Pipeline

Goの並行パターン:パイプラインとキャンセル (Go Concurrency Patterns: Pipelines and cancellation)

Goの並行パターン:パイプラインとキャンセル #

Go Concurrency Patterns: Pipelines and cancellation by Sameer Ajmani

はじめに #

Goの並行性に関する基本要素によって、I/Oや複数のCPIを効率的に使うことができるストリーミングデータパイプラインを 簡単に構築することができます。この記事ではそのようなパイプラインの例を紹介し、操作が失敗したときに発生する 繊細な事柄にハイライトを当て、また失敗に綺麗に対応するテクニックを紹介します。

パイプラインとはなにか #

Goにおいて、パイプラインの厳密な定義はありません。パイプラインは数ある並行プログラミングの種類の一つに過ぎません。 正式な定義ではないですが、パイプラインとはチャンネルによって接続された一連の ステージ を挿します。 そこでは、各ステージでは同じ関数を実行するゴルーチンのまとまりになっています。 各ステージではゴルーチンは次の役割を果たします。

  • 上流 から 流入 チャンネル経由で値を受け取る
  • そのデータに対してある関数を実行し、通常は新しい値を生成する
  • 下流流出 チャンネル経由で値を送信する

各ステージでは、任意の数の流入と流出のチャンネルを持っています。ただし最初と最後のステージは例外で、 それぞれ流出と流入のチャンネルのみが存在します。最初のステージは時々 ソース あるいは プロデューサー と呼ばれ、 最後のステージは シンク あるいは コンシューマー と呼ばれます。

パイプラインの考え方とそのテクニックを説明するために単純なパイプラインの例から始めてみましょう。 あとでより現実的な例を紹介します。

数字を平方する #

3つのステージからなるパイプラインを考えてみましょう。

最初のステージ gen は整数のリストからチャンネルに変換する関数です。このチャンネルがリスト内の整数を出すことになります。 gen 関数は整数をチャンネルに送信するゴルーチンを起動し、すべての値が送信されたらチャンネルを閉じます。

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

2番めのステージは sq で、チャンネルから整数を受信して、受信した整数それぞれの平方を出すチャンネルを返します。 流入のチャンネルが閉じて、すべての値を下流に送った後に、流出のチャンネルを閉じます。

...