Boost.Asioによる非同期関数呼び出しと、非同期ノンブロッキングFuture

C++ Advent Calendar 2012 に、参加させていただきました。
この記事は19日目になります。
当初はboost asioのio_service周りの性能評価をネタに考えていたのですが、別のネタができたので、今回はこちらを紹介したいと思います。

本記事は、イベント駆動アプリケーションの開発に役立つ、非同期(ノンブロッキング)関数呼び出しと、非同期ノンブロッキングFutureによる結果の受け取りについてです。

はじめに

高性能な分散システムやサーバアプリケーションの開発をするには、マルチスレッド化イベント駆動型アーキテクチャが必須となります。
Boost.Asioは、このようなアプリケーション開発に適したC++のライブラリです。

イベント駆動の大変なところ

イベント駆動アプリケーションの開発において障害になるのは、コールバック地獄です。

イベントハンドラで別の非同期関数を呼び出し、その結果をまた別のハンドラで受け取り、さらにそのハンドラ内で別の非同期関数を呼び出して・・・といったように
コールバックのコールバックのコールバック関数を書くことになります。

void callbackC(std::shared_ptr<boost::asio::io_service> loop) {
 
}

void callbackB(std::shared_ptr<boost::asio::io_service> loop) {
  loop->post(std::bind(callbackC, loop));
}

void callbackA(std::shared_ptr<boost::asio::io_service> loop) {
  loop->post(std::bind(callbackB, loop));
}

この例は単純ですが、実際は必要な引数の受け渡しや、エラー処理等が入ってきます。このように非同期関数の呼び出しごとに、処理の流れが飛んでしまうのでわかりづらくなってしまいます。

そこで、このような非同期関数呼び出しと、非同期関数の結果受け取りを簡単に書けるようなラッパーを作ってみました。

ソースコード全体はこちら

非同期ノンブロッキングFuture

void callbackA(std::shared_ptr<boost::asio::io_service> loop) {
  // 非同期関数呼び出し
  Future<int> f = async(*loop, []() { return 100; });

  // 非同期関数の結果を受け取るコールバックをアタッチする
  f.attachCallback(
    [](Future<int> f) {
      try {
        cout << f.get() << endl;
      }
      catch(std::exception& e) {
        // エラー処理
      }
    }
  );
}

使い方は、std::futureと似ています。
async()に、boost::asio::io_serviceと、呼び出したい関数を投げると、それがio_service::post()に投げられ非同期に実行されます。
非同期関数の実行が終了したら、自動的にFutureクラスにアタッチしたコールバック関数が呼び出され、そこで結果を取得できます。
非同期関数内で例外が発生した場合は、結果受け取り側でf.get()を呼び出した時点でその例外が再スローされるようになっています。これにより、エラー処理も簡単にかけると思います。

ポイントは、イベントハンドラ内で呼び出した非同期関数の結果受け取りを、ブロッキングで待たない事です。
callbackAで呼び出している関数はすべて、すぐに制御を戻すノンブロッキング関数です。
イベントハンドラ内でブロッキング待機すると、その間そのスレッドは他のイベントを受け取れなくなるので、ブロッキング待機は使えないのです。

今後の発展

非同期ノンブロッキングFutureが本当に役立つのは、非同期なネットワークやファイルI/Oの待受けだと思います。
Boost_Asioをイベント駆動フレームワークとした、RPCライブラリがあればいいなと思っていて、その場合、RPC呼び出しは以下のようにかけるようになると思います。

void callbackA(std::shared_ptr<boost::asio::io_service> loop) {
  Future<int> f = RPC呼び出し(*loop, "rpc_func_a", arg1, arg2, ...);
  f.attachCallback(
    [](Future<int> f) {
      try {
        cout << f.get() << endl;
      }
      catch(std::exception& e) {
        // エラー処理
      }
    }
  );
}

おわりに

イベント駆動アプリケーションにおいて、非同期関数の結果をノンブロッキングに受け取るため、非同期ノンブロッキングFutureを紹介しました。
地味な話題でしたが、特にイベント駆動な分散システムを実装したことがある方には、やりたいことが分かっていただけると思いますw

ファイルシステム操作のスループット (mdtest)

ファイルシステム操作のスループット: mdtest(http://sourceforge.net/projects/mdtest/)を使用
8 clientからの並列{file|directory} create->stat->delete
各クライアント1000個,合計8000個の{file|directory}をcreate->stat->delete

  • *_shared
    • マウントポイント直下に8000{file|directory}を並列にcreate->stat->deleteした場合
  • *_unique
    • マウントポイント直下に,クライアントごとにディレクトリを作成してから,それぞれのディレクトリに1000個ずつ{file|directory}をcreate->stat->deleteした場合

ditributed striped volumeについては,ファイルの作成・削除に失敗する為,測定できなかった.原因は調査中です.

考察

  • ディレクトリ操作が全体的に遅い
    • ディレクトリの作成は,すべてのglusterfsサーバで行われる為.
  • ファイル作成,削除はディレクトリ操作と比べて速い
  • replicateやstripeをすると,ファイル create/stat/deleteの性能は低下する

I/O throuput (dd)

write測定方法: dd if=/dev/zero of=file bs=1M count=4k conv=fdatasync
read測定方法: echo 1 > /proc/sys/vm/drop_caches後に dd if=file of=/dev/null bs=1M count=4k

各サーバ1つずつbrickをvolumeに登録.8 bricksで1 volumeを構成.

  • ext3
  • dist_remote
    • distributed volumes リモートbrickへのファイルI/O
  • dist_local
    • distributed volumes ローカルbrickへのファイルI/O
  • dist_rep_*
    • distributed replicated volume (4 distributed x 2 replicated = 8bricks)
  • dist_str_*
    • distributed striped volume (4 distributed x 2 striped = 8bricks)
  • rep_8
    • replicated volumesで8ノードへ複製作成
  • stripe_8
    • striped volumesで8ノードへストライピング

考察

  • マウントポイントと実際にファイルの実体を書くサーバが一致してると速い
  • ネットワークが1Gib(実測で100MiB/sec)なので,ネットワークがボトルネックとなっている.(10Gibの環境で評価が必要)