Concurrency

Goの並行パターン:コンテキスト (Go Concurrency Pattern: Context)

Goの並行パターン:コンテキスト #

Go Concurrency Pattern: Context by Sameer Ajmani

はじめに #

Goで書かれたサーバでは、サーバに来たリクエストはそれぞれそれ自身のゴルーチンで処理されます。 リクエストハンドラはしばしばデータベースやRPCサービスといったバックエンドにアクセスするために追加のゴルーチンを起動します。 リクエストの処理を行っているゴルーチンは、通常エンドユーザのアイデンティティや認可トークン、リクエストの期限などリクエスト固有の値へのアクセス権が必要です。 リクエストがキャンセルされたりタイムアウトした場合には、それらのゴルーチンが使っていたリソースをシステムが再度要求することができるように、そのリクエストの処理を行っているすべてのゴルーチンは素早く終了すべきです。

Googleで私たちは、APIの境界をまたぐリクエスト固有の値やキャンセルのシグナル、期限などを、 あるリクエストの処理に関与するすべてのゴルーチンに投げることを容易にする、 context パッケージというパッケージを開発しました。 パッケージは golang.org/x/net/context に公開されています。 1 この記事ではそのパッケージの使い方と実際に動作する例を紹介したいと思います。

コンテキスト(Context) #

context パッケージの核となっているのは Context 型です。

// ContextはAPIの境界を越えて期限とキャンセルシグナルとリクエスト固有の値を保持します。
// メソッドは複数のゴルーチンから同時に呼び出されても安全です。
type Context interface {
    // Doneはこのコンテキストがキャンセルされたりタイムアウトした場合にcloseされます。
    Done() <-chan struct{}

    // ErrはDoneチャンネルが閉じた後なぜこのコンテキストがキャンセルされたかを知らせます。
    Err() error

    // Deadlineは設定されている場合にはいつこのContextがキャンセルされるかを返します。
    Deadline() (deadline time.Time, ok bool)

    // Valueはkeyに紐付いた値を返し、設定がない場合はnilを返します。
    Value(key interface{}) interface{}
}

(この説明は要約されたもので、 godoc が正式なものです。)

Done メソッドは、 Context の代わりに動作する関数に対するキャンセルシグナルとして振る舞うチャンネルを返します。チャンネルが閉じられたときに、関数は処理を中断して戻るべきです。 Err メソッドはなぜその Context がキャンセルされたかを示すエラーを返します。 パイプラインとキャンセル の記事では Done チャンネルのイディオムについてより詳細に議論しています。

Context には Done チャンネルが受信専用であるのと同様の理由で Cancel メソッドがあり ません 。 キャンセルシグナルを受け取る関数は通常シグナルを送る関数ではありません。 特に、親の操作が子の操作を行うためのゴルーチンを起動したときに、それらの子の操作が親の操作をキャンセルできるべきではありません。 代わりに WithCancel 関数(あとで説明します)で新しい Context の値をキャンセルする方法を提供します。

...

Goの並行パターン:パイプラインとキャンセル (Go Concurrency Patterns: Pipelines and cancellation)

Goの並行パターン:パイプラインとキャンセル #

Go Concurrency Patterns: Pipelines and cancellation by Sameer Ajmani

はじめに #

Goの並行性に関する基本要素によって、I/Oや複数のCPIを効率的に使うことができるストリーミングデータパイプラインを 簡単に構築することができます。この記事ではそのようなパイプラインの例を紹介し、操作が失敗したときに発生する 繊細な事柄にハイライトを当て、また失敗に綺麗に対応するテクニックを紹介します。

パイプラインとはなにか #

Goにおいて、パイプラインの厳密な定義はありません。パイプラインは数ある並行プログラミングの種類の一つに過ぎません。 正式な定義ではないですが、パイプラインとはチャンネルによって接続された一連の ステージ を挿します。 そこでは、各ステージでは同じ関数を実行するゴルーチンのまとまりになっています。 各ステージではゴルーチンは次の役割を果たします。

  • 上流 から 流入 チャンネル経由で値を受け取る
  • そのデータに対してある関数を実行し、通常は新しい値を生成する
  • 下流流出 チャンネル経由で値を送信する

各ステージでは、任意の数の流入と流出のチャンネルを持っています。ただし最初と最後のステージは例外で、 それぞれ流出と流入のチャンネルのみが存在します。最初のステージは時々 ソース あるいは プロデューサー と呼ばれ、 最後のステージは シンク あるいは コンシューマー と呼ばれます。

パイプラインの考え方とそのテクニックを説明するために単純なパイプラインの例から始めてみましょう。 あとでより現実的な例を紹介します。

数字を平方する #

3つのステージからなるパイプラインを考えてみましょう。

最初のステージ gen は整数のリストからチャンネルに変換する関数です。このチャンネルがリスト内の整数を出すことになります。 gen 関数は整数をチャンネルに送信するゴルーチンを起動し、すべての値が送信されたらチャンネルを閉じます。

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

2番めのステージは sq で、チャンネルから整数を受信して、受信した整数それぞれの平方を出すチャンネルを返します。 流入のチャンネルが閉じて、すべての値を下流に送った後に、流出のチャンネルを閉じます。

...

Goの並行パターン:タイムアウトと進行(Go Concurrency Patterns: Timing out, moving on)

Goの並行パターン:タイムアウトと進行 #

Go Concurrency Patterns: Timing out, moving on by Andrew Gerrand

並行プログラミングにはイディオムがあります。良い例はタイムアウトです。 Goのチャンネルではタイムアウトを直接はサポートしていませんが、その実装は容易です。 たとえば、ch チャンネルから値を受信したいけれど、1秒以上は待ちたくないという状況を考えてみましょう。 まずシグナル用のチャンネルを作り、そのチャンネルに送信する前に1秒待つゴルーチンを起動します。

timeout := make(chan bool, 1)
go func() {
    time.Sleep(1 * time.Second)
    timeout <- true
}()

その後、select構文を使って chtimeout を待つようにします。 もし1秒待っても ch から何も来なければ、 timeout のケースが選択され、chからの読み込みは破棄されます。

select {
case <-ch:
    // chから読み込む
case <-timeout:
    // chからの読み込みはタイムアウト
}

timeout チャンネルは1つの値をバッファし、タイムアウトのゴルーチンがそのチャンネルに値を送り、終了できるようになっています。 このゴルーチンは、chから値が受け取られたかを知りません。(あるいは気にしていません) つまりこのゴルーチンはchからの読み込みがタイムアウトより前に起こったとしても、永遠には存在しえません。 timeout チャンネルは最終的にガベージコレクタによって回収されます。

(この例ではゴルーチンとチャンネルの機構をデモするために time.Sleep を使いました。 実際のプログラムでは time.After という、チャンネルを返し、決まった時間のあとにそのチャンネルに値を送る関数を使うべきでしょう。)

このパターンの他の例を見てみましょう。この例では複数のレプリケーションされたデータベースから同時に読み込むプログラムを扱っています。 このプログラムでは、値は1つだけ必要で最初に来た値だけを取得すべきです。

Query 関数はデータベース接続のスライスと問い合わせの文字列を引数に取ります。 この関数は各データベースに並列に問い合わせ、最初に受信した結果を返します。

func Query(conns []Conn, query string) Result {
    ch := make(chan Result, 1)
    for _, conn := range conns {
        go func(c Conn) {
            select {
            case ch <- c.DoQuery(query):
            default:
            }
        }(conn)
    }
    return <-ch
}

この例では、クロージャーがノンブロッキングに送信します。これは、 default ケース付きの select 構文内の送信操作を使うことで実現しています。 もし送信が出来なければ、直ちに default ケースが選択されます。 送信をノンブロッキングにすることで、ループ内で立ち上げられたゴルーチンが1つも無駄に生存しないことが保証されます。 しかしながら、親の関数が値を受信する前に結果が来れば、チャンネルのバッファの準備ができていないため送信は失敗する可能性があります。

...