../_images/clock.png

15. 並列アプリケーションを設計する

すべていい感じです。概念は理解されたと思います。 が、再度いいますが、この本の最初からやってきたことは全部おもちゃです: 計算機、ツリー、ヒースローからロンドン、などなど。 そろそろ何か楽しくてもっと役に経つことをすることです。 並列Erlangで小さいアプリケーションを書きましょう。 アプリケーションは小さく行を基にしているけれど、それでいて役立つしそれなりに拡張できるものになります。

私はいくらか考えが体系立っていない人間です。 宿題を忘れたり、アパートの周りですることを忘れたり、この本や、仕事、ミーティング、約束などなど良く忘れます。 結局タスクであふれた何十ものリストを書く羽目になって、それでもなおやり忘れたり、見直しを忘れたりします。 希望的にはあなたにもすべきことのリマインダーが必要であってほしいです。(しかしあなたの場合は私ほど欲しいとは思ってないと思いますが) なぜなら、これから、すべきことや予定をお知らせするイベントリマインダーアプリケーションを書こうとしているからです。

15.1. 問題を理解する

まず最初のステップは自分がいったい何をしようとしているのかを知ることです。 あなたは「リマインダーアプリケーションだ」と言うでしょう。私も「もちろん」と答えます。’ しかしもっと言うことはあります。どうやってアプリケーションとやり取りすればいいのでしょうか。 アプリケーションに何をしてもらいたいのでしょうか。 どうやってプログラムをプロセスの形に落とせるのでしょうか。 どのようなメッセージを送るべきかどうやって知るのでしょうか。

引用をすると「水の上を歩くのも使用からソフトウェアを開発するのも簡単だ。それが固まってさえいれば。」 というわけで、仕様を決めていきましょう。これから作る小さなソフトウェアでは次のようなことができます:

  • イベントを追加する。イベントには締切り(警告する時間)、イベント名、詳細が含まれます。
  • イベントの時間が来たら警告する
  • イベント名でイベントをキャンセルする
  • 永続的なディスク保存先を持たない。これは今回見ようとしているのアーキテクチャの概念を示すのに必要ありません。これは実際のアプリケーションを作るなら全然ダメなことですが、代わりにもし実装したくなったらその機能はどこに挿入されるかを示して、ちょっとだけその実装の手助けになる関数も示します。
  • 永続化保存先がないとした場合、実行中にコードを更新しなければいけません。
  • ソフトウェアとのやりとりはコマンドライン経由で行われますが、あとでこれは拡張できるので、他の方法でも行えます。(たとえばGUI、Webページアクセス、IMソフト、メールなど)

ここに、やることに決めたプログラムの構造を示します:

../_images/reminder-structure.png

ここで、クライアント、イベントサーバ、x、y、zはすべてプロセスです。 次にそれぞれの役目を示します:

イベントサーバ

  • クライアントからのサブスクライブを受け取る
  • イベントプロセスから出された通知を各サブスクライバに転送する
  • イベントを追加するためにメッセージを受け取る(必要なx,y,zプロセスを起動する)
  • イベントのキャンセルをするメッセージを受け取けとって、イベントプロセスを殺すことが出来る
  • クライアントから終了できる
  • シェルから自分自身のコードを再読込出来る

クライアント

  • イベントサーバをサブスクライブして、メッセージとして通知を受ける。 こうすることで、すべてイベントサーバをサブスクライブしている多くのクライアントを設計するのが簡単になります。 各クライアントは潜在的に、前述した異なる(GUI、Webページ、IMソフト、メールなど)のゲートウェイと成り得ます。
  • サーバにイベントを詳細情報とともに追加するように依頼する
  • サーバにイベントをキャンセルするように依頼する
  • サーバを(落ちたかどうか知る為に)監視する
  • 必要があればイベントサーバを終了する

x,y,z

  • 発報が待たれる通知を表す(きほ天気にはイベントサーバにリンクされたタイマーに過ぎません)
  • 時間が来たらイベントサーバにメッセージを送る
  • キャンセルのメッセージを受け取って死ぬ

すべてのクライアント(IM、メールなど、この本で実装されていないもの)はすべてイベントの通知を受けて、キャンセルはクライアントを警告するものではない、ということに注意してください。 ここで、ソフトウェアはあなたと私の為に書かれていて、1ユーザのみが実行することを想定しています。

ここに、考えられるメッセージをすべて記載したより複雑なグラフを用意しました:

../_images/reminder-bubbles-and-arrows.png

これは考えられるすべてのプロセスを示しています。すべての矢印を書いて、そのメッセージを追加することで、高次元でのプロトコルあるいは少なくともそのスケルトンを書いたことになります。

通知されるイベントごとにプロセスを立てることは過剰になって実際のアプリケーションではスケールしづらいだろうということに気づくべきです。 しかしながら、ここでつくろうとしているのはたった1人のユーザの為のものなので、これで十分です。 別の方法としては timer:send_after/2-3 というような関数を使って、たくさんのプロセスをspawnすることを避けるやり方があります。

15.2. プロトコルを決める

もう、各コンポーネントが何をすべきで、何をやり取りするか知っているので、送信されるすべてのメッセージの一覧を作って、どういうメッセージにするかを決めるのがいいでしょう。 まず最初にクライアントとイベントサーバとのやりとりを見てみましょう。

../_images/reminder-subscribe.png

ここでは2つのモニターを使うことに決めました。その理由は、クライアントとサーバには特に依存関係が見当たらないからです。 つまり、もちろんクライアントはサーバなしには動かないですが、サーバはクライアントなしでも動作出来るからです。 ここではリンクも使えますが、多くのクライアントを使って拡張したいため、サーバが死んだ時に他のクライアントがすべてクラッシュしてしまうようなことになってほしくないのです。 またクライアントが本当にシステムプロセスになってしまうことも想定していないので、サーバが死んだ時に終了を捕捉できません。 次のメッセージです:

../_images/reminder-add.png

これはイベントをイベントサーバに追加します。その確認が、何かおかしなことが起きない限り ok アトムの形で送り返されます。( TimeOut が間違ったときのフォーマットとして送られるでしょう) 逆の操作である、イベントの削除は次のように行われます:

../_images/reminder-remove.png

イベントサーバはそのあとイベントの時間が来たときに通知を送ります:

../_images/reminder-cs-done.png

あとは、サーバの終了とサーバがクラッシュした場合のたった2つだけ次のようなものが必要です:

../_images/reminder-shutdown.png

サーバが死んだ時には直接確認することはありません。 なぜならモニターがすでにそのことを警告してくれているからです。 以上がイベントサーバとクライアントの間で起こることの全てです。 ではイベントサーバとイベントプロセス簡でのメッセージについて見てみましょう。

次に行く前に注意しておくことは、イベントサーバとイベントをリンクしておくととても便利だということです。 その理由は、サーバが死んだ時にはすべてのイベントが死んでほしいからです: イベントはサーバなしに存在することがありえないからです。

ではイベントの話に戻りましょう。イベントサーバがイベントを起動させたときに、各イベントに特別な識別子を割り振ります。(イベント名です) イベントの内どれか1つ期限がきたら、そのことを伝えるメッセージを送る必要があります:

../_images/reminder-es-done.png

一方で、イベントはイベントサーバからのキャンセルの通知が来ないか見ている必要があります:

../_images/reminder-cancel.png

異常です。最後に我々のプロトコルに一つだけ必要なものがあります。 それはサーバのアップグレードをするものです:

../_images/reminder-code-change.png

返信は必要ありません。実際のプログラムでその機能を見て、ちゃんと出来ているか確認するからです。

プロトコルも決まって、プロセスの階層構造がどうなっているかの一般的なアイデアも分かったので、いよいよ実際にプロジェクトを開始出来ます。

15.3. 土台を置く

../_images/cement.png

始めるために、標準的なErlangのディレクトリ構造を置きます。このような見た目になります:

ebin/
include/
priv/
src/

ebin/ ディレクトリはコンパイルされたファイルが置かれる場所です。 include/ ディレクトリは .hrl ファイルが置かれる場所で、 .hrl ファイル自体は他のアプリケーションにインクルードされます。プライベートな .hrl ファイルは通常 src ディレクトリの中に置かれます。 priv/ ディレクトリはErlangとやり取りする必要がある実行ファイル、たとえば特定のドライバなどに使われます。 このプロジェクトでは実際にはこのディレクトリは使いません。 最後の一つは src/ ディレクトリで、すべての .erl ファイルが置かれます。

標準的なErlangプロジェクトでは、このディレクトリ構造はとても小さいです。 conf/ ディレクトリが特定の設定ファイルのために追加されますし、 doc/ ディレクトリはドキュメントに、 lib/ ディレクトリはアプリケーションを実行するのに必要なサードパーティライブラリに使われます。 世に出回っているErlangプロジェクトはそれぞれに違う名前のディレクトリを使っていたりしますが、最初に挙げた4つに関しては 標準的なOTPの慣例 の一部なのでほぼ一緒です。

15.4. イベントモジュール

src/ ディレクトリに入って、 event.erl モジュールを作りはじめましょう。これは先ほど描いたx,y,zイベントを実装してます。 このモジュールから書き始めるのは、最も依存が少ないからです: これはイベントサーバやクライアント関数の実装がなくても実行してみることが出来ます。

実際にコードを書く前に、プロトコルだけでは完全でないことについて触れなければいけません。 プロトコルはどんなデータがプロセスからプロセスに送信されるかはうまく表してくれますが、その複雑さは書いてくれません: イベントの指定の仕方がどう動作するか、参照を使うのか、名前を使うのかなどです。 たいていのメッセージは {Pid, Ref, Messages} の形でラップされます。 ここで Pid は送信元で、 Ref は一意のメッセージ識別子で、どの返信が誰から送られてきたかを知る手助けになります。 返信を探す前に多くのメッセージを送ろうとしたときに、参照がなければどの返信がどのメッセージに対して行われたのかわかりません。

では始めましょう。 event.erl のコードを走らせるこのプロセスの核は、関数 loop/1 でしょう。 これは、プロセスを覚えている方なら、つぎのようなスケルトンコードになると分かるでしょう:

loop(State) ->
receive
    {Server, Ref, cancel} ->
        ...
after Delay ->
    ...
end.

ここでは、イベントを通知するためにサポートしなければ行けないタイムアウトがErlang項になっていて、サーバがイベントのキャンセルを送る方法が示されています。 loop 内に State という変数があるのに気がつくでしょう。 State 変数はタイムアウト時間を表すデータ(秒数)や、イベントの名前( {done, Id} というようなメッセージを送るため)を含んでいる必要があります。 また State 変数は通知を送るためにイベントサーバのpidも知っている必要があります。

これがループの状態として保持されるべきものの全てです。では state レコードをファイルの先頭で宣言しましょう:

-module(event).
-compile(export_all).
-record(state, {server,
                name="",
                to_go=0}).

state が定義されたので、 loop をもうちょっと細かく定義出来るでしょう:

loop(S = #state{server=Server}) ->
    receive
        {Server, Ref, cancel} ->
            Server ! {Ref, ok}
    after S#state.to_go*1000 ->
        Server ! {done, S#state.name}
    end.

ここで1000倍しているのは to_go の値を秒からミリ秒に変換するためです。

Don’t drink too much Kool-Aid:

これから先に言語の欠点があります!変数 'Server' を関数の先頭で束縛したのは、receive節でパターンマッチをするときに使われるからです。 思い出してください、レコードはハックなのです! S#state.server という式は暗黙のうちに element(S, 2) に展開されます。これはパターンマッチするのに適切なパターンではありません。

これはafters部の S#state.to_go に対してもうまく動作します。その理由はあとで評価できる表現として残っているからです。

では loop を試してみましょう:

6> c(event).
{ok,event}
7> rr(event, state).
[state]
8> spawn(event, loop, [#state{server=self(), name="test", to_go=5}]).
<0.60.0>
9> flush().
ok
10> flush().
Shell got {done,"test"}
ok
11> Pid = spawn(event, loop, [#state{server=self(), name="test", to_go=500}]).
<0.64.0>
12> ReplyRef = make_ref().
#Ref<0.0.0.210>
13> Pid ! {self(), ReplyRef, cancel}.
{<0.50.0>,#Ref<0.0.0.210>,cancel}
14> flush().
Shell got {#Ref<0.0.0.210>,ok}
ok

確認することがたくさんあります。まず最初に、レコードをイベントモジュールから rr(Mod) でインポートしています。 その後、シェルをサーバとして(self())イベントループをspawnします。 このイベントは5秒後に発報されます。9行目の式は3秒後に、10行目は6秒後に行われました。 2回目の flush() では {done, "test"} というメッセージを受け取ったのが分かると思います。

直後に、キャンセル機能を試して見ました。(余裕を持って500秒と入力しました) 参照を作成して、メッセージを送って、同じ参照を持った返信を取得しているのを確認出来ると思います。 これで受け取った ok はこのプロセスから送られていて、システム上の他のプロセスから送られたのではないとわかります。

キャンセルメッセージが参照と一緒にラップされていて、 done メッセージそうでない理由は、単純に適当な場所からキャンセルが送信されてくる事を期待していない上に、それに対して返信したくないからです。(もしどんな場所からでも送れてしまうと、 receive でマッチしなくなってしまいます) もう1つ事前にテストしておきたいものがあります。イベントが来年起きるとしたらどうでしょうか。

15> spawn(event, loop, [#state{server=self(), name="test", to_go=365*24*60*60}]).
<0.69.0>
16>
=ERROR REPORT==== DD-MM-YYYY::HH:mm:SS ===
Error in process <0.69.0> with exit value: {timeout_value,[{event,loop,1}]}

あらら、実装の限界に当たったようです。Erlangのタイムアウト値はミリ秒でおよそ50日に制限されていることがわかりました。これはさほど重大ではありませんが、このエラーを3つの理由からお見せしました。

このエラーは、モジュールを書いていてテストしているときに、章の半ばくらいまできてケツを噛みました。 Erlangはすべてのタスクにおいて完璧というわけではなく、ここで見たエラーはErlangの実装者が想定していない方法でタイマーを使った結果です。 これは実際には問題ではありません。ワークアラウンドを実施してみましょう。 このエラーに適用することに決めた修正方法は、タイムアウト値が長すぎると分かった場合にタイムアウト値を複数に分ける関数を書く、というものです。 これは loop/1 関数にもいくつかの手助けをしてもらう必要があります。 以上から、タイムアウト時間を分割するときは基本的には49日間(制限が50日間のため)で等しく分割して、残りを等しく分割する、という方法になります。 これら分割された秒のリストの合計値は元の時間になります:

%% Because Erlang is limited to about 49 days (49*24*60*60*1000) in
%% milliseconds, the following function is used
normalize(N) ->
    Limit = 49*24*60*60,
    [N rem Limit | lists:duplicate(N div Limit, Limit)].

関数 lists:duplicate/2 は式を第1引数にとり、第2引数の値でそれを何度も再生成します。 normalize/198*24*60*60+4 を送った場合は、 [4,4233600,4233600] が返ってきます。 loop/1 関数は新しいフォーマットを受け付けるために次のような見た目になります:

%% Loop uses a list for times in order to go around the ~49 days limit
%% on timeouts.
loop(S = #state{server=Server, to_go=[T|Next]}) ->
    receive
        {Server, Ref, cancel} ->
            Server ! {Ref, ok}
    after T*1000 ->
        if Next =:= [] ->
            Server ! {done, S#state.name};
        Next =/= [] ->
            loop(S#state{to_go=Next})
        end
    end.

試してみてください。動作結果の見た目は普通ですが、今度はタイムアウト値を何年にも出来ます。 これは to_go リストの最初の要素を取り出してその時間いっぱい待機します。それが終わったら to_go リストの次の要素が適用されます。もし空であれば、タイムアウトは終わりサーバーはその通知を受けます。 そうでなければループはリストに要素がある限り続きます。これが新しい実装の内容です。

イベントプロセスが開始するたびに event:normalize(N) のような関数を手で呼び出すのはじれったいです。 特に先程のワークアラウンドは我々のコードを使うプログラマにとって気にするべきことではありません。 標準的な方法としては代わりに、 loop 関数をうまく動作させるのに必要なすべてのデータを初期化する init 関数を書くことです。 いままさにその問題にぶつかっているので、標準的な startstart_link 関数を追加します:

start(EventName, Delay) ->
    spawn(?MODULE, init, [self(), EventName, Delay]).

start_link(EventName, Delay) ->
    spawn_link(?MODULE, init, [self(), EventName, Delay]).

%%% Event's innards
init(Server, EventName, Delay) ->
    loop(#state{server=Server,
                name=EventName,
                to_go=normalize(Delay)}).

これでインターフェースがぐっと綺麗になりました。テストはしてませんが、 cancel というメッセージを1つだけ送ればよいというのはかなりいいですね。それ専用のインターフェース関数も書きましょう:

cancel(Pid) ->
    %% Monitor in case the process is already dead
    Ref = erlang:monitor(process, Pid),
    Pid ! {self(), Ref, cancel},
    receive
        {Ref, ok} ->
            erlang:demonitor(Ref, [flush]),
            ok;
        {'DOWN', Ref, process, Pid, _Reason} ->
            ok
    end.

おや!新しいトリックです!ここでプロセスが居るかどうかを確認するためにモニターを使っています。 もしプロセスがすでに死んでいたら、無駄な時間を過ごさずに ok をプロトコルで定義された通りに返します。 プロセスが参照とともに返信してきた場合には、そのプロセスはすぐに死ぬとわかります: その参照を気にする必要がない場合はメッセージを受け取るのを避けるために参照を削除します。 flush オプションも与えていることに注目してください。これはモニターを辞めるまでの間に DOWN メッセージが送られた場合に、それらをパージします。

テストしてみましょう:

17> c(event).
{ok,event}
18> f().
ok
19> event:start("Event", 0).
<0.103.0>
20> flush().
Shell got {done,"Event"}
ok
21> Pid = event:start("Event", 500).
<0.106.0>
22> event:cancel(Pid).
ok

うまく動きました!イベントモジュールで煩わしい部分として最後にのこったのは、時間を秒で入力しなければならないということです。 Erlangの日時({{Year, Day, Month}, {Hour, Minute, Second}})のような標準的なフォーマットを使えるならそのほうがずっとよさそうです。 次の関数を追加して、コンピュータの現在時刻から指定した時刻までの時間を計算するようにしましょう:

time_to_go(TimeOut={{_,_,_}, {_,_,_}}) ->
    Now = calendar:local_time(),
    ToGo = calendar:datetime_to_gregorian_seconds(TimeOut) -
           calendar:datetime_to_gregorian_seconds(Now),
    Secs = if ToGo > 0  -> ToGo;
              ToGo =< 0 -> 0
           end,
    normalize(Secs).

calendarモジュール はかなりファンキーな関数名を持っています。 上で書いたように、この関数は現在時刻とイベントが発報される時刻間の秒数を計算します。 もしイベントが過去なら、代わりに 0 を返すようにして、できるだけ早くサーバに通知するようにします。 init 関数を修正して、この関数を normalize/1 関数の代わりに呼び出すようにしましょう。 変数をより記述的にしたければ Delay 変数を DateTime と名前を付け直してもいいでしょう:

init(Server, EventName, DateTime) ->
    loop(#state{server=Server,
                name=EventName,
                to_go=time_to_go(DateTime)}).

これが終わったので一段落です。新しいイベントを始めましょう。1パイント(0.5リットル)のミルクあるいはビールを飲んで、イベントメッセージが時刻通りに飛んでくるか確認に戻ってきましょう。

15.5. イベントサーバ

イベントサーバ にとりかかりましょう。プロトコルによれば、スケルトンはこんな感じになるべきです:

-module(evserv).
-compile(export_all).

loop(State) ->
    receive
        {Pid, MsgRef, {subscribe, Client}} ->
            ...
        {Pid, MsgRef, {add, Name, Description, TimeOut}} ->
            ...
        {Pid, MsgRef, {cancel, Name}} ->
            ...
        {done, Name} ->
            ...
        shutdown ->
            ...
        {'DOWN', Ref, process, _Pid, _Reason} ->
            ...
        code_change ->
            ...
        Unknown ->
            io:format("Unknown message: ~p~n",[Unknown]),
        loop(State)
    end.

必要な返信を前と同じ {Pid, Ref, Message} の形式にラップしたことに気がついたかもしれません。 サーバは状態として2つのものを保持する必要があります: サブスクライブしているクライアントのリストとspawnしたすべてのイベントプロセスのリストです。 通知を受けたら、プロトコルによると、イベントが終わったらイベントサーバは {done, Name} を受け取って、 {done, Name, Description} を送ります。 これには必要なトラフィックをなるべく小さくして、イベントプロセスには本当に必要なものだけ気にするようにさせるという考えがあります。 で、クライアントのリストとイベントのリストの話です:

-record(state, {events,    %% list of #event{} records
                clients}). %% list of Pids

-record(event, {name="",
                description="",
                pid,
                timeout={{1970,1,1},{0,0,0}}}).

loop は先頭でレコード定義を使います:

loop(S = #state{}) ->
    receive
        ...
    end.

イベントとクライアントのリストが orddict だとよさそうです。 同時に何百ものイベントやクライアントを持つことはなさそうです。 データ構造の章を思い出すと、 orddict がこの要件にとても良く合いそうです。 これに対応するために init 関数を書きます:

init() ->
    %% Loading events from a static file could be done here.
    %% You would need to pass an argument to init telling where the
    %% resource to find the events is. Then load it from here.
    %% Another option is to just pass the events straight to the server
    %% through this function.
    loop(#state{events=orddict:new(),
                clients=orddict:new()}).

スケルトンと初期化が終わったので、メッセージを1つづつ実装していきます。 最初のメッセージはサブスクライブに関するものです。イベントが終わったときに通知しなければいけないので、すべてのサブスクライバのリストを保持したいです。 また、前述のプロトコルではそれらをモニターすると言っています。 クラッシュしたクライアントを保持して、意味なく役立たずなメッセージを送りたくないので、これは理解できます。 とにかく、こんな感じになるはずです:

{Pid, MsgRef, {subscribe, Client}} ->
    Ref = erlang:monitor(process, Client),
    NewClients = orddict:store(Ref, Client, S#state.clients),
    Pid ! {MsgRef, ok},
    loop(S#state{clients=NewClients});
../_images/rss.png

loop/1 内でこの節では、モニターを起動して、クライアント情報を orddictRef をキーとして保存しています。 この理由は単純です: モニターの EXIT メッセージを受け取って、クライアントIDが必要な時が来るからです。モニターからのメッセージには参照が含まれています。(その参照によって orddict のエントリを削除出来ます)

次に考えるべきメッセージはイベントを追加する部分のメッセージです。 エラーステータスを返すことが出来ます。唯一検証するのは受け取ったタイムスタンプの確認だけです。 {{Year,Month,Day}, {Hour,Minute,seconds}} の形を受け取るようにするのは簡単ですが、閏年でないのに2月29日のイベントを受け取ったり、存在しない日付を絶対に受け付けないようにしないといけません。 さらに、「5時間マイナス1分75秒」みたいなあり得ない日付も受け付けたくありません。 1つの関数でこれらすべてを対処できます。

最初に作るブロックは関数 calendar:valid_date/1 を使います。 これは、名前のとおり、日付が適切か否かを確認します。 悲しいことにカレンダーモジュールがおかしいのはファンキーな名前を使うことにとどまりません: {H,M,S} が正しい値か確認する関数は実際にはありません。 これも実装しなければいけません。次のようなファンキーな命名規則で作ります:

valid_datetime({Date,Time}) ->
    try
        calendar:valid_date(Date) andalso valid_time(Time)
    catch
        error:function_clause -> %% not in {{D,M,Y},{H,Min,S}} format
            false
    end;
valid_datetime(_) ->
    false.

valid_time({H,M,S}) -> valid_time(H,M,S).
valid_time(H,M,S) when H >= 0, H < 24,
                       M >= 0, M < 60,
                       S >= 0, S < 60 -> true;
valid_time(_,_,_) -> false.

これで valid_datetime/1 関数はメッセージを追加するところで使えます:

{Pid, MsgRef, {add, Name, Description, TimeOut}} ->
    case valid_datetime(TimeOut) of
        true ->
            EventPid = event:start_link(Name, TimeOut),
            NewEvents = orddict:store(Name,
                                      #event{name=Name,
                                             description=Description,
                                             pid=EventPid,
                                             timeout=TimeOut},
                                      S#state.events),
            Pid ! {MsgRef, ok},
            loop(S#state{events=NewEvents});
        false ->
            Pid ! {MsgRef, {error, bad_timeout}},
            loop(S)
    end;

もし時間が適切であれば、新しいイベントプロセスをspawnして、呼び出し元に確認メッセージを送る前に、そのデータをイベントサーバの state に保存します。 もしタイムアウト時間がおかしければ、暗黙のうちにエラーを投げたり、サーバをクラッシュするのではなく、クライアントに通知をします。 追加として名前が衝突してないかとかの他の制限を確認することも出来るでしょう。 (プロトコル仕様書を更新するのをお忘れなく!)

次にプロトコルに定義されているメッセージは、イベントのキャンセルです。 イベントのキャンセルはクライアント側では決して失敗しません。したがって、そこではコードはより簡素になります。 イベントがプロセスの state レコードに存在するかどうかを確認するだけです。 もし存在してたら、定義した event:cancel/1 関数を使ってイベントを殺して、 ok を送ります。 もし見つからなかったら、ちゃんと動作していることをユーザに伝えるだけです – イベントは動作しておらず、これはユーザが望んでいる状況です。

{Pid, MsgRef, {cancel, Name}} ->
    Events = case orddict:find(Name, S#state.events) of
                 {ok, E} ->
                     event:cancel(E#event.pid),
                     orddict:erase(Name, S#state.events);
                 error ->
                     S#state.events
                 end,
    Pid ! {MsgRef, ok},
    loop(S#state{events=Events});

いいねいいね。クライアントからイベントサーバへの自発的なやりとりはすべてカバーされました。 サーバとイベント自身の間でやり取りされる物についての処理を書いていきましょう。 扱うメッセージとしては2つあります: イベントのキャンセル(もう実装されました)とイベントのタイムアウトです。 メッセージは単純に {done, Name} です:

{done, Name} ->
    E = orddict:fetch(Name, S#state.events),
    send_to_clients({done, E#event.name, E#event.description},
    S#state.clients),
    NewEvents = orddict:erase(Name, S#state.events),
    loop(S#state{events=NewEvents});

関数 send_to_clients/2 は名前の通りの事をして、次のように定義されます:

send_to_clients(Msg, ClientDict) ->
    orddict:map(fun(_Ref, Pid) -> Pid ! Msg end, ClientDict).

これはほとんどの loop コードでこうなるべきです。 あと残っているのは異なるステータスメッセージを設定することです: クライアントが落ちた、シャットダウン、コードのアップグレードなどです。こんな感じです:

shutdown ->
    exit(shutdown);
    {'DOWN', Ref, process, _Pid, _Reason} ->
        loop(S#state{clients=orddict:erase(Ref, S#state.clients)});
    code_change ->
        ?MODULE:loop(S);
    Unknown ->
        io:format("Unknown message: ~p~n",[Unknown]),
        loop(S)

最初の事例(シャットダウン)はかなり明示的です。 kill メッセージを受け取ると、プロセスが死にます。 もしディスクに状態を保存したい時は、それを行うコードを追加することが可能です。 より安全な保存/終了セマンティクスを使いたい場合は、すべての追加、キャンセル、完了メッセージで行えます。 ディスクからイベントを読み込むのは init 関数で実行され、読み込んだ時にそれらをspawnします。

'DOWN' メッセージの動作もかなり単純です。これはクライアントが死んだことを意味しているので、 state 内のクライアントのリストから削除します。

Unknown メッセージは、実際の製品アプリケーションではロギングモジュールを使うことになると思いますが、デバッグの目的で io:format/2 で表示されるだけです。

コードの変更メッセージまで来ました。私はかなり面白い部分だと思っていますので、専用の節を作りたいと思います。

15.6. 熱いコードがお好き

ホットコードローディングをするために、Erlangにはコードサーバと呼ばれるものがあります。 コードサーバは基本的にETSテーブルを管理しています。(インメモリデータベーステーブルで、VMネイティブ) コードサーバはメモリ上に1つのモジュールを2つのバージョンで保存でき、両方共同時に実行できます。 モジュールの新しいバージョンは c(Module) でコンパイルした時に自動的にロードされる、 l(Module) でロードする、あるいは code module の中の関数でロードするなどの方法があります。

理解して欲しい概念として、Erlangはローカル呼び出しと外部呼出しの両方があるということがあります。 ローカル呼び出しはexportされていない関数で行われる関数呼び出しです。 一方、外部呼出しはexportされた関数でのみ行われ、 Module:Function(Args) の形式をしています。

VMに2つのバージョンのモジュールがあるとき、すべてのローカル呼び出しはプロセスで現在実行中のバージョンで行われます。 しかしながら、外部呼出しは常にコードサーバ内にある最新版で行われます。 それから、もしローカル呼び出しが外部呼出しの中で行われた場合、それらは最新版で行われます。

../_images/hot-code-loading.png

Erlang内のすべてのプロセス/アクターが自身の状態を変更するために再帰呼び出しをする必要があるとして、アクターは外部再帰呼び出しをすることで全体として最新版をロードすることが出来ます。

Note

プロセスがまだモジュールの第1版を動かしているときに第3版をロードしたら、そのプロセスはVMによって殺されます。このとき、そのプロセスはsupervisorなしの孤児プロセスと認識されたか、自身をアップグレードする方法としてそうされたということです。もし誰も一番古いバージョンを動かしていなかったら、それは単純に捨てられて、新しいバージョンが代わりに保持されます。

モジュールが新バージョンになったときはいつでもメッセージを送るようなシステムモジュールに自分自身を束縛する方法が幾つかあります。 これを行うことで、モジュールの再読込をこのようなメッセージを受け取った時だけ行うようにすることができます。そして、それは MyModule:Upgrade(CurrentState) のような関数で行うことができ、状態のデータ構造を新しいバージョンの仕様によって変更出来ます。 この「サブスクライブ」の処理はOTPフレームワークによって自動的に行われます。それについてはすぐあとで触れます。 いまつくっているリマインダーアプリケーションでは、コードサーバは使わず、代わりにシェルから独自の code_change メッセージをを使って、とても基本的な再読込を行います。 ホットコードローディングを知るにはこれで十分です。 とは言っても、とりあえず一般的な例を見てみましょう:

-module(hotload).
-export([server/1, upgrade/1]).

server(State) ->
    receive
        update ->
            NewState = ?MODULE:upgrade(State),
            ?MODULE:server(NewState);  %% loop in the new version of the module
        SomeMessage ->
            %% do something here
            server(State)  %% stay in the same version no matter what.
    end.

upgrade(OldState) ->
    %% transform and return the state here.

ご覧のとおり、 ?MODULE:loop(S) がこのパターンに当てはまります。

15.7. メッセージを隠せと言ったろう?

メッセージを隠蔽すること!もし他人に自分のコードやプロセス上で何かさせるつもりなら、メッセージをインターフェース関数の中に隠さなければいけません。 次の例はイベントモジュール内で使ったものです:

start() ->
    register(?MODULE, Pid=spawn(?MODULE, init, [])),
    Pid.

start_link() ->
    register(?MODULE, Pid=spawn_link(?MODULE, init, [])),
    Pid.

terminate() ->
    ?MODULE ! shutdown.

私はサーバモジュールを登録(register)することに決めました。 その理由は今のところ同時に一つだけしか稼働するつもりがないからです。 もしリマインダーアプリケーションを多人数でも使えるように拡張するつもりなら、代わりに グローバルモジュールgprocライブラリ を使って名前を登録するのがおすすめです。 このサンプルアプリケーションには、これで十分です。

私たちが書いた最初のメッセージは次に抽象化すべきです: どのようにサブスクライブするかです。 上述した小さなプロトコルあるいは仕様はモニターを呼び出しているので、そこに追加されています。 どこでも、もしサブスクライブメッセージにより返された参照が DOWN メッセージの中にあれば、クライアントはサーバが落ちたとわかります。

subscribe(Pid) ->
    Ref = erlang:monitor(process, whereis(?MODULE)),
    ?MODULE ! {self(), Ref, {subscribe, Pid}},
    receive
        {Ref, ok} ->
            {ok, Ref};
        {'DOWN', Ref, process, _Pid, Reason} ->
            {error, Reason}
    after 5000 ->
        {error, timeout}
    end.

次はイベントの追加です:

add_event(Name, Description, TimeOut) ->
    Ref = make_ref(),
    ?MODULE ! {self(), Ref, {add, Name, Description, TimeOut}},
    receive
        {Ref, Msg} -> Msg
    after 5000 ->
        {error, timeout}
    end.

クライアントに受信されうる {error, bad_timeout} メッセージが転送されるように選択肢たことを注意してください。 また erlang:error(bad_timeout) を上げることによってクライアントをクラッシュさせることにも決めました。 クライアントをクラッシュさせるかエラーメッセージを転送するかはコミュニティでもまだ議論されている内容です。 ここでは代わりのクラッシュ関数を示します:

add_event2(Name, Description, TimeOut) ->
    Ref = make_ref(),
    ?MODULE ! {self(), Ref, {add, Name, Description, TimeOut}},
    receive
        {Ref, {error, Reason}} -> erlang:error(Reason);
        {Ref, Msg} -> Msg
    after 5000 ->
        {error, timeout}
    end.

それからイベントキャンセルもあります。これは名前を引数に取ります:

cancel(Name) ->
    Ref = make_ref(),
    ?MODULE ! {self(), Ref, {cancel, Name}},
    receive
        {Ref, ok} -> ok
    after 5000 ->
        {error, timeout}
    end.

最後はクライアント向けに小さく細かに設定されたもものです。その関数は一定期間すべてのメッセージを蓄積します。もしメッセージが見つかったら、それらは取り出され関数は出来るだけ早く返します。

listen(Delay) ->
    receive
        M = {done, _Name, _Description} ->
            [M | listen(0)]
    after Delay*1000 ->
        []
    end.

15.8. 試運転

これでアプリケーションをコンパイル出来るようになったので、テスト実行をしてみましょう。 物事をより単純にするため、プロジェクトをビルドするのに特定のErlangのMakefileを書きます。 Emakefile という名前のファイルを開いて、プロジェクトのベースディレクトリに置いてください。 ファイルにはErlang項を書いて、Erlangコンパイラがいい感じに .beam ファイルを焼きあげてくれるようにレシピを渡します:

../_images/oven.png
{'src/*', [debug_info,
           {i, "src"},
           {i, "include"},
           {outdir, "ebin"}]}.

これはコンパイラに debug_info をファイルに追加するように指定していて(このオプションはあまり指定しません)、ファイルを src/include/ ディレクトリから探して出力を ebin/ にするようにしています。

これで、コマンドラインに入って erl -make を実行することで、ファイルはすべてコンパイルされて、 ebin/ ディレクトリの中に置かれます。 erl -pa ebin/ を実行してErlangシェルを起動します。 -pa <directory> オプションはErlang VMにモジュールを検索するパスを追加するように指定しています。

他の方法としてはシェルを普通に起動してから make:all([load]) とするやり方です。 これは現在のディレクトリから 'Emakefile' と名前がついたファイルを探して、(変更があれば)再コンパイルして新しいファイルを読み込みます。

さて何千ものイベントを追跡できるようになりました( DateTime 変数を適当な文字列に置き換えて下さい):

1> evserv:start().
<0.34.0>
2> evserv:subscribe(self()).
{ok,#Ref<0.0.0.31>}
3> evserv:add_event("Hey there", "test", FutureDateTime).
ok
4> evserv:listen(5).
[]
5> evserv:cancel("Hey there").
ok
6> evserv:add_event("Hey there2", "test", NextMinuteDateTime).
ok
7> evserv:listen(2000).
[{done,"Hey there2","test"}]

いいねいいねー。いくつかインターフェース関数を書いたので、クライアントを書くのも十分簡潔にできるでしょう。

15.9. 監視を追加する

より安定したアプリケーションにするために、 前の章 で書いたのと同じように他の ‘restarter’ を書くべきです。 sup.erl という名前のファイルを開きます。ここでsupervisorはこんな感じになります:

-module(sup).
-export([start/2, start_link/2, init/1, loop/1]).

start(Mod,Args) ->
    spawn(?MODULE, init, [{Mod, Args}]).

start_link(Mod,Args) ->
    spawn_link(?MODULE, init, [{Mod, Args}]).

init({Mod,Args}) ->
    process_flag(trap_exit, true),
    loop({Mod,start_link,Args}).

loop({M,F,A}) ->
    Pid = apply(M,F,A),
    receive
        {'EXIT', Pid, Reason} ->
            io:format("Process ~p exited for reason ~p~n",[Pid,Reason]),
            loop({M,F,A});
        {'EXIT', _From, shutdown} ->
            exit(shutdown) % will kill the child too
    end.

これはなんとなく ‘restarter` に似ていますが、こっちのほうがもうちょっと一般的です。 これはどんなモジュールでも start_link 関数を持っている限り引数にとることが出来ます。 これは、supervisor自身がシャットダウン終了シグナルによって終了しない限り、監視しているプロセスを個別に再起動します。

1> c(evserv), c(sup).
{ok,sup}
2> SupPid = sup:start(evserv, []).
<0.43.0>
3> whereis(evserv).
<0.44.0>
4> exit(whereis(evserv), die).
true
Process <0.44.0> exited for reason die
5> exit(whereis(evserv), die).
Process <0.48.0> exited for reason die
true
6> exit(SupPid, shutdown).
true
7> whereis(evserv).
undefined

ご覧のとおり、supervisorを殺すとその子供も死にます。

Note

より発展的で柔軟性のあるsupervisorをOTP supervisorの章でご覧に入れます。 それらが一般的に人々が監視ツリーと言ったときに指すものです。 ここでお見せした supervisor は一番基本的な形のものに過ぎず、実際と比較すると製品環境には必ずしも適していません。

15.10. 名前空間(あるいはそれがない)

../_images/gentleman.png

Erlangはフラっとなモジュール構造をしている(階層がない)ので、アプリケーションで衝突が起きることがしばしば有ります。 その一例としては、ほぼすべてのプロジェクトで最低1回は定義されるよな、頻繁に使われる user モジュールです。 これはErlangと一緒に提供される user モジュールを破壊します。 何かクラッシュをテストしたかったら code:clash/0 関数で試すことができます。

こういったことから、すべてのモジュールには接頭辞としてプロジェクトの名前をつけるのが一般的です。 この場合、リマインダーアプリケーションのモジュール名は reminder_evserv, reminder_sup, reminder_event といった具合にすべきでしょう。

プログラマによっては、アプリケーション自身にちなんでモジュールを追加することに決めることもあります。 これは彼ら独自のアプリケーションを使うときによくある呼び出しをラップするような場合です。 例としては、アプリケーションをsupervisorと一緒にアプリケーションを起動する、サーバをサブスクライブする、イベントの追加やキャンセルをするといった具合です。

他の名前空間にも注意することが大事です。たとえば絶対に衝突してはいけない登録された名前、データベース表などです。

とても基本的な並列Erlangアプリケーションについてはこれでおしまいです。 これでたくさんの並列プロセスを深く考えることなく使えることが示されました: supervisor、クライアント、サーバ、タイマープロセスなど(それらを何千と持てます) 同期する必要もなければ、ロックすることもなく、メインループもありません。 メッセージパッシングによって、アプリケーションを担当範囲とタスクを分割して数モジュールに区切ることが簡潔になりました。

evserv.erl 内の基本的な呼び出しは、Erlang VM外のイベントサーバイベントサーバとやり取りすることが出来るようなクライアントを作るために使われて、プログラムを本当に役に立つものにしています。

けれども、こういったことを擦る前に、OTPフレームワークについて読むことをおすすめします。 次からの数章は、OTPフレームワークを構成しているものをいくつかご紹介します。 これらによってずっと堅牢でエレガントなアプリケーションを書くことが出来ます。 Erlangの力の大部分はこれらを使うことによって得られるのです。 これらは細心の注意を払って、エンジニア力を結集して作られたツールなので、自尊心の高いErlangプログラマは知るべきです。