Xangoのサンプルスクリプト

昨日のエントリに軽くブックマークが付いているようなので、サンプルスクリプトをさらしておく。
サンプルの割りに依存ライブラリが多いとか、やたら長いとかはまぁ勘弁。とりあえず Xango::Broker::Push のほうを。

#!/usr/bin/env perl

#sub Xango::DEBUG { 1 }

package MyHandler;

use strict;
use warnings;
use POE;
use Readonly;
use HTML::LinkExtractor;
use Encode;
require Encode::Detect;

Readonly::Scalar my $MAX_DEPTH => 5;
Readonly::Scalar my $MAX_REDIRECT => 5;

sub spawn {
    my $class = shift;
    my $self = bless {}, $class;

    POE::Session->create(
        heap => $self,
        object_states => [
            $self => [ qw(_start _stop apply_policy handle_response) ]
        ]
    );

    return $self;
}

sub _start { $_[KERNEL]->alias_set('handler') }
sub _stop { $_[KERNEL]->alias_remove('handler') }

sub apply_policy {
    my $job = $_[ARG0];

    return $job->uri->host eq 'localhost';
}

sub handle_response {
    my($kernel, $obj, $job) = @_[KERNEL, OBJECT, ARG0];
    my $depth = $job->notes('depth') || 0;
    my $redirect = $job->notes('redirect') || 0;
    my $response = $job->notes('http_response');
    my $content = decode('Detect', $response->content);

    if ($response->code =~ /^30[12]$/) {
        return if $redirect >= $MAX_REDIRECT;
        my $uri = URI->new_abs($response->header('location'), $job->uri);
        my $new_job = Xango::Job->new(uri => $uri, redirect => $redirect + 1);
        $kernel->post('broker', 'enqueue_job', $new_job);
        return;
    }

    return if $depth >= $MAX_DEPTH;
    return unless $response->is_success;

    # do something

    my $base = $job->uri;
    my $lx = HTML::LinkExtractor->new;
    $lx->parse(\$content);
     for my $link (@{$lx->links}) {
         if (defined $link->{href}) {
             my $uri = URI->new_abs($link->{href}, $base);
             next unless $uri->scheme =~ /^https?$/;
             my $new_job = Xango::Job->new(uri => $uri, depth => $depth + 1);
             $kernel->post('broker', 'enqueue_job', $new_job);
         }
     }
}

package main;

use strict;
use warnings;
use POE;
use Xango;
use Xango::Job;
use Xango::Broker::Push;

binmode STDOUT, ':utf8';

my $handler = MyHandler->spawn;
my $broker = Xango::Broker::Push->spawn(
    Alias => 'broker',
    HandlerAlias => 'handler',
    HttpCompArgs => [
        Agent => 'Test Crawler based Xango'
    ]
);
my @jobs = (
    Xango::Job->new(uri => URI->new('http://localhost/'), depth => 0),
);
POE::Kernel->post($broker->alias, 'enqueue_job', $_) for @jobs;
POE::Kernel->run;

実行すると localhost にすごい勢いで容赦ないリクエストが飛ぶ。あと、多分ジョブが投入されなくなってから 30 秒したらプロセスが終了する。

一応、簡単な解説というかメモ。

spawn

POEのセッションを作る。別に spawn でなくてもいい気がするけど慣習っぽいので。
とりあえずここでは object_states でイベント名を Array Reference で指定しているが、他にも packages_states とかinline_states とかあるらしい。そこらへんは POE::Session を参照。

_start, _stop

セッションの開始、終了時に呼ばれるイベントハンドラ。とりあえずエイリアスの設定だけ。
ここで設定するエイリアスは Broker を作るときに指定する HandlerAlias と対応する。

apply_policy

ジョブを処理するかどうかを判定するために呼ばれる。真を返せば処理されるし、偽を返せば処理されない。
ここではホスト部localhost のものだけ許可。
同期的に呼ばれるので、あまり重い処理を書かないほうが良い。

handle_response

HTTPレスポンスに対する処理を行う。3xx によるリダイレクト処理もここで処理。
$_[ARG0]->notes('http_response') で HTTP::Response のオブジェクトが取得できるので、あとはよしなに。
とりあえず HTML::LinkExtractor つかってリンク抽出して broker に enqueue_job しているだけ。

全体的なメモ

  • いちいち $_[KERNEL] とかやるのがいやなら POE::Sugar::Args がいいらしい
  • キューにジョブを追加したい場合は $kernel->post('broker', 'enqueue_job', $job) として、Broker のセッションにメッセージを Post する
  • URI以外の情報も Xango::Job の生成時に指定しておけば $job->notes で取り出せる

あと、Broker は訪問済みのURLの除去とか、ホスト毎に delay を掛けてジョブの投入といったことは一切やってくれない。なので、実際作るときは Handler から Broker に直接 enqueue_job を Post するんじゃなくて JobManager な感じのセッションにジョブを送って、JobManager で alarm 使って適当なタイミングで Broker に enqueue_job を Post するって感じになるかな、多分。んで、大規模な場合は JobManager は DB つかってジョブ管理するとか。