Parallel k-NN Join in Apache Flink
published on Monday, June 22, 2015
시작하며
최근 6개월간은 남는 시간을 거의 Apache Flink에 기여하는 데 보내고 있다. 처음에는 덜 중요한 기능 추가로 기여를 하기 시작해서 근래에는 다양한 컴포넌트에 기여를 하고있다. 예전부터 오픈소스 활동을 하면서 새로이 배운 내용들을 블로그에 잘 정리해 둬야겠다고 생각하고 있었는데, 이제서야 글을 하나 올린다. 이 글은 Flink 기계 학습 라이브러리 flink-ml에 k-Nearest Neighbors Join을 추가하면서 공부한 내용을 정리한 것이다.
이 글에 적은 k-NN, k-NN Join의 정의와 MapReduce 모델에서의 k-NN Join 구현은 Zhang, C.의 논문[1]에서 가져왔다.
k-Nearest Neighbors Join
k-Nearest Neighbors는 특정 데이터와 거리가 가까운 k개의 데이터의 분류를 통해 특정 데이터의 분류를 결정하는 알고리즘이다. 데이터 사이의 거리는 보통 유클리드 거리(Euclidean Distance)가 사용되지만 때에 따라 코사인 거리(Cosine Distance)나 기타 다른 거리 척도가 사용되기도 한다. 거리 척도를 바탕으로 임의의 두 데이터 $r$과 $s$에 대해 두 데이터 사이의 거리를 $d(r, s)$라 하자. 그러면 임의의 데이터 $r$과 데이터 집합 $S$에 대해서 $\text{knn}(r,S)$를 정의할 수 있다.
\begin{equation} \text{knn}(r,S) = \text{set of }k\text{ nearest neighbors of }r\text{ from }S \end{equation}
이미 분류가 알려진 데이터 집합 $S$로 부터 거리가 가까운 k개의 데이터를 찾아낸 후, 새로운 데이터 $r$의 분류를 결정한다. 새로운 데이터 1개가 아닌 데이터 집합에 대해서도 k-NN 알고리즘을 적용할 수 있는데, 그것을 k-NN Join이라 한다.
Join 연산은 일반적으로 서로 다른 두 집합에 대해 어떤 조건이 일치하는 데이터를 조합해 하나의 집합으로 표현한 것을 말한다. k-NN Join은 조합 조건이 k-NN인 경우라고 볼 수 있다. 다시 말해, 두 집합 $R$, $S$에 대해서 k-NN Join은 $R$의 모든 데이터 각각에 대해 $S$에서의 k-NN을 계산하는 것을 말한다. 이를 바탕으로 k-NN Join을 아래와 같이 정의할 수 있다.
\begin{equation} \text{knnJ}(R, S) = \{(r, \text{knn}(r,S))\mid\text{for all}\ r \in R \} \end{equation}
기본적인 k-NN Join은 $O(d|S||R|)$의 시간 복잡도를 갖는데, 시간 복잡도를 개선하기 위해 R-Tree, KD-Tree 등의 자료구조를 사용하거나, 병렬 처리를 수행하거나, 결과의 정확도를 약간 희생하는 근사 알고리즘 등으로 시간 복잡도를 개선할 수 있다. 여기서는 병렬 처리를 수행하는 부분에 대해서 보다 자세하게 알아볼 것이다.
Parallel k-Nearest Neighbors Join in MapReduce
MapReduce 모델에서 Join 연산은 대부분 shuffle-reduce 단계에서 수행되는데 k-NN Join역시 마찬가지로 shuffle-reduce를 수행하면서 k-NN Join을 수행한다. MapReduce 모델에서의 k-NN Join 알고리즘을 개략적으로 기술하면 아래와 같다.
Phase 1
-
데이터 집합 $R$과 $S$를 둘 다 $n$개의 집합으로 나눈다. 이를 각각 $R_1,R_2,\cdots,R_n$과 $S_1,S_2,\cdots,S_n$라 하자.
-
나누어진 데이터 $n$개의 집합 $R_1,R_2,\cdots,R_n$과 $S_1,S_2,\cdots,S_n$을 가지고 cross product 연산을 수행한다. 연산의 결과로 $(R_1,S_1),(R_1,S_2),\cdots,(R_1,S_n),(R_2,S_1),\cdots,(R_n,S_n)$, 총 $n^2$개의 조합을 갖게 된다.
-
Hadoop MapReduce는 보통의 경우 입력을 1개만 받을 수 있어서 cross product 연산을 구현하기 위해서 특별한 구현이 필요하다. (MapReduce-based Join 구현을 응용하거나, 또는 직접 HDFS API로 파일 열어서 Block Offset으로 파일을 읽어들이거나 하는 식의 구현을 사용)
-
-
각각의 조합 $(R_i,S_j)$에 대해, $(r,s,d(r,s))$를 계산한다. ($r\in R_i, s\in S_j$)
Phase 2
-
Phase 1의 결과를 $r$을 기준으로 묶은 후, 각각의 묶음에서 $d(r,s)$를 기준으로 정렬(local sort)한다.
-
Grouping 연산은 shuffle-reduce 단계를 통해 수행할 수 있다.
-
정렬 대신에 Priority Queue 같은 자료구조를 사용할 수도 있다.
-
-
정렬된 레코드들에서 상위 $k$의 데이터를 선택해 $\text{knnJ}(R,S)$에 포함시킨다.
Parallel k-Neareset Neighbors Join in Apache Flink
위의 알고리즘을 Apache Flink에서 제공하는 기능들을 바탕으로 간단하게 k-NN Join을 구현해 보았다.
import org.apache.flink.api.scala._
import org.apache.flink.ml.common._
import org.apache.flink.ml.math.Vector
import org.apache.flink.metrics.distances.EuclideanDistanceMetric
import org.apache.flink.util.Collector
import org.apache.flink.api.java.utils.DataSetUtils
val env = ExecutionEnvironment.getEnvironment
val R: DataSet[Vector] = env.fromElements(...)
val S: DataSet[Vector] = env.fromElements(...)
val partitioner = FlinkMLTools.ModuloKeyPartitioner
val k = 10 // 이웃의 수 k
val n = 10 // 병렬 처리를 위한 분할 집합의 수 n
// 두 Vector의 거리 측정을 위한 척도
val metric = EuclideanDistanceMetric()
// R을 n개의 집합으로 나눈다
// Vector 데이터 타입은 비교가 안되기 때문에 groupBy 연산을 수행할 수 없다.
// 그렇기 때문에 R에 속한 각각의 데이터에 고유한 번호를 붙인다
val RWithIndex = DataSetUtils.zipWithIndex(R)
val RBlocks = FlinkMLTools.block(RWithIndex, n, Some(partitioner))
// S를 n개의 집합으로 나눈다
val SBlocks = FlinkMLTools.block(S, n, Some(partitioner))
// 두 집합에 대해서 cross product을 계산
val crossed = SBlocks.cross(RBlocks).mapPartition {
(iter, out: Collector[(Vector, Vector, Long, Double)]) => {
for ((SetS, SetR) <- iter) {
for (r <- SetR.values; s <- SetS.values) {
// Phase 1 결과 계산 (S vector, R key, R vector, distance)
out.collect(s, r._1, r._2, metric.distance(s, r._2))
}
}
}
}
// 입력 Vector Key 기준으로 묶은 뒤 각 묶음 내에서 거리 순으로 정렬
val result = crossed.groupBy(1).sortGroup(3, Order.ASCENDING).reduceGroup {
(iter, out: Collector[Vector, Array[Vector]]) => {
if (iter.hasNext) {
val head = iter.next()
val key = head._2
val neighbors: ArrayBuffer[Vector] = ArrayBuffer(head._1)
// 거리 순으로 정렬된 것에서 k개 추출
for ((vector, _, _, _) <- iter.take(k - 1)) {
neighbors += vector
}
out.collect(key, neighbors.toArray)
}
}
}