- add nowOpt on AggregateParam.
- add mergeWalLogs which iterate over Iterator[WalLog], instead of 
Iterator[WalLogAgg].


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/0f06c52f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/0f06c52f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/0f06c52f

Branch: refs/heads/master
Commit: 0f06c52fe05a074777c6331266ccf6ce5d3fb83a
Parents: aa8bd35
Author: DO YUNG YOON <steams...@apache.org>
Authored: Mon Oct 29 14:28:13 2018 +0900
Committer: DO YUNG YOON <steams...@apache.org>
Committed: Mon Oct 29 14:28:13 2018 +0900

----------------------------------------------------------------------
 .../org/apache/s2graph/s2jobs/wal/WalLog.scala  | 62 ++++++++++++++++----
 .../wal/process/WalLogAggregateProcess.scala    |  2 +-
 .../wal/process/params/AggregateParam.scala     |  8 ++-
 3 files changed, 56 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0f06c52f/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
index 528f170..ad696a9 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
@@ -32,44 +32,77 @@ object WalLogAgg {
     Hashing.murmur3_128().hashBytes(s"$dim:$value".getBytes("UTF-8")).asLong()
   }
 
+  private def addToHeap(walLog: WalLog,
+                        heap: BoundedPriorityQueue[WalLog],
+                        now: Long,
+                        validTimestampDuration: Option[Long]): Unit = {
+    val ts = walLog.timestamp
+    val isValid = validTimestampDuration.map(d => now - ts < d).getOrElse(true)
+
+    if (isValid) {
+      heap += walLog
+    }
+  }
+
   private def addToHeap(iter: Seq[WalLog],
                         heap: BoundedPriorityQueue[WalLog],
+                        now: Long,
                         validTimestampDuration: Option[Long]): Unit = {
-    val now = System.currentTimeMillis()
+    iter.foreach(walLog => addToHeap(walLog, heap, now, 
validTimestampDuration))
+  }
 
-    iter.foreach { walLog =>
-      val ts = walLog.timestamp
-      val isValid = validTimestampDuration.map(d => now - ts < 
d).getOrElse(true)
+  private def toWalLogAgg(edgeHeap: BoundedPriorityQueue[WalLog],
+                          vertexHeap: BoundedPriorityQueue[WalLog],
+                          sortTopItems: Boolean): Option[WalLogAgg] = {
+    val topVertices = if (sortTopItems) 
vertexHeap.toArray.sortBy(-_.timestamp) else vertexHeap.toArray
+    val topEdges = if (sortTopItems) edgeHeap.toArray.sortBy(-_.timestamp) 
else edgeHeap.toArray
 
-      if (isValid) {
-        heap += walLog
-      }
+    topEdges.headOption.map(head => WalLogAgg(head.from, topVertices, 
topEdges))
+  }
+
+  def mergeWalLogs(iter: Iterator[WalLog],
+                   heapSize: Int,
+                   now: Long,
+                   validTimestampDuration: Option[Long],
+                   sortTopItems: Boolean)(implicit ord: Ordering[WalLog]): 
Option[WalLogAgg] = {
+    val edgeHeap = new BoundedPriorityQueue[WalLog](heapSize)
+    val vertexHeap = new BoundedPriorityQueue[WalLog](heapSize)
+
+    iter.foreach { walLog =>
+      if (walLog.isVertex) addToHeap(walLog, vertexHeap, now, 
validTimestampDuration)
+      else addToHeap(walLog, edgeHeap, now, validTimestampDuration)
     }
+
+    toWalLogAgg(edgeHeap, vertexHeap, sortTopItems)
   }
 
   def merge(iter: Iterator[WalLogAgg],
             heapSize: Int,
+            now: Long,
             validTimestampDuration: Option[Long],
             sortTopItems: Boolean)(implicit ord: Ordering[WalLog]): 
Option[WalLogAgg] = {
     val edgeHeap = new BoundedPriorityQueue[WalLog](heapSize)
     val vertexHeap = new BoundedPriorityQueue[WalLog](heapSize)
 
     iter.foreach { walLogAgg =>
-      addToHeap(walLogAgg.vertices, vertexHeap, validTimestampDuration)
-      addToHeap(walLogAgg.edges, edgeHeap, validTimestampDuration)
+      addToHeap(walLogAgg.vertices, vertexHeap, now, validTimestampDuration)
+      addToHeap(walLogAgg.edges, edgeHeap, now, validTimestampDuration)
     }
 
-    val topVertices = if (sortTopItems) 
vertexHeap.toArray.sortBy(-_.timestamp) else vertexHeap.toArray
-    val topEdges = if (sortTopItems) edgeHeap.toArray.sortBy(-_.timestamp) 
else edgeHeap.toArray
+    toWalLogAgg(edgeHeap, vertexHeap, sortTopItems)
+  }
 
-    topEdges.headOption.map(head => WalLogAgg(head.from, topVertices, 
topEdges))
+  def mergeWalLogs(iter: Iterator[WalLog],
+                   param: AggregateParam)(implicit ord: Ordering[WalLog]): 
Option[WalLogAgg] = {
+    mergeWalLogs(iter, param.heapSize, param.now, 
param.validTimestampDuration, param.sortTopItems)
   }
 
   def merge(iter: Iterator[WalLogAgg],
             param: AggregateParam)(implicit ord: Ordering[WalLog]): 
Option[WalLogAgg] = {
-    merge(iter, param.heapSize, param.validTimestampDuration, 
param.sortTopItems)
+    merge(iter, param.heapSize, param.now, param.validTimestampDuration, 
param.sortTopItems)
   }
 
+
   private def filterPropsInner(walLogs: Seq[WalLog],
                           transformers: Seq[Transformer],
                           validFeatureHashKeys: Set[Long]): Seq[WalLog] = {
@@ -133,6 +166,7 @@ case class WalLog(timestamp: Long,
                   service: String,
                   label: String,
                   props: String) {
+  val isVertex = elem == "v" || elem == "vertex"
   val id = from
   val columnName = label
   val serviceName = to
@@ -169,4 +203,6 @@ object WalLog {
 
     WalLog(timestamp, operation, elem, from, to, service, label, props)
   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0f06c52f/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
index aebb1cc..e4aa4e1 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
@@ -21,7 +21,7 @@ object WalLogAggregateProcess {
     import ss.implicits._
 
     dataset.groupByKey(walLog => walLog.from).flatMapGroups { case (key, iter) 
=>
-      WalLogAgg.merge(iter.map(WalLogAgg(_)), aggregateParam)
+      WalLogAgg.mergeWalLogs(iter, aggregateParam)
     }.toDF(WalLogAgg.outputColumns: _*)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0f06c52f/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
index ecc12e3..523ec8e 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
@@ -15,13 +15,15 @@ object AggregateParam {
     val sortTopItems = 
taskConf.options.get("sortTopItems").map(_.toBoolean).getOrElse(defaultShouldSortTopItems)
     val numOfPartitions = taskConf.options.get("numOfPartitions").map(_.toInt)
     val validTimestampDuration = 
taskConf.options.get("validTimestampDuration").map(_.toLong).getOrElse(Long.MaxValue)
+    val nowOpt = taskConf.options.get("now").map(_.toLong)
 
     new AggregateParam(groupByKeys = groupByKeys,
       topK = Option(maxNumOfEdges),
       isArrayType = Option(arrayType),
       shouldSortTopItems = Option(sortTopItems),
       numOfPartitions = numOfPartitions,
-      validTimestampDuration = Option(validTimestampDuration)
+      validTimestampDuration = Option(validTimestampDuration),
+      nowOpt = nowOpt
     )
   }
 }
@@ -31,7 +33,8 @@ case class AggregateParam(groupByKeys: Option[Seq[String]],
                           isArrayType: Option[Boolean],
                           shouldSortTopItems: Option[Boolean],
                           numOfPartitions: Option[Int],
-                          validTimestampDuration: Option[Long]) {
+                          validTimestampDuration: Option[Long],
+                          nowOpt: Option[Long]) {
 
   import AggregateParam._
 
@@ -39,4 +42,5 @@ case class AggregateParam(groupByKeys: Option[Seq[String]],
   val heapSize = topK.getOrElse(defaultTopK)
   val arrayType = isArrayType.getOrElse(defaultIsArrayType)
   val sortTopItems = shouldSortTopItems.getOrElse(defaultShouldSortTopItems)
+  val now = nowOpt.getOrElse(System.currentTimeMillis())
 }

Reply via email to