- add nowOpt on AggregateParam. - add mergeWalLogs which iterate over Iterator[WalLog], instead of Iterator[WalLogAgg].
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/0f06c52f Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/0f06c52f Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/0f06c52f Branch: refs/heads/master Commit: 0f06c52fe05a074777c6331266ccf6ce5d3fb83a Parents: aa8bd35 Author: DO YUNG YOON <steams...@apache.org> Authored: Mon Oct 29 14:28:13 2018 +0900 Committer: DO YUNG YOON <steams...@apache.org> Committed: Mon Oct 29 14:28:13 2018 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/s2jobs/wal/WalLog.scala | 62 ++++++++++++++++---- .../wal/process/WalLogAggregateProcess.scala | 2 +- .../wal/process/params/AggregateParam.scala | 8 ++- 3 files changed, 56 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0f06c52f/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala index 528f170..ad696a9 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala @@ -32,44 +32,77 @@ object WalLogAgg { Hashing.murmur3_128().hashBytes(s"$dim:$value".getBytes("UTF-8")).asLong() } + private def addToHeap(walLog: WalLog, + heap: BoundedPriorityQueue[WalLog], + now: Long, + validTimestampDuration: Option[Long]): Unit = { + val ts = walLog.timestamp + val isValid = validTimestampDuration.map(d => now - ts < d).getOrElse(true) + + if (isValid) { + heap += walLog + } + } + private def addToHeap(iter: Seq[WalLog], heap: BoundedPriorityQueue[WalLog], + now: Long, validTimestampDuration: Option[Long]): Unit = { - val now = System.currentTimeMillis() + iter.foreach(walLog => addToHeap(walLog, heap, now, validTimestampDuration)) + } - iter.foreach { walLog => - val ts = walLog.timestamp - val isValid = validTimestampDuration.map(d => now - ts < d).getOrElse(true) + private def toWalLogAgg(edgeHeap: BoundedPriorityQueue[WalLog], + vertexHeap: BoundedPriorityQueue[WalLog], + sortTopItems: Boolean): Option[WalLogAgg] = { + val topVertices = if (sortTopItems) vertexHeap.toArray.sortBy(-_.timestamp) else vertexHeap.toArray + val topEdges = if (sortTopItems) edgeHeap.toArray.sortBy(-_.timestamp) else edgeHeap.toArray - if (isValid) { - heap += walLog - } + topEdges.headOption.map(head => WalLogAgg(head.from, topVertices, topEdges)) + } + + def mergeWalLogs(iter: Iterator[WalLog], + heapSize: Int, + now: Long, + validTimestampDuration: Option[Long], + sortTopItems: Boolean)(implicit ord: Ordering[WalLog]): Option[WalLogAgg] = { + val edgeHeap = new BoundedPriorityQueue[WalLog](heapSize) + val vertexHeap = new BoundedPriorityQueue[WalLog](heapSize) + + iter.foreach { walLog => + if (walLog.isVertex) addToHeap(walLog, vertexHeap, now, validTimestampDuration) + else addToHeap(walLog, edgeHeap, now, validTimestampDuration) } + + toWalLogAgg(edgeHeap, vertexHeap, sortTopItems) } def merge(iter: Iterator[WalLogAgg], heapSize: Int, + now: Long, validTimestampDuration: Option[Long], sortTopItems: Boolean)(implicit ord: Ordering[WalLog]): Option[WalLogAgg] = { val edgeHeap = new BoundedPriorityQueue[WalLog](heapSize) val vertexHeap = new BoundedPriorityQueue[WalLog](heapSize) iter.foreach { walLogAgg => - addToHeap(walLogAgg.vertices, vertexHeap, validTimestampDuration) - addToHeap(walLogAgg.edges, edgeHeap, validTimestampDuration) + addToHeap(walLogAgg.vertices, vertexHeap, now, validTimestampDuration) + addToHeap(walLogAgg.edges, edgeHeap, now, validTimestampDuration) } - val topVertices = if (sortTopItems) vertexHeap.toArray.sortBy(-_.timestamp) else vertexHeap.toArray - val topEdges = if (sortTopItems) edgeHeap.toArray.sortBy(-_.timestamp) else edgeHeap.toArray + toWalLogAgg(edgeHeap, vertexHeap, sortTopItems) + } - topEdges.headOption.map(head => WalLogAgg(head.from, topVertices, topEdges)) + def mergeWalLogs(iter: Iterator[WalLog], + param: AggregateParam)(implicit ord: Ordering[WalLog]): Option[WalLogAgg] = { + mergeWalLogs(iter, param.heapSize, param.now, param.validTimestampDuration, param.sortTopItems) } def merge(iter: Iterator[WalLogAgg], param: AggregateParam)(implicit ord: Ordering[WalLog]): Option[WalLogAgg] = { - merge(iter, param.heapSize, param.validTimestampDuration, param.sortTopItems) + merge(iter, param.heapSize, param.now, param.validTimestampDuration, param.sortTopItems) } + private def filterPropsInner(walLogs: Seq[WalLog], transformers: Seq[Transformer], validFeatureHashKeys: Set[Long]): Seq[WalLog] = { @@ -133,6 +166,7 @@ case class WalLog(timestamp: Long, service: String, label: String, props: String) { + val isVertex = elem == "v" || elem == "vertex" val id = from val columnName = label val serviceName = to @@ -169,4 +203,6 @@ object WalLog { WalLog(timestamp, operation, elem, from, to, service, label, props) } + + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0f06c52f/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala index aebb1cc..e4aa4e1 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala @@ -21,7 +21,7 @@ object WalLogAggregateProcess { import ss.implicits._ dataset.groupByKey(walLog => walLog.from).flatMapGroups { case (key, iter) => - WalLogAgg.merge(iter.map(WalLogAgg(_)), aggregateParam) + WalLogAgg.mergeWalLogs(iter, aggregateParam) }.toDF(WalLogAgg.outputColumns: _*) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0f06c52f/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala index ecc12e3..523ec8e 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala @@ -15,13 +15,15 @@ object AggregateParam { val sortTopItems = taskConf.options.get("sortTopItems").map(_.toBoolean).getOrElse(defaultShouldSortTopItems) val numOfPartitions = taskConf.options.get("numOfPartitions").map(_.toInt) val validTimestampDuration = taskConf.options.get("validTimestampDuration").map(_.toLong).getOrElse(Long.MaxValue) + val nowOpt = taskConf.options.get("now").map(_.toLong) new AggregateParam(groupByKeys = groupByKeys, topK = Option(maxNumOfEdges), isArrayType = Option(arrayType), shouldSortTopItems = Option(sortTopItems), numOfPartitions = numOfPartitions, - validTimestampDuration = Option(validTimestampDuration) + validTimestampDuration = Option(validTimestampDuration), + nowOpt = nowOpt ) } } @@ -31,7 +33,8 @@ case class AggregateParam(groupByKeys: Option[Seq[String]], isArrayType: Option[Boolean], shouldSortTopItems: Option[Boolean], numOfPartitions: Option[Int], - validTimestampDuration: Option[Long]) { + validTimestampDuration: Option[Long], + nowOpt: Option[Long]) { import AggregateParam._ @@ -39,4 +42,5 @@ case class AggregateParam(groupByKeys: Option[Seq[String]], val heapSize = topK.getOrElse(defaultTopK) val arrayType = isArrayType.getOrElse(defaultIsArrayType) val sortTopItems = shouldSortTopItems.getOrElse(defaultShouldSortTopItems) + val now = nowOpt.getOrElse(System.currentTimeMillis()) }