Hadoop StreamingのReducerへの入力の構造化をPHPでやってみた

(今回の記事は、id:naoyaさんの記事をパクってにインスパイアされて、PHPでやってみたものです。話の内容としては概ね同じです。)

前回の記事の最後で、「HadoopStreamingはReducerへの入力が構造化されていないという問題点があります。」という事を書きました。

前回の小説の文を分かち書きしたものの単語数カウントの例で言うと、Reducerに渡ってくる入力は次のような形になります。

単語1	1
単語1	1
単語2	1
 :

今回は、単純にハッシュ上にカウントしていくだけなので大した問題にはなりませんが、もっと複雑な処理をReducerにやらせようとすると、PHPプロセスのメモリ不足になってしまう可能性があります。

PHPでHadoopStreamingの入出力を汎用的に扱う

という訳で、id:naoyaさんの作ったフレームワークを参考に、PHPで同じようなものを作ってみました。

やっつけで作ったのでテストも無い、コメントすら入っていない適当な作りですが(^^;ゞ、とりあえず目的の動作はしてくれたので、GitHubにコードを上げてみました。
http://github.com/stellaqua/php-hadoop-streaming-frontend/tree/master

このフレームワークを使ってMap関数を書き直してみます。

#!/usr/bin/php
<?php
require_once(dirname(dirname(__FILE__)).'/lib/HadoopStreaming/Mapper.php');

class Mapper extends HadoopStreaming_Mapper
{
    public function map ( $key, $value )
    {
        foreach ( preg_split('/\s+/', $value) as $word ) {
            if ( $word !== '' ) {
                $this->emit($word, 1);
            }
        }
    }
}

$mapper = new Mapper();
$mapper->run();
?>

HadoopStreaming_Mapperクラスを継承したクラスを作ってmapメソッドを定義すると、引数としてkeyとvalueが渡ってきて処理できるようになります。

処理結果は、keyとvalueを引数にしてemitメソッドを呼び出すと、タブ区切りで出力してくれます。

続いてReduce関数。

#!/usr/bin/php
<?php
require_once(dirname(dirname(__FILE__)).'/lib/HadoopStreaming/Reducer.php');

class Reducer extends HadoopStreaming_Reducer
{
    public function reduce ( $key, $values )
    {
        $count = 0;
        while ( $values->has_next_value ) {
            $count++;
            $values->next();
        }
        $this->emit($key, $count);
    }
}

$reducer = new Reducer();
$reducer->run();
?>

こちらもMap関数の場合と同じ感じですが、ポイントはreduceメソッドの第2引数として、イテレータが渡されるというところです。

reduceメソッドはkey毎に呼び出されるので、サンプルのようにwhile文でイテレータを回してやれば、前回の記事のようにハッシュで結果を保持しておくような必要が無くなります。

今回の例では使っていませんが、イテレータにcurrent_key/current_valueというプロパティが定義してあるので、key/valueの値を使った処理も、もちろん書く事ができます。

とりあえず、今回のサンプルを動かす為だけに作ったので、何か足りていない機能があったり、バグがあったりするかもしれませんが、それは追々対応していこうと思います。

メモリ使用量の違いについて

今回の例は規模が小さいのであまり参考にならないかもしれませんが、前回の方法と今回のフレームワークを使った方法で、メモリ使用量にどのくらい差が出てくるのか調べてみました。

メモリ使用量は、memory_get_usage()関数をmap.php/reduce.phpの最後に直接仕込んで測定しました。

まず、前回のSTDIN直の方法の場合。

  • map:341360
  • map:341368
  • reduce:1775480
  • reduce:1810488

そして、今回のフレームワークを使った場合。

  • map2:360208
  • map2:360184
  • reduce2:259200
  • reduce2:259024

Map関数側は処理内容にほとんど違いが無いので差が無いですが、Reduce関数側は結構差が出てきてますね。*1

で、結論

前回・今回でHadoopを使って分散処理を体験してみた訳ですが、実際に分散処理の恩恵を受ける為には、GBytesクラス以上のデータを扱う場合になると思うので、そうそう必要になる事は無さそうです。

ただ、MapReduceの考え方自体は、集合知プログラミングで扱うような計算量の多い処理をいかに分割するかというヒントとして、非常に役に立ちそうです。

図ったかのように、集合知プログラミングシリーズの次の章の最初のサンプルが、"フィード中の単語を数える"なので(笑)、早速、Hadoopと連携して処理させるコードを書いてみて、またシリーズの続きを進めていきたい思います。

*1:結果がそれぞれ2つずつになっているのは、MapperとReducerがそれぞれ2つのworkerに分割処理されたからだと思います。シングルノードではありますが、一応はきちんと分散処理してくれてますね!