Xangoのサンプルスクリプト
昨日のエントリに軽くブックマークが付いているようなので、サンプルスクリプトをさらしておく。
サンプルの割りに依存ライブラリが多いとか、やたら長いとかはまぁ勘弁。とりあえず Xango::Broker::Push のほうを。
#!/usr/bin/env perl #sub Xango::DEBUG { 1 } package MyHandler; use strict; use warnings; use POE; use Readonly; use HTML::LinkExtractor; use Encode; require Encode::Detect; Readonly::Scalar my $MAX_DEPTH => 5; Readonly::Scalar my $MAX_REDIRECT => 5; sub spawn { my $class = shift; my $self = bless {}, $class; POE::Session->create( heap => $self, object_states => [ $self => [ qw(_start _stop apply_policy handle_response) ] ] ); return $self; } sub _start { $_[KERNEL]->alias_set('handler') } sub _stop { $_[KERNEL]->alias_remove('handler') } sub apply_policy { my $job = $_[ARG0]; return $job->uri->host eq 'localhost'; } sub handle_response { my($kernel, $obj, $job) = @_[KERNEL, OBJECT, ARG0]; my $depth = $job->notes('depth') || 0; my $redirect = $job->notes('redirect') || 0; my $response = $job->notes('http_response'); my $content = decode('Detect', $response->content); if ($response->code =~ /^30[12]$/) { return if $redirect >= $MAX_REDIRECT; my $uri = URI->new_abs($response->header('location'), $job->uri); my $new_job = Xango::Job->new(uri => $uri, redirect => $redirect + 1); $kernel->post('broker', 'enqueue_job', $new_job); return; } return if $depth >= $MAX_DEPTH; return unless $response->is_success; # do something my $base = $job->uri; my $lx = HTML::LinkExtractor->new; $lx->parse(\$content); for my $link (@{$lx->links}) { if (defined $link->{href}) { my $uri = URI->new_abs($link->{href}, $base); next unless $uri->scheme =~ /^https?$/; my $new_job = Xango::Job->new(uri => $uri, depth => $depth + 1); $kernel->post('broker', 'enqueue_job', $new_job); } } } package main; use strict; use warnings; use POE; use Xango; use Xango::Job; use Xango::Broker::Push; binmode STDOUT, ':utf8'; my $handler = MyHandler->spawn; my $broker = Xango::Broker::Push->spawn( Alias => 'broker', HandlerAlias => 'handler', HttpCompArgs => [ Agent => 'Test Crawler based Xango' ] ); my @jobs = ( Xango::Job->new(uri => URI->new('http://localhost/'), depth => 0), ); POE::Kernel->post($broker->alias, 'enqueue_job', $_) for @jobs; POE::Kernel->run;
実行すると localhost にすごい勢いで容赦ないリクエストが飛ぶ。あと、多分ジョブが投入されなくなってから 30 秒したらプロセスが終了する。
一応、簡単な解説というかメモ。
spawn
POEのセッションを作る。別に spawn でなくてもいい気がするけど慣習っぽいので。
とりあえずここでは object_states でイベント名を Array Reference で指定しているが、他にも packages_states とかinline_states とかあるらしい。そこらへんは POE::Session を参照。
_start, _stop
セッションの開始、終了時に呼ばれるイベントハンドラ。とりあえずエイリアスの設定だけ。
ここで設定するエイリアスは Broker を作るときに指定する HandlerAlias と対応する。
apply_policy
ジョブを処理するかどうかを判定するために呼ばれる。真を返せば処理されるし、偽を返せば処理されない。
ここではホスト部が localhost のものだけ許可。
同期的に呼ばれるので、あまり重い処理を書かないほうが良い。
handle_response
HTTPレスポンスに対する処理を行う。3xx によるリダイレクト処理もここで処理。
$_[ARG0]->notes('http_response') で HTTP::Response のオブジェクトが取得できるので、あとはよしなに。
とりあえず HTML::LinkExtractor つかってリンク抽出して broker に enqueue_job しているだけ。
全体的なメモ
- いちいち $_[KERNEL] とかやるのがいやなら POE::Sugar::Args がいいらしい
- キューにジョブを追加したい場合は $kernel->post('broker', 'enqueue_job', $job) として、Broker のセッションにメッセージを Post する
- URI以外の情報も Xango::Job の生成時に指定しておけば $job->notes で取り出せる
あと、Broker は訪問済みのURLの除去とか、ホスト毎に delay を掛けてジョブの投入といったことは一切やってくれない。なので、実際作るときは Handler から Broker に直接 enqueue_job を Post するんじゃなくて JobManager な感じのセッションにジョブを送って、JobManager で alarm 使って適当なタイミングで Broker に enqueue_job を Post するって感じになるかな、多分。んで、大規模な場合は JobManager は DB つかってジョブ管理するとか。