文字列ソーティング #3

今度は単語単位のソートにしてベンチマーク。元データは同じく新聞記事約 30MBytes (単語数 5,727,663)。
ついでに MSD Radix Sort も加えてみた。
で、きむらさんのところを見て、そういや、プロファイルとってないなと思ったので、g++ -O2 -pg でコンパイルして gprof で結果を見てみた。suffix_compare が strcmp の代わりの functor なんだけど普通にやるとインライン展開されちゃうから無理矢理別ファイルに分けてリンクしてある。

algorithm time[sec]
quicksort 32.56
multikey quicksort 9.45
merge sort 20.46
MSD radix sort 15.74
std::sort 29.61
std::stable_sort 21.17
==> mergesort <==
 time   seconds   seconds    calls   s/call   s/call  name
 83.27     17.75    17.75 122506116     0.00     0.00  suffix_compare<unsigned char, int>::operator()(int, int) const
 10.73     20.03     2.29  1009070     0.00     0.00  int* std::merge<int*, int*, int*, suffix_compare<unsigned char, int> >(int*, int*, int*, int*, int*, suffix_compare<unsigned char, int>)
  2.35     20.53     0.50                             main

==> msd_radix_sort <==
 time   seconds   seconds    calls   s/call   s/call  name
 71.39     18.63    18.63        1    18.63    25.32  void _msd_radix_sort<unsigned char, int>(unsigned char const*, int*, int, int, int)
 22.43     24.48     5.85 62083188     0.00     0.00  suffix_compare<unsigned char, int>::operator()(int, int) const
  3.18     25.31     0.83   346637     0.00     0.00  void insertion_sort<int, suffix_compare<unsigned char, int> >(int*, unsigned int, suffix_compare<unsigned char, int>)

==> multikey_quicksort <==
 time   seconds   seconds    calls   s/call   s/call  name
 81.37     16.93    16.93        1    16.93    20.27  void multikey_quicksort<unsigned char, int>(unsigned char const*, int*, int, int)
 14.93     20.03     3.11  9933763     0.00     0.00  suffix_compare<unsigned char, int>::operator()(int, int) const
  2.50     20.55     0.52                             main

==> quicksort <==
 time   seconds   seconds    calls   s/call   s/call  name
 87.77     28.60    28.60 178720117     0.00     0.00  suffix_compare<unsigned char, int>::operator()(int, int) const
  9.04     31.55     2.95        1     2.95    31.58  void quicksort<unsigned char, int>(unsigned char const*, int*, int)
  1.57     32.06     0.51                             f()

==> stl_sort <==
 time   seconds   seconds    calls   s/call   s/call  name
 90.18     28.31    28.31 163298909     0.00     0.00  suffix_compare<unsigned char, int>::operator()(int, int) const
  5.74     30.11     1.80   598058     0.00     0.00  int* std::__unguarded_partition<int*, int, suffix_compare<unsigned char, int> >(int*, int*, int, suffix_compare<unsigned char, int>)
  1.56     30.60     0.49                             main

==> stl_stable_sort <==
 time   seconds   seconds    calls   s/call   s/call  name
 84.90     18.58    18.58 125713536     0.00     0.00  suffix_compare<unsigned char, int>::operator()(int, int) const
  9.85     20.73     2.16   818254     0.00     0.00  int* std::merge<int*, int*, int*, suffix_compare<unsigned char, int> >(int*, int*, int*, int*, int*, suffix_compare<unsigned char, int>)
  2.33     21.24     0.51                             main

MSD Radix Sort と Multikey Quicksort 以外について、比較関数の呼び出し回数と実行時間について並べてみる。

algorithm 呼び出し回数 合計実行時間[sec] 平均実行時間[nsec]
Quicksort 178,720,117 28.60 160
Merge sort 122,506,116 17.75 144
std::sort 163,298,909 28.31 173
std::stable_sort 125,713,536 18.58 148

要するにマージソートの方が、比較コスト、比較回数の両方で優れているということだな。

一応ソースを各種アルゴリズムの実装コードも書いておく。なんか変なところがあれば、指摘頼みます。

// -*- c++ -*-

#ifndef SUFSORT_H
#define SUFSORT_H

#include <vector>
#include <utility>
#include <functional>
#include <algorithm>

#include <boost/scoped_array.hpp>

template<typename Character, typename Int>
struct suffix_compare : public std::binary_function<Int, Int, bool>
{
    suffix_compare(const Character* _text, Int _offset = 0) : text(_text), offset(_offset) { }

    bool operator()(Int i, Int j) const {
        if (i == j) return false;
    
        for (;;) {
            if (text[offset + j] == 0) return false;
            if (text[offset + i] == 0) return true;
            if (text[offset + i] < text[offset + j]) return true;
            if (text[offset + i] > text[offset + j]) return false;
            i++;
            j++;
        }
    }

    const Character* text;
    Int offset;
};

template<typename T, typename Int>
inline void vecswap(T i, T j, Int n)
{
    while (n-- > 0) std::swap(*i++, *j++);
}

template<typename Character, typename Int>
inline Character ref(const Character* text, Int* ip, Int index, Int depth)
{
    return text[ip[index] + depth];
}

template<typename T, typename Comparator>
void insertion_sort(T* a, size_t n, Comparator cmp)
{
    for (size_t i = 1; i < n; i++) {
        size_t j;
        T x = a[i];
        for (j = i; j > 0 && cmp(x, a[j - 1]); j--) {
            a[j] = a[j - 1];
        }
        a[j] = x;
    }
}

/**
 * Implementation of Multikey QuickSort.
 * 
 * @param text  text array which is terminated zero.
 * @param ip    index pointer
 * @param n     size of ip
 * @param depth recursion depth
 */
template<typename Character, typename Int>
void multikey_quicksort(const Character* text, Int* ip, Int n, int depth)
{
    int a, b, c, d, r, v;
    if (n <= 1) return;

    if (n < 10) {
        insertion_sort(ip, n, suffix_compare<Character, Int>(text, depth));
        return;
    }

    for (;;) {
        a = rand() % n;
        std::swap(ip[0], ip[a]);

        v = ref(text, ip, 0, depth);
        a = b = 1;
        c = d = n - 1;
        for (;;) {
            while (b <= c && (r = ref(text, ip, b, depth) - v) <= 0) {
                if (r == 0) { std::swap(ip[a], ip[b]); a++; }
                b++;
            }
            while (b <= c && (r = ref(text, ip, c, depth) - v) >= 0) {
                if (r == 0) { std::swap(ip[c], ip[d]); d--; }
                c--;
            }
            if (b > c) break;
            std::swap(ip[b], ip[c]);
            b++;
            c--;
        }

        r = std::min(a, b - a);
        vecswap(ip, ip + b - r, r);
        r = std::min(d - c, n - d - 1);
        vecswap(ip + b, ip + n - r, r);
        r = b - a;

        if (a - d  - 1 != 0) {
            break;
        }
        else {
            depth++;
        }
    } 

    multikey_quicksort(text, ip, r, depth);
    if (text[ip[r] + depth]) multikey_quicksort(text, ip + r, a + n - d - 1, depth + 1);
    r = d - c;
    multikey_quicksort(text, ip + n - r, r, depth);
}

/**
 * Implementation of Two-Stage Sort.
 */
template<typename Character, typename Int>
void two_stage_sort(const Character* text, Int* sa, Int n)
{
    Int ALPHABET_SIZE = static_cast<Int>(std::numeric_limits<Character>::max()) 
        - std::numeric_limits<Character>::min() + 1;
    boost::scoped_array<Int> count(new Int[ALPHABET_SIZE]);
    boost::scoped_array<Int> countA(new Int[ALPHABET_SIZE]);
    boost::scoped_array<Int> countB(new Int[ALPHABET_SIZE]);
    boost::scoped_array<Int> bucketA(new Int[ALPHABET_SIZE]);
    boost::scoped_array<Int> bucketB(new Int[ALPHABET_SIZE]);

    std::fill(count.get(), count.get() + ALPHABET_SIZE, 0);
    std::fill(countA.get(), countA.get() + ALPHABET_SIZE, 0);
    std::fill(countB.get(), countB.get() + ALPHABET_SIZE, 0);

    for (Int i = 0; i < n  - 1; i++) {
        count[text[i]]++;
        if (text[i] > text[i + 1]) countA[text[i]]++;
    }
    count[text[n - 1]]++;

    bucketA[0] = 0;
    bucketB[0] = 0;
    for (Int i = 1; i < ALPHABET_SIZE; i++) {
        bucketA[i] = bucketA[i - 1] + count[i - 1];
        bucketB[i] = bucketA[i] + countA[i];
    }

    std::fill(sa, sa + n, 0);
    for (Int i = 0; i < n - 1; i++) {
        if (text[i] <= text[i + 1]) {
            sa[bucketB[text[i]] + countB[text[i]]] = i;
            countB[text[i]]++;
        }
    }
    sa[bucketB[text[n - 1]] + countB[text[n - 1]]] = n - 1;
    countB[text[n - 1]]++;

    for (Int i = 0; i < ALPHABET_SIZE; i++) {
        multikey_quicksort(text, sa + bucketB[i], countB[i], 1);
    }

    for (Int i = 0; i < n; i++) {
        if (sa[i] > 0 && text[sa[i] - 1] > text[sa[i]]) {
            sa[bucketA[text[sa[i] - 1]]++] = sa[i] - 1;
        }
    }
}

template<typename Character, typename Int>
void mergesort(const Character* text, Int* index, Int n)
{
    boost::scoped_array<Int> work(new Int[n / 2 + 1]);
    _mergesort(text, index, n, work.get());
}

template<typename Character, typename Int>
void _mergesort(const Character* text, Int* index, Int n, Int* work)
{
    if (n <= 10) {
        insertion_sort(index, n, suffix_compare<Character, Int>(text, 0));
        return;
    }

    Int middle = n / 2;
    _mergesort(text, index, middle, work);
    _mergesort(text, index + middle, n - middle, work);
    std::copy(index, index + middle, work);
    std::merge(work, work + middle, index + middle, index + n, index, suffix_compare<Character, Int>(text, 0));
}

template<typename Character, typename Int>
void quicksort(const Character* text, Int* index, Int size)
{
    suffix_compare<Character, Int> cmp(text);

    Int i = 0;
    Int j = size - 1;

    Int x = index[size / 2];

    for (;;) {
        while (cmp(index[i], x)) ++i;
        while (cmp(x, index[j])) --j;
        if (i >= j) break;
        std::swap(index[i++], index[j--]);
    }

    if (i > 1) quicksort(text, index, i);
    if (size - j > 1) quicksort(text, index + j + 1, size - j - 1);
}

template<typename Character, typename Int>
void msd_radix_sort(const Character* text, Int* sa, Int n, Int alphabet_size)
{
    _msd_radix_sort(text, sa, n, alphabet_size, 0);
}

template<typename Character, typename Int>
void _msd_radix_sort(const Character* text, Int* sa, Int n, Int alphabet_size, Int depth)
{
    if (n < 100) {
        insertion_sort(sa, n, suffix_compare<Character, Int>(text, depth));
        return;
    }
    
    boost::scoped_array<Int> count(new Int[alphabet_size]);
    boost::scoped_array<Int> positions(new Int[alphabet_size]);

    std::fill(count.get(), count.get() + alphabet_size, 0);

    for (Int i = 0; i < n; i++) {
        count[text[sa[i] + depth]]++;
    }

    positions[0] = count[0];
    for (Int i = 1; i < alphabet_size; i++) {
        positions[i] = (count[i] += count[i - 1]);
    }

    Int current = 0;
    Int start = 0;
    Int i = 0;
    for (;;) {
        if (positions[current] <= i) {
            if (current && start + 1 < count[current] && count[current] - count[current - 1] > 1) {
                _msd_radix_sort(text, 
                                sa + count[current - 1],
                                count[current] - count[current - 1], 
                                alphabet_size, depth + 1);
            }
            if (current == alphabet_size - 1) break;
            start = i = count[current++];
        } else {
            Character key = text[sa[i] + depth];
            if (key == current) {
                i++;
            } else {
                --positions[key];
                std::swap(sa[positions[key]], sa[i]);
            }
        }
    }
}

#endif // SUFSORT_H