このブログの更新は Twitterアカウント @m_hiyama で通知されます。
Follow @m_hiyama

メールでのご連絡は hiyama{at}chimaira{dot}org まで。

はじめてのメールはスパムと判定されることがあります。最初は、信頼されているドメインから差し障りのない文面を送っていただけると、スパムと判定されにくいと思います。

参照用 記事

Java BlockingQueueで遊ぶ:パイプラインごっこ

java.util.concurrentパッケージにBlockingQueue<E>(Eは型パラメータ)というクラスがあるんですね。おー、これは便利。

BlockingQueueはキュー(先入れ先出しバッファ)ですが、バッファが空のときや満杯のときはスレッドを停止して待たせること(ブロッキング)を自動的にやってくれます。ということは、Unixのパイプと同じ機能を持ちますね。

僕は、stdinとstdout、getcharとputcharを使うプログラムがとっても気にいっていたので、BlockingQueueで再現してみます。

まずは、1文字単位入出力のインターフェース:

// InOut.java
interface In {
  int getc(); // -1 means EOF
}

interface Out {
  void putc(char c);
  void end();
}

Outにあるend()ってメソッドが気になるかもしれませんが、後で説明します。

次にパイプです。

// Pipe.java
import java.util.concurrent.*;

class PipeException extends RuntimeException {
  public PipeException(String msg) {
    super(msg);
  }
}

class Pipe implements In, Out {
  private BlockingQueue&lt;Integer> queue;

  public Pipe() {
    queue = new ArrayBlockingQueue&lt;Integer>(8);
  }

  public int getc(){
    Integer c;
    try {
      c = queue.take();
    } catch (InterruptedException e) {
      throw new PipeException(e.getMessage());
    }
    if (c == null) throw new PipeException("Can not get");
    return c.intValue();
  }

  public void putc(char c) {
    try {
      queue.put((int)c);
    } catch (InterruptedException e) {
      throw new PipeException(e.getMessage());
    }
  }

  public void end() {
    try {
      queue.put(-1);
    } catch (InterruptedException e) {
      throw new PipeException(e.getMessage());
    }
  }
}

件のBlockingQueueを使って整数キューを作りました。データは文字ですが、EndOfFileを表す-1を受け入れる必要があるので整数です。

さてこの EOF=-1 ですが、ファイルの終わりにホントに入っているわけではありません。また、プロセスが終了したときにEOFを出力するわけでもありません。単なる信号です。Unixの場合は、ファイルIOやプロセス管理のメカニズムがうまいことEOFを作り出してくれますが、それをシミュレートするのが面倒だったので、インターフェースOutにend()ってメソッドを入れました。end()を呼ぶことはEOFを出力することになります。

パイプでつなぐフィルターを2つ作っておきます。

// Filter.java
abstract class Filter implements Runnable {
  protected In in;
  protected Out out;

  public Filter(In in, Out out) {
    this.in = in;
    this.out = out;
  }

  public abstract void run(); // runnable

  public void start() {
    (new Thread(this)).start();
  }
}

class Tolower extends Filter {
  public Tolower(In in, Out out) {
    super(in, out);
  }

  public void run() {
    int c;
    while ((c = in.getc()) != -1) {
      out.putc(Character.toLowerCase((char)c));
    }
    out.end();
  }
}

class Word extends Filter {
  public Word(In in, Out out) {
    super(in, out);
  }

  public void run() {
    boolean inWord = false;
    int c;
    while ((c = in.getc()) != -1) {
      if (Character.isLetterOrDigit((char)c)) {
        inWord = true;
        out.putc((char)c);
      } else {
        if (inWord) {
          out.putc('\n');
        }
        inWord = false;
      }
    }
    out.end();
  }
}

フィルターを書いたことがある人はなつかしい感じがするでしょ :-) Unixのプロセスによりフィルターを実現するときとほとんど同じですが、先ほど説明したend()の呼び出しが加わっています。これは、パイプの実装を手抜きしたペナルティとしてのオマジナイです。

データの湧きだし元(ソース)と吐き出し口(シンク)も作っておきます。それぞれ、ファイルとコンソール(本物のstdout)です。

// Device.java
import java.io.*;

class FileIn implements In {
  private Reader reader;

  public FileIn(String filename) throws FileNotFoundException {
    reader = new FileReader(filename);
  }
  
  public int getc() {
    try {
      return reader.read();
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }
}

class StdOut implements Out {
  public void putc(char c) {
    System.out.print(c);
  }

  public void end() {
    System.out.flush();
  }
}

これで準備は終わりです。試しに、

  • word < file.txt | tolower

というコマンドラインと同じことをするプログラムを作ってみましょう。[追記](下のアーカイブ内のソースでは、PipelineTest→PipelineDemoとリネームしてます。)[/追記]

// PipelineTest.java
import java.io.*;

class PipelineTest {
  public static void main(String[] args) {
    if (args.length < 1) {
      System.err.println("No args.");
      System.exit(1);
    }
    String filename = args[0];

    FileIn fileIn = null;
    try {
      fileIn = new FileIn(filename);
    } catch (FileNotFoundException e) {
      System.err.println("File not found.");
      System.exit(1);
    }
    StdOut stdOut = new StdOut();
    Pipe pipe = new Pipe();
    Filter word = new Word(fileIn, pipe);
    Filter tolower = new Tolower(pipe, stdOut);
    
    word.start();
    tolower.start();
  }
}

適当なテキストファイルを引数にして java -cp . PipelineTest file.txt を実行すると、テキストファイル中の語(英数字Unicodeでletterと呼ばれている文字と数字の並びに過ぎません)が、英字はすべて小文字にして列挙されます。

パイプラインは、Producer-Consumerパターンと呼ばれる並列デザインパターンの典型例であり、BlockingQueueはProducer-Consumerパターンを直接的にサポートします。BlockingQueueにより、他の並列デザインパターンも実現できますが、それはまたにしましょう。

続きがあります。→Java BlockingQueueでもっと遊ぶ:変形パイプラインから圏論へ