Implement and add test cases on below.
- WalLogAggregateProcess
- BuildTopFeaturesProcess
- FilterTopFeaturesProcess


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/651ca2cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/651ca2cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/651ca2cf

Branch: refs/heads/master
Commit: 651ca2cf9fad57e4613e22dde39c65fe88755a5c
Parents: d9af2b8
Author: DO YUNG YOON <steams...@apache.org>
Authored: Wed Aug 22 19:15:48 2018 +0900
Committer: DO YUNG YOON <steams...@apache.org>
Committed: Thu Aug 23 00:52:24 2018 +0900

----------------------------------------------------------------------
 .../org/apache/s2graph/s2jobs/task/Task.scala   |  16 +++
 .../org/apache/s2graph/s2jobs/wal/WalLog.scala  |  60 ++++++---
 .../wal/process/BuildTopFeaturesProcess.scala   | 126 +++++++++++++++++++
 .../wal/process/FeatureIndexProcess.scala       |  99 ---------------
 .../wal/process/FilterTopFeaturesProcess.scala  |  90 +++++++++++++
 .../wal/process/FilterWalLogAggProcess.scala    |  75 -----------
 .../wal/process/WalLogAggregateProcess.scala    |  20 +--
 .../wal/process/params/AggregateParam.scala     |  19 ++-
 .../process/params/BuildTopFeaturesParam.scala  |  17 +++
 .../wal/process/params/FeatureIndexParam.scala  |  15 ---
 .../process/params/FilterTopFeaturesParam.scala |   7 ++
 .../process/params/FilterWalLogAggParam.scala   |   7 --
 .../wal/transformer/DefaultTransformer.scala    |   4 +-
 .../s2jobs/wal/transformer/ExtractDomain.scala  |  11 +-
 .../wal/transformer/ExtractServiceName.scala    |  13 +-
 .../s2jobs/wal/transformer/Transformer.scala    |   3 +-
 .../s2graph/s2jobs/wal/udfs/WalLogUDF.scala     |  26 ++--
 .../apache/s2graph/s2jobs/wal/TestData.scala    |  42 +++++++
 .../s2graph/s2jobs/wal/TransformerTest.scala    |  10 +-
 .../process/BuildTopFeaturesProcessTest.scala   |  31 +++++
 .../process/FilterTopFeaturesProcessTest.scala  |  84 +++++++++++++
 .../process/WalLogAggregateProcessTest.scala    |  64 ++--------
 .../s2jobs/wal/udafs/WalLogUDAFTest.scala       |  42 +++++++
 23 files changed, 574 insertions(+), 307 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
index ab02900..62081df 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
@@ -21,9 +21,13 @@ package org.apache.s2graph.s2jobs.task
 
 import org.apache.s2graph.core.S2GraphConfigs
 import org.apache.s2graph.s2jobs.Logger
+import org.apache.s2graph.s2jobs.wal.transformer.Transformer
+import play.api.libs.json.Json
 //import org.apache.s2graph.s2jobs.loader.GraphFileOptions
 
 object TaskConf {
+  val Empty = new TaskConf(name = "empty", `type` = "empty", inputs = Nil, 
options = Map.empty)
+
 //  def toGraphFileOptions(taskConf: TaskConf): GraphFileOptions = {
 //    val args = taskConf.options.filterKeys(GraphFileOptions.OptionKeys)
 //      .flatMap(kv => Seq(kv._1, kv._2)).toSeq.toArray
@@ -42,6 +46,18 @@ object TaskConf {
   def parseLocalCacheConfigs(taskConf: TaskConf): Map[String, Any] = {
     
taskConf.options.filterKeys(S2GraphConfigs.CacheConfigs.DEFAULTS.keySet).mapValues(_.toInt)
   }
+
+  def parseTransformers(taskConf: TaskConf): Seq[Transformer] = {
+    val classes = Json.parse(taskConf.options.getOrElse("transformClasses",
+      
"""["org.apache.s2graph.s2jobs.wal.transformer.DefaultTransformer"]""")).as[Seq[String]]
+
+    classes.map { className =>
+      Class.forName(className)
+        .getConstructor(classOf[TaskConf])
+        .newInstance(taskConf)
+        .asInstanceOf[Transformer]
+    }
+  }
 }
 case class TaskConf(name:String, `type`:String, inputs:Seq[String] = Nil, 
options:Map[String, String] = Map.empty, cache:Option[Boolean]=None)
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
index e4c3467..9617ca1 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
@@ -3,6 +3,7 @@ package org.apache.s2graph.s2jobs.wal
 import com.google.common.hash.Hashing
 import org.apache.s2graph.core.JSONParser
 import org.apache.s2graph.s2jobs.wal.process.params.AggregateParam
+import org.apache.s2graph.s2jobs.wal.transformer.Transformer
 import org.apache.s2graph.s2jobs.wal.utils.BoundedPriorityQueue
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.{LongType, StringType, StructField, 
StructType}
@@ -17,22 +18,12 @@ object WalLogAgg {
     new WalLogAgg(walLog.from, Seq(walLog), walLog.timestamp, walLog.timestamp)
   }
 
+  def toFeatureHash(dimVal: DimVal): Long = toFeatureHash(dimVal.dim, 
dimVal.value)
+
   def toFeatureHash(dim: String, value: String): Long = {
     Hashing.murmur3_128().hashBytes(s"$dim:$value".getBytes("UTF-8")).asLong()
   }
 
-  def filter(walLogAgg: WalLogAgg, validFeatureHashKeys: Set[Long]) = {
-    val filtered = walLogAgg.logs.map { walLog =>
-      val fields = Json.parse(walLog.props).as[JsObject].fields.filter { case 
(dim, jsValue) =>
-        validFeatureHashKeys(toFeatureHash(dim, 
JSONParser.jsValueToString(jsValue)))
-      }
-
-      walLog.copy(props = Json.toJson(fields).as[JsObject].toString)
-    }
-
-    walLogAgg.copy(logs = filtered)
-  }
-
   def merge(iter: Iterator[WalLogAgg],
             param: AggregateParam)(implicit ord: Ordering[WalLog]) = {
     val heap = new BoundedPriorityQueue[WalLog](param.heapSize)
@@ -51,6 +42,47 @@ object WalLogAgg {
 
     WalLogAgg(topItems.head.from, topItems, maxTs, minTs)
   }
+
+  def filterProps(walLogAgg: WalLogAgg,
+                  transformers: Seq[Transformer],
+                  validFeatureHashKeys: Set[Long]) = {
+    val filtered = walLogAgg.logs.map { walLog =>
+      val fields = walLog.propsJson.fields.filter { case (propKey, propValue) 
=>
+        val filtered = transformers.flatMap { transformer =>
+          transformer.toDimValLs(walLog, propKey, 
JSONParser.jsValueToString(propValue)).filter(dimVal => 
validFeatureHashKeys(toFeatureHash(dimVal)))
+        }
+        filtered.nonEmpty
+      }
+
+      walLog.copy(props = Json.toJson(fields.toMap).as[JsObject].toString)
+    }
+
+    walLogAgg.copy(logs = filtered)
+  }
+}
+
+object DimValCountRank {
+  def fromRow(row: Row): DimValCountRank = {
+    val dim = row.getAs[String]("dim")
+    val value = row.getAs[String]("value")
+    val count = row.getAs[Long]("count")
+    val rank = row.getAs[Long]("rank")
+
+    new DimValCountRank(DimVal(dim, value), count, rank)
+  }
+}
+
+case class DimValCountRank(dimVal: DimVal, count: Long, rank: Long)
+
+case class DimValCount(dimVal: DimVal, count: Long)
+
+object DimVal {
+  def fromRow(row: Row): DimVal = {
+    val dim = row.getAs[String]("dim")
+    val value = row.getAs[String]("value")
+
+    new DimVal(dim, value)
+  }
 }
 
 case class DimVal(dim: String, value: String)
@@ -71,8 +103,8 @@ case class WalLog(timestamp: Long,
   val id = from
   val columnName = label
   val serviceName = to
-
-  lazy val propsKeyValues = Json.parse(props).as[JsObject].fields.map { case 
(key, jsValue) =>
+  lazy val propsJson = Json.parse(props).as[JsObject]
+  lazy val propsKeyValues = propsJson.fields.map { case (key, jsValue) =>
     key -> JSONParser.jsValueToString(jsValue)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/BuildTopFeaturesProcess.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/BuildTopFeaturesProcess.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/BuildTopFeaturesProcess.scala
new file mode 100644
index 0000000..048b470
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/BuildTopFeaturesProcess.scala
@@ -0,0 +1,126 @@
+package org.apache.s2graph.s2jobs.wal.process
+
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.WalLogAgg.toFeatureHash
+import org.apache.s2graph.s2jobs.wal.process.params.BuildTopFeaturesParam
+import org.apache.s2graph.s2jobs.wal.transformer._
+import org.apache.s2graph.s2jobs.wal.udfs.WalLogUDF
+import org.apache.s2graph.s2jobs.wal.{DimVal, DimValCount, WalLog, WalLogAgg}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+import org.apache.spark.sql.functions._
+import play.api.libs.json.{JsObject, Json}
+
+import scala.collection.mutable
+
+object BuildTopFeaturesProcess {
+  def extractDimValuesWithCount(transformers: Seq[Transformer]) = {
+    udf((rows: Seq[Row]) => {
+      val logs = rows.map(WalLog.fromRow)
+      val dimValCounts = mutable.Map.empty[DimVal, Int]
+
+      logs.foreach { walLog =>
+        walLog.propsKeyValues.foreach { case (propsKey, propsValue) =>
+          transformers.foreach { transformer =>
+            transformer.toDimValLs(walLog, propsKey, propsValue).foreach { 
dimVal =>
+              val newCount = dimValCounts.getOrElse(dimVal, 0) + 1
+              dimValCounts += (dimVal -> newCount)
+            }
+          }
+        }
+      }
+
+      dimValCounts.toSeq.sortBy(-_._2)map { case (dimVal, count) =>
+        DimValCount(dimVal, count)
+      }
+    })
+  }
+
+  def extractDimValues(transformers: Seq[Transformer]) = {
+    udf((rows: Seq[Row]) => {
+      val logs = rows.map(WalLog.fromRow)
+      // TODO: this can be changed into Map to count how many times each 
dimVal exist in sequence of walLog
+      // then change this to mutable.Map.empty[DimVal, Int], then aggregate.
+      val distinctDimValues = mutable.Set.empty[DimVal]
+
+      logs.foreach { walLog =>
+        walLog.propsKeyValues.foreach { case (propsKey, propsValue) =>
+          transformers.foreach { transformer =>
+            transformer.toDimValLs(walLog, propsKey, propsValue).foreach { 
dimVal =>
+              distinctDimValues += dimVal
+            }
+          }
+        }
+      }
+
+      distinctDimValues.toSeq
+    })
+  }
+
+  def buildDictionary(ss: SparkSession,
+                      allDimVals: DataFrame,
+                      param: BuildTopFeaturesParam,
+                      dimValColumnName: String = "dimVal"): DataFrame = {
+    import ss.implicits._
+
+    val rawFeatures = allDimVals
+      .select(col(param._countColumnName), 
col(s"$dimValColumnName.dim").as("dim"), 
col(s"$dimValColumnName.value").as("value"))
+      .groupBy("dim", "value")
+      .agg(countDistinct(param._countColumnName).as("count"))
+      .filter(s"count > ${param._minUserCount}")
+
+    val ds: Dataset[((String, Long), String)] =
+      rawFeatures.select("dim", "value", "count").as[(String, String, Long)]
+        .map { case (dim, value, uv) =>
+          (dim, uv) -> value
+        }
+
+
+    implicit val ord = Ordering.Tuple2(Ordering.String, Ordering.Long.reverse)
+
+    val rdd: RDD[(Long, (String, Long), String)] = WalLogUDF.appendRank(ds, 
param.numOfPartitions, param.samplePointsPerPartitionHint)
+
+    rdd.toDF("rank", "dim_count", "value")
+      .withColumn("dim", col("dim_count._1"))
+      .withColumn("count", col("dim_count._2"))
+      .select("dim", "value", "count", "rank")
+  }
+}
+
+case class BuildTopFeaturesProcess(taskConf: TaskConf) extends 
org.apache.s2graph.s2jobs.task.Process(taskConf) {
+
+  import BuildTopFeaturesProcess._
+
+  override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): 
DataFrame = {
+    val countColumnName = taskConf.options.getOrElse("countColumnName", "from")
+    val numOfPartitions = taskConf.options.get("numOfPartitions").map(_.toInt)
+    val samplePointsPerPartitionHint = 
taskConf.options.get("samplePointsPerPartitionHint").map(_.toInt)
+    val minUserCount = taskConf.options.get("minUserCount").map(_.toLong)
+
+    numOfPartitions.map { d => 
ss.sqlContext.setConf("spark.sql.shuffle.partitions", d.toString) }
+
+    val param = BuildTopFeaturesParam(minUserCount = minUserCount, 
countColumnName = Option(countColumnName),
+      numOfPartitions = numOfPartitions, samplePointsPerPartitionHint = 
samplePointsPerPartitionHint
+    )
+
+    val edges = taskConf.inputs.tail.foldLeft(inputMap(taskConf.inputs.head)) 
{ case (prev, cur) =>
+      prev.union(inputMap(cur))
+    }
+
+    //TODO: user expect to inject transformers that transform (WalLog, 
propertyKey, propertyValue) to Seq[DimVal].
+    val transformers = TaskConf.parseTransformers(taskConf)
+    val dimValExtractUDF = extractDimValues(transformers)
+    val dimValColumnName = "dimVal"
+
+    val rawFeatures = edges
+      .withColumn(dimValColumnName, explode(dimValExtractUDF(col("logs"))))
+
+    val dict = buildDictionary(ss, rawFeatures, param, dimValColumnName)
+
+    dict
+  }
+
+
+  override def mandatoryOptions: Set[String] = Set.empty
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala
deleted file mode 100644
index 073b0cf..0000000
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-package org.apache.s2graph.s2jobs.wal.process
-
-import org.apache.s2graph.s2jobs.task.TaskConf
-import org.apache.s2graph.s2jobs.wal.process.params.FeatureIndexParam
-import org.apache.s2graph.s2jobs.wal.transformer._
-import org.apache.s2graph.s2jobs.wal.udfs.WalLogUDF
-import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql._
-import org.apache.spark.sql.functions._
-
-import scala.collection.mutable
-
-object FeatureIndexProcess {
-  def extractDimValues(transformers: Seq[Transformer]) = {
-    udf((rows: Seq[Row]) => {
-      val logs = rows.map(WalLog.fromRow)
-      // TODO: this can be changed into Map to count how many times each 
dimVal exist in sequence of walLog
-      // then change this to mutable.Map.empty[DimVal, Int], then aggregate.
-      val distinctDimValues = mutable.Set.empty[DimVal]
-
-      logs.foreach { walLog =>
-        walLog.propsKeyValues.foreach { case (propsKey, propsValue) =>
-          transformers.foreach { transformer =>
-            transformer.toDimValLs(walLog, propsKey, 
propsValue).foreach(distinctDimValues += _)
-          }
-        }
-      }
-
-      distinctDimValues.toSeq
-    })
-  }
-
-  def buildDictionary(ss: SparkSession,
-                      allDimVals: DataFrame,
-                      param: FeatureIndexParam,
-                      dimValColumnName: String = "dimVal"): DataFrame = {
-    import ss.implicits._
-
-    val rawFeatures = allDimVals
-      .select(col(param._countColumnName), 
col(s"$dimValColumnName.dim").as("dim"), 
col(s"$dimValColumnName.value").as("value"))
-      .groupBy("dim", "value")
-      .agg(countDistinct(param._countColumnName).as("count"))
-      .filter(s"count > ${param._minUserCount}")
-
-    val ds: Dataset[((String, Long), String)] =
-      rawFeatures.select("dim", "value", "count").as[(String, String, Long)]
-        .map { case (dim, value, uv) =>
-          (dim, uv) -> value
-        }
-
-
-    implicit val ord = Ordering.Tuple2(Ordering.String, Ordering.Long.reverse)
-
-    val rdd: RDD[(Long, (String, Long), String)] = WalLogUDF.appendRank(ds, 
param.numOfPartitions, param.samplePointsPerPartitionHint)
-
-    rdd.toDF("rank", "dim_count", "value")
-      .withColumn("dim", col("dim_count._1"))
-      .withColumn("count", col("dim_count._2"))
-      .select("dim", "value", "count", "rank")
-  }
-}
-
-case class FeatureIndexProcess(taskConf: TaskConf) extends 
org.apache.s2graph.s2jobs.task.Process(taskConf) {
-
-  import FeatureIndexProcess._
-
-  override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): 
DataFrame = {
-    val countColumnName = taskConf.options.getOrElse("countColumnName", "from")
-    val numOfPartitions = taskConf.options.get("numOfPartitions").map(_.toInt)
-    val samplePointsPerPartitionHint = 
taskConf.options.get("samplePointsPerPartitionHint").map(_.toInt)
-    val minUserCount = taskConf.options.get("minUserCount").map(_.toLong)
-
-    numOfPartitions.map { d => 
ss.sqlContext.setConf("spark.sql.shuffle.partitions", d.toString) }
-
-    val param = FeatureIndexParam(minUserCount = minUserCount, countColumnName 
= Option(countColumnName),
-      numOfPartitions = numOfPartitions, samplePointsPerPartitionHint = 
samplePointsPerPartitionHint
-    )
-
-    val edges = taskConf.inputs.tail.foldLeft(inputMap(taskConf.inputs.head)) 
{ case (prev, cur) =>
-      prev.union(inputMap(cur))
-    }
-
-    //TODO: user expect to inject transformers that transform (WalLog, 
propertyKey, propertyValue) to Seq[DimVal].
-    val transformers = Seq(DefaultTransformer())
-    val dimValExtractUDF = extractDimValues(transformers)
-    val dimValColumnName = "dimVal"
-
-    val rawFeatures = edges
-      .withColumn(dimValColumnName, explode(dimValExtractUDF(col("logs"))))
-
-    val dict = buildDictionary(ss, rawFeatures, param, dimValColumnName)
-
-    dict
-  }
-
-
-  override def mandatoryOptions: Set[String] = Set.empty
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FilterTopFeaturesProcess.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FilterTopFeaturesProcess.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FilterTopFeaturesProcess.scala
new file mode 100644
index 0000000..ec7e645
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FilterTopFeaturesProcess.scala
@@ -0,0 +1,90 @@
+package org.apache.s2graph.s2jobs.wal.process
+
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.WalLogAgg
+import org.apache.s2graph.s2jobs.wal.transformer.{DefaultTransformer, 
Transformer}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.functions.{col, udf}
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import play.api.libs.json.{JsObject, Json}
+
+object FilterTopFeaturesProcess {
+  private var validFeatureHashKeys: Set[Long] = null
+  def getValidFeatureHashKeys(validFeatureHashKeysBCast: 
Broadcast[Array[Long]]): Set[Long] = {
+    if (validFeatureHashKeys == null) {
+      validFeatureHashKeys = validFeatureHashKeysBCast.value.toSet
+    }
+
+    validFeatureHashKeys
+  }
+
+  def collectDistinctFeatureHashes(ss: SparkSession,
+                                   filteredDict: DataFrame): Array[Long] = {
+    import ss.implicits._
+
+    val featureHashUDF = udf((dim: String, value: String) => 
WalLogAgg.toFeatureHash(dim, value))
+
+    filteredDict.withColumn("featureHash", featureHashUDF(col("dim"), 
col("value")))
+      .select("featureHash")
+      .distinct().as[Long].collect()
+  }
+
+  def filterTopKsPerDim(dict: DataFrame,
+                        maxRankPerDim: Broadcast[Map[String, Int]],
+                        defaultMaxRank: Int): DataFrame = {
+    val filterUDF = udf((dim: String, rank: Long) => {
+      rank < maxRankPerDim.value.getOrElse(dim, defaultMaxRank)
+    })
+
+    dict.filter(filterUDF(col("dim"), col("rank")))
+  }
+
+  def filterWalLogAgg(ss: SparkSession,
+                      walLogAgg: Dataset[WalLogAgg],
+                      transformers: Seq[Transformer],
+                      validFeatureHashKeysBCast: Broadcast[Array[Long]]) = {
+    import ss.implicits._
+    walLogAgg.mapPartitions { iter =>
+      val validFeatureHashKeys = 
getValidFeatureHashKeys(validFeatureHashKeysBCast)
+
+      iter.map { walLogAgg =>
+        WalLogAgg.filterProps(walLogAgg, transformers, validFeatureHashKeys)
+      }
+    }
+  }
+}
+
+class FilterTopFeaturesProcess(taskConf: TaskConf) extends 
org.apache.s2graph.s2jobs.task.Process(taskConf) {
+
+  import FilterTopFeaturesProcess._
+
+  /*
+    filter topKs per dim, then build valid dimValLs.
+    then broadcast valid dimValLs to original dataframe, and filter out not 
valid dimVal.
+   */
+  override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): 
DataFrame = {
+    import ss.implicits._
+
+    val maxRankPerDim = taskConf.options.get("maxRankPerDim").map { s =>
+      Json.parse(s).as[JsObject].fields.map { case (k, jsValue) =>
+        k -> jsValue.as[Int]
+      }.toMap
+    }
+    val maxRankPerDimBCast = 
ss.sparkContext.broadcast(maxRankPerDim.getOrElse(Map.empty))
+
+    val defaultMaxRank = taskConf.options.get("defaultMaxRank").map(_.toInt)
+
+    val featureDict = inputMap(taskConf.options("featureDict"))
+    val walLogAgg = inputMap(taskConf.options("walLogAgg")).as[WalLogAgg]
+
+    val transformers = TaskConf.parseTransformers(taskConf)
+
+    val filteredDict = filterTopKsPerDim(featureDict, maxRankPerDimBCast, 
defaultMaxRank.getOrElse(Int.MaxValue))
+    val validFeatureHashKeys = collectDistinctFeatureHashes(ss, filteredDict)
+    val validFeatureHashKeysBCast = 
ss.sparkContext.broadcast(validFeatureHashKeys)
+
+    filterWalLogAgg(ss, walLogAgg, transformers, 
validFeatureHashKeysBCast).toDF()
+  }
+
+  override def mandatoryOptions: Set[String] = Set("featureDict", "walLogAgg")
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FilterWalLogAggProcess.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FilterWalLogAggProcess.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FilterWalLogAggProcess.scala
deleted file mode 100644
index 3c546a7..0000000
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FilterWalLogAggProcess.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-package org.apache.s2graph.s2jobs.wal.process
-
-import org.apache.s2graph.s2jobs.task.TaskConf
-import org.apache.s2graph.s2jobs.wal.WalLogAgg
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.sql.functions.{col, udf}
-import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-import play.api.libs.json.{JsObject, Json}
-
-object FilterWalLogAggProcess {
-
-  def collectDistinctFeatureHashes(ss: SparkSession,
-                                   filteredDict: DataFrame): Array[Long] = {
-    import ss.implicits._
-
-    val featureHashUDF = udf((dim: String, value: String) => 
WalLogAgg.toFeatureHash(dim, value))
-
-    filteredDict.withColumn("featureHash", featureHashUDF(col("dim"), 
col("value")))
-      .select("featureHash")
-      .distinct().as[Long].collect()
-  }
-
-  def filterTopKsPerDim(dict: DataFrame,
-                        maxRankPerDim: Broadcast[Map[String, Int]],
-                        defaultMaxRank: Int): DataFrame = {
-    val filterUDF = udf((dim: String, rank: Long) => {
-      rank < maxRankPerDim.value.getOrElse(dim, defaultMaxRank)
-    })
-
-    dict.filter(filterUDF(col("dim"), col("rank")))
-  }
-
-  def filterWalLogAgg(walLogAgg: Dataset[WalLogAgg],
-                      validFeatureHashKeysBCast: Broadcast[Array[Long]]) = {
-    walLogAgg.mapPartitions { iter =>
-      val validFeatureHashKeys = validFeatureHashKeysBCast.value.toSet
-
-      iter.map { walLogAgg =>
-        WalLogAgg.filter(walLogAgg, validFeatureHashKeys)
-      }
-    }
-  }
-}
-
-class FilterWalLogAggProcess(taskConf: TaskConf) extends 
org.apache.s2graph.s2jobs.task.Process(taskConf) {
-
-  import FilterWalLogAggProcess._
-
-  /*
-    filter topKs per dim, then build valid dimValLs.
-    then broadcast valid dimValLs to original dataframe, and filter out not 
valid dimVal.
-   */
-  override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): 
DataFrame = {
-    val maxRankPerDim = taskConf.options.get("maxRankPerDim").map { s =>
-      Json.parse(s).as[JsObject].fields.map { case (k, jsValue) =>
-        k -> jsValue.as[Int]
-      }.toMap
-    }
-    val maxRankPerDimBCast = 
ss.sparkContext.broadcast(maxRankPerDim.getOrElse(Map.empty))
-
-    val defaultMaxRank = taskConf.options.get("defaultMaxRank").map(_.toInt)
-
-    val featureDict = inputMap(taskConf.options("featureDict"))
-    val walLogAgg = inputMap(taskConf.options("walLogAgg")).as[WalLogAgg]
-
-
-    val filteredDict = filterTopKsPerDim(featureDict, maxRankPerDimBCast, 
defaultMaxRank.getOrElse(Int.MaxValue))
-    val validFeatureHashKeys = collectDistinctFeatureHashes(ss, filteredDict)
-    val validFeatureHashKeysBCast = 
ss.sparkContext.broadcast(validFeatureHashKeys)
-
-    filterWalLogAgg(walLogAgg, validFeatureHashKeysBCast).toDF()
-  }
-
-  override def mandatoryOptions: Set[String] = Set("featureDict", "walLogAgg")
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
index 2b42e20..ee4debd 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
@@ -41,26 +41,18 @@ class WalLogAggregateProcess(taskConf: TaskConf) extends 
org.apache.s2graph.s2jo
   override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): 
DataFrame = {
     import ss.implicits._
 
-    val groupByKeys = 
taskConf.options.get("groupByKeys").map(_.split(",").filter(_.nonEmpty).toSeq)
-    val maxNumOfEdges = 
taskConf.options.get("maxNumOfEdges").map(_.toInt).getOrElse(1000)
-    val arrayType = 
taskConf.options.get("arrayType").map(_.toBoolean).getOrElse(false)
-    val sortTopItems = 
taskConf.options.get("sortTopItems").map(_.toBoolean).getOrElse(false)
-    val numOfPartitions = taskConf.options.get("numOfPartitions")
-    numOfPartitions.foreach(d => 
ss.sqlContext.setConf("spark.sql.shuffle.partitions", d))
+    //TODO: Current implementation only expect taskConf.options as Map[String, 
String].
+    //Once we change taskConf.options as JsObject, then we can simply parse 
input paramter as following.
+    //implicit val paramReads = Json.reads[AggregateParam]
+    val param = AggregateParam.fromTaskConf(taskConf)
+    param.numOfPartitions.foreach(d => 
ss.sqlContext.setConf("spark.sql.shuffle.partitions", d.toString))
 
     implicit val ord = WalLog.orderByTsAsc
     val walLogs = 
taskConf.inputs.tail.foldLeft(inputMap(taskConf.inputs.head)) { case (prev, 
cur) =>
       prev.union(inputMap(cur))
     }
 
-    //TODO: Current implementation only expect taskConf.options as Map[String, 
String].
-    //Once we change taskConf.options as JsObject, then we can simply parse 
input paramter as following.
-    //implicit val paramReads = Json.reads[AggregateParam]
-
-    val param = AggregateParam(groupByKeys, topK = Option(maxNumOfEdges),
-      isArrayType = Option(arrayType), shouldSortTopItems = 
Option(sortTopItems))
-
-    if (arrayType) aggregate(ss, walLogs.as[WalLogAgg], param)
+    if (param.arrayType) aggregate(ss, walLogs.as[WalLogAgg], param)
     else aggregateRaw(ss, walLogs.as[WalLog], param)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
index 89c36f4..4cb1377 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
@@ -1,17 +1,34 @@
 package org.apache.s2graph.s2jobs.wal.process.params
 
+import org.apache.s2graph.s2jobs.task.TaskConf
 
 object AggregateParam {
   val defaultGroupByKeys = Seq("from")
   val defaultTopK = 1000
   val defaultIsArrayType = false
   val defaultShouldSortTopItems = true
+
+  def fromTaskConf(taskConf: TaskConf): AggregateParam = {
+    val groupByKeys = 
taskConf.options.get("groupByKeys").map(_.split(",").filter(_.nonEmpty).toSeq)
+    val maxNumOfEdges = 
taskConf.options.get("maxNumOfEdges").map(_.toInt).getOrElse(defaultTopK)
+    val arrayType = 
taskConf.options.get("arrayType").map(_.toBoolean).getOrElse(defaultIsArrayType)
+    val sortTopItems = 
taskConf.options.get("sortTopItems").map(_.toBoolean).getOrElse(defaultShouldSortTopItems)
+    val numOfPartitions = taskConf.options.get("numOfPartitions").map(_.toInt)
+
+    new AggregateParam(groupByKeys = groupByKeys,
+      topK = Option(maxNumOfEdges),
+      isArrayType = Option(arrayType),
+      shouldSortTopItems = Option(sortTopItems),
+      numOfPartitions = numOfPartitions
+    )
+  }
 }
 
 case class AggregateParam(groupByKeys: Option[Seq[String]],
                           topK: Option[Int],
                           isArrayType: Option[Boolean],
-                          shouldSortTopItems: Option[Boolean]) {
+                          shouldSortTopItems: Option[Boolean],
+                          numOfPartitions: Option[Int]) {
 
   import AggregateParam._
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/BuildTopFeaturesParam.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/BuildTopFeaturesParam.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/BuildTopFeaturesParam.scala
new file mode 100644
index 0000000..3e8bae5
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/BuildTopFeaturesParam.scala
@@ -0,0 +1,17 @@
+package org.apache.s2graph.s2jobs.wal.process.params
+
+object BuildTopFeaturesParam {
+  val defaultMinUserCount = 0L
+  val defaultCountColumnName = "from"
+}
+
+case class BuildTopFeaturesParam(minUserCount: Option[Long],
+                                 countColumnName: Option[String],
+                                 samplePointsPerPartitionHint: Option[Int],
+                                 numOfPartitions: Option[Int]) {
+
+  import BuildTopFeaturesParam._
+
+  val _countColumnName = countColumnName.getOrElse(defaultCountColumnName)
+  val _minUserCount = minUserCount.getOrElse(defaultMinUserCount)
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala
deleted file mode 100644
index 4ede70b..0000000
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala
+++ /dev/null
@@ -1,15 +0,0 @@
-package org.apache.s2graph.s2jobs.wal.process.params
-
-object FeatureIndexParam {
-  val defaultMinUserCount = 0L
-  val defaultCountColumnName = "from"
-}
-
-case class FeatureIndexParam(minUserCount: Option[Long],
-                             countColumnName: Option[String],
-                             samplePointsPerPartitionHint: Option[Int],
-                             numOfPartitions: Option[Int]) {
-  import FeatureIndexParam._
-  val _countColumnName = countColumnName.getOrElse(defaultCountColumnName)
-  val _minUserCount = minUserCount.getOrElse(defaultMinUserCount)
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FilterTopFeaturesParam.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FilterTopFeaturesParam.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FilterTopFeaturesParam.scala
new file mode 100644
index 0000000..3b9d868
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FilterTopFeaturesParam.scala
@@ -0,0 +1,7 @@
+package org.apache.s2graph.s2jobs.wal.process.params
+
+class FilterTopFeaturesParam(maxRankPerDim: Option[Map[String, Int]],
+                             defaultMaxRank: Option[Int]) {
+  val _maxRankPerDim = maxRankPerDim.getOrElse(Map.empty)
+  val _defaultMaxRank = defaultMaxRank.getOrElse(Int.MaxValue)
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FilterWalLogAggParam.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FilterWalLogAggParam.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FilterWalLogAggParam.scala
deleted file mode 100644
index 4b2209f..0000000
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FilterWalLogAggParam.scala
+++ /dev/null
@@ -1,7 +0,0 @@
-package org.apache.s2graph.s2jobs.wal.process.params
-
-class FilterWalLogAggParam(maxRankPerDim: Option[Map[String, Int]],
-                           defaultMaxRank: Option[Int]) {
-  val _maxRankPerDim = maxRankPerDim.getOrElse(Map.empty)
-  val _defaultMaxRank = defaultMaxRank.getOrElse(Int.MaxValue)
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/DefaultTransformer.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/DefaultTransformer.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/DefaultTransformer.scala
index 2839d6a..de328e5 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/DefaultTransformer.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/DefaultTransformer.scala
@@ -1,3 +1,5 @@
 package org.apache.s2graph.s2jobs.wal.transformer
 
-case class DefaultTransformer() extends Transformer
+import org.apache.s2graph.s2jobs.task.TaskConf
+
+case class DefaultTransformer(taskConf: TaskConf) extends Transformer(taskConf)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractDomain.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractDomain.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractDomain.scala
index 9b09d71..45bbe66 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractDomain.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractDomain.scala
@@ -1,12 +1,15 @@
 package org.apache.s2graph.s2jobs.wal.transformer
 
+import org.apache.s2graph.s2jobs.task.TaskConf
 import org.apache.s2graph.s2jobs.wal.utils.UrlUtils
 import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog}
+import play.api.libs.json.Json
 
-case class ExtractDomain(urlDimensions: Set[String],
-                         hostDimName: String = "host",
-                         domainDimName: String = "domain",
-                         keywordDimName: String = "uri_keywords") extends 
Transformer {
+case class ExtractDomain(taskConf: TaskConf) extends Transformer(taskConf) {
+  val urlDimensions = Json.parse(taskConf.options.getOrElse("urlDimensions", 
"[]")).as[Set[String]]
+  val hostDimName = taskConf.options.getOrElse("hostDimName", "host")
+  val domainDimName= taskConf.options.getOrElse("domainDimName", "domain")
+  val keywordDimName = taskConf.options.getOrElse("keywordDimName", 
"uri_keywords")
   override def toDimValLs(walLog: WalLog, propertyKey: String, propertyValue: 
String): Seq[DimVal] = {
     if (!urlDimensions(propertyKey)) Nil
     else {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractServiceName.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractServiceName.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractServiceName.scala
index 7ad7dfd..15efe9f 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractServiceName.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractServiceName.scala
@@ -1,10 +1,17 @@
 package org.apache.s2graph.s2jobs.wal.transformer
 
+import org.apache.s2graph.core.JSONParser
+import org.apache.s2graph.s2jobs.task.TaskConf
 import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog}
+import play.api.libs.json.{JsObject, Json}
+
+class ExtractServiceName(taskConf: TaskConf) extends Transformer(taskConf) {
+  val serviceDims = Json.parse(taskConf.options.getOrElse("serviceDims", 
"[]")).as[Set[String]]
+  val domainServiceMap = 
Json.parse(taskConf.options.getOrElse("domainServiceMap", 
"{}")).as[JsObject].fields.map { case (k, v) =>
+      k -> JSONParser.jsValueToString(v)
+  }.toMap
+  val serviceDimName = taskConf.options.getOrElse("serviceDimName", 
"serviceDimName")
 
-class ExtractServiceName(serviceDims: Set[String],
-                         domainServiceMap: Map[String, String] = Map.empty,
-                         serviceDimName: String = "serviceName") extends 
Transformer {
   override def toDimValLs(walLog: WalLog, propertyKey: String, propertyValue: 
String): Seq[DimVal] = {
     if (!serviceDims(propertyKey)) Nil
     else {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/Transformer.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/Transformer.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/Transformer.scala
index 523f58d..68be2ad 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/Transformer.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/Transformer.scala
@@ -1,11 +1,12 @@
 package org.apache.s2graph.s2jobs.wal.transformer
 
+import org.apache.s2graph.s2jobs.task.TaskConf
 import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog}
 
 /**
   * decide how to transform walLog's each property key value to Seq[DimVal]
   */
-trait Transformer {
+abstract class Transformer(taskConf: TaskConf) extends Serializable {
   def toDimValLs(walLog: WalLog, propertyKey: String, propertyValue: String): 
Seq[DimVal] = {
     val dim = s"${walLog.label}:${propertyKey}"
     val value = propertyValue

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/WalLogUDF.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/WalLogUDF.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/WalLogUDF.scala
index 34f4a2b..d213d6c 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/WalLogUDF.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/WalLogUDF.scala
@@ -137,22 +137,16 @@ object WalLogUDF {
     val sorted = rdd.repartitionAndSortWithinPartitions(partitioner)
 
     def rank(idx: Int, iter: Iterator[((K1, K2), V)]) = {
-      val initialK1: K1 = null.asInstanceOf[K1]
-      val initialK2: K2 = null.asInstanceOf[K2]
-      val initialV: V = null.asInstanceOf[V]
-      val zero = List((1L, initialK1, initialK2, initialV))
-
-      def merge(acc: List[(Long, K1, K2, V)], x: ((K1, K2), V)) =
-        (acc.head, x) match {
-          case ((offset, prevKey1, _, _), ((curKey1: K1, curKey2: K2), curVal: 
V)) => {
-            val newOffset = if (prevKey1 == curKey1) offset + 1L else 1L
-            (newOffset, curKey1, curKey2, curVal) :: acc
-          }
-        }
-
-      iter.foldLeft(zero)(merge).reverse.drop(1).map { case (offset, key1, 
key2, value) =>
-        (idx, offset, key1, key2, value)
-      }.toIterator
+      var curOffset = 1L
+      var curK1 = null.asInstanceOf[K1]
+
+      iter.map{ case ((key1, key2), value) =>
+        //        println(s">>>[$idx] curK1: $curK1, curOffset: $curOffset")
+        val newOffset = if (curK1 == key1) curOffset + 1L  else 1L
+        curOffset = newOffset
+        curK1 = key1
+        (idx, newOffset, key1, key2, value)
+      }
     }
 
     def getOffset(idx: Int, iter: Iterator[((K1, K2), V)]) = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TestData.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TestData.scala 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TestData.scala
new file mode 100644
index 0000000..fae9265
--- /dev/null
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TestData.scala
@@ -0,0 +1,42 @@
+package org.apache.s2graph.s2jobs.wal
+
+object TestData {
+  val testServiceName = "s2graph"
+  val walLogsLs = Seq(
+    WalLog(1L, "insert", "edge", "u1", "i1", s"$testServiceName", "click", 
"""{"item_name":"awesome item"}"""),
+    WalLog(2L, "insert", "edge", "u1", "i1", s"$testServiceName", "purchase", 
"""{"price":2}"""),
+    WalLog(3L, "insert", "edge", "u1", "q1", s"$testServiceName", "search", 
"""{"referrer":"www.google.com"}"""),
+    WalLog(4L, "insert", "edge", "u2", "i1", s"$testServiceName", "click", 
"""{"item_name":"awesome item"}"""),
+    WalLog(5L, "insert", "edge", "u2", "q2", s"$testServiceName", "search", 
"""{"referrer":"www.bing.com"}"""),
+    WalLog(6L, "insert", "edge", "u3", "i2", s"$testServiceName", "click", 
"""{"item_name":"bad item"}"""),
+    WalLog(7L, "insert", "edge", "u4", "q1", s"$testServiceName", "search", 
"""{"referrer":"www.google.com"}""")
+  )
+
+  // order by from
+  val aggExpected = Array(
+    WalLogAgg("u1", Seq(
+      WalLog(3L, "insert", "edge", "u1", "q1", s"$testServiceName", "search", 
"""{"referrer":"www.google.com"}"""),
+      WalLog(2L, "insert", "edge", "u1", "i1", s"$testServiceName", 
"purchase", """{"price":2}"""),
+      WalLog(1L, "insert", "edge", "u1", "i1", s"$testServiceName", "click", 
"""{"item_name":"awesome item"}""")
+    ), 3L, 1L),
+    WalLogAgg("u2", Seq(
+      WalLog(5L, "insert", "edge", "u2", "q2", s"$testServiceName", "search", 
"""{"referrer":"www.bing.com"}"""),
+      WalLog(4L, "insert", "edge", "u2", "i1", s"$testServiceName", "click", 
"""{"item_name":"awesome item"}""")
+    ), 5L, 4L),
+    WalLogAgg("u3", Seq(
+      WalLog(6L, "insert", "edge", "u3", "i2", s"$testServiceName", "click", 
"""{"item_name":"bad item"}""")
+    ), 6L, 6L),
+    WalLogAgg("u4", Seq(
+      WalLog(7L, "insert", "edge", "u4", "q1", s"$testServiceName", "search", 
"""{"referrer":"www.google.com"}""")
+    ), 7L, 7L)
+  )
+
+  // order by dim, rank
+  val featureDictExpected = Array(
+    DimValCountRank(DimVal("click:item_name", "awesome item"), 2, 1),
+    DimValCountRank(DimVal("click:item_name", "bad item"), 1, 2),
+    DimValCountRank(DimVal("purchase:price", "2"), 1, 1),
+    DimValCountRank(DimVal("search:referrer", "www.google.com"), 2, 1),
+    DimValCountRank(DimVal("search:referrer", "www.bing.com"), 1, 2)
+  )
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TransformerTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TransformerTest.scala 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TransformerTest.scala
index 5089390..974002b 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TransformerTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TransformerTest.scala
@@ -1,20 +1,26 @@
 package org.apache.s2graph.s2jobs.wal
 
+import org.apache.s2graph.s2jobs.task.TaskConf
 import org.apache.s2graph.s2jobs.wal.transformer._
 import org.scalatest.{FunSuite, Matchers}
+import play.api.libs.json.Json
 
 class TransformerTest extends FunSuite with Matchers {
   val walLog = WalLog(1L, "insert", "edge", "a", "b", "s2graph", "friends", 
"""{"name": 1, "url": "www.google.com"}""")
 
   test("test default transformer") {
-    val transformer = new DefaultTransformer
+    val taskConf = TaskConf.Empty
+    val transformer = new DefaultTransformer(taskConf)
     val dimVals = transformer.toDimValLs(walLog, "name", "1")
 
     dimVals shouldBe Seq(DimVal("friends:name", "1"))
   }
 
   test("test ExtractDomain from URL") {
-    val transformer = new ExtractDomain(urlDimensions = Set("url"))
+    val taskConf = TaskConf.Empty.copy(options =
+      Map("urlDimensions" -> Json.toJson(Seq("url")).toString())
+    )
+    val transformer = new ExtractDomain(taskConf)
     val dimVals = transformer.toDimValLs(walLog, "url", 
"http://www.google.com/abc";)
 
     dimVals shouldBe Seq(

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/BuildTopFeaturesProcessTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/BuildTopFeaturesProcessTest.scala
 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/BuildTopFeaturesProcessTest.scala
new file mode 100644
index 0000000..4d2f079
--- /dev/null
+++ 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/BuildTopFeaturesProcessTest.scala
@@ -0,0 +1,31 @@
+package org.apache.s2graph.s2jobs.wal.process
+
+import com.holdenkarau.spark.testing.DataFrameSuiteBase
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.DimValCountRank
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+
+class BuildTopFeaturesProcessTest extends FunSuite with Matchers with 
BeforeAndAfterAll with DataFrameSuiteBase {
+
+  import org.apache.s2graph.s2jobs.wal.TestData._
+
+  test("test entire process.") {
+    import spark.implicits._
+    val df = spark.createDataset(aggExpected).toDF()
+
+    val taskConf = new TaskConf(name = "test", `type` = "test", inputs = 
Seq("input"),
+      options = Map("minUserCount" -> "0")
+    )
+    val job = new BuildTopFeaturesProcess(taskConf)
+
+
+    val inputMap = Map("input" -> df)
+    val featureDicts = job.execute(spark, inputMap)
+      .orderBy("dim", "rank")
+      .map(DimValCountRank.fromRow)
+      .collect()
+
+    featureDicts shouldBe featureDictExpected
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/FilterTopFeaturesProcessTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/FilterTopFeaturesProcessTest.scala
 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/FilterTopFeaturesProcessTest.scala
new file mode 100644
index 0000000..cd8295a
--- /dev/null
+++ 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/FilterTopFeaturesProcessTest.scala
@@ -0,0 +1,84 @@
+package org.apache.s2graph.s2jobs.wal.process
+
+import com.holdenkarau.spark.testing.DataFrameSuiteBase
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.transformer.DefaultTransformer
+import org.apache.s2graph.s2jobs.wal.{DimValCountRank, WalLogAgg}
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+
+class FilterTopFeaturesProcessTest extends FunSuite with Matchers with 
BeforeAndAfterAll with DataFrameSuiteBase {
+  import org.apache.s2graph.s2jobs.wal.TestData._
+
+  test("test filterTopKsPerDim.") {
+    import spark.implicits._
+    val featureDf = spark.createDataset(featureDictExpected).map { x =>
+      (x.dimVal.dim, x.dimVal.value, x.count, x.rank)
+    }.toDF("dim", "value", "count", "rank")
+
+    val maxRankPerDim = spark.sparkContext.broadcast(Map.empty[String, Int])
+
+    // filter nothing because all feature has rank < 10
+    val filtered = FilterTopFeaturesProcess.filterTopKsPerDim(featureDf, 
maxRankPerDim, 10)
+
+    val real = filtered.orderBy("dim", 
"rank").map(DimValCountRank.fromRow).collect()
+    real.zip(featureDictExpected).foreach { case (real, expected) =>
+        real shouldBe expected
+    }
+    // filter rank >= 2
+    val filtered2 = FilterTopFeaturesProcess.filterTopKsPerDim(featureDf, 
maxRankPerDim, 2)
+    val real2 = filtered2.orderBy("dim", 
"rank").map(DimValCountRank.fromRow).collect()
+    real2 shouldBe featureDictExpected.filter(_.rank < 2)
+  }
+
+
+  test("test filterWalLogAgg.") {
+    import spark.implicits._
+    val walLogAgg = spark.createDataset(aggExpected)
+    val featureDf = spark.createDataset(featureDictExpected).map { x =>
+      (x.dimVal.dim, x.dimVal.value, x.count, x.rank)
+    }.toDF("dim", "value", "count", "rank")
+    val maxRankPerDim = spark.sparkContext.broadcast(Map.empty[String, Int])
+
+    val transformers = Seq(DefaultTransformer(TaskConf.Empty))
+    // filter nothing. so input, output should be same.
+    val featureFiltered = 
FilterTopFeaturesProcess.filterTopKsPerDim(featureDf, maxRankPerDim, 10)
+    val validFeatureHashKeys = 
FilterTopFeaturesProcess.collectDistinctFeatureHashes(spark, featureFiltered)
+    val validFeatureHashKeysBCast = 
spark.sparkContext.broadcast(validFeatureHashKeys)
+    val real = FilterTopFeaturesProcess.filterWalLogAgg(spark, walLogAgg, 
transformers, validFeatureHashKeysBCast)
+      .collect().sortBy(_.from)
+
+    real.zip(aggExpected).foreach { case (real, expected) =>
+      real shouldBe expected
+    }
+  }
+
+  test("test entire process. filter nothing.") {
+    import spark.implicits._
+    val df = spark.createDataset(aggExpected).toDF()
+    val featureDf = spark.createDataset(featureDictExpected).map { x =>
+      (x.dimVal.dim, x.dimVal.value, x.count, x.rank)
+    }.toDF("dim", "value", "count", "rank")
+
+    val inputKey = "input"
+    val featureDictKey = "feature"
+    // filter nothing since we did not specified maxRankPerDim and 
defaultMaxRank.
+    val taskConf = new TaskConf(name = "test", `type` = "test",
+      inputs = Seq(inputKey, featureDictKey),
+      options = Map(
+        "featureDict" -> featureDictKey,
+        "walLogAgg" -> inputKey
+      )
+    )
+    val inputMap = Map(inputKey -> df, featureDictKey -> featureDf)
+    val job = new FilterTopFeaturesProcess(taskConf)
+    val filtered = job.execute(spark, inputMap)
+      .orderBy("from")
+      .as[WalLogAgg]
+      .collect()
+
+    filtered.zip(aggExpected).foreach { case (real, expected) =>
+      real shouldBe expected
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala
 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala
index 5753411..1bb7426 100644
--- 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala
+++ 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala
@@ -2,35 +2,20 @@ package org.apache.s2graph.s2jobs.wal.process
 
 import com.holdenkarau.spark.testing.DataFrameSuiteBase
 import org.apache.s2graph.s2jobs.task.TaskConf
-import org.apache.s2graph.s2jobs.wal.WalLog
-import org.apache.s2graph.s2jobs.wal.udafs._
-import org.apache.s2graph.s2jobs.wal.utils.BoundedPriorityQueue
+import org.apache.s2graph.s2jobs.wal._
 import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
 
-import scala.collection.mutable
-import scala.util.Random
-
 class WalLogAggregateProcessTest extends FunSuite with Matchers with 
BeforeAndAfterAll with DataFrameSuiteBase {
-  val walLogsLs = Seq(
-    WalLog(1L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 
1}"""),
-    WalLog(2L, "insert", "edge", "a", "c", "s2graph", "friends", """{"name": 
2}"""),
-    WalLog(3L, "insert", "edge", "a", "d", "s2graph", "friends", """{"name": 
3}"""),
-    WalLog(4L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 
4}""")
-  )
-  val walLogsLs2 = Seq(
-    WalLog(5L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 
1}"""),
-    WalLog(6L, "insert", "edge", "a", "c", "s2graph", "friends", """{"name": 
2}"""),
-    WalLog(7L, "insert", "edge", "a", "d", "s2graph", "friends", """{"name": 
3}"""),
-    WalLog(8L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 
4}""")
-  )
-
+  import org.apache.s2graph.s2jobs.wal.TestData._
 
-  test("test S2EdgeDataAggregateProcess") {
+  test("test entire process") {
     import spark.sqlContext.implicits._
 
     val edges = spark.createDataset(walLogsLs).toDF()
-    val inputMap = Map("edges" -> edges)
-    val taskConf = new TaskConf(name = "test", `type` = "agg", inputs = 
Seq("edges"),
+    val processKey = "agg"
+    val inputMap = Map(processKey -> edges)
+
+    val taskConf = new TaskConf(name = "test", `type` = "agg", inputs = 
Seq(processKey),
       options = Map("maxNumOfEdges" -> "10")
     )
 
@@ -38,40 +23,9 @@ class WalLogAggregateProcessTest extends FunSuite with 
Matchers with BeforeAndAf
     val processed = job.execute(spark, inputMap)
 
     processed.printSchema()
-    processed.collect().foreach { row =>
-      println(row)
-    }
-  }
-
-  test("mergeTwoSeq") {
-    val prev: Array[Int] = Array(3, 2, 1)
-    val cur: Array[Int] = Array(4, 2, 2)
-
-    val ls = WalLogUDAF.mergeTwoSeq(prev, cur, 10)
-    println(ls.size)
-
-    ls.foreach { x =>
-      println(x)
+    processed.orderBy("from").as[WalLogAgg].collect().zip(aggExpected).foreach 
{ case (real, expected) =>
+      real shouldBe expected
     }
   }
 
-  test("addToTopK test.") {
-    import WalLogUDAF._
-    val numOfTest = 100
-    val numOfNums = 100
-    val maxNum = 10
-
-    (0 until numOfTest).foreach { testNum =>
-      val maxSize = 1 + Random.nextInt(numOfNums)
-      val pq = new BoundedPriorityQueue[Int](maxSize)
-      val arr = (0 until numOfNums).map(x => Random.nextInt(maxNum))
-      var result: mutable.Seq[Int] = mutable.ArrayBuffer.empty[Int]
-
-      arr.foreach { i =>
-        pq += i
-        result = addToTopK(result, maxSize, i)
-      }
-      result.sorted shouldBe pq.toSeq.sorted
-    }
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAFTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAFTest.scala
 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAFTest.scala
new file mode 100644
index 0000000..aded56d
--- /dev/null
+++ 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAFTest.scala
@@ -0,0 +1,42 @@
+package org.apache.s2graph.s2jobs.wal.udafs
+
+import org.apache.s2graph.s2jobs.wal.utils.BoundedPriorityQueue
+import org.scalatest._
+
+import scala.collection.mutable
+import scala.util.Random
+
+class WalLogUDAFTest extends FunSuite with Matchers {
+
+  test("mergeTwoSeq") {
+    val prev: Array[Int] = Array(3, 2, 1)
+    val cur: Array[Int] = Array(4, 2, 2)
+
+    val ls = WalLogUDAF.mergeTwoSeq(prev, cur, 10)
+    println(ls.size)
+
+    ls.foreach { x =>
+      println(x)
+    }
+  }
+
+  test("addToTopK test.") {
+    import WalLogUDAF._
+    val numOfTest = 100
+    val numOfNums = 100
+    val maxNum = 10
+
+    (0 until numOfTest).foreach { testNum =>
+      val maxSize = 1 + Random.nextInt(numOfNums)
+      val pq = new BoundedPriorityQueue[Int](maxSize)
+      val arr = (0 until numOfNums).map(x => Random.nextInt(maxNum))
+      var result: mutable.Seq[Int] = mutable.ArrayBuffer.empty[Int]
+
+      arr.foreach { i =>
+        pq += i
+        result = addToTopK(result, maxSize, i)
+      }
+      result.sorted shouldBe pq.toSeq.sorted
+    }
+  }
+}

Reply via email to