Problem 160

Hadoop はドキュメントが少なすぎ。ソフトウェア自体は良くできているっぽいのにもったいない。
まぁ、ということで、Project Euler の Problem 160 を解くプログラムでも。マジックナンバー多すぎだけど気にしない。まぁ、examples の PiEstimater でも読んどいたほうが良いと思うけど。
しっかし、JobConf の設定も面倒だが、Generics の型パラメータが煩雑だ。こういうのは Scala の得意分野な気もするが、どうだろう。

package projecteuler;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Problem160 extends Configured implements Tool {
    private static class FactMapper extends MapReduceBase implements
            Mapper<LongWritable, LongWritable, LongWritable, LongWritable> {

        @Override
        public void map(LongWritable key, LongWritable val,
                OutputCollector<LongWritable, LongWritable> out,
                Reporter reporter) throws IOException {

            long low = key.get();
            long high = val.get();
            long x = 1;

            for (long i = low; i <= high; i++) {
                long y = i;
                while (y % 2 == 0)
                    y /= 2;
                while (y % 5 == 0)
                    y /= 5;

                x = (x * y) % 100000;
            }

            out.collect(new LongWritable(0), new LongWritable(x));
        }
    }

    private static class FactReducer extends MapReduceBase implements
            Reducer<LongWritable, LongWritable, LongWritable, LongWritable> {

        private JobConf conf;

        private long x = 1;

        @Override
        public void configure(JobConf conf) {
            this.conf = conf;
        }

        @Override
        public void reduce(LongWritable key, Iterator<LongWritable> values,
                OutputCollector<LongWritable, LongWritable> out,
                Reporter reporter) throws IOException {

            while (values.hasNext()) {
                LongWritable value = values.next();
                x = (x * value.get()) % 100000;
            }
        }

        @Override
        public void close() throws IOException {
            Path tmpDir = new Path("tmp-fact");
            Path outDir = new Path(tmpDir, "out");
            Path outFile = new Path(outDir, "reduce-out");

            FileSystem fs = FileSystem.get(conf);
            SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
                    outFile, LongWritable.class, LongWritable.class,
                    CompressionType.NONE);

            writer.append(new LongWritable(0), new LongWritable(x));
            writer.close();
        }
    }

    private long launch(int nMaps, long n) throws IOException {
        long f = 0;
        for (long x = 2; x < n; x *= 2) {
            f += n / x;
        }
        for (long x = 5; x < n; x *= 5) {
            f -= n / x;
        }
        long r = power(2, f);

        JobConf conf = new JobConf(getConf(), Problem160.class);
        conf.setJobName("fact");
        conf.setInputFormat(SequenceFileInputFormat.class);

        conf.setOutputKeyClass(LongWritable.class);
        conf.setOutputValueClass(LongWritable.class);
        conf.setOutputFormat(SequenceFileOutputFormat.class);

        conf.setMapperClass(FactMapper.class);
        conf.setReducerClass(FactReducer.class);

        conf.setNumReduceTasks(1);

        Path tmpDir = new Path("tmp-fact");
        Path inDir = new Path(tmpDir, "in");
        Path outDir = new Path(tmpDir, "out");
        FileSystem fs = FileSystem.get(conf);
        fs.delete(tmpDir);

        if (!fs.mkdirs(inDir)) {
            throw new IOException("Mkdir failed to creaete " + inDir.toString());
        }

        conf.setInputPath(inDir);
        conf.setOutputPath(outDir);
        conf.setNumMapTasks(nMaps);

        long nPerMap = n / nMaps;

        for (int i = 0; i < nMaps; i++) {
            Path file = new Path(inDir, "part" + i);
            SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
                    file, LongWritable.class, LongWritable.class,
                    CompressionType.NONE);

            long start = nPerMap * i + 1;
            long end = Math.min(start + nPerMap - 1, n);

            writer.append(new LongWritable(start), new LongWritable(end));
            writer.close();
        }

        JobClient.runJob(conf);
        Path inFile = new Path(outDir, "reduce-out");
        SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, conf);
        LongWritable k = new LongWritable();
        LongWritable v = new LongWritable();
        reader.next(k, v);
        reader.close();

        return (v.get() * r) % 100000;
    }

    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: Problem160 nMaps n");
            ToolRunner.printGenericCommandUsage(System.err);
            return -1;
        }

        int nMaps = Integer.parseInt(args[0]);
        long n = Long.parseLong(args[1]);

        System.out.println("f(n) = " + launch(nMaps, n));

        return 0;
    }

    public long power(long b, long n) {
        long x = 1;
        while (n != 0) {
            if ((n & 0x01) != 0) {
                x = (x * b) % 100000;
            }
            b = (b * b) % 100000;
            n = n >> 1;
        }
        return x;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Problem160(), args);
        System.exit(res);
    }
}