コンテンツにスキップ

第11章 K-平均法クラスタリング

はじめに

  • K-平均法アルゴリズムは、データセット内のクラスターを検出する方法です。(データをグループに分ける方法)
  • マップパターンレデュースパターンにより容易に並列化できますが、コードに手を加えて二つのパターンを融合することで、同期の回数を減らすことができます。
  • 並列したK-平均法実装には、大量のデータを処理する重い型のリダクションが含まれます。
  • 今回は、そのようなK-平均法アルゴリズムについてと、それを並列化するインテルCilk PlusやインテルTBBで実装する方法を説明します。

11.1 アルゴリズム

K-平均法の標準アルゴリズムは、まず、初期クラスターを作成した後、クラスターの更新を繰り返します。更新ごとに、図11.1の二つのステップを実行します。 図11.1 1. 各クラスターの中心を計算します。 2. 各ポイントを最も近い中心のクラスターに再割り当てします。


場合によっては、クラスターが空になることがあります。 そのままにしておくと、ポイントがないため、クラスターの中心を計算することができないため、アルゴリズムがうまく動作しなくなります。
これを回避するための一般的な修復方法は、現在のクラスターから最も遠いポイントを空のクラスターに割り当てることです。 空クラスターの修復


二つのステップにおいて、それぞれポイントまたはクラスターを並列しますが、通常、並列スラックを提供できることが多いポイントのみを今回は使用していきます。

クラスターの中心は、ポイント数で割ったポイントの合計なので、一つ目のステップでは、並列リダクションを使用して計算できます。

二つ目のステップでは、ポイントを再割り当てするマップパターンを並列に行います。

さらに、ここでは別の効率の問題もあります。アルゴリズムは、ポイントに対して二つのパターン(中心を計算するレデュースパターンとポイントを再割り当てするマップパターン)を交互に使用します。これらの操作の融合は多くの場合利点があり、メモリーの走査が二回ではなく一回で済みます。

中心の計算には二つのステップ(ポイントの合計を計算するステップとクラスターのサイズで合計を割るステップ)が必要なため、反復ステップを以下のように拡張します。

  • 合計: 各クラスターのポイントの合計を計算します(リダクション)
  • 分割: そのクラスターのポイント数で各クラスターの合計を割ります
  • 再割り当て: 最も近い中心に各ポイントを再割り当てします(マップ)

フュージョンによってこれらのステップを最適化する場合、そのまま融合するにはリダクションとマップの順序が異なっており、しかも分割ステップがその間に入っています。

K-平均法アルゴリズムにおいて、この問題はこれらのステップをループ構造にすることで解決できます。最初の合計ステップはループから取り出せるため、アルゴリズムは次のようになります。

合計
do {
    分割
    再割り当て(マップ)
    合計(リダクション)
}  while(ポイントを別のクラスターに移動);
これにより、マップパターンとリダクションパターンを融合できるようになりました。変更後のループはオリジナルのループよりも一回多く合計ステップを実行するため、オーバーヘッドは多少増えますが、融合によるメリットのほうがはるかに重要です。

11.2 K-平均法とCilk Plus

リスト11.1は、トップレベルのCilk Plusのコードです。全体の構造を見てみましょう。

  • 入力はnポイントの配列points
  • クラスター数はk
  • 配列centroid[0:k]はクラスターの中心を格納
  • 配列cluster_id[0:n]は各ポイントの中心のインデックスを格納
  • distance2(p,q)はポイントpとq間のユークリッド距離を返すとする
リスト11.1
Cilk PlusによるK- 平均法クラスタリング

void compute_k_means( size_t n, const point points[], size_t k, cluster_id id[], point centroid[] ) {

    // 初期クラスターを作成して合計を計算
    elementwise_reducer<sum_and_count> sum(k); 
    sum.clear();
    cilk_for( size_t i=0; i<n; ++i ) {
        id[i] = i % k;  
        // ループから取り出した「合計ステップ」
        sum[id[i]].tally(points[i]);
    }

    // クラスターが変更されなくなるまでループ
    cilk::reducer_opadd<size_t> change;
    do {
        // 空のクラスターを修復
        repair_empty_clusters( n, points, id, k, centroid, sum );

        // 「分割ステップ」:合計から中心を計算
        centroid[0:k] = sum.get_array()[0:k].mean();

        sum.clear();
        change.set_value(0);
        cilk_for( size_t i=0; i<n; ++i ) {
            // 「再割り当てステップ」:ポイント[i]に最も近い中心へのインデックスを検知
            cluster_id j = __sec_reduce_min_ind(distance2(centroid[0:k],points[i]));
            if( j!=id[i] ) {
                // 別の中心の方が近くなった場合
                id[i] = j;
                ++change;
            }
            // 「合計ステップ」
            sum[j].tally(points[i]);
        } 
    } while( change.get_value()!=0 );
}
  • 14行目から34行目のdo-whileループは、前節の回転ステップ構造を使用して分割、再割り当て、合計ステップを繰り返します。
  • 23行目から33行目のcilk_forは、再割り当てステップと合計ステップのマップ/レデュースを融合したものです。
    • 再割り当てステップは、配列セクションxの最小値のインデックスを返す、配列表記操作__sec_reduce_min_index(x)を用いて表されます。
  • 変数sumはハイパーオブジェクト(タスク独自のコピー)です。
    • コードのシリアルバージョンでは、sum_and_countオブジェクトの配列になります。
    • j番目のオブジェクトは、j番目のクラスターに関する情報、特にポイントの合計とポイント数を記録します。

リスト11.2はsum_and_countの定義です。

リスト11.2
クラスターのポイントの平均値を計算する型sum_and_count

struct sum_and_count {
    sum_and_count() : sum(), count(0) {}
    point sum;
    size_t count;
    void clear() {
        sum = point();
        count = 0;
    }
    void tally( const point& p ) {
        sum += p;
        ++count;
    }
    point mean() const {
        return sum/count;
    }
    void operator+=( const sum_and_count& other ) {
        sum += other.sum;
        count += other.count;
    };
};
ここで、point型について次の三つを仮定します。

  • point()はポイント加算の単位元を構築する(要は初期化)
  • q+=pでは、ポイントqをポイントqとpの合計にセットする
  • p/nはqのnコピーが追加されたときに合計がpになるポイントqを返す(q*n=pとなるq)

最後のメソッドoperator+=はシーケンシャルのK-平均法アルゴリズムには必要ありません。並列実装で二つのsum_and_countオブジェクトの情報をマージするものです。つまり、最終的に各スレッドの結果を統合して、正しい値を得るために使われます。

11.2.1 ハイパーオブジェクト

コードは二つのハイパーオブジェクトchange,sumを使用します。

changeとsumはcilk_forループの並行反復によって更新されるため、どちらもハイパーオブジェクトです。

  • changeはクラスター割り当ての変更をカウントする
    • 加算の操作をスレッド間で正しく集約するためのレデューサーであるreducer_opaddテンプレートクラスを使って実装される
  • sumは各クラスターの合計ポイントを累積する
    • reducer_opaddの標準機能では完全に表現できないため、カスタム実装が必要

カスタム実装: sumはk個のクラスターを処理するためにk個のsum_and_countオブジェクトを持つローカルビューを持つ必要があるが、 kはランタイム値、つまり、ユーザーが指定してプログラムの実行時に決まるため、レデューサーを構築するには、プログラム実行中にkを記憶してスレッド・ローカル・ビューを構築するときに使用する必要がある。解決方法として、kを記憶するモノイドという事前定義テンプレートcilk::reducerを使うことができる。

リスト11.3に完全な実装を示します。

リスト11.3
CilkPlusにおける配列を要素単位で合計するハイパーオブジェクトの定義
各構造体の名前は変更できます
identityメソッドとreduceメソッドはcillk::reducer<Monoid>で内部的に使用されるため名前を変更できません

template<typename T>
class elementwise_reducer {
    struct View {
        T* array;
        View( size_t k ) : array( new T[k] ) {}
        ~View() {delete[] array;}
    };

    struct Monoid: cilk::monoid_base<View> {
        const size_t k;
        void identity(View* p) const {new(p) View(k);}
        void reduce(View* left, View* right) const {
            left->array[0:k] += right->array[0:k];
        }
        Monoid( size_t k_ ) : k(k_) {}
    };
    cilk::reducer<Monoid> impl;
public:
    elementwise_reducer( size_t k ) : impl(Monoid(k), k) {}
#if __linux__||__APPLE__
    void clear() {[]( T& x) {x.clear();}(impl.view().array[0:impl.monoid().k]);}
#else
    void clear() {impl.view().array[0:impl.monoid().k].clear();}
#endif
    T* get_array() {return impl.view().array;}
    operator sum_and_count*() {return get_array();}
};
レデューサーは、以下の三つで構成されています。

  • ハイパーオブジェクトのビューを実装するView
  • 操作のビューを定義するMonoid
  • パブリック・インターフェイスを提供するelementwise_reducerラッパー

Viewクラスは、各タスクが並行処理で使用するローカルなデータを表します。具体的には、k個のT型(sum_and_count型など)の配列を持ち、その配列が各タスクごとに独立して保持されます。この配列の構築と破棄に対する責任を負います。

Monoidクラスは、ビューの操作を定義します。ビューのメモリを割り当てたりする際はデフォルトのcilk::reducerを用いますが、kをビュー・コンストラクターに渡す必要がある初期化(identityメソッド)では、デフォルトをオーバーライドします。また、二つのビューを合計する(reduceメソッド)方法を指定します。

elementwise_reducerクラスは、Cilk Plusのレデューサーをカプセル化して操作しやすくするラッパーです。実際のレデューサーはimplというメンバー変数に含まれ、パブリックメソッドを通じて操作されます。例えば、clearメソッドは要素単位でビューの各要素にclear()メソッドを適用します。
implのコンストラクターの引数は二つです。最初の引数はMonoidです。二番目の引数は最初に生成されるビューの構築に使われます。

11.3 K-平均法とTBB

TBBのK-平均法アルゴリズムの基本的な構造はCilk Plusと同じです。主な違いは次の通りです。

  • 配列表記文はシリアルループとして記述されます。配列表記はベクトル化のためではなく表記を簡潔にするために使用されていたため、パフォーマンスが損なわれるのではなく、表記が簡潔でなくなるだけです。
  • ハイパーオブジェクトはスレッド・ローカル・ストレージ(TLS)に置換されます。ローカルビュー のグローバルビューへのマージは明示的なループを使用して行われます。
  • 並列ループの反復のタイリング(ブロッキングの意味) は明示的であるため、スレッド・ローカル・ルックアップ(スレッド独自のデータを取得すること)はタイル(ブロックの意味)ごとに一回行えます。

リスト11.4にリスト11.2のsum_and_countのスレッド・ローカル・ビューを保持するtls_type型の宣言を示します。
スレッド・ローカル・ストレージは、スレッド・ローカル・ビューのコレクションを実装するtbb::enumerable_thread_specificテンプレートのインスタンスを使用して実装されます。
tls.local()を呼び出すことで、そのスレッドのローカルなビューを取得します。もし、そのスレッドにまだローカルビューが存在しない場合、新しいビューが自動的に作成されます。

リスト11.4
TBBにおけるスレッドローカルビューのt1s_type型の宣言

class view {
    view( const view& v );            // コピー構築を拒否
    void operator=( const view& v );  // 割り当てを拒否
public:
    sum_and_count* array;
    size_t change;
    view( size_t k ) : array(new sum_and_count[k]), change(0) {}
    ~view() {delete[] array;}
};

typedef tbb::enumerable_thread_specific<view> tls_type;
リスト11.4を詳しく見ていきます。
view( const view& v );            // コピー構築を拒否
void operator=( const view& v );  // 割り当てを拒否
これは、クラスviewのインスタンスがコピーされたり、代入されたりすることを禁止するために宣言されています。これは、各スレッドが独自のローカルビューを保持し、それが誤ってほかのスレッドにコピーされないようにするためです。

view( size_t k ) : array(new sum_and_count[k]), change(0) {}
~view() { delete[] array; }
コンストラクターview(size_t k)は、配列arrayを動的に生成し、そのサイズは引数kに基づいています。配列にはsum_and_count型のオブジェクトが格納され、これによって各クラスターの合計とデータ数を保持します。
change変数は0に初期化されています。
デストラクター~view()は、new演算子で動的に確保したメモリをdelete[]で開放しています。

typedef tbb::enumerable_thread_specific<view> tls_type;
これは、各スレッドが独自のview型のインスタンスを持つことを意味します。また、tbb::enumerable_thread_specific< view > をtls_typeという名前で操作できるようにしています。

新しいビューを作成する方法はenumerable_thread_specificがどのように構築されるかに依存います。以下の三つの方法があります。

  • enumerable_thread_specific<> a;
  • enuerable_thread_specific b(x) ;//xはT型と仮定
  • enumerable_thread_specificc(f);//fはファンクターと仮定

aのローカルビューはデフォルトコンストラクタによって生成されます。
bのローカルビューはT型の変数xのコピーとなります。
cのローカルビューはT(f())を使用して構築されます。ファンクターfが呼び出され、その結果をもとにローカルビューが構築されます。
今回は最後の方法が使用されます。

view(size_t k) : array(new sum_and_count[k]), change(0) {}
このコンストラクタは k という引数を受け取り、sum_and_count 型の配列を k 個生成します。つまり、k によってビューの配列サイズが決定されることになります。
以下のようにtls_typeのインスタンスを宣言します。
tls_type tls([&] { return k; });
ラムダ式 [&] { return k; } は、k という変数の値を返す無名関数です。この無名関数は enumerable_thread_specific のファンクター f として渡されます。
これにより、k を使って各スレッドごとに view 型のローカルビューが初期化されます。

enumerable_thread_specificがビューのSTLコンテナーのように動作するため、スレッドはSTL規則に従ってすべてのビューにアクセスすることができます。ただし、通常は各スレッドが自分のローカルビューにアクセスすることが一般的です。これにより、各スレッドが独自のデータを管理しつつ、必要に応じて他のスレッドのビューにアクセスすることも可能になります。
例えば、リスト11.5はすべてのビューのchangeの合計をグローバルビューに累積してローカル値をリセットします。K-平均法の例は、このコードとリスト11.6のポイントの合計を累積するコードに依存します。
リスト11.5
ローカルビューの変更の検出tls変数はtbb::enumerable_thread_specific<view>です

void reduce_local_counts_to_global_count( tls_type& tls, view& global ) {
    global.change = 0;
    for( auto i=tls.begin(); i!=tls.end(); ++i ) {
        view& v = *i;
        global.change += i->change;
        v.change = 0;
    }
}
リスト11.5は、forループ内で各スレッドのローカルビューchangeの値をグローバルビューに加算し、各ローカルビューのchangeは次の反復に備えてリセットされます。
リスト11.6
ローカルビューからグローバル合計への累積各ローカルビューはK-平均法アルゴリズムの次の反復に備えてクリアされます

void reduce_local_sums_to_global_sum( size_t k, tls_type& tls, view& global ) {
    for( auto i=tls.begin(); i!=tls.end(); ++i ) {
        view& v = *i;
        for( size_t j=0; j<k; ++j ) {
            global.array[j] += v.array[j];
            v.array[j].clear();
        }
    }
}
リスト11.6は、forループ内で各スレッドのローカルビューのarrayをグローバルビューのarrayに加算していき、次の反復に備えてローカルをリセットしています。

リダクションにスレッド・ローカル・ストレージを使用することは、Cilk Plusのレデューサーと比較して二つの制限があります。

  • リダクション操作は可換(commutative)かつ結合(associative)でなければいけない。Cilk Plusのレデューサー・ハイパーオブジェクトは結合性のみ必要。
  • ローカルビューのレデュースにシリアルループを使用すると、スパンが本質的に Ω(P)(Pはスレッド数) であるため、スケーラビリティーのボトルネックになることがある。

後者の制限は、TBBのparallel_reduceテンプレートを使用してローカルビューを並列にレデュースすることで対処できます。ただし、K-平均法クラスタリングでは、ハードウェア・スレッドより多くのポイントがある限り、その分、ローカルビューのリダクション処理をシリアルで実行するオーバーヘッドは相対的に小さくなるため、この制限に対処する必要はありません。


TBBの例では、tbb::parallel_forの明示的なタイル形式も使用されています。以下のパターンを使用して、nポイントすべての反復を並列に行います。

tbb::parallel_for{
    blocked range<size_t> (0, n),
    [...]( tbb: blocked_ range<size_t> r ){
        view& v = tls.local () ;
        for( size_t i=r.begin(); i!=r.end(); ++i){
            ...ポイントiを処理
        }
    }
}
blocked range (0, n)によって、全データポイントをparallel_forがサブ範囲に分割します。 ファンクター部分では、スレッド・ローカル・ストレージを使って、各スレッドがローカルビューを取得し、各スレッドが独立してデータを処理します。

TBBにはCilk Plusの__sec_reduce_min_indのようなリダクション操作がないため、リスト11.7のreduce_min_ind補助ルーチンを使用します。
リスト11.7
指定したポイントに最も近いインデックスを検出するルーチンリスト11.1の25行目の_sec_reduce_min_indと等価なシリアル表現

int reduce_min_ind( const point centroid[], size_t k, point value ) { 
    int min = -1; //最も近いクラスターのインデックスを保持する変数minを初期化(どのクラスターも選ばれていない)
    float mind = std::numeric_limits<float>::max(); //最小距離を保持する変数mindを初期化
    for( int j=0; j<k; ++j ) {
        float d = distance2(centroid[j],value);
        if( d<mind  ) {
            mind = d;
            min = j;
        }
    }
    return min;
}
リスト11.7は、与えられたポイントに最も近いクラスターのインデックスを見つけるための操作を行います。


以上により、K-平均法の残りは簡単に記述でき、TBBコードはリスト11.8のようになる。
TBBによるK-平均法クラスタリング

void compute_k_means( size_t n, const point points[], size_t k, cluster_id id[], point centroid[] ) {

    tls_type tls([&]{return k;}); 
    view global(k);

    // Create initial clusters and compute their sums.
    tbb::parallel_for(
        tbb::blocked_range<size_t>(0,n),
        [=,&tls,&global]( tbb::blocked_range<size_t> r ) {
            view& v = tls.local();
            for( size_t i=r.begin(); i!=r.end(); ++i ) {
                id[i] = i % k;  
                // Peeled "Sum step"
                v.array[id[i]].tally(points[i]);
            }
        }
    );

    // Loop until ids do not change
    size_t change;
    do {
        // Reduce local sums to global sum
        reduce_local_sums_to_global_sum( k, tls, global );

        // Repair any empty clusters
        repair_empty_clusters( n, points, id, k, centroid, global.array );

        // "Divide step": Compute centroids from global sums
        for( size_t j=0; j<k; ++j ) {
            centroid[j] = global.array[j].mean();
            global.array[j].clear();
        }

        // Compute new clusters and their local sums
        tbb::parallel_for(
            tbb::blocked_range<size_t>(0,n),
            [=,&tls,&global]( tbb::blocked_range<size_t> r ) {
                view& v = tls.local();
                for( size_t i=r.begin(); i!=r.end(); ++i ) {
                    // "Reassign step": Find index of centroid closest to points[i]
                    cluster_id j = reduce_min_ind(centroid, k , points[i]); 
                    if( j!=id[i] ) {
                        id[i] = j;
                        ++v.change;
                    }
                    // "Sum step" 
                    v.array[j].tally(points[i]);
                }
            }
        );

        // Reduce local counts to global count
        reduce_local_counts_to_global_count( tls, global );
    } while( global.change!=0 );
}

11.4 まとめ

  • K-平均法アルゴリズムは、収束するまで2つの並列ステップ(中心の計算とポイントの再割り当て)を繰り返すシリアルループである。最初の反復の一部を取り出すことにより、2つの並列ステップをこの(マップとリダクションを融合した)並列走査に置き換えることができる。
  • Cilk Plus実装では、適切な組込みレデューサーが利用できない場合にカスタム・レデューサー・ハイパーオブジェクトを定義してリダクションを実行する方法を説明した。
  • TBB実装では、リダクションが可換かつ結合可能な場合に動作するスレッド・ローカル・ストレージを使用した。