add WalLogAggregateProcess.
 - cleanup codes.
 - add little bit of comments.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/a2e10fbb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/a2e10fbb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/a2e10fbb

Branch: refs/heads/master
Commit: a2e10fbbb6baefa87b80623fe97e3affb1144e4f
Parents: e94a029
Author: DO YUNG YOON <steams...@apache.org>
Authored: Mon Aug 13 10:29:42 2018 +0900
Committer: DO YUNG YOON <steams...@apache.org>
Committed: Mon Aug 13 10:29:42 2018 +0900

----------------------------------------------------------------------
 .../org/apache/s2graph/s2jobs/task/Source.scala |   9 +-
 .../s2jobs/wal/BoundedPriorityQueue.scala       |  46 -------
 .../org/apache/s2graph/s2jobs/wal/WalLog.scala  |  75 +++++++++--
 .../process/S2EdgeDataAggregateProcess.scala    |  33 -----
 .../S2EdgeDataArrayAggregateProcess.scala       |  29 -----
 .../wal/process/WalLogAggregateProcess.scala    |  87 +++++++++++++
 .../s2jobs/wal/utils/BoundedPriorityQueue.scala |  52 ++++++++
 .../S2EdgeDataAggregateProcessTest.scala        | 123 -------------------
 .../process/WalLogAggregateProcessTest.scala    |  78 ++++++++++++
 9 files changed, 291 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a2e10fbb/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
index 5bbf166..8e4e234 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
@@ -99,13 +99,20 @@ class FileSource(conf:TaskConf) extends Source(conf) {
     val paths = conf.options("paths").split(",")
     val format = conf.options.getOrElse("format", DEFAULT_FORMAT)
     val columnsOpt = conf.options.get("columns")
+    val readOptions = conf.options.get("read").map { s =>
+      Json.parse(s).as[JsObject].fields.map { case (k, jsValue) =>
+        k -> JSONParser.jsValueToString(jsValue)
+      }.toMap
+    }.getOrElse(Map.empty)
 
     format match {
       case "edgeLog" =>
         ss.read.format("com.databricks.spark.csv").option("delimiter", "\t")
           .schema(BulkLoadSchema).load(paths: _*)
       case _ =>
-        val df = ss.read.format(format).load(paths: _*)
+        val df =
+          if (readOptions.isEmpty) ss.read.format(format).load(paths: _*)
+          else ss.read.options(readOptions).format(format).load(paths: _*)
 
         if (columnsOpt.isDefined) 
df.toDF(columnsOpt.get.split(",").map(_.trim): _*) else df
     }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a2e10fbb/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/BoundedPriorityQueue.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/BoundedPriorityQueue.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/BoundedPriorityQueue.scala
deleted file mode 100644
index ce03169..0000000
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/BoundedPriorityQueue.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-package org.apache.s2graph.s2jobs.wal
-
-import scala.collection.generic.Growable
-import scala.collection.JavaConverters._
-import java.util.{PriorityQueue => JPriorityQueue}
-
-
-class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A])
-  extends Iterable[A] with Growable[A] with Serializable {
-
-  private val underlying = new JPriorityQueue[A](maxSize, ord)
-
-  override def iterator: Iterator[A] = underlying.iterator.asScala
-
-  override def size: Int = underlying.size
-
-  override def ++=(xs: TraversableOnce[A]): this.type = {
-    xs.foreach { this += _ }
-    this
-  }
-
-  override def +=(elem: A): this.type = {
-    if (size < maxSize) {
-      underlying.offer(elem)
-    } else {
-      maybeReplaceLowest(elem)
-    }
-    this
-  }
-
-  override def +=(elem1: A, elem2: A, elems: A*): this.type = {
-    this += elem1 += elem2 ++= elems
-  }
-
-  override def clear() { underlying.clear() }
-
-  private def maybeReplaceLowest(a: A): Boolean = {
-    val head = underlying.peek()
-    if (head != null && ord.gt(a, head)) {
-      underlying.poll()
-      underlying.offer(a)
-    } else {
-      false
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a2e10fbb/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
index 1b70a8a..dc628ec 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
@@ -1,21 +1,66 @@
 package org.apache.s2graph.s2jobs.wal
 
+import org.apache.s2graph.core.JSONParser
+import org.apache.s2graph.s2jobs.wal.process.AggregateParam
+import org.apache.s2graph.s2jobs.wal.utils.BoundedPriorityQueue
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.{LongType, StringType, StructField, 
StructType}
+import play.api.libs.json.{JsObject, Json}
 
-case class WalLog(timestamp:Long,
-                  operation:String,
-                  elem:String,
-                  from:String,
-                  to:String,
-                  service:String,
-                  label:String,
-                  props:String) {
+import scala.util.Try
+
+object WalLogAgg {
+  val outputColumns = Seq("from", "logs", "maxTs", "minTs")
+
+  def apply(walLog: WalLog): WalLogAgg = {
+    new WalLogAgg(walLog.from, Seq(walLog), walLog.timestamp, walLog.timestamp)
+  }
+
+  def merge(iter: Iterator[WalLogAgg],
+            param: AggregateParam)(implicit ord: Ordering[WalLog]) = {
+    val heap = new BoundedPriorityQueue[WalLog](param.heapSize)
+    var minTs = Long.MaxValue
+    var maxTs = Long.MinValue
+
+    iter.foreach { walLogAgg =>
+      minTs = Math.min(walLogAgg.minTs, minTs)
+      maxTs = Math.max(walLogAgg.maxTs, maxTs)
+
+      walLogAgg.logs.foreach { walLog =>
+        heap += walLog
+      }
+    }
+    val topItems = if (param.sortTopItems) heap.toArray.sortBy(-_.timestamp) 
else heap.toArray
+
+    WalLogAgg(topItems.head.from, topItems, maxTs, minTs)
+  }
+}
+
+case class WalLogAgg(from: String,
+                     logs: Seq[WalLog],
+                     maxTs: Long,
+                     minTs: Long)
+
+case class WalLog(timestamp: Long,
+                  operation: String,
+                  elem: String,
+                  from: String,
+                  to: String,
+                  service: String,
+                  label: String,
+                  props: String) {
   val id = from
   val columnName = label
   val serviceName = to
+
+  lazy val propsKeyValues = Json.parse(props).as[JsObject].fields.map { case 
(key, jsValue) =>
+    key -> JSONParser.jsValueToString(jsValue)
+  }
 }
 
 object WalLog {
+  val orderByTsAsc = Ordering.by[WalLog, Long](walLog => walLog.timestamp)
+
   val WalLogSchema = StructType(Seq(
     StructField("timestamp", LongType, false),
     StructField("operation", StringType, false),
@@ -27,5 +72,17 @@ object WalLog {
     StructField("props", StringType, false)
     //    StructField("direction", StringType, true)
   ))
-}
 
+  def fromRow(row: Row): WalLog = {
+    val timestamp = row.getAs[Long]("timestamp")
+    val operation = 
Try(row.getAs[String]("operation")).toOption.getOrElse("insert")
+    val elem = Try(row.getAs[String]("elem")).toOption.getOrElse("edge")
+    val from = row.getAs[String]("from")
+    val to = row.getAs[String]("to")
+    val service = row.getAs[String]("service")
+    val label = row.getAs[String]("label")
+    val props = Try(row.getAs[String]("props")).toOption.getOrElse("{}")
+
+    WalLog(timestamp, operation, elem, from, to, service, label, props)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a2e10fbb/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala
deleted file mode 100644
index cdc6c8c..0000000
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.s2graph.s2jobs.wal.process
-
-import org.apache.s2graph.s2jobs.task.TaskConf
-import org.apache.s2graph.s2jobs.wal.udafs._
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.{DataFrame, SparkSession}
-
-/**
-  * expect S2EdgeData dataframe as input.
-  * @param taskConf
-  */
-class S2EdgeDataAggregateProcess(taskConf: TaskConf) extends 
org.apache.s2graph.s2jobs.task.Process(taskConf) {
-  override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): 
DataFrame = {
-    val maxNumOfEdges = 
taskConf.options.get("maxNumOfEdges").map(_.toInt).getOrElse(1000)
-    val groupByColumns = 
taskConf.options.get("groupByColumns").getOrElse("from").split(",").map(col(_))
-    val aggregateColumns = 
taskConf.options.get("aggregateColumns").getOrElse("timestamp,to,label,props").split(",").map(col(_))
-    
taskConf.options.get("parallelism").map(ss.sqlContext.setConf("spark.sql.shuffle.partitions",
 _))
-
-    val aggregator = S2EdgeDataAggregate(maxNumOfEdges)
-
-    val edges = inputMap(taskConf.inputs.head)
-
-    edges
-      .groupBy(groupByColumns: _*)
-      .agg(
-        aggregator(aggregateColumns: _*).as("edges"),
-        max(col("timestamp")).as("max_ts"),
-        min(col("timestamp")).as("min_ts")
-      )
-  }
-
-  override def mandatoryOptions: Set[String] = Set.empty
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a2e10fbb/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataArrayAggregateProcess.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataArrayAggregateProcess.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataArrayAggregateProcess.scala
deleted file mode 100644
index 08df269..0000000
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataArrayAggregateProcess.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.s2graph.s2jobs.wal.process
-
-import org.apache.s2graph.s2jobs.task.TaskConf
-import org.apache.s2graph.s2jobs.wal.udafs.GroupByArrayAgg
-import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.spark.sql.functions._
-
-class S2EdgeDataArrayAggregateProcess(taskConf: TaskConf) extends 
org.apache.s2graph.s2jobs.task.Process(taskConf) {
-  override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): 
DataFrame = {
-    import ss.sqlContext.implicits._
-    val maxNumOfEdges = 
taskConf.options.get("maxNumOfEdges").map(_.toInt).getOrElse(1000)
-    val groupByColumns = 
taskConf.options.get("groupByColumns").getOrElse("from").split(",").map(col(_))
-    val aggregateColumns = 
taskConf.options.get("aggregateColumns").getOrElse("edges").split(",").map(col(_))
-    
taskConf.options.get("parallelism").map(ss.sqlContext.setConf("spark.sql.shuffle.partitions",
 _))
-    val aggregator = new GroupByArrayAgg(maxNumOfEdges)
-
-    val edges = inputMap(taskConf.inputs.head)
-
-    edges
-      .groupBy(groupByColumns: _*)
-      .agg(
-        aggregator(aggregateColumns: _*).as("edges"),
-        max(col("max_ts")).as("max_ts"),
-        min(col("min_ts")).as("min_ts")
-      )
-  }
-
-  override def mandatoryOptions: Set[String] = Set.empty
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a2e10fbb/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
new file mode 100644
index 0000000..0c9829c
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
@@ -0,0 +1,87 @@
+package org.apache.s2graph.s2jobs.wal.process
+
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.{WalLog, WalLogAgg}
+import org.apache.spark.sql._
+
+object AggregateParam {
+  val defaultGroupByKeys = Seq("from")
+  val defaultTopK = 1000
+  val defaultIsArrayType = false
+  val defaultShouldSortTopItems = true
+}
+
+case class AggregateParam(groupByKeys: Option[Seq[String]],
+                          topK: Option[Int],
+                          isArrayType: Option[Boolean],
+                          shouldSortTopItems: Option[Boolean]) {
+
+  import AggregateParam._
+
+  val groupByColumns = groupByKeys.getOrElse(defaultGroupByKeys)
+  val heapSize = topK.getOrElse(defaultTopK)
+  val arrayType = isArrayType.getOrElse(defaultIsArrayType)
+  val sortTopItems = shouldSortTopItems.getOrElse(defaultShouldSortTopItems)
+}
+
+object WalLogAggregateProcess {
+  def aggregate(ss: SparkSession,
+                dataset: Dataset[WalLogAgg],
+                aggregateParam: AggregateParam)(implicit ord: 
Ordering[WalLog]) = {
+    import ss.implicits._
+    dataset.groupByKey(_.from).mapGroups { case (_, iter) =>
+      WalLogAgg.merge(iter, aggregateParam)
+    }.toDF(WalLogAgg.outputColumns: _*)
+  }
+
+  def aggregateRaw(ss: SparkSession,
+                   dataset: Dataset[WalLog],
+                   aggregateParam: AggregateParam)(implicit ord: 
Ordering[WalLog]): DataFrame = {
+    import ss.implicits._
+
+    dataset.groupByKey(walLog => walLog.from).mapGroups { case (key, iter) =>
+      WalLogAgg.merge(iter.map(WalLogAgg(_)), aggregateParam)
+    }.toDF(WalLogAgg.outputColumns: _*)
+  }
+}
+
+
+/**
+  * expect DataFrame of WalLog, then group WalLog by groupByKeys(default from).
+  * produce DataFrame of WalLogAgg which abstract the session consists of 
sequence of WalLog ordered by timestamp(desc).
+  *
+  * one way to visualize this is that transforming (row, column, value) matrix 
entries into (row, Sparse Vector(column:value).
+  * note that we only keep track of max topK latest walLog per each groupByKeys
+  */
+class WalLogAggregateProcess(taskConf: TaskConf) extends 
org.apache.s2graph.s2jobs.task.Process(taskConf) {
+
+  import WalLogAggregateProcess._
+
+  override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): 
DataFrame = {
+    import ss.implicits._
+
+    val groupByKeys = 
taskConf.options.get("groupByKeys").map(_.split(",").filter(_.nonEmpty).toSeq)
+    val maxNumOfEdges = 
taskConf.options.get("maxNumOfEdges").map(_.toInt).getOrElse(1000)
+    val arrayType = 
taskConf.options.get("arrayType").map(_.toBoolean).getOrElse(false)
+    val sortTopItems = 
taskConf.options.get("sortTopItems").map(_.toBoolean).getOrElse(false)
+
+    taskConf.options.get("parallelism").foreach(d => 
ss.sqlContext.setConf("spark.sql.shuffle.partitions", d))
+
+    implicit val ord = WalLog.orderByTsAsc
+    val walLogs = 
taskConf.inputs.tail.foldLeft(inputMap(taskConf.inputs.head)) { case (prev, 
cur) =>
+      prev.union(inputMap(cur))
+    }
+
+    //TODO: Current implementation only expect taskConf.options as Map[String, 
String].
+    //Once we change taskConf.options as JsObject, then we can simply parse 
input paramter as following.
+    //implicit val paramReads = Json.reads[AggregateParam]
+
+    val param = AggregateParam(groupByKeys, topK = Option(maxNumOfEdges),
+      isArrayType = Option(arrayType), shouldSortTopItems = 
Option(sortTopItems))
+
+    if (arrayType) aggregate(ss, walLogs.as[WalLogAgg], param)
+    else aggregateRaw(ss, walLogs.as[WalLog], param)
+  }
+
+  override def mandatoryOptions: Set[String] = Set.empty
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a2e10fbb/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/BoundedPriorityQueue.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/BoundedPriorityQueue.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/BoundedPriorityQueue.scala
new file mode 100644
index 0000000..c146452
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/BoundedPriorityQueue.scala
@@ -0,0 +1,52 @@
+package org.apache.s2graph.s2jobs.wal.utils
+
+import java.util.{PriorityQueue => JPriorityQueue}
+
+import scala.collection.JavaConverters._
+import scala.collection.generic.Growable
+
+/**
+  * copied from org.apache.spark.util.BoundedPriorityQueue since it is package 
private.
+  * @param maxSize
+  * @param ord
+  * @tparam A
+  */
+class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A])
+  extends Iterable[A] with Growable[A] with Serializable {
+
+  private val underlying = new JPriorityQueue[A](maxSize, ord)
+
+  override def iterator: Iterator[A] = underlying.iterator.asScala
+
+  override def size: Int = underlying.size
+
+  override def ++=(xs: TraversableOnce[A]): this.type = {
+    xs.foreach { this += _ }
+    this
+  }
+
+  override def +=(elem: A): this.type = {
+    if (size < maxSize) {
+      underlying.offer(elem)
+    } else {
+      maybeReplaceLowest(elem)
+    }
+    this
+  }
+
+  override def +=(elem1: A, elem2: A, elems: A*): this.type = {
+    this += elem1 += elem2 ++= elems
+  }
+
+  override def clear() { underlying.clear() }
+
+  private def maybeReplaceLowest(a: A): Boolean = {
+    val head = underlying.peek()
+    if (head != null && ord.gt(a, head)) {
+      underlying.poll()
+      underlying.offer(a)
+    } else {
+      false
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a2e10fbb/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala
 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala
deleted file mode 100644
index a397f7e..0000000
--- 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-package org.apache.s2graph.s2jobs.wal.process
-
-import com.holdenkarau.spark.testing.DataFrameSuiteBase
-import org.apache.s2graph.s2jobs.task.TaskConf
-import org.apache.s2graph.s2jobs.wal.{BoundedPriorityQueue, WalLog}
-import org.apache.s2graph.s2jobs.wal.udafs._
-import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-
-import scala.collection.mutable
-import scala.util.Random
-
-class S2EdgeDataAggregateProcessTest extends FunSuite with Matchers with 
BeforeAndAfterAll with DataFrameSuiteBase {
-  val walLogsLs = Seq(
-    WalLog(1L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 
1}"""),
-    WalLog(2L, "insert", "edge", "a", "c", "s2graph", "friends", """{"name": 
2}"""),
-    WalLog(3L, "insert", "edge", "a", "d", "s2graph", "friends", """{"name": 
3}"""),
-    WalLog(4L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 
4}""")
-  )
-  val walLogsLs2 = Seq(
-    WalLog(5L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 
1}"""),
-    WalLog(6L, "insert", "edge", "a", "c", "s2graph", "friends", """{"name": 
2}"""),
-    WalLog(7L, "insert", "edge", "a", "d", "s2graph", "friends", """{"name": 
3}"""),
-    WalLog(8L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 
4}""")
-  )
-
-
-  test("test S2EdgeDataAggregateProcess") {
-    import spark.sqlContext.implicits._
-
-    val edges = spark.createDataset(walLogsLs).toDF()
-    val inputMap = Map("edges" -> edges)
-    val taskConf = new TaskConf(name = "test", `type` = "agg", inputs = 
Seq("edges"),
-      options = Map("maxNumOfEdges" -> "10",
-        "runOrderBy" -> "false",
-        "groupByAggClassName" -> "GroupByAggOptimized"))
-
-    val job = new S2EdgeDataAggregateProcess(taskConf = taskConf)
-    val processed = job.execute(spark, inputMap)
-
-    processed.printSchema()
-    processed.collect().foreach { row =>
-      println(row)
-    }
-  }
-
-  test("test S2EdgeDataArrayAggregateProcess") {
-    import spark.sqlContext.implicits._
-
-    val edges = spark.createDataset(walLogsLs).toDF()
-    val edges2 = spark.createDataset(walLogsLs2).toDF()
-
-    val firstConf = new TaskConf(name = "test", `type` = "agg", inputs = 
Seq("edges"),
-      options = Map("maxNumOfEdges" -> "10"))
-
-    val firstJob = new S2EdgeDataAggregateProcess(firstConf)
-    val firstJob2 = new S2EdgeDataAggregateProcess(firstConf)
-
-    val first = firstJob.execute(spark, Map("edges" -> edges))
-    val first2 = firstJob2.execute(spark, Map("edges" -> edges2))
-
-    val secondInputMap = Map(
-      "aggregated" -> first.union(first2)
-    )
-
-    val secondConf = new TaskConf(name = "testarray", `type` = "agg",
-      inputs = Seq("aggregated"),
-      options = Map("maxNumOfEdges" -> "10"))
-
-    val secondJob = new S2EdgeDataArrayAggregateProcess(secondConf)
-
-
-    val processed = secondJob.execute(spark, secondInputMap)
-
-    processed.printSchema()
-    processed.collect().foreach { row =>
-      println(row)
-    }
-  }
-
-  test("mergeTwoSeq") {
-    val prev: Array[Int] = Array(3, 2, 1)
-    val cur: Array[Int] = Array(4, 2, 2)
-
-    val ls = S2EdgeDataAggregate.mergeTwoSeq(prev, cur, 10)
-    println(ls.size)
-
-    ls.foreach { x =>
-      println(x)
-    }
-  }
-
-  test("addToTopK test.") {
-    import S2EdgeDataAggregate._
-    val numOfTest = 100
-    val numOfNums = 100
-    val maxNum = 10
-
-    (0 until numOfTest).foreach { testNum =>
-      val maxSize = 1 + Random.nextInt(numOfNums)
-      val pq = new BoundedPriorityQueue[Int](maxSize)
-      val arr = (0 until numOfNums).map(x => Random.nextInt(maxNum))
-      var result: mutable.Seq[Int] = mutable.ArrayBuffer.empty[Int]
-
-      arr.foreach { i =>
-        pq += i
-        result = addToTopK(result, maxSize, i)
-      }
-      result.toSeq.sorted shouldBe pq.toSeq.sorted
-    }
-
-//    val maxSize = 1 + Random.nextInt(numOfNums)
-//    val pq = new BoundedPriorityQueue[Int](maxSize)
-//    val arr = (0 until numOfNums).map(x => Random.nextInt(maxNum))
-//    val result = mutable.ArrayBuffer.empty[Int]
-//    var lastPos = 0
-//    arr.foreach { i =>
-//      pq += i
-//      addToTopK(result, lastPos, maxSize, i)
-//      lastPos = lastPos + 1
-//    }
-//    result.toSeq.sorted shouldBe pq.toSeq.sorted
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a2e10fbb/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala
 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala
new file mode 100644
index 0000000..5ae595b
--- /dev/null
+++ 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala
@@ -0,0 +1,78 @@
+package org.apache.s2graph.s2jobs.wal.process
+
+import com.holdenkarau.spark.testing.DataFrameSuiteBase
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.WalLog
+import org.apache.s2graph.s2jobs.wal.udafs._
+import org.apache.s2graph.s2jobs.wal.utils.BoundedPriorityQueue
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+
+import scala.collection.mutable
+import scala.util.Random
+
+class WalLogAggregateProcessTest extends FunSuite with Matchers with 
BeforeAndAfterAll with DataFrameSuiteBase {
+  val walLogsLs = Seq(
+    WalLog(1L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 
1}"""),
+    WalLog(2L, "insert", "edge", "a", "c", "s2graph", "friends", """{"name": 
2}"""),
+    WalLog(3L, "insert", "edge", "a", "d", "s2graph", "friends", """{"name": 
3}"""),
+    WalLog(4L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 
4}""")
+  )
+  val walLogsLs2 = Seq(
+    WalLog(5L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 
1}"""),
+    WalLog(6L, "insert", "edge", "a", "c", "s2graph", "friends", """{"name": 
2}"""),
+    WalLog(7L, "insert", "edge", "a", "d", "s2graph", "friends", """{"name": 
3}"""),
+    WalLog(8L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 
4}""")
+  )
+
+
+  test("test S2EdgeDataAggregateProcess") {
+    import spark.sqlContext.implicits._
+
+    val edges = spark.createDataset(walLogsLs).toDF()
+    val inputMap = Map("edges" -> edges)
+    val taskConf = new TaskConf(name = "test", `type` = "agg", inputs = 
Seq("edges"),
+      options = Map("maxNumOfEdges" -> "10",
+        "runOrderBy" -> "false",
+        "groupByAggClassName" -> "GroupByAggOptimized"))
+
+    val job = new WalLogAggregateProcess(taskConf = taskConf)
+    val processed = job.execute(spark, inputMap)
+
+    processed.printSchema()
+    processed.collect().foreach { row =>
+      println(row)
+    }
+  }
+
+  test("mergeTwoSeq") {
+    val prev: Array[Int] = Array(3, 2, 1)
+    val cur: Array[Int] = Array(4, 2, 2)
+
+    val ls = S2EdgeDataAggregate.mergeTwoSeq(prev, cur, 10)
+    println(ls.size)
+
+    ls.foreach { x =>
+      println(x)
+    }
+  }
+
+  test("addToTopK test.") {
+    import S2EdgeDataAggregate._
+    val numOfTest = 100
+    val numOfNums = 100
+    val maxNum = 10
+
+    (0 until numOfTest).foreach { testNum =>
+      val maxSize = 1 + Random.nextInt(numOfNums)
+      val pq = new BoundedPriorityQueue[Int](maxSize)
+      val arr = (0 until numOfNums).map(x => Random.nextInt(maxNum))
+      var result: mutable.Seq[Int] = mutable.ArrayBuffer.empty[Int]
+
+      arr.foreach { i =>
+        pq += i
+        result = addToTopK(result, maxSize, i)
+      }
+      result.sorted shouldBe pq.toSeq.sorted
+    }
+  }
+}
\ No newline at end of file

Reply via email to