26. バケツいっぱいのソケット

../_images/bucket.png

これまで、Erlang自身で面白いことをしてきましたが、かろうじて外の世界とのやり取りを、あちこちのテキストファイルの読み書きだけでは行なっていました。 ローカルでのやり取りも面白いですが、そろそろ外へ飛び出して、外の世界とのやり取りを始めてみましょう。

この章では、ソケットを使った3つのコンポーネントを扱います。IOリスト、UDPソケット、TCPソケットの3つです。 IOリストは話題としては極端に複雑なものではありません。単純に、ソケットや他のErlangドライバ越しに送る文字列を効率的に作る賢い方法というだけです。

26.1. IOリスト

このガイドの始めの方で、テキストを扱う際には、文字列(整数のリスト)かバイナリ(データを保持しているバイナリデータ)のどちらも使えるという話をしました。 たとえば「Hello World」という文字列をネットワーク越しに送るときに "Hello World" という文字列としても送れますし、 <<"Hello World">> というバイナリとしても送れます。 記法も似ていますし、結果も同様になります。

2者の違いは、どのように組み立てるかというところにあります。 文字列は、整数の単方向リストのような形になっています。各文字は、文字自身と、リストの残りの部分へのリンクを保存しなければいけません。 その上、たとえば真ん中でも終端でも、リストに要素を追加したくなったら、修正をする場所までリスト全体を走査して、それから要素を追加します。 しかし、これは先頭に文字列を追加する場合には当てはまりません:

A = [a]
B = [b|A] = [b,a]
C = [c|B] = [c,b,a]

上に示したように、先頭に追加する場合は、 ABC に何が入っていても、再書き込みが必要ありません。 C の表現としては [c,b,a] でも [c|B] でも [c,|[b|[a]]] のどれでも可能です。 最後の表現では、 A の形は宣言された時と同じであるとわかるでしょう。 B についても同様です。 追加の場合に関する例は次のとおりです:

A = [a]
B = A ++ [b] = [a] ++ [b] = [a|[b]]
C = B ++ [c] = [a|[b]] ++ [c] = [a|[b|[c]]]

再書き込みしているのが分かりましたか。 B を作成するときに、 A を再書き込みしなければいけません。 C を作成するときは、 B を再書き込みしなければいけません。( B が含んでいる [a|...] に関する再書き込みも含みます) 同様の方法で D を追加しようとすると、 C を再書き込みする必要があるでしょう。 長い文字列に関しては、これはまったく非効率な方法で、Erlang VMによって掃除されるたくさんのゴミを生成してしまいます。

バイナリに関しては、まったく問題はないです:

A = <<"a">>
B = <<A/binary, "b">> = <<"ab">>
C = <<B/binary, "c">> = <<"abc">>

この場合、バイナリは自分の長さを知っていて、データは一定時間で結合することができます。 いいですね、リストよりもずっと良いです。 また、バイナリはリストに比べて小さくまとまっています。 これらの理由から、ここから先はテキストを扱う際にはなるべくバイナリを使うことにします。

一方で、いくつか欠点もあります。 バイナリは、特定の方法で処理されることを想定していて、バイナリを修正したり、分割したり、といった操作は依然としてコストが掛かります。 さらに、時々文字列とバイナリと個々の文字をお互い変換しながら使うコードを扱うこともあるでしょう。 絶え間なく、型をまたいで変換することは面倒です。

これらの例では、 IOリスト が救世主となります。 IOリストは奇妙なデータ構造の型です。 IOリストは、バイト(0から255の整数)、バイナリ、他のIOリストのいずれかのリストになります。 これはつまり、IOリストを受け取る関数は、 [$H, $e, [$l, <<"lo">>, " "], [[["W","o"], <<"rl">>]] | [<<"d">>]] というようなものも受け取れるということです。 これを受け取るとき、Erlang VMはリストを、 Hello World という文字の並びを取得するために必要な作業として、flattenします。

このようなIOリストを受け取る関数とはどのような関数でしょうか。 データを出力しなければいけない関数はたいていそれにあたります。 io モジュールや file 、TCPやUDPソケットにある関数はどんな関数でもIOリストを扱うことができます。 2つ3つ例を挙げると、 unicode モジュールのうちのいくつかや re (正規表現)モジュールのすべての関数、といったライブラリ関数もIOリストを扱えます。

先のHello World IOリストを、シェルで io:format("~s~n", [IoList]) に渡してみましょう。 問題なく動作すると思います。

../_images/brotocol.png

全体としてみると、IOリストは、出力する文字列を動的に生成するときに、イミュータブルなデータ構造に関する問題を避けつつ生成できる非常に賢い方法です。

26.2. TCPとUDP: 兄弟プロトコル

Erlangで使える最初のソケットとしてはUDPプロトコルに基づいたものがあります。 UDPはIP層の上に作られたプロトコルで、IP層の上にポート番号といったいくつかの抽象化がされています。 UDPはステートレスプロトコルと呼ばれています。 UDPポートから取得されたデータは細かく分けられ、タグが外され、セッションはなく、あなたが受信した断片は送られたときと同じ順番で届いているか保証はないのです。 事実、誰かがパケットを送信して、それをあなたが受信するかどうかもまったく保証されないのです。 これらの理由から、UDPはパケットが小さく、失っても影響が小さく、複雑なやり取りがあまりない場合、あるいは待ち時間が短くなる必要がある場合に使われる傾向があります。

これは、TCPのようなステートフルプロトコルとは逆です。ステートフルプロトコルというのは、パケットの喪失や、パケットの順序の並び替え、複数の送信者と受信者間の独立したセッションの管理などを行うプロトコルです。 TCPは信頼がおける情報交換方法ですが、遅くなったり、立ち上げに時間が掛かるというリスクがあります。 UCPは速いですが、TCPほど信頼が置けません。 必要に応じて、注意して選択してください。

とにかく、ErlangでUDPを扱うのは比較的簡単です。 与えられたポート越しにソケットを用意して、そのソケットがデータの送受信の両方を行えます:

../_images/udp-ports.png

あまり良くないアナロジーを使うと、UDPは家に大量の郵便受けがあり(各郵便受けがポートです)、メッセージがちょっとだけ書かれた紙切れを各郵便受けで受け取るようなものです。 ここでいうメッセージには「あなたがこのパンツが似合いますね」から「これは家の中にあった紙切れです!」まで、どんな内容のものも含まれます。 いくつかのメッセージが一片の紙切れに書くには長すぎるときは、たくさんの紙切れとなって郵便受けに入れられています。 それを意味が通るように並び替えて、誰かの家まで運転していって、返信として紙切れを郵便受けに入れるのはすべてあなたの仕事です。 メッセージが純粋に有益なもの(「もしもし、ドアの鍵が開いてますよ」)であったり、とても短いもの(「何着てるの?Ronより」)だった場合には問題がなく、すべてのやり取りは1つの郵便受けで事足ります。 しかし、メッセージが複雑だった場合、セッションごとに異なるポートを使いたくなるのではないでしょうか。 UDPではだめだ!TCPを使いましょう!

TCPの場合は、プロトコルはステートフルで、接続ベースです。 メッセージを送信できるようになる前に、ハンドシェイクを行わなければいけません。 つまり、誰かが郵便受けを取得して(UDPでのアナロジーで使っていたものと同様)、「よう、こちらIPアドレス 94.25.12.37だけど、ちょっと話さない?」というようなメッセージを送信します。これに対して、あなたが「いいよ、メッセージにN番とタグと付けて、そこから番号を増やしていこう」というような具合で返信します。 その時点から、あなたとIPアドレス 94.25.12.37がお互いにやり取りをする間は、紙切れを順番に並べ、無くなっているメッセージを相手にもう一回送るように伝え、それらのメッセージに返信し、などを有意義にやりとりすることができます。

この方法では、1つの郵便受け(つまりポート)を使ってすべてのやり取りを問題なく行うことができます。 これがTCPの良いところです。 多少のオーバーヘッドは掛かりますが、すべてが順序良く、正しく配信されるようにしてくれます。

このようなアナロジーが好きでなくとも、残念がる必要はありません。なぜなら、いまから、本題に入って、TCPとUDPの各ソケットのErlangでの使い方を見ていくからです。 やり方は単純です。

26.3. UDPソケット

UDPには基本的な操作が少ししかありません。ソケットを用意する、メッセージを送信する、メッセージを受け取る、接続を閉じる、だけです。 起こりうる事象は次の図で表せるでしょう:

../_images/udp.png

何を差し置いても、まず最初の操作はソケットを開くことです。 これは gen_udp:open/1-2 を呼び出すことで行えます。 最も簡素な形式は {ok, Socket} = gen_udp:open(PortNumber) です。

ポート番号は1から65535までの整数です。 0から1023はwell-knownポートとして知られています。 たいていの場合、あなたが使っているOSでは、管理者権限がなければwell-knownポートで待ち受けることはできないでしょう。 1024から49151までのポートは予約済み(registered)ポートです。 これらは通常は特別な権限なしに自由に使えますが、いくつかはよく知られたサービスで使われているポートです。 残りのポートは動的ポートあるいはプライベートポートと呼ばれています。 これらはよく一時的なポートとして使われます。 私たちのテストでは、8789のような、いくらか安全で、あまり使われることのないポート番号を使いましょう。

先に進む前に、 gen_udp:open/2 についてまだ終わってませんでした。 この2番目の引数はオプションのリストで、どのような型のデータ( list または binary )を受信したいか、あるいは、どのような形で受信したいか、たとえばメッセージ( {active, true} )あるいは関数呼び出しの結果( {active, false} )などを指定します。 さらに、ソケットをIPv4( inet4 )とIPv6( inet6 )のどちらにするか、UDPソケットがブロードキャストに使えるようにするか( {broadcast, true | false} )、バッファのサイズなどに関してもオプションがあります。 もっと多くのオプションがありますが、とりあえずいまは単純なものだけを扱って、残りを学ぶかどうかについては読者のみなさんにお任せしましょう。 UDPの機能についてはすぐに複雑な話題になりやすいですし、あいにく本書はErlangについての本であってTCPやUDPの本ではありませんからね。

それでは、ソケットを開いてみましょう。まずはErlangシェルを起動します:

1> {ok, Socket} = gen_udp:open(8789, [binary, {active,true}]).
{ok,#Port<0.676>}
2> gen_udp:open(8789, [binary, {active,true}]).
{error,eaddrinuse}

最初のコマンドでソケットを開き、バイナリデータを返すように指示したあと、アクティブであるように指定しました。 #Port<0.676> という新しいデータ構造が返されていることがお分かりですか。 これは、たったいま開いたソケットを表しています。 これはPidとほぼ同様の使い方をすることができます。つまり、クラッシュした際に失敗がソケットに伝搬するようにリンクを設定することすらできるのです! 2つ目の関数呼び出しでは、同じソケットを再度開こうとしていますが、これはできません。 それが {error, eaddrinuse} が戻されている理由です。 幸いにも、最初のソケット Socket はまだ開いています。

さて、2つ目のErlangシェルを開きましょう。 こちらのシェルでは2つ目のUDPソケットを、異なるポート番号で開きます:

1> {ok, Socket} = gen_udp:open(8790).
{ok,#Port<0.587>}
2> gen_udp:send(Socket, {127,0,0,1}, 8789, "hey there!").
ok

おや、新しい関数です! 2つ目の関数呼び出しで、メッセージの送信に gen_udp:send/4 という関数が使われています。(なんと機能がわかりやすい関数名でしょう) この関数の引数は、左から順番に、 gen_udp:send(OwnSocket, RemoteAddress, RemotePort, Message) となります。 RemoteAddress は文字列、ドメイン名を含むアトム(”example.org”)、IPv4アドレスを表す4要素のタプル、IPv6アドレスを表す8要素のタプルのいずれかです。 その後、受信者のポート番号を指定します。(どの郵便受けに私たちの紙切れを入れるかということです) そして、メッセージは、文字列、バイナリ、IOリストのいずれかとなります。

メッセージは送信されたのでしょうか。 最初のErlangシェルに戻って、データをフラッシュしてみましょう:

3> flush().
Shell got {udp,#Port<0.676>,{127,0,0,1},8790,<<"hey there!">>}
ok

すばらしい。 ソケットを開いたプロセスはメッセージを {udp, Socket, FromIp, FromPort, Message} の形式で受信します。 これらのフィールドを使って、メッセージがどこから来て、どのソケットを通ったか、そしてどんなメッセージだったのかを知ることができます。 ここまでで、ソケットを開いて、データを送信して、そのデータをアクティブモードモードで受信しました。 パッシブモードではどうなるでしょうか。 これを調べるためには、最初のErlangシェルでソケットを閉じて、新しいソケットを開きましょう:

4> gen_udp:close(Socket).
ok
5> f(Socket).
ok
6> {ok, Socket} = gen_udp:open(8789, [binary, {active,false}]).
{ok,#Port<0.683>}

ここで、ソケットを閉じ、 Socket 変数の束縛を解き、それから再度ソケットを開いて Socket 変数に束縛しました。今度はパッシブモードです。 再びメッセージを送る前に、次のコマンドを試してみましょう:

7> gen_udp:recv(Socket, 0).

あなたのErlangシェルは固まってしまうと思います。 ここで使った関数は recv/2 です。 この関数は、パッシブソケットをポーリングして、メッセージを待つときに使われます。 ここで 0 は欲しいメッセージの長さを表しています。 面白いことに、この長さは gen_udp に完全に無視されます。 gen_tcp にも同様の関数がありますが、こちらでは意味があります。 とにかく、私たちがメッセージを送らなければ、 recv/2 は二度と戻って来ません。 2つ目のErlangシェルに戻って、新しいメッセージを送ってみましょう:

3> gen_udp:send(Socket, {127,0,0,1}, 8789, "hey there!").
ok

最初のErlangシェルに {ok,{{127,0,0,1},8790,<<"hey there!">>}} と戻り値として表示されたことでしょう。 待ち時間を無限にしたくない場合はどうしたらいいでしょうか。 タイムアウト値を設定するだけです:

8> gen_udp:recv(Socket, 0, 2000).
{error,timeout}

これでUDPに関してはほとんどおしまいです。本当ですよ!

26.4. TCPソケット

TCPソケットはインターフェースこそUDPソケットと大部分が同じですが、その動作が決定的に異なる部分があります。 最大の違いは、クライアントとサーバがまったく異なるものである、ということです。 クライアントは次のような振る舞いをします:

../_images/tcp-client.png

一方で、サーバは次のようなスキームに従います:

../_images/tcp-server.png

ぱっと見おかしくないですか。 クライアントの動作は gen_udp で見たものと似ています。ポートに接続して、送信して受信して、終わりです。 しかし、サーバの場合、1つ新しいモードが加わりました。LISTENです。 これはTCPがセッションを準備するやり方に起因しています。

まず最初に、新しいErlangシェルを立ちあげて、listenソケットと呼ばれるものを gen_tcp:listen(Port, Options) で開きます:

1> {ok, ListenSocket} = gen_tcp:listen(8091, [{active,true}, binary]).
{ok,#Port<0.661>}

listenソケットは、接続要求を待つ責任しかありません。 私が gen_udp では同じようなオプションを使っていたのを見たことでしょう。 あれは、たいていのオプションがすべてのIPソケットに対して同様になるからです。 TCPの場合は、さらに特定用途のオプションがあります。たとえば接続バックログ( {backlog, N} )、keepaliveソケット( {keepalive, true | false} )、パケットのパッケージング( {packet, N} 、ここで N は各パケットヘッダが切り取られ、パースされる長さです)などがあります。

一旦、listenソケットが開くと、あらゆる(2つ以上の)プロセスがlistenソケットを取得し、’accepting’状態になり、あるクライアントがそれに話しかけるまでロックされます:

2> {ok, AcceptSocket} = gen_tcp:accept(ListenSocket, 2000).
** exception error: no match of right hand side value {error,timeout}
3> {ok, AcceptSocket} = gen_tcp:accept(ListenSocket).
** exception error: no match of right hand side value {error,closed}

ちくしょう。 タイムアウトしてクラッシュしました。 listenソケットは、それが紐付くシェルプロセスが落ちた時に閉じてしまいました。 もう一度試してみましょう。今度はタイムアウトは2秒(2000ミリ秒)です:

4> f().
ok
5> {ok, ListenSocket} = gen_tcp:listen(8091, [{active, true}, binary]).
{ok,#Port<0.728>}
6> {ok, AcceptSocket} = gen_tcp:accept(ListenSocket).

今度はプロセスがロックされました。 いいですね! 2つ目のシェルを開いてみましょう:

1> {ok, Socket} = gen_tcp:connect({127,0,0,1}, 8091, [binary, {active,true}]).
{ok,#Port<0.596>}

こちらのソケットはいつもどおりのオプションで、タイムアウトを無制限にしたくなければ、最後に Timeout という引数を追加することもできます。 最初のシェルに戻ってみると、 {ok, SocketNumber} という戻り値を確認できることでしょう。 その時点から、acceptソケットとクライアントソケットは、gen_udpと同様に、1対1のやり取りができるようになります。 2つ目のシェルで、最初のシェルにメッセージを送ってみましょう:

3> gen_tcp:send(Socket, "Hey there first shell!").
ok

そして最初のシェルで次のコマンドを実行します:

7> flush().
Shell got {tcp,#Port<0.729>,<<"Hey there first shell!">>}
ok

両ソケットともに同様にメッセージを送信でき、 gen_tcp:close(Socket) で閉じることができます。 acceptソケットを閉じるときはソケットだけを閉じ、listenソケットを閉じる場合は関連するacceptソケットもすべて閉じます。

これでErlangでのTCPソケットに関する話はおしまいです! でも本当におしまいなのでしょうか。

ええ、もちろん、これ以上お話できることはあります。 あなたが自分でソケットを操作する経験を積んでいくと、ソケットの所有権のようなものがあることに気がつくことでしょう。

これはつまり、UDPソケット、TCPクライアントソケット、TCP acceptソケットのすべてが、存在しているあらゆるプロセスからメッセージを取得することができますが、取得したメッセージはソケットを起動したプロセスのみが読み取ることができます:

../_images/socket-owner.png

これはあまり実用的ではありませんね。 この状況では、たとえ私たちの要件にないものだとしても、メッセージを渡すために所有者プロセスを常に生かしておかなければいけないということです。 次のようなことができたら素敵ではないですか。

1. プロセスAがソケットを起動する
2. プロセスAがリクエストを送信する
3. プロセスAがプロセスBを起動し、
   ソケットを起動する
4a. ソケットの所有権を              4b. プロセスBがリクエストを処理する
    プロセスBに与える
5a. プロセスAがリクエストを送る     5b. プロセスBがリクエストを処理しつつける
6a. プロセスAがプロセスCを生成し、  6b. ...
    ソケットを起動する
    ...

ここで、 A は大量のクエリを捌く担当となり、新しく生成された各プロセスはリプライ、その処理などの担当となります。 このように A が新しいプロセスにタスクを処理する権限を移譲するというのが賢いでしょう。 ここで手が込んでいる部分は、ソケットに所有権を譲る部分です。

ここに技があります。 gen_tcpとgen_udpは共に controlling_process(Socket, Pid) と呼ばれる関数を持っています。 この関数は、現在のソケットの所有者が呼ばなくてはいけません。 これで、そのプロセスはErlangに「わかる?この Pid に僕のソケットを譲るよ。もう僕には無理だ。」ということを伝えます。 このときから、その関数内の Pid がソケットからメッセージを受け取って読み取れるプロセスとなります。 以上です。

26.5. inetでもっと制御する

さて、これでソケットを開き方、ソケットへのメッセージの送り方、所有権の変更の仕方などを理解しました。 また、パッシブモード、アクティブモードの両方でのメッセージの待ち受け方も理解しました。 UDPの例に戻って、アクティブモードからパッシブモードに切り替えようとしたときに、私はソケットを再起動して、変数をフラッシュしました。 これはあまり実用的ではなく、特にTCPを使って同様のことを行いたいときには、生きているセッションを切らなければいけないので、使い物になりません。

幸いにも、 inet というモジュールがあり、これはgen_tcpとgen_udpのソケットの両方で共有されたすべての操作を担当しています。 今挙げた、アクティブモードとパッシブモードの切り替えに関する問題に対しては、 inet:setopts(Socket, Options) という名前の関数があります。 オプションのリストには、ソケットを用意する上で必要などのような項でも含めることができます。

Note

注意して下さい! inet という名前のモジュールと inets という名前のモジュールがあります。 ここで私たちが使いたいモジュールは inet です。 inets はたくさんの事前定義のサービスやサーバ(FTP、トリビアルFTP(TFTP)、HTTPなど)を含んだOTPアプリケーションです。

これらの違いは、 inetsinet 上に作られたサービスだ、と覚えるか、あるいは inet + s(ervices) と覚えれば理解しやすいです。

Erlangシェルを起動してTCPサーバとします:

1> {ok, Listen} = gen_tcp:listen(8088, [{active,false}]).
{ok,#Port<0.597>}
2> {ok, Accept} = gen_tcp:accept(Listen).

そして2つ目のErlangシェルでは次のコマンドを実行します:

1> {ok, Socket} = gen_tcp:connect({127,0,0,1}, 8088, []).
{ok,#Port<0.596>}
2> gen_tcp:send(Socket, "hey there").
ok

最初のシェルに戻ると、ソケットは受け入れられているはずです。 何か受け取ったかを確認するためにフラッシュします:

3> flush().
ok

もちろん、受け取っていません。 いまはパッシブモードになっているからです。 これを修正してみましょう:

4> inet:setopts(Accept, [{active, true}]).
ok
5> flush().
Shell got {tcp,#Port<0.598>,"hey there"}
ok

やりました! アクティブソケットとパッシブソケットに対してすべての制御を行えるようになりました。力を手中に収めました。 どのようにアクティブモードとパッシブモードを使い分けたらいいのでしょうか。

../_images/stop2.png

多くの懸念点があります。 一般的に、メッセージをすぐにでも受け取りたいのであれば、パッシブモードのほうがずっと速いです。 Erlangでは処理をおこなうためにプロセスのメールボックスを無駄に使う必要はなく、そのメールボックスの中身を確認したり、メッセージを取得することなども必要ありません。 recv を使うとより効率的です。 しかしながら、 recv はプロセスをイベント駆動なものからアクティブポーリングへと変更します。あるソケットと他のErlangコードの間の仲介役を書かなければならない場合、このことによって状況が少々複雑になります。

仲介役のコードを書く場合には、アクティブモードに切り替えるというのはいい方法かもしれません。 パケットがメッセージとして送られた場合、受信状態(つまりgen_serverの handle_info 関数)で待って、メッセージを処理しなければなりません。 スピードとは別に、この方法の欠点は、アクセス数制限です。

外部からくるすべてのパケットが闇雲にErlang VMに受け入れられ、メッセージに変換される場合、VMの外部からメモリを溢れさせて、VMを殺すことが容易に可能となってしまうため、そのような制限が設けられました。 パッシブモードには、メッセージがいつどのようにErlang VMに取り込まれ、ブロックし、キューにため、メッセージを下位層の実装に渡すというタスクを委譲するか、を制限できる利点があります。

もしアクテイブモードにセマンティクスが必要で、パッシブモードに安全性が必要だとしたらどうでしょうか。 inet:setupts/2 を使ってパッシブモードとアクテイブモードを素早く切り替えることはできますが、競合条件になる危険がかなりあります。 代わりに、 {active, once} というオプションで設定できる アクティブワンス(active once) と呼ばれるモードがあります。 これがどのような働きをするか見てみましょう。

先程のサーバ側のシェルを残しておいて下さい:

6> inet:setopts(Accept, [{active, once}]).
ok

クライアント側のシェルに移って、 send/2 を2回呼び出してみましょう:

3> gen_tcp:send(Socket, "one").
ok
4> gen_tcp:send(Socket, "two").
ok

サーバ側のシェルに戻って:

7> flush().
Shell got {tcp,#Port<0.598>,"one"}
ok
8> flush().
ok
9> inet:setopts(Accept, [{active, once}]).
ok
10> flush().
Shell got {tcp,#Port<0.598>,"two"}
ok

お分かりでしょうか。 {active, once} を2回目に訊くまで、 "two" というメッセージは変換されませんでした。これはつまり、ソケットはパッシブモードに戻っていたということです。 したがってアクティブワンスモードで、安全にアクティブモードとパッシブモードの切り替えができます。 良いセマンティクスですし、安全でもありますね。

inet には他にも素敵な関数があります。 セマンティクスを読み、現在のホストの情報を取得し、ソケットを検査するなどのことが出来ます。

これでソケットについての話はほぼおしまいです。 そろそろ実践編です。

Note

インターネットの荒野には、たくさんのプロトコルを扱うライブラリがあります。HTTP、0mq、RAW UNIXソケットなどです。 これらすべてに関するライブラリが手に入ります。 しかしながら、標準のErlangのディストリビューションでは、主な2つのオプションであるTCPソケットとUDPソケットに関するライブラリが付いてきます。 いくつかHTTPサーバやHTTPをパースするコードも付いてきますが、これらは手に入るものの中では最も効率が良いものではありません。

../_images/take-a-break3.png

26.6. Sockserv、再登場

この章ではこれ以降新しいコードはあまりご紹介しません。 代わりに、前の章のProcess Questで書いた sockserv サーバをもう一度おさらいしようと思います。 これは完全に実行可能なサーバで、どのようにしてgen_server内のOTP監視ツリーでTCP接続を捌くのかを観ていきたいと思います。

TCPサーバを素直に実装すると次のようになるでしょう:

-module(naive_tcp).
-compile(export_all).

start_server(Port) ->
    Pid = spawn_link(fun() ->
        {ok, Listen} = gen_tcp:listen(Port, [binary, {active, false}]),
        spawn(fun() -> acceptor(Listen) end),
        timer:sleep(infinity)
    end),
    {ok, Pid}.

acceptor(ListenSocket) ->
    {ok, Socket} = gen_tcp:accept(ListenSocket),
    spawn(fun() -> acceptor(ListenSocket) end),
    handle(Socket).

%% Echoing back whatever was obtained
handle(Socket) ->
    inet:setopts(Socket, [{active, once}]),
    receive
        {tcp, Socket, <<"quit", _/binary>>} ->
            gen_tcp:close(Socket);
        {tcp, Socket, Msg} ->
            gen_tcp:send(Socket, Msg),
            handle(Socket)
    end.

これがどのように動作するかを理解するために、次の図が役に立つかもしれません:

../_images/sequential-server.png

start_server 関数はlistenソケットを開き、アクセプターを生成し、それを永遠にアイドル状態にさせておきます。 アイドル化は、listenソケットがそれを開いたプロセスに束縛され、プロセスが接続を処理する間は生かしておけるようにするため、必要です。 各アクセプタープロセスは受け入れる接続を待ちます。 接続が来たら、アクセプタープロセスは同様の新しいプロセスを起動して、listenソケットをそのプロセスと共有します。 新しく立ち上げたプロセスが待受処理をしている間に、アクセプタープロセスはなんらかの処理を行います。 各ハンドラは "quit" で始まるメッセージを受け取って接続が閉じられるまでは、すべてのメッセージを繰り返します。

Note

<<"quit", _/binary>> というパターンは、バイナリ文字列で始めが q, u, i, t の文字で、その後特に気にする必要がないバイナリデータ( _ )が続く、ということを意味します。

Erlangシェルで、 naive_tcp:start_server(8091) というコマンドを入力し、サーバを起動します。 それからtelnetクライアントをlocalhostで開き(念押しになりますが、telnetクライアントは技術的にはRAW TCPのためではなく、サーバをテストするために自分でクライアントを書かなくとも良いクライアントになるから使うのです)、次のような出力が見られると思います:

$ telnet localhost 8091
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hey there
hey there
that's what I asked
that's what I asked
stop repeating >:(
stop repeating >:(
quit doing that!
Connection closed by foreign host.

すばらしいですね。 Poople Inc. という社名の会社を起業して、このようなサーバが載ったソーシャルネットワークサービスをいくつか立ちあげましょう。 モジュール名がそうなっているという以外でも、これは素直な実装です。 コードは単純ですが、並行性が考慮されていません。 もしリクエストが1つ1つ来たら、この素直なサーバが無事に動作するでしょう。 しかし、もし15人が同時にサーバにアクセスしてきて待ち行列ができてしまったらどうなるでしょうか。

その時は1度に1つのリプライしかできませんし、この実装では各プロセスはまず接続を待って、準備をして、新しいアクセプターを生成しなければいけません。 待ち行列内の15番目のリクエストでは、サーバと接続できるかどうかを確認するだけでも、他の14本の接続がすべて確立されていなければなりません。 あなたがプロダクションサーバを扱っている場合、たとえば、おそらく1秒間に500から1000の要求があることでしょう。 今の状態では非実用的です。

私たちは、この直列なワークフローを変更して:

../_images/sequential-accept.png

より並行なものにする必要があります:

../_images/parallel-accept.png

すでに待機状態で待っているアクセプターをたくさん持っておけば、要求に対する応答の多くの遅延が解消できます。 ここでは、他のデモ実装を見ていくのではなく、前の章の sockserv-1.0.1 から学んでいきます。 本物のOTPコンポーネントや実世界の例に基づいたものを調べるほうがいいでしょう。 事実、socksrvの汎用形が、 cowboy (cowboyはsockservより信頼性が高いのは疑いようがないのですが)や etorrent というTorrentクライアントで使われているものと同様です。

このProcess Questのsockservを構築するために、トップダウンで実装していきます。 私たちが構成に必要なのは、多くのワーカを持ったスーパバイザです。 上図のような並行性を考えると、スーパバイザはlistenソケットを持ち、それをすべてのワーカで共有する形になります。そして各ワーカは受け入れ処理を行うのです。

すべてのワーカで物を共有できるようなスーパバイザはどのように書いたらいいのでしょうか。 通常の監視ではそれを実現する方法はありません。たとえ one_for_oneone_for_allrest_for_one の監視のどれを使っていても、すべての子プロセスは各々独立しているのです。 これを受けて反射的にグローバルな状態を持ちたくなります。単にlistenソケットを持ち、ハンドラにそれを渡すだけの登録済みプロセスです。 このような思いつきの実装を堪えて、賢くなりましょう。 フォースを使うのです。(ここでフォースとは スーパバイザの章 を読み返すことができる能力のことを指します) 2分あげますので、この問題を解決する実装方法を考えてみて下さい。(2分は自分で計ってくださいね)

秘訣は simple_one_for_one スーパバイザを使うことにあります。 simple_one_for_one スーパバイザは子プロセスの仕様をすべての子プロセスで共有するので、listenソケットを、それにアクセスするすべての子プロセスに突っ込めがいいだけです!

では、輝かしいスーパバイザをご紹介しましょう:

%%% The supervisor in charge of all the socket acceptors.
-module(sockserv_sup).
-behaviour(supervisor).

-export([start_link/0, start_socket/0]).
-export([init/1]).

start_link() ->
    supervisor:start_link({local, ?MODULE}, ?MODULE, []).

init([]) ->
    {ok, Port} = application:get_env(port),
    %% Set the socket into {active_once} mode.
    %% See sockserv_serv comments for more details
    {ok, ListenSocket} = gen_tcp:listen(Port, [{active,once}, {packet,line}]),
    spawn_link(fun empty_listeners/0),
    {ok, {{simple_one_for_one, 60, 3600},
        [{socket,
         {sockserv_serv, start_link, [ListenSocket]}, % pass the socket!
         temporary, 1000, worker, [sockserv_serv]}
        ]}}.

start_socket() ->
    supervisor:start_child(?MODULE, []).

%% Start with 20 listeners so that many multiple connections can
%% be started at once, without serialization. In best circumstances,
%% a process would keep the count active at all times to insure nothing
%% bad happens over time when processes get killed too much.
empty_listeners() ->
    [start_socket() || _ <- lists:seq(1,20)],
    ok.

では、ここで何が行われているのでしょうか。 標準の start_link/0init/1 関数があります。 sockservに simple_one_for_one 再起動戦略があり、子プロセスの仕様に ListenSocket が渡されているのが分かります。 start_socket/0 で起動したすべての子プロセスは 、この変数をデフォルトで引数として持っています。 すばらしい!

これだけでは十分ではありません。 アプリケーションには要求をなるべく早く処理してもらいたいのです。 このことが、 spawn_link(fun empty_listeners/0) の呼び出しを追加した理由です。 empty_listeners/0 関数は20個のハンドラをロックされ受信接続を待った状態で起動します。 これを単純な理由で spawn_link/1 の中に入れました。スーパバイザプロセスはその init/1 の中にあって、どのようなメッセージにも応答することができないのです。 自分自身をinit関数の中から呼びだそうとした場合、プロセスはデッドロック状態になり、永久に止めることができなくなります。 単純にこのことから、外部プロセスが必要になります。

Note

上のスニペットで、 {packet, line} というオプションをgen_tcpに渡したことにお気づきかもしれません。 このオプションは、受信したパケットすべてを改行ごとに分割し、それに基づいてキューに貯めるためのものです。(行末も受信した文字列の一部です。) これは私たちの例でtelnetクライアントとうまくやり取りするときに便利です。

ようやく、ややこしい部分はすべて書き終わりました。 これでワーカ自身の実装に集中できます。

前の章のProcess Questの節を思い出すと、このように事が進んでいました:

  1. ユーザがサーバに接続する
  2. サーバがキャラクター名を訊く
  3. ユーザはキャラクター名を送る
  4. サーバがステータスを提案する
    1. ユーザが提案されたステータスを拒否した場合は4に戻る
    2. ユーザが提案されたステータスを受け入れた場合は6に進む
  5. ゲームはプレーヤーにイベントを送る。7になるまで続ける。
  6. ユーザはサーバに quit を送るかソケットが強制的に閉じられる

つまり、2種類の入力をサーバプロセスにおこなうことになります。Process Questアプリケーションからやってくる入力値とユーザからやってくる入力値です。 ユーザからくるデータは、ソケットからやってきて、gen_serverの handle_info/2 関数で処理されます。 Process Questからやってくるデータは、私たちが制御する方法で送られ、 handle_cast で処理される投入されたデータはそこで解釈されます。 まず、サーバを起動しなければいけません:

-module(sockserv_serv).
-behaviour(gen_server).

-record(state, {name, % player's name
                next, % next step, used when initializing
                socket}). % the current socket

-export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         code_change/3, terminate/2]).

一番最初はかなり標準的なgen_serverコールバックモジュールです。 ここで唯一特別なのは、キャラクター名、ソケット、 next というフィールドです含む状態です。 next の部分はcatch-allフィールドのような、サーバの状態に関連する一時的な情報を保存する場所です。 gen_fsmは特に問題はなく使われていると思います。

実際のサーバの起動には次のようなコードを書きます:

-define(TIME, 800).
-define(EXP, 50).

start_link(Socket) ->
    gen_server:start_link(?MODULE, Socket, []).

init(Socket) ->
    %% properly seeding the process
    <<A:32, B:32, C:32>> = crypto:rand_bytes(12),
    random:seed({A,B,C}),
    %% Because accepting a connection is a blocking function call,
    %% we can not do it in here. Forward to the server loop!
    gen_server:cast(self(), accept),
    {ok, #state{socket=Socket}}.

%% We never need you, handle_call!
handle_call(_E, _From, State) ->
    {noreply, State}.

上で定義さている2つのマクロ( ?TIME?EXP )が各アクション間の遅延(800ミリ秒)と次のレベルに到達するために必要な経験値(50、各レベルごとに倍増していきます)の基準値を設定する特別なパラメータです。

start_link/1 関数がソケットを引数に取っていることに気がつくでしょう。 これは sockserv_sup から渡されたlistenソケットです。

乱数種に関する最初の部分は、プロセスが後ほどキャラクタのステータスを生成する上で必要な乱数種が、適切に与えられるようにするためのものです。 そうしないと、あるデフォルト値が多くのプロセスで使われてしまい、これは望んだ状況になっていません。 乱数を使うライブラリからではなく、 init/1 関数内でおkの初期化を行った理由は、乱数種はプロセスレベルで行われていて(ちくしょう!ミュータブルな状態だ!)、ライブラリを呼び出すたびに新しい乱数種を作りたくないからです。

どのような場合でも、ここで本当に重要な部分はメッセージを自分自身に投入しているところです。 その理由は gen_tcp:accept/1-2 が操作をブロックしていて、これはすべての init 関数が同期だという事実に紐付いています。 もし接続の受け入れに30秒掛かった場合、プロセスを起動するスーパバイザも30秒ロックされます。 したがって、メッセージを自分自身に投入して、listenソケットを状態の state フィールドに追加します。

Don’t Drink Too Much Kool-Aid:

他人の書いたコードを読むと、 random:seed/1now() の結果を使って呼んでいるのをしばしば見るかもしれません。 now() は単調な時間を返す(常に増加していて、決して二度と同じ値になることはない)ため、便利な関数です。 しかし、Erlangで使われている乱数アルゴリズムにとっては悪い乱数種です。 こうした理由から、 crypto:rand_bytes(12) (R14B03以上では crypto:strong_rand_bytes(12) )を使って、12桁の安全に暗号化された乱数のバイト列を生成したほうがいいでしょう。 <<A:32, B:32, C:32>> とすることで、12バイトを3つの整数に変換することができます。

その接続を受け入れる必要があります。いろいろといじってみましょう:

handle_cast(accept, S = #state{socket=ListenSocket}) ->
    {ok, AcceptSocket} = gen_tcp:accept(ListenSocket),
    %% Remember that thou art dust, and to dust thou shalt return.
    %% We want to always keep a given number of children in this app.
    sockserv_sup:start_socket(), % a new acceptor is born, praise the lord
    send(AcceptSocket, "What's your character's name?", []),
    {noreply, S#state{socket=AcceptSocket, next=name}};

接続を受け入れて、(常に新しい接続を扱うために20個のアクセプターを用意しておくために)代わりのアクセプターを起動し、それからacceptソケットを ListenSocket の代わりとして保存し、次にソケットを通じて受け取るメッセージは’next’フィールドに入っている名前(name)に関するものだと記しておきます。

次に進む前に、次のような形で send 関数を通じてクライアントに質問を送信します:

send(Socket, Str, Args) ->
    gen_tcp:send(Socket, io_lib:format(Str++"~n", Args)),
    inet:setopts(Socket, [{active, once}]),
    ok.

なんとも巧妙です! メッセージを受信したあとは常にリプライしなければいけないと強く設定したため、関数内にアクティブワンスのルーティンを書き、また改行も追加しました。 単に怠けただけです。

手順1と2を終えたので、次はソケットから来るユーザの入力を待たなければいけません:

handle_info({tcp, _Socket, Str}, S = #state{next=name}) ->
    Name = line(Str),
    gen_server:cast(self(), roll_stats),
    {noreply, S#state{name=Name, next=stats}};

文字列 Str は何が入っているかまったくわかりません。しかし、状態の中にある next フィールドが私たちが受け取ったものはなんでも名前だと教えてくれるので、問題ありません。 デモアプリケーションではユーザがtelnetを使うことを想定していたので、受け取るテキストは必ず行末を含んでいます。 次のように定義された line/1 関数は行末を取り除きます:

%% Let's get rid of the white space and ignore whatever's after.
%% makes it simpler to deal with telnet.
line(Str) ->
    hd(string:tokens(Str, "\r\n ")).

一度名前を受け取ったら、それを保存して、プレーヤーのステータスを生成するために自分自身にメッセージ( roll_stats )を投入し、次の手順に進みます。

Note

サンプルファイルを見ると、メッセージ全体をパターンマッチする代わりに、短い ?SOCK(Var) というマクロを使っているのが分かると思います。 このマクロは ~define(SOCK(Msg), {tcp, _Port, Msg}) と定義されていて、私のような怠け者が素早く少ないタイプ量で文字列をパターンマッチする方法です。

ステータス一覧は handle_cast 節に戻ってきます:

handle_cast(roll_stats, S = #state{socket=Socket}) ->
    Roll = pq_stats:initial_roll(),
    send(Socket,
         "Stats for your character:~n"
         "  Charisma: ~B~n"
         "  Constitution: ~B~n"
         "  Dexterity: ~B~n"
         "  Intelligence: ~B~n"
         "  Strength: ~B~n"
         "  Wisdom: ~B~n~n"
         "Do you agree to these? y/n~n",
         [Points || {_Name, Points} <- lists:sort(Roll)]),
    {noreply, S#state{next={stats, Roll}}};
../_images/dice.png

pq_stats モジュールはステータス一覧を展開する関数を含んでいて、すべての節はステータスを表示するためだけに使われます。 ~B 書式パラメータは整数を表示したいということを意味しています。 状態の next の部分はここで上書きされます。 ユーザにステータスを受け入れるかどうかを聞いているので、ユーザが応えるのを待たなければならず、その後そのステータスを捨てて新しいものを生成するか、またはすぐ後で触れるProcess Questのキャラクターに渡すかしなければいけません。

今度は handle_info 関数内で、ユーザの入力を聞いてみましょう:

handle_info({tcp, Socket, Str}, S = #state{socket=Socket, next={stats, _}}) ->
    case line(Str) of
        "y" ->
            gen_server:cast(self(), stats_accepted);
        "n" ->
            gen_server:cast(self(), roll_stats);
        _ -> % ask again because we didn't get what we wanted
            send(Socket, "Answer with y (yes) or n (no)", [])
    end,
    {noreply, S};

キャラクターを直接関数節の中で起動したくなる衝動に駆られるでしょうが、私はそうしないことに決めました。なぜなら handle_info はユーザの入力を扱うもので、 handle_cast はProcess Questに関するものを扱うものだからです。 責任範囲の区別です! ユーザがステータスを拒否したら、単純に roll_stats を再度呼び出します。 何も新しいことはありません。 ユーザがステータスを受け入れたら、Process Questのキャラクターを起動して、Process Questからイベントが送られてくるのを待つことができます:

%% The player has accepted the stats! Start the game!
handle_cast(stats_accepted, S = #state{name=Name, next={stats, Stats}}) ->
    processquest:start_player(Name, [{stats,Stats},{time,?TIME},
                                     {lvlexp, ?EXP}]),
    processquest:subscribe(Name, sockserv_pq_events, self()),
    {noreply, S#state{next=playing}};

これらが私がゲーム用に定義した、通常の関数呼び出しです。 プレーヤーを起動して、 sockserv_pq_events イベントハンドラを使ってイベントをサブスクライブします。 次の状態は playing です。この状態は、取得されるすべてのメッセージはゲームからだけはなく他のものからも受け取るという状態です:

%% Events coming in from process quest
%% We know this because all these events' tuples start with the
%% name of the player as part of the internal protocol defined for us
handle_cast(Event, S = #state{name=N, socket=Sock}) when element(1, Event) =:= N ->
    [case E of
        {wait, Time} -> timer:sleep(Time);
        IoList -> send(Sock, IoList, [])
     end || E <- sockserv_trans:to_str(Event)], % translate to a string
    {noreply, S}.

これがどのように動作するかを細かく説明はしません。 sockserv_trans:to_str(Event) はあるゲームイベントをIOリストのリストか、イベント間の遅延を表す {wait, Time} タプルに変換する、ということだけ知っておいてください。(敵が落としたアイテムが何かを表示する直前に executing a ... というメッセージを表示します。)

従うべき手順のリストを思い出すと、メッセージは1つを除いてすべて変換していました。 その例外はユーザが辞めたい時にその旨を伝えるメッセージです。 次の節を handle_info の先頭に追加します:

handle_info({tcp, _Socket, "quit"++_}, S) ->
    processquest:stop_player(S#state.name),
    gen_tcp:close(S#state.socket),
    {stop, normal, S};

キャラクターを停止して、ソケットを閉じ、プロセスを終了します。 やりました。 他にゲームを辞める理由としては、TCPソケットがクライアントによって閉じられた、という場合があります:

handle_info({tcp_closed, _}, S) ->
    {stop, normal, S};
handle_info(E, S) ->
    io:format("unexpected: ~p~n", [E]),
    {noreply, S}.

また、不明なメッセージを処理する節も追加しました。 ユーザが予期しないなにかを入力した場合にクラッシュさせたくはありません。 あと terminate/2 関数と code_change/3 関数だけがまだ実装されていません:

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

terminate(normal, _State) ->
    ok;
terminate(_Reason, _State) ->
    io:format("terminate reason: ~p~n", [_Reason]).

以上すべてを実装すれば、このファイルのコンパイルを試して、先ほどのリリース内にある対応するbeamファイルをいまコンパイルしたものに置き換えて、ちゃんと動作するか確認してみましょう。 ちゃんとコピーしていれば(あるいは私がやってみる場合も)、動くはずです。

26.7. これからどこに向かおう

もし受け入れてくれるなら、次の課題は、さらにいくつかの好きなコマンドをクライアントに追加することです。たとえば、しばらくの間アクションをキューに貯めて、サーバをレジュームさせたら一度にそれらのコマンドを表示する「一時停止」のような機能を追加するのはどうでしょうか。 あるいは、あなたがイケてると思うなら、これまでのレベルやステータスを sockserv_serv モジュールに記録して、クライアント側か履歴を取得するコマンドを追加するのはどうでしょうか。 私はいつもは読者に演習を出すのは嫌いなのですが、ときどきあちこちに課題を出してみたくなる誘惑に駆られます。なので、この演習は楽しんで下さい!

あるいは、既存のサーバ実装のソースコードを読んでみたり、自分でサーバなどを実装してみるのも良い演習ですね。 アマチュアがWebサーバを書いてみるなどという事をする言語というのは珍しいのですが、Erlangはその数少ない中の1つです。 数少ない実践で、才能となるでしょう。 Erlangが外界とやり取りすることは、便利なソフトウェアを書くことに向けた長い道のりの1歩に過ぎません。