clean up WalLogAggregateProcess.

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/5122e98a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/5122e98a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/5122e98a

Branch: refs/heads/master
Commit: 5122e98adfe89d60d50e8c6468ebd868c2951b88
Parents: a2e10fb
Author: DO YUNG YOON <steams...@apache.org>
Authored: Mon Aug 13 12:43:32 2018 +0900
Committer: DO YUNG YOON <steams...@apache.org>
Committed: Mon Aug 13 17:59:40 2018 +0900

----------------------------------------------------------------------
 .../org/apache/s2graph/s2jobs/wal/WalLog.scala  |   4 +-
 .../wal/process/FeatureIndexProcess.scala       | 146 ++++++++++
 .../wal/process/WalLogAggregateProcess.scala    |  25 +-
 .../wal/process/params/AggregateParam.scala     |  22 ++
 .../wal/process/params/FeatureIndexParam.scala  |  20 ++
 .../wal/transformer/DefaultTransformer.scala    |   3 +
 .../s2jobs/wal/transformer/ExtractDomain.scala  |  20 ++
 .../wal/transformer/ExtractServiceName.scala    |  17 ++
 .../s2jobs/wal/transformer/Transformer.scala    |  15 +
 .../s2jobs/wal/udafs/S2EdgeDataAggregate.scala  | 290 -------------------
 .../s2graph/s2jobs/wal/udafs/WalLogUDAF.scala   | 290 +++++++++++++++++++
 .../s2graph/s2jobs/wal/udfs/WalLogUDF.scala     | 216 ++++++++++++++
 .../s2graph/s2jobs/wal/utils/UrlUtils.scala     |  57 ++++
 .../s2graph/s2jobs/wal/TransformerTest.scala    |  26 ++
 .../process/WalLogAggregateProcessTest.scala    |   9 +-
 15 files changed, 842 insertions(+), 318 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
index dc628ec..f4a670b 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
@@ -1,7 +1,7 @@
 package org.apache.s2graph.s2jobs.wal
 
 import org.apache.s2graph.core.JSONParser
-import org.apache.s2graph.s2jobs.wal.process.AggregateParam
+import org.apache.s2graph.s2jobs.wal.process.params.AggregateParam
 import org.apache.s2graph.s2jobs.wal.utils.BoundedPriorityQueue
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.{LongType, StringType, StructField, 
StructType}
@@ -36,6 +36,8 @@ object WalLogAgg {
   }
 }
 
+case class DimVal(dim: String, value: String)
+
 case class WalLogAgg(from: String,
                      logs: Seq[WalLog],
                      maxTs: Long,

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala
new file mode 100644
index 0000000..7d04334
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala
@@ -0,0 +1,146 @@
+package org.apache.s2graph.s2jobs.wal.process
+
+import com.google.common.hash.Hashing
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.process.params.FeatureIndexParam
+import org.apache.s2graph.s2jobs.wal.transformer._
+import org.apache.s2graph.s2jobs.wal.udfs.WalLogUDF
+import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+import org.apache.spark.sql.functions._
+import play.api.libs.json.{JsObject, Json}
+
+import scala.collection.mutable
+
+object FeatureIndexProcess {
+  def extractDimValues(transformers: Seq[Transformer]) = {
+    udf((rows: Seq[Row]) => {
+      val logs = rows.map(WalLog.fromRow)
+      // TODO: this can be changed into Map to count how many times each 
dimVal exist in sequence of walLog
+      // then change this to mutable.Map.empty[DimVal, Int], then aggregate.
+      val distinctDimValues = mutable.Set.empty[DimVal]
+
+      logs.foreach { walLog =>
+        walLog.propsKeyValues.foreach { case (propsKey, propsValue) =>
+          transformers.foreach { transformer =>
+            transformer.toDimValLs(walLog, propsKey, 
propsValue).foreach(distinctDimValues += _)
+          }
+        }
+      }
+
+      distinctDimValues.toSeq
+    })
+  }
+
+  def buildDictionary(ss: SparkSession,
+                      allDimVals: DataFrame,
+                      param: FeatureIndexParam,
+                      dimValColumnName: String = "dimVal"): DataFrame = {
+    import ss.implicits._
+
+    val rawFeatures = allDimVals
+      .select(col(param._countColumnName), 
col(s"$dimValColumnName.dim").as("dim"), 
col(s"$dimValColumnName.value").as("value"))
+      .groupBy("dim", "value")
+      .agg(countDistinct(param._countColumnName).as("count"))
+      .filter(s"count > ${param._minUserCount}")
+
+    val ds: Dataset[((String, Long), String)] =
+      rawFeatures.select("dim", "value", "count").as[(String, String, Long)]
+        .map { case (dim, value, uv) =>
+          (dim, uv) -> value
+        }
+
+
+    implicit val ord = Ordering.Tuple2(Ordering.String, Ordering.Long.reverse)
+
+    val rdd: RDD[(Long, (String, Long), String)] = WalLogUDF.appendRank(ds, 
param.numOfPartitions, param.samplePointsPerPartitionHint)
+
+    rdd.toDF("rank", "dim_count", "value")
+      .withColumn("dim", col("dim_count._1"))
+      .withColumn("count", col("dim_count._2"))
+      .select("dim", "value", "count", "rank")
+  }
+
+  def toFeatureHash(dim: String, value: String): Long = {
+    Hashing.murmur3_128().hashBytes(s"$dim:$value".getBytes("UTF-8")).asLong()
+  }
+
+  def collectDistinctFeatureHashes(ss: SparkSession,
+                                   filteredDict: DataFrame): Array[Long] = {
+    import ss.implicits._
+
+    val featureHashUDF = udf((dim: String, value: String) => 
toFeatureHash(dim, value))
+
+    filteredDict.withColumn("featureHash", featureHashUDF(col("dim"), 
col("value")))
+      .select("featureHash")
+      .distinct().as[Long].collect()
+  }
+
+  def filterTopKsPerDim(dict: DataFrame,
+                        maxRankPerDim: Broadcast[Map[String, Int]],
+                        defaultMaxRank: Int): DataFrame = {
+    val filterUDF = udf((dim: String, rank: Long) => {
+      rank < maxRankPerDim.value.getOrElse(dim, defaultMaxRank)
+    })
+
+    dict.filter(filterUDF(col("dim"), col("rank")))
+  }
+}
+
+case class FeatureIndexProcess(taskConf: TaskConf) extends 
org.apache.s2graph.s2jobs.task.Process(taskConf) {
+
+  import FeatureIndexProcess._
+
+  override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): 
DataFrame = {
+    val countColumnName = taskConf.options.getOrElse("countColumnName", "from")
+    val numOfPartitions = taskConf.options.get("numOfPartitions").map(_.toInt)
+    val samplePointsPerPartitionHint = 
taskConf.options.get("samplePointsPerPartitionHint").map(_.toInt)
+    val minUserCount = taskConf.options.get("minUserCount").map(_.toLong)
+    val maxRankPerDim = taskConf.options.get("maxRankPerDim").map { s =>
+      val json = Json.parse(s).as[JsObject]
+      json.fieldSet.map { case (key, jsValue) => key -> jsValue.as[Int] }.toMap
+    }
+    val defaultMaxRank = taskConf.options.get("defaultMaxRank").map(_.toInt)
+    val dictPath = taskConf.options.get("dictPath")
+
+    numOfPartitions.map { d => 
ss.sqlContext.setConf("spark.sql.shuffle.partitions", d.toString) }
+
+    //    val maxRankPerDimBCast = 
ss.sparkContext.broadcast(maxRankPerDim.getOrElse(Map.empty))
+
+    val param = FeatureIndexParam(minUserCount = minUserCount, countColumnName 
= Option(countColumnName),
+      numOfPartitions = numOfPartitions, samplePointsPerPartitionHint = 
samplePointsPerPartitionHint,
+      maxRankPerDim = maxRankPerDim, defaultMaxRank = defaultMaxRank, dictPath 
= dictPath
+    )
+
+    val edges = taskConf.inputs.tail.foldLeft(inputMap(taskConf.inputs.head)) 
{ case (prev, cur) =>
+      prev.union(inputMap(cur))
+    }
+
+    //TODO: user expect to inject transformers that transform (WalLog, 
propertyKey, propertyValue) to Seq[DimVal].
+    val transformers = Seq(DefaultTransformer())
+    val dimValExtractUDF = extractDimValues(transformers)
+    val dimValColumnName = "dimVal"
+
+    val rawFeatures = edges
+      .withColumn(dimValColumnName, explode(dimValExtractUDF(col("logs"))))
+
+    val dict = buildDictionary(ss, rawFeatures, param, dimValColumnName)
+
+    dict
+    //TODO: filter topKs per dim, then build valid dimValLs.
+    // then broadcast valid dimValLs to original dataframe, and filter out not 
valid dimVal.
+
+    //    dictPath.foreach { path => 
dict.write.mode(SaveMode.Overwrite).parquet(path) }
+    //
+    //    val filteredDict = filterTopKsPerDim(dict, maxRankPerDimBCast, 
defaultMaxRank.getOrElse(Int.MaxValue))
+    //    val distinctFeatureHashes = collectDistinctFeatureHashes(ss, 
filteredDict)
+    //    val distinctFeatureHashesBCast = 
ss.sparkContext.broadcast(distinctFeatureHashes)
+
+    //    filteredDict
+  }
+
+
+  override def mandatoryOptions: Set[String] = Set.empty
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
index 0c9829c..2b42e20 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
@@ -1,29 +1,10 @@
 package org.apache.s2graph.s2jobs.wal.process
 
 import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.process.params.AggregateParam
 import org.apache.s2graph.s2jobs.wal.{WalLog, WalLogAgg}
 import org.apache.spark.sql._
 
-object AggregateParam {
-  val defaultGroupByKeys = Seq("from")
-  val defaultTopK = 1000
-  val defaultIsArrayType = false
-  val defaultShouldSortTopItems = true
-}
-
-case class AggregateParam(groupByKeys: Option[Seq[String]],
-                          topK: Option[Int],
-                          isArrayType: Option[Boolean],
-                          shouldSortTopItems: Option[Boolean]) {
-
-  import AggregateParam._
-
-  val groupByColumns = groupByKeys.getOrElse(defaultGroupByKeys)
-  val heapSize = topK.getOrElse(defaultTopK)
-  val arrayType = isArrayType.getOrElse(defaultIsArrayType)
-  val sortTopItems = shouldSortTopItems.getOrElse(defaultShouldSortTopItems)
-}
-
 object WalLogAggregateProcess {
   def aggregate(ss: SparkSession,
                 dataset: Dataset[WalLogAgg],
@@ -64,8 +45,8 @@ class WalLogAggregateProcess(taskConf: TaskConf) extends 
org.apache.s2graph.s2jo
     val maxNumOfEdges = 
taskConf.options.get("maxNumOfEdges").map(_.toInt).getOrElse(1000)
     val arrayType = 
taskConf.options.get("arrayType").map(_.toBoolean).getOrElse(false)
     val sortTopItems = 
taskConf.options.get("sortTopItems").map(_.toBoolean).getOrElse(false)
-
-    taskConf.options.get("parallelism").foreach(d => 
ss.sqlContext.setConf("spark.sql.shuffle.partitions", d))
+    val numOfPartitions = taskConf.options.get("numOfPartitions")
+    numOfPartitions.foreach(d => 
ss.sqlContext.setConf("spark.sql.shuffle.partitions", d))
 
     implicit val ord = WalLog.orderByTsAsc
     val walLogs = 
taskConf.inputs.tail.foldLeft(inputMap(taskConf.inputs.head)) { case (prev, 
cur) =>

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
new file mode 100644
index 0000000..89c36f4
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
@@ -0,0 +1,22 @@
+package org.apache.s2graph.s2jobs.wal.process.params
+
+
+object AggregateParam {
+  val defaultGroupByKeys = Seq("from")
+  val defaultTopK = 1000
+  val defaultIsArrayType = false
+  val defaultShouldSortTopItems = true
+}
+
+case class AggregateParam(groupByKeys: Option[Seq[String]],
+                          topK: Option[Int],
+                          isArrayType: Option[Boolean],
+                          shouldSortTopItems: Option[Boolean]) {
+
+  import AggregateParam._
+
+  val groupByColumns = groupByKeys.getOrElse(defaultGroupByKeys)
+  val heapSize = topK.getOrElse(defaultTopK)
+  val arrayType = isArrayType.getOrElse(defaultIsArrayType)
+  val sortTopItems = shouldSortTopItems.getOrElse(defaultShouldSortTopItems)
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala
new file mode 100644
index 0000000..ddf7037
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala
@@ -0,0 +1,20 @@
+package org.apache.s2graph.s2jobs.wal.process.params
+
+object FeatureIndexParam {
+  val defaultMinUserCount = 0L
+  val defaultCountColumnName = "from"
+}
+
+case class FeatureIndexParam(minUserCount: Option[Long],
+                             countColumnName: Option[String],
+                             samplePointsPerPartitionHint: Option[Int],
+                             numOfPartitions: Option[Int],
+                             maxRankPerDim: Option[Map[String, Int]],
+                             defaultMaxRank: Option[Int],
+                             dictPath: Option[String]) {
+  import FeatureIndexParam._
+  val _countColumnName = countColumnName.getOrElse(defaultCountColumnName)
+  val _minUserCount = minUserCount.getOrElse(defaultMinUserCount)
+  val _maxRankPerDim = maxRankPerDim.getOrElse(Map.empty)
+  val _defaultMaxRank = defaultMaxRank.getOrElse(Int.MaxValue)
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/DefaultTransformer.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/DefaultTransformer.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/DefaultTransformer.scala
new file mode 100644
index 0000000..2839d6a
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/DefaultTransformer.scala
@@ -0,0 +1,3 @@
+package org.apache.s2graph.s2jobs.wal.transformer
+
+case class DefaultTransformer() extends Transformer

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractDomain.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractDomain.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractDomain.scala
new file mode 100644
index 0000000..9b09d71
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractDomain.scala
@@ -0,0 +1,20 @@
+package org.apache.s2graph.s2jobs.wal.transformer
+
+import org.apache.s2graph.s2jobs.wal.utils.UrlUtils
+import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog}
+
+case class ExtractDomain(urlDimensions: Set[String],
+                         hostDimName: String = "host",
+                         domainDimName: String = "domain",
+                         keywordDimName: String = "uri_keywords") extends 
Transformer {
+  override def toDimValLs(walLog: WalLog, propertyKey: String, propertyValue: 
String): Seq[DimVal] = {
+    if (!urlDimensions(propertyKey)) Nil
+    else {
+      val (_, domains, kwdOpt) = UrlUtils.extract(propertyValue)
+
+      domains.headOption.toSeq.map(DimVal(hostDimName, _)) ++
+        domains.map(DimVal(domainDimName, _)) ++
+        kwdOpt.toSeq.map(DimVal(keywordDimName, _))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractServiceName.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractServiceName.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractServiceName.scala
new file mode 100644
index 0000000..7ad7dfd
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractServiceName.scala
@@ -0,0 +1,17 @@
+package org.apache.s2graph.s2jobs.wal.transformer
+
+import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog}
+
+class ExtractServiceName(serviceDims: Set[String],
+                         domainServiceMap: Map[String, String] = Map.empty,
+                         serviceDimName: String = "serviceName") extends 
Transformer {
+  override def toDimValLs(walLog: WalLog, propertyKey: String, propertyValue: 
String): Seq[DimVal] = {
+    if (!serviceDims(propertyKey)) Nil
+    else {
+      val serviceName = domainServiceMap.getOrElse(propertyValue, 
propertyValue)
+
+      Seq(DimVal(serviceDimName, serviceName))
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/Transformer.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/Transformer.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/Transformer.scala
new file mode 100644
index 0000000..523f58d
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/Transformer.scala
@@ -0,0 +1,15 @@
+package org.apache.s2graph.s2jobs.wal.transformer
+
+import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog}
+
+/**
+  * decide how to transform walLog's each property key value to Seq[DimVal]
+  */
+trait Transformer {
+  def toDimValLs(walLog: WalLog, propertyKey: String, propertyValue: String): 
Seq[DimVal] = {
+    val dim = s"${walLog.label}:${propertyKey}"
+    val value = propertyValue
+
+    Seq(DimVal(dim, value))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala
deleted file mode 100644
index dabfd99..0000000
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala
+++ /dev/null
@@ -1,290 +0,0 @@
-package org.apache.s2graph.s2jobs.wal.udafs
-
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.expressions.GenericRow
-import org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
UserDefinedAggregateFunction}
-import org.apache.spark.sql.types._
-
-import scala.annotation.tailrec
-import scala.collection.mutable
-
-object S2EdgeDataAggregate {
-  type Element = (Long, String, String, String)
-
-  val emptyRow = new GenericRow(Array(-1L, "empty", "empty", "empty"))
-
-  val elementOrd = Ordering.by[Element, Long](_._1)
-
-  val rowOrdering = new Ordering[Row] {
-    override def compare(x: Row, y: Row): Int = {
-      x.getAs[Long](0).compareTo(y.getAs[Long](0))
-    }
-  }
-
-  val rowOrderingDesc = new Ordering[Row] {
-    override def compare(x: Row, y: Row): Int = {
-      -x.getAs[Long](0).compareTo(y.getAs[Long](0))
-    }
-  }
-
-  val fields = Seq(
-    StructField(name = "timestamp", LongType),
-    StructField(name = "to", StringType),
-    StructField(name = "label", StringType),
-    StructField(name = "props", StringType)
-  )
-
-  val arrayType = ArrayType(elementType = StructType(fields))
-
-  def apply(maxNumOfEdges: Int = 1000): GroupByAggOptimized = {
-    new GroupByAggOptimized(maxNumOfEdges)
-  }
-
-  def swap[T](array: mutable.Seq[T], i: Int, j: Int) = {
-    val tmp = array(i)
-    array(i) = array(j)
-    array(j) = tmp
-  }
-
-  @tailrec
-  def percolateDown[T](array: mutable.Seq[T], idx: Int)(implicit ordering: 
Ordering[T]): Unit = {
-    val left = 2 * idx + 1
-    val right = 2 * idx + 2
-    var smallest = idx
-
-    if (left < array.size && ordering.compare(array(left), array(smallest)) < 
0) {
-      smallest = left
-    }
-
-    if (right < array.size && ordering.compare(array(right), array(smallest)) 
< 0) {
-      smallest = right
-    }
-
-    if (smallest != idx) {
-      swap(array, idx, smallest)
-      percolateDown(array, smallest)
-    }
-  }
-
-  def percolateUp[T](array: mutable.Seq[T],
-                     idx: Int)(implicit ordering: Ordering[T]): Unit = {
-    var pos = idx
-    var parent = (pos - 1) / 2
-    while (parent >= 0 && ordering.compare(array(pos), array(parent)) < 0) {
-      // swap pos and parent, since a[parent] > array[pos]
-      swap(array, parent, pos)
-      pos = parent
-      parent = (pos - 1) / 2
-    }
-  }
-
-  def addToTopK[T](array: mutable.Seq[T],
-                   size: Int,
-                   newData: T)(implicit ordering: Ordering[T]): mutable.Seq[T] 
= {
-    // use array as minHeap to keep track of topK.
-    // parent = (i -1) / 2
-    // left child = 2 * i + 1
-    // right chiud = 2  * i + 2
-
-    // check if array is already full.
-    if (array.size >= size) {
-      // compare newData to min. newData < array(0)
-      val currentMin = array(0)
-      if (ordering.compare(newData, currentMin) < 0) {
-        // drop newData
-      } else {
-        // delete min
-        array(0) = newData
-        // percolate down
-        percolateDown(array, 0)
-      }
-      array
-    } else {
-      // append new element into seqeunce since there are room left.
-      val newArray = array :+ newData
-      val idx = newArray.size - 1
-      // percolate up last element
-      percolateUp(newArray, idx)
-      newArray
-    }
-  }
-
-  def mergeTwoSeq[T](prev: Seq[T], cur: Seq[T], size: Int)(implicit ordering: 
Ordering[T]): Seq[T] = {
-    import scala.collection.mutable
-    val (n, m) = (cur.size, prev.size)
-
-    var (i, j) = (0, 0)
-    var idx = 0
-    val arr = new mutable.ArrayBuffer[T](size)
-
-    while (idx < size && i < n && j < m) {
-      if (ordering.compare(cur(i), prev(j)) < 0) {
-        arr += cur(i)
-        i += 1
-      } else {
-        arr += prev(j)
-        j += 1
-      }
-      idx += 1
-    }
-    while (idx < size && i < n) {
-      arr += cur(i)
-      i += 1
-    }
-    while (idx < size && j < m) {
-      arr += prev(j)
-      j += 1
-    }
-
-    arr
-  }
-}
-
-class GroupByAggOptimized(maxNumOfEdges: Int = 1000) extends 
UserDefinedAggregateFunction {
-
-  import S2EdgeDataAggregate._
-
-  implicit val ord = rowOrdering
-
-  val arrayType = ArrayType(elementType = StructType(fields))
-
-  type ROWS = mutable.Seq[Row]
-
-  override def inputSchema: StructType = StructType(fields)
-
-  override def bufferSchema: StructType = StructType(Seq(
-    StructField(name = "edges", dataType = arrayType)
-  ))
-
-  override def dataType: DataType = arrayType
-
-  override def deterministic: Boolean = true
-
-  override def initialize(buffer: MutableAggregationBuffer): Unit = {
-    buffer.update(0, mutable.ArrayBuffer.empty[Row])
-  }
-
-  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
-    val prev = buffer.getAs[ROWS](0)
-
-    val updated = addToTopK(prev, maxNumOfEdges, input)
-
-    buffer.update(0, updated)
-  }
-
-  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
-    var prev = buffer1.getAs[ROWS](0)
-    val cur = buffer2.getAs[ROWS](0)
-
-    cur.filter(_ != null).foreach { row =>
-      prev = addToTopK(prev, maxNumOfEdges, row)
-    }
-
-    buffer1.update(0, prev)
-  }
-
-  override def evaluate(buffer: Row): Any = {
-    val ls = buffer.getAs[ROWS](0)
-    takeTopK(ls, maxNumOfEdges)
-  }
-
-  private def takeTopK(ls: Seq[Row], k: Int) = {
-    val sorted = ls.sorted
-    if (sorted.size <= k) sorted else sorted.take(k)
-  }
-}
-
-class GroupByAgg(maxNumOfEdges: Int = 1000) extends 
UserDefinedAggregateFunction {
-  import S2EdgeDataAggregate._
-
-  implicit val ord = rowOrderingDesc
-
-  val arrayType = ArrayType(elementType = StructType(fields))
-
-  override def inputSchema: StructType = StructType(fields)
-
-  override def bufferSchema: StructType = StructType(Seq(
-    StructField(name = "edges", dataType = arrayType),
-    StructField(name = "buffered", dataType = BooleanType)
-  ))
-
-  override def dataType: DataType = arrayType
-
-  override def deterministic: Boolean = true
-
-  override def initialize(buffer: MutableAggregationBuffer): Unit = {
-    buffer.update(0, scala.collection.mutable.ListBuffer.empty[Element])
-  }
-
-  /* not optimized */
-  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
-    val element = input
-
-    val prev = buffer.getAs[Seq[Row]](0)
-    val appended = prev :+ element
-
-    buffer.update(0, appended)
-    buffer.update(1, false)
-  }
-
-  private def takeTopK(ls: Seq[Row], k: Int) = {
-    val sorted = ls.sorted
-    if (sorted.size <= k) sorted else sorted.take(k)
-  }
-  /* not optimized */
-  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
-    val cur = buffer2.getAs[Seq[Row]](0)
-    val prev = buffer1.getAs[Seq[Row]](0)
-
-    buffer1.update(0, takeTopK(prev ++ cur, maxNumOfEdges))
-    buffer1.update(1, true)
-  }
-
-  override def evaluate(buffer: Row): Any = {
-    val ls = buffer.getAs[Seq[Row]](0)
-    val buffered = buffer.getAs[Boolean](1)
-    if (buffered) ls
-    else takeTopK(ls, maxNumOfEdges)
-  }
-}
-
-class GroupByArrayAgg(maxNumOfEdges: Int = 1000) extends 
UserDefinedAggregateFunction {
-  import S2EdgeDataAggregate._
-
-  implicit val ord = rowOrdering
-
-  import scala.collection.mutable
-
-  override def inputSchema: StructType = StructType(Seq(
-    StructField(name = "edges", dataType = arrayType)
-  ))
-
-  override def bufferSchema: StructType = StructType(Seq(
-    StructField(name = "edges", dataType = arrayType)
-  ))
-
-  override def dataType: DataType = arrayType
-
-  override def deterministic: Boolean = true
-
-  override def initialize(buffer: MutableAggregationBuffer): Unit =
-    buffer.update(0, mutable.ListBuffer.empty[Row])
-
-  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
-    val cur = input.getAs[Seq[Row]](0)
-    val prev = buffer.getAs[Seq[Row]](0)
-    val merged = mergeTwoSeq(cur, prev, maxNumOfEdges)
-
-    buffer.update(0, merged)
-  }
-
-  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
-    val cur = buffer2.getAs[Seq[Row]](0)
-    val prev = buffer1.getAs[Seq[Row]](0)
-
-    val merged = mergeTwoSeq(cur, prev, maxNumOfEdges)
-    buffer1.update(0, merged)
-  }
-
-  override def evaluate(buffer: Row): Any = buffer.getAs[Seq[Row]](0)
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAF.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAF.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAF.scala
new file mode 100644
index 0000000..81a1356
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAF.scala
@@ -0,0 +1,290 @@
+package org.apache.s2graph.s2jobs.wal.udafs
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRow
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
UserDefinedAggregateFunction}
+import org.apache.spark.sql.types._
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+
+object WalLogUDAF {
+  type Element = (Long, String, String, String)
+
+  val emptyRow = new GenericRow(Array(-1L, "empty", "empty", "empty"))
+
+  val elementOrd = Ordering.by[Element, Long](_._1)
+
+  val rowOrdering = new Ordering[Row] {
+    override def compare(x: Row, y: Row): Int = {
+      x.getAs[Long](0).compareTo(y.getAs[Long](0))
+    }
+  }
+
+  val rowOrderingDesc = new Ordering[Row] {
+    override def compare(x: Row, y: Row): Int = {
+      -x.getAs[Long](0).compareTo(y.getAs[Long](0))
+    }
+  }
+
+  val fields = Seq(
+    StructField(name = "timestamp", LongType),
+    StructField(name = "to", StringType),
+    StructField(name = "label", StringType),
+    StructField(name = "props", StringType)
+  )
+
+  val arrayType = ArrayType(elementType = StructType(fields))
+
+  def apply(maxNumOfEdges: Int = 1000): GroupByAggOptimized = {
+    new GroupByAggOptimized(maxNumOfEdges)
+  }
+
+  def swap[T](array: mutable.Seq[T], i: Int, j: Int) = {
+    val tmp = array(i)
+    array(i) = array(j)
+    array(j) = tmp
+  }
+
+  @tailrec
+  def percolateDown[T](array: mutable.Seq[T], idx: Int)(implicit ordering: 
Ordering[T]): Unit = {
+    val left = 2 * idx + 1
+    val right = 2 * idx + 2
+    var smallest = idx
+
+    if (left < array.size && ordering.compare(array(left), array(smallest)) < 
0) {
+      smallest = left
+    }
+
+    if (right < array.size && ordering.compare(array(right), array(smallest)) 
< 0) {
+      smallest = right
+    }
+
+    if (smallest != idx) {
+      swap(array, idx, smallest)
+      percolateDown(array, smallest)
+    }
+  }
+
+  def percolateUp[T](array: mutable.Seq[T],
+                     idx: Int)(implicit ordering: Ordering[T]): Unit = {
+    var pos = idx
+    var parent = (pos - 1) / 2
+    while (parent >= 0 && ordering.compare(array(pos), array(parent)) < 0) {
+      // swap pos and parent, since a[parent] > array[pos]
+      swap(array, parent, pos)
+      pos = parent
+      parent = (pos - 1) / 2
+    }
+  }
+
+  def addToTopK[T](array: mutable.Seq[T],
+                   size: Int,
+                   newData: T)(implicit ordering: Ordering[T]): mutable.Seq[T] 
= {
+    // use array as minHeap to keep track of topK.
+    // parent = (i -1) / 2
+    // left child = 2 * i + 1
+    // right chiud = 2  * i + 2
+
+    // check if array is already full.
+    if (array.size >= size) {
+      // compare newData to min. newData < array(0)
+      val currentMin = array(0)
+      if (ordering.compare(newData, currentMin) < 0) {
+        // drop newData
+      } else {
+        // delete min
+        array(0) = newData
+        // percolate down
+        percolateDown(array, 0)
+      }
+      array
+    } else {
+      // append new element into seqeunce since there are room left.
+      val newArray = array :+ newData
+      val idx = newArray.size - 1
+      // percolate up last element
+      percolateUp(newArray, idx)
+      newArray
+    }
+  }
+
+  def mergeTwoSeq[T](prev: Seq[T], cur: Seq[T], size: Int)(implicit ordering: 
Ordering[T]): Seq[T] = {
+    import scala.collection.mutable
+    val (n, m) = (cur.size, prev.size)
+
+    var (i, j) = (0, 0)
+    var idx = 0
+    val arr = new mutable.ArrayBuffer[T](size)
+
+    while (idx < size && i < n && j < m) {
+      if (ordering.compare(cur(i), prev(j)) < 0) {
+        arr += cur(i)
+        i += 1
+      } else {
+        arr += prev(j)
+        j += 1
+      }
+      idx += 1
+    }
+    while (idx < size && i < n) {
+      arr += cur(i)
+      i += 1
+    }
+    while (idx < size && j < m) {
+      arr += prev(j)
+      j += 1
+    }
+
+    arr
+  }
+}
+
+class GroupByAggOptimized(maxNumOfEdges: Int = 1000) extends 
UserDefinedAggregateFunction {
+
+  import WalLogUDAF._
+
+  implicit val ord = rowOrdering
+
+  val arrayType = ArrayType(elementType = StructType(fields))
+
+  type ROWS = mutable.Seq[Row]
+
+  override def inputSchema: StructType = StructType(fields)
+
+  override def bufferSchema: StructType = StructType(Seq(
+    StructField(name = "edges", dataType = arrayType)
+  ))
+
+  override def dataType: DataType = arrayType
+
+  override def deterministic: Boolean = true
+
+  override def initialize(buffer: MutableAggregationBuffer): Unit = {
+    buffer.update(0, mutable.ArrayBuffer.empty[Row])
+  }
+
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    val prev = buffer.getAs[ROWS](0)
+
+    val updated = addToTopK(prev, maxNumOfEdges, input)
+
+    buffer.update(0, updated)
+  }
+
+  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+    var prev = buffer1.getAs[ROWS](0)
+    val cur = buffer2.getAs[ROWS](0)
+
+    cur.filter(_ != null).foreach { row =>
+      prev = addToTopK(prev, maxNumOfEdges, row)
+    }
+
+    buffer1.update(0, prev)
+  }
+
+  override def evaluate(buffer: Row): Any = {
+    val ls = buffer.getAs[ROWS](0)
+    takeTopK(ls, maxNumOfEdges)
+  }
+
+  private def takeTopK(ls: Seq[Row], k: Int) = {
+    val sorted = ls.sorted
+    if (sorted.size <= k) sorted else sorted.take(k)
+  }
+}
+
+class GroupByAgg(maxNumOfEdges: Int = 1000) extends 
UserDefinedAggregateFunction {
+  import WalLogUDAF._
+
+  implicit val ord = rowOrderingDesc
+
+  val arrayType = ArrayType(elementType = StructType(fields))
+
+  override def inputSchema: StructType = StructType(fields)
+
+  override def bufferSchema: StructType = StructType(Seq(
+    StructField(name = "edges", dataType = arrayType),
+    StructField(name = "buffered", dataType = BooleanType)
+  ))
+
+  override def dataType: DataType = arrayType
+
+  override def deterministic: Boolean = true
+
+  override def initialize(buffer: MutableAggregationBuffer): Unit = {
+    buffer.update(0, scala.collection.mutable.ListBuffer.empty[Element])
+  }
+
+  /* not optimized */
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    val element = input
+
+    val prev = buffer.getAs[Seq[Row]](0)
+    val appended = prev :+ element
+
+    buffer.update(0, appended)
+    buffer.update(1, false)
+  }
+
+  private def takeTopK(ls: Seq[Row], k: Int) = {
+    val sorted = ls.sorted
+    if (sorted.size <= k) sorted else sorted.take(k)
+  }
+  /* not optimized */
+  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+    val cur = buffer2.getAs[Seq[Row]](0)
+    val prev = buffer1.getAs[Seq[Row]](0)
+
+    buffer1.update(0, takeTopK(prev ++ cur, maxNumOfEdges))
+    buffer1.update(1, true)
+  }
+
+  override def evaluate(buffer: Row): Any = {
+    val ls = buffer.getAs[Seq[Row]](0)
+    val buffered = buffer.getAs[Boolean](1)
+    if (buffered) ls
+    else takeTopK(ls, maxNumOfEdges)
+  }
+}
+
+class GroupByArrayAgg(maxNumOfEdges: Int = 1000) extends 
UserDefinedAggregateFunction {
+  import WalLogUDAF._
+
+  implicit val ord = rowOrdering
+
+  import scala.collection.mutable
+
+  override def inputSchema: StructType = StructType(Seq(
+    StructField(name = "edges", dataType = arrayType)
+  ))
+
+  override def bufferSchema: StructType = StructType(Seq(
+    StructField(name = "edges", dataType = arrayType)
+  ))
+
+  override def dataType: DataType = arrayType
+
+  override def deterministic: Boolean = true
+
+  override def initialize(buffer: MutableAggregationBuffer): Unit =
+    buffer.update(0, mutable.ListBuffer.empty[Row])
+
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    val cur = input.getAs[Seq[Row]](0)
+    val prev = buffer.getAs[Seq[Row]](0)
+    val merged = mergeTwoSeq(cur, prev, maxNumOfEdges)
+
+    buffer.update(0, merged)
+  }
+
+  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+    val cur = buffer2.getAs[Seq[Row]](0)
+    val prev = buffer1.getAs[Seq[Row]](0)
+
+    val merged = mergeTwoSeq(cur, prev, maxNumOfEdges)
+    buffer1.update(0, merged)
+  }
+
+  override def evaluate(buffer: Row): Any = buffer.getAs[Seq[Row]](0)
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/WalLogUDF.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/WalLogUDF.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/WalLogUDF.scala
new file mode 100644
index 0000000..34f4a2b
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/WalLogUDF.scala
@@ -0,0 +1,216 @@
+package org.apache.s2graph.s2jobs.wal.udfs
+
+import com.google.common.hash.Hashing
+import org.apache.s2graph.core.JSONParser
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.{Dataset, Row}
+import play.api.libs.json._
+
+import scala.reflect.ClassTag
+
+object WalLogUDF {
+
+  import scala.collection.mutable
+
+  type MergedProps = Map[String, Seq[String]]
+  type MutableMergedProps = mutable.Map[String, mutable.Map[String, Int]]
+  type MutableMergedPropsInner = mutable.Map[String, Int]
+
+  def initMutableMergedPropsInner = mutable.Map.empty[String, Int]
+
+  def initMutableMergedProps = mutable.Map.empty[String, mutable.Map[String, 
Int]]
+
+//  //TODO:
+//  def toDimension(rawActivity: RawActivity, propertyKey: String): String = {
+//    //    val (ts, dst, label, _) = rawActivity
+//    //    label + "." + propertyKey
+//    propertyKey
+//  }
+//
+//  def updateMutableMergedProps(mutableMergedProps: 
MutableMergedProps)(dimension: String,
+//                                                                       
dimensionValue: String,
+//                                                                       
count: Int = 1): Unit = {
+//    val buffer = mutableMergedProps.getOrElseUpdate(dimension, 
initMutableMergedPropsInner)
+//    val newCount = buffer.getOrElse(dimensionValue, 0) + count
+//    buffer += (dimensionValue -> newCount)
+//  }
+//
+//  def groupByDimensionValues(rawActivity: RawActivity,
+//                             propsJson: JsObject,
+//                             mergedProps: MutableMergedProps,
+//                             toDimensionFunc: (RawActivity, String) => 
String,
+//                             excludePropKeys: Set[String] = Set.empty): Unit 
= {
+//    propsJson.fields.filter(t => !excludePropKeys(t._1)).foreach { case 
(propertyKey, jsValue) =>
+//      val values = jsValue match {
+//        case JsString(s) => Seq(s)
+//        case JsArray(arr) => arr.map(JSONParser.jsValueToString)
+//        case _ => Seq(jsValue.toString())
+//      }
+//      val dimension = toDimensionFunc(rawActivity, propertyKey)
+//
+//      values.foreach { value =>
+//        updateMutableMergedProps(mergedProps)(dimension, value)
+//      }
+//    }
+//  }
+//
+//  def buildMergedProps(rawActivities: Seq[RawActivity],
+//                       toDimensionFunc: (RawActivity, String) => String,
+//                       defaultTopKs: Int = 100,
+//                       dimTopKs: Map[String, Int] = Map.empty,
+//                       excludePropKeys: Set[String] = Set.empty,
+//                       dimValExtractors: Seq[Extractor] = Nil): MergedProps 
= {
+//    val mergedProps = initMutableMergedProps
+//
+//    rawActivities.foreach { case rawActivity@(_, _, _, rawProps) =>
+//      val propsJson = Json.parse(rawProps).as[JsObject]
+//      groupByDimensionValues(rawActivity, propsJson, mergedProps, 
toDimensionFunc, excludePropKeys)
+//    }
+//    // work on extra dimVals.
+//    dimValExtractors.foreach { extractor =>
+//      extractor.extract(rawActivities, mergedProps)
+//    }
+//
+//    mergedProps.map { case (key, values) =>
+//      val topK = dimTopKs.getOrElse(key, defaultTopKs)
+//
+//      key -> values.toSeq.sortBy(-_._2).take(topK).map(_._1)
+//    }.toMap
+//  }
+//
+//  def rowToRawActivity(row: Row): RawActivity = {
+//    (row.getAs[Long](0), row.getAs[String](1), row.getAs[String](2), 
row.getAs[String](3))
+//  }
+//
+//  def appendMergeProps(toDimensionFunc: (RawActivity, String) => String = 
toDimension,
+//                       defaultTopKs: Int = 100,
+//                       dimTopKs: Map[String, Int] = Map.empty,
+//                       excludePropKeys: Set[String] = Set.empty,
+//                       dimValExtractors: Seq[Extractor] = Nil,
+//                       minTs: Long = 0,
+//                       maxTs: Long = Long.MaxValue) = udf((acts: Seq[Row]) 
=> {
+//    val rows = acts.map(rowToRawActivity).filter(act => act._1 >= minTs && 
act._1 < maxTs)
+//
+//    buildMergedProps(rows, toDimensionFunc, defaultTopKs, dimTopKs, 
excludePropKeys, dimValExtractors)
+//  })
+
+  val extractDimensionValues = {
+    udf((dimensionValues: Map[String, Seq[String]]) => {
+      dimensionValues.toSeq.flatMap { case (dimension, values) =>
+        values.map { value => dimension -> value }
+      }
+    })
+  }
+
+  def toHash(dimension: String, dimensionValue: String): Long = {
+    val key = s"$dimension.$dimensionValue"
+    Hashing.murmur3_128().hashBytes(key.toString.getBytes("UTF-8")).asLong()
+  }
+
+  def filterDimensionValues(validDimValues: Broadcast[Set[Long]]) = {
+    udf((dimensionValues: Map[String, Seq[String]]) => {
+      dimensionValues.map { case (dimension, values) =>
+        val filtered = values.filter { value =>
+          val hash = toHash(dimension, value)
+
+          validDimValues.value(hash)
+        }
+
+        dimension -> filtered
+      }
+    })
+  }
+
+  def appendRank[K1: ClassTag, K2: ClassTag, V: ClassTag](ds: Dataset[((K1, 
K2), V)],
+                                                          numOfPartitions: 
Option[Int] = None,
+                                                          
samplePointsPerPartitionHint: Option[Int] = None)(implicit ordering: 
Ordering[(K1, K2)]) = {
+    import org.apache.spark.RangePartitioner
+    val rdd = ds.rdd
+
+    val partitioner = new 
RangePartitioner(numOfPartitions.getOrElse(rdd.partitions.size),
+      rdd,
+      true,
+      samplePointsPerPartitionHint = samplePointsPerPartitionHint.getOrElse(20)
+    )
+
+    val sorted = rdd.repartitionAndSortWithinPartitions(partitioner)
+
+    def rank(idx: Int, iter: Iterator[((K1, K2), V)]) = {
+      val initialK1: K1 = null.asInstanceOf[K1]
+      val initialK2: K2 = null.asInstanceOf[K2]
+      val initialV: V = null.asInstanceOf[V]
+      val zero = List((1L, initialK1, initialK2, initialV))
+
+      def merge(acc: List[(Long, K1, K2, V)], x: ((K1, K2), V)) =
+        (acc.head, x) match {
+          case ((offset, prevKey1, _, _), ((curKey1: K1, curKey2: K2), curVal: 
V)) => {
+            val newOffset = if (prevKey1 == curKey1) offset + 1L else 1L
+            (newOffset, curKey1, curKey2, curVal) :: acc
+          }
+        }
+
+      iter.foldLeft(zero)(merge).reverse.drop(1).map { case (offset, key1, 
key2, value) =>
+        (idx, offset, key1, key2, value)
+      }.toIterator
+    }
+
+    def getOffset(idx: Int, iter: Iterator[((K1, K2), V)]) = {
+      val buffer = mutable.Map.empty[K1, (Int, Long)]
+      if (!iter.hasNext) buffer.toIterator
+      else {
+        val ((k1, k2), v) = iter.next()
+        var prevKey1: K1 = k1
+        var size = 1L
+        iter.foreach { case ((k1, k2), v) =>
+          if (prevKey1 != k1) {
+            buffer += prevKey1 -> (idx, size)
+            prevKey1 = k1
+            size = 0L
+          }
+          size += 1L
+        }
+        if (size > 0) buffer += prevKey1 -> (idx, size)
+        buffer.iterator
+      }
+    }
+
+    val partRanks = sorted.mapPartitionsWithIndex(rank)
+    val _offsets = sorted.mapPartitionsWithIndex(getOffset)
+    val offsets = _offsets.groupBy(_._1).flatMap { case (k1, 
partitionWithSize) =>
+      val ls = partitionWithSize.toSeq.map(_._2).sortBy(_._1)
+      var sum = ls.head._2
+      val lss = ls.tail.map { case (partition, size) =>
+        val x = (partition, sum)
+        sum += size
+        x
+      }
+      lss.map { case (partition, offset) =>
+        (k1, partition) -> offset
+      }
+    }.collect()
+
+    println(offsets)
+
+    val offsetsBCast = ds.sparkSession.sparkContext.broadcast(offsets)
+
+    def adjust(iter: Iterator[(Int, Long, K1, K2, V)], startOffsets: Map[(K1, 
Int), Long]) = {
+      iter.map { case (partition, rankInPartition, key1, key2, value) =>
+        val startOffset = startOffsets.getOrElse((key1, partition), 0L)
+        val rank = startOffset + rankInPartition
+
+        (partition, rankInPartition, rank, (key1, key2), value)
+      }
+    }
+
+    val withRanks = partRanks
+      .mapPartitions { iter =>
+        val startOffsets = offsetsBCast.value.toMap
+        adjust(iter, startOffsets)
+      }.map { case (_, _, rank, (key1, key2), value) =>
+      (rank, (key1, key2), value)
+    }
+
+    withRanks
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/UrlUtils.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/UrlUtils.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/UrlUtils.scala
new file mode 100644
index 0000000..2941357
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/UrlUtils.scala
@@ -0,0 +1,57 @@
+package org.apache.s2graph.s2jobs.wal.utils
+
+import java.net.{URI, URLDecoder}
+
+import scala.util.matching.Regex
+
+object UrlUtils {
+  val pattern = new Regex("""(\\x[0-9A-Fa-f]{2}){3}""")
+  val koreanPattern = new scala.util.matching.Regex("([가-힣]+[\\-_a-zA-Z 
0-9]*)+|([\\-_a-zA-Z 0-9]+[가-힣]+)")
+
+
+  // url extraction functions
+  def urlDecode(url: String): (Boolean, String) = {
+    try {
+      val decoded = URLDecoder.decode(url, "UTF-8")
+      (url != decoded, decoded)
+    } catch {
+      case e: Exception => (false, url)
+    }
+  }
+
+  def hex2String(url: String): String = {
+    pattern replaceAllIn(url, m => {
+      new String(m.toString.replaceAll("[^0-9A-Fa-f]", "").sliding(2, 
2).toArray.map(Integer.parseInt(_, 16).toByte), "utf-8")
+    })
+  }
+
+  def toDomains(url: String, maxDepth: Int = 3): Seq[String] = {
+    val uri = new URI(url)
+    val domain = uri.getHost
+    if (domain == null) Nil
+    else {
+      val paths = uri.getPath.split("/")
+      if (paths.isEmpty) Seq(domain)
+      else {
+        val depth = Math.min(maxDepth, paths.size)
+        (1 to depth).map { ith =>
+          domain + paths.take(ith).mkString("/")
+        }
+      }
+    }
+  }
+
+  def extract(_url: String): (String, Seq[String], Option[String]) = {
+    try {
+      val url = hex2String(_url)
+      val (encoded, decodedUrl) = urlDecode(url)
+
+      val kwdOpt = 
koreanPattern.findAllMatchIn(decodedUrl).toList.map(_.group(0)).headOption.map(_.replaceAll("\\s",
 ""))
+      val domains = toDomains(url.replaceAll(" ", ""))
+      (decodedUrl, domains, kwdOpt)
+    } catch {
+      case e: Exception => (_url, Nil, None)
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TransformerTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TransformerTest.scala 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TransformerTest.scala
new file mode 100644
index 0000000..5089390
--- /dev/null
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TransformerTest.scala
@@ -0,0 +1,26 @@
+package org.apache.s2graph.s2jobs.wal
+
+import org.apache.s2graph.s2jobs.wal.transformer._
+import org.scalatest.{FunSuite, Matchers}
+
+class TransformerTest extends FunSuite with Matchers {
+  val walLog = WalLog(1L, "insert", "edge", "a", "b", "s2graph", "friends", 
"""{"name": 1, "url": "www.google.com"}""")
+
+  test("test default transformer") {
+    val transformer = new DefaultTransformer
+    val dimVals = transformer.toDimValLs(walLog, "name", "1")
+
+    dimVals shouldBe Seq(DimVal("friends:name", "1"))
+  }
+
+  test("test ExtractDomain from URL") {
+    val transformer = new ExtractDomain(urlDimensions = Set("url"))
+    val dimVals = transformer.toDimValLs(walLog, "url", 
"http://www.google.com/abc";)
+
+    dimVals shouldBe Seq(
+      DimVal("host", "www.google.com"),
+      DimVal("domain", "www.google.com"),
+      DimVal("domain", "www.google.com/abc")
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala
 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala
index 5ae595b..5753411 100644
--- 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala
+++ 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala
@@ -31,9 +31,8 @@ class WalLogAggregateProcessTest extends FunSuite with 
Matchers with BeforeAndAf
     val edges = spark.createDataset(walLogsLs).toDF()
     val inputMap = Map("edges" -> edges)
     val taskConf = new TaskConf(name = "test", `type` = "agg", inputs = 
Seq("edges"),
-      options = Map("maxNumOfEdges" -> "10",
-        "runOrderBy" -> "false",
-        "groupByAggClassName" -> "GroupByAggOptimized"))
+      options = Map("maxNumOfEdges" -> "10")
+    )
 
     val job = new WalLogAggregateProcess(taskConf = taskConf)
     val processed = job.execute(spark, inputMap)
@@ -48,7 +47,7 @@ class WalLogAggregateProcessTest extends FunSuite with 
Matchers with BeforeAndAf
     val prev: Array[Int] = Array(3, 2, 1)
     val cur: Array[Int] = Array(4, 2, 2)
 
-    val ls = S2EdgeDataAggregate.mergeTwoSeq(prev, cur, 10)
+    val ls = WalLogUDAF.mergeTwoSeq(prev, cur, 10)
     println(ls.size)
 
     ls.foreach { x =>
@@ -57,7 +56,7 @@ class WalLogAggregateProcessTest extends FunSuite with 
Matchers with BeforeAndAf
   }
 
   test("addToTopK test.") {
-    import S2EdgeDataAggregate._
+    import WalLogUDAF._
     val numOfTest = 100
     val numOfNums = 100
     val maxNum = 10

Reply via email to