add WalLogAggregateProcess. - cleanup codes. - add little bit of comments.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/a2e10fbb Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/a2e10fbb Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/a2e10fbb Branch: refs/heads/master Commit: a2e10fbbb6baefa87b80623fe97e3affb1144e4f Parents: e94a029 Author: DO YUNG YOON <steams...@apache.org> Authored: Mon Aug 13 10:29:42 2018 +0900 Committer: DO YUNG YOON <steams...@apache.org> Committed: Mon Aug 13 10:29:42 2018 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/s2jobs/task/Source.scala | 9 +- .../s2jobs/wal/BoundedPriorityQueue.scala | 46 ------- .../org/apache/s2graph/s2jobs/wal/WalLog.scala | 75 +++++++++-- .../process/S2EdgeDataAggregateProcess.scala | 33 ----- .../S2EdgeDataArrayAggregateProcess.scala | 29 ----- .../wal/process/WalLogAggregateProcess.scala | 87 +++++++++++++ .../s2jobs/wal/utils/BoundedPriorityQueue.scala | 52 ++++++++ .../S2EdgeDataAggregateProcessTest.scala | 123 ------------------- .../process/WalLogAggregateProcessTest.scala | 78 ++++++++++++ 9 files changed, 291 insertions(+), 241 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a2e10fbb/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala index 5bbf166..8e4e234 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala @@ -99,13 +99,20 @@ class FileSource(conf:TaskConf) extends Source(conf) { val paths = conf.options("paths").split(",") val format = conf.options.getOrElse("format", DEFAULT_FORMAT) val columnsOpt = conf.options.get("columns") + val readOptions = conf.options.get("read").map { s => + Json.parse(s).as[JsObject].fields.map { case (k, jsValue) => + k -> JSONParser.jsValueToString(jsValue) + }.toMap + }.getOrElse(Map.empty) format match { case "edgeLog" => ss.read.format("com.databricks.spark.csv").option("delimiter", "\t") .schema(BulkLoadSchema).load(paths: _*) case _ => - val df = ss.read.format(format).load(paths: _*) + val df = + if (readOptions.isEmpty) ss.read.format(format).load(paths: _*) + else ss.read.options(readOptions).format(format).load(paths: _*) if (columnsOpt.isDefined) df.toDF(columnsOpt.get.split(",").map(_.trim): _*) else df } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a2e10fbb/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/BoundedPriorityQueue.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/BoundedPriorityQueue.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/BoundedPriorityQueue.scala deleted file mode 100644 index ce03169..0000000 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/BoundedPriorityQueue.scala +++ /dev/null @@ -1,46 +0,0 @@ -package org.apache.s2graph.s2jobs.wal - -import scala.collection.generic.Growable -import scala.collection.JavaConverters._ -import java.util.{PriorityQueue => JPriorityQueue} - - -class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A]) - extends Iterable[A] with Growable[A] with Serializable { - - private val underlying = new JPriorityQueue[A](maxSize, ord) - - override def iterator: Iterator[A] = underlying.iterator.asScala - - override def size: Int = underlying.size - - override def ++=(xs: TraversableOnce[A]): this.type = { - xs.foreach { this += _ } - this - } - - override def +=(elem: A): this.type = { - if (size < maxSize) { - underlying.offer(elem) - } else { - maybeReplaceLowest(elem) - } - this - } - - override def +=(elem1: A, elem2: A, elems: A*): this.type = { - this += elem1 += elem2 ++= elems - } - - override def clear() { underlying.clear() } - - private def maybeReplaceLowest(a: A): Boolean = { - val head = underlying.peek() - if (head != null && ord.gt(a, head)) { - underlying.poll() - underlying.offer(a) - } else { - false - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a2e10fbb/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala index 1b70a8a..dc628ec 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala @@ -1,21 +1,66 @@ package org.apache.s2graph.s2jobs.wal +import org.apache.s2graph.core.JSONParser +import org.apache.s2graph.s2jobs.wal.process.AggregateParam +import org.apache.s2graph.s2jobs.wal.utils.BoundedPriorityQueue +import org.apache.spark.sql.Row import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} +import play.api.libs.json.{JsObject, Json} -case class WalLog(timestamp:Long, - operation:String, - elem:String, - from:String, - to:String, - service:String, - label:String, - props:String) { +import scala.util.Try + +object WalLogAgg { + val outputColumns = Seq("from", "logs", "maxTs", "minTs") + + def apply(walLog: WalLog): WalLogAgg = { + new WalLogAgg(walLog.from, Seq(walLog), walLog.timestamp, walLog.timestamp) + } + + def merge(iter: Iterator[WalLogAgg], + param: AggregateParam)(implicit ord: Ordering[WalLog]) = { + val heap = new BoundedPriorityQueue[WalLog](param.heapSize) + var minTs = Long.MaxValue + var maxTs = Long.MinValue + + iter.foreach { walLogAgg => + minTs = Math.min(walLogAgg.minTs, minTs) + maxTs = Math.max(walLogAgg.maxTs, maxTs) + + walLogAgg.logs.foreach { walLog => + heap += walLog + } + } + val topItems = if (param.sortTopItems) heap.toArray.sortBy(-_.timestamp) else heap.toArray + + WalLogAgg(topItems.head.from, topItems, maxTs, minTs) + } +} + +case class WalLogAgg(from: String, + logs: Seq[WalLog], + maxTs: Long, + minTs: Long) + +case class WalLog(timestamp: Long, + operation: String, + elem: String, + from: String, + to: String, + service: String, + label: String, + props: String) { val id = from val columnName = label val serviceName = to + + lazy val propsKeyValues = Json.parse(props).as[JsObject].fields.map { case (key, jsValue) => + key -> JSONParser.jsValueToString(jsValue) + } } object WalLog { + val orderByTsAsc = Ordering.by[WalLog, Long](walLog => walLog.timestamp) + val WalLogSchema = StructType(Seq( StructField("timestamp", LongType, false), StructField("operation", StringType, false), @@ -27,5 +72,17 @@ object WalLog { StructField("props", StringType, false) // StructField("direction", StringType, true) )) -} + def fromRow(row: Row): WalLog = { + val timestamp = row.getAs[Long]("timestamp") + val operation = Try(row.getAs[String]("operation")).toOption.getOrElse("insert") + val elem = Try(row.getAs[String]("elem")).toOption.getOrElse("edge") + val from = row.getAs[String]("from") + val to = row.getAs[String]("to") + val service = row.getAs[String]("service") + val label = row.getAs[String]("label") + val props = Try(row.getAs[String]("props")).toOption.getOrElse("{}") + + WalLog(timestamp, operation, elem, from, to, service, label, props) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a2e10fbb/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala deleted file mode 100644 index cdc6c8c..0000000 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala +++ /dev/null @@ -1,33 +0,0 @@ -package org.apache.s2graph.s2jobs.wal.process - -import org.apache.s2graph.s2jobs.task.TaskConf -import org.apache.s2graph.s2jobs.wal.udafs._ -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.{DataFrame, SparkSession} - -/** - * expect S2EdgeData dataframe as input. - * @param taskConf - */ -class S2EdgeDataAggregateProcess(taskConf: TaskConf) extends org.apache.s2graph.s2jobs.task.Process(taskConf) { - override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = { - val maxNumOfEdges = taskConf.options.get("maxNumOfEdges").map(_.toInt).getOrElse(1000) - val groupByColumns = taskConf.options.get("groupByColumns").getOrElse("from").split(",").map(col(_)) - val aggregateColumns = taskConf.options.get("aggregateColumns").getOrElse("timestamp,to,label,props").split(",").map(col(_)) - taskConf.options.get("parallelism").map(ss.sqlContext.setConf("spark.sql.shuffle.partitions", _)) - - val aggregator = S2EdgeDataAggregate(maxNumOfEdges) - - val edges = inputMap(taskConf.inputs.head) - - edges - .groupBy(groupByColumns: _*) - .agg( - aggregator(aggregateColumns: _*).as("edges"), - max(col("timestamp")).as("max_ts"), - min(col("timestamp")).as("min_ts") - ) - } - - override def mandatoryOptions: Set[String] = Set.empty -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a2e10fbb/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataArrayAggregateProcess.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataArrayAggregateProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataArrayAggregateProcess.scala deleted file mode 100644 index 08df269..0000000 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataArrayAggregateProcess.scala +++ /dev/null @@ -1,29 +0,0 @@ -package org.apache.s2graph.s2jobs.wal.process - -import org.apache.s2graph.s2jobs.task.TaskConf -import org.apache.s2graph.s2jobs.wal.udafs.GroupByArrayAgg -import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.functions._ - -class S2EdgeDataArrayAggregateProcess(taskConf: TaskConf) extends org.apache.s2graph.s2jobs.task.Process(taskConf) { - override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = { - import ss.sqlContext.implicits._ - val maxNumOfEdges = taskConf.options.get("maxNumOfEdges").map(_.toInt).getOrElse(1000) - val groupByColumns = taskConf.options.get("groupByColumns").getOrElse("from").split(",").map(col(_)) - val aggregateColumns = taskConf.options.get("aggregateColumns").getOrElse("edges").split(",").map(col(_)) - taskConf.options.get("parallelism").map(ss.sqlContext.setConf("spark.sql.shuffle.partitions", _)) - val aggregator = new GroupByArrayAgg(maxNumOfEdges) - - val edges = inputMap(taskConf.inputs.head) - - edges - .groupBy(groupByColumns: _*) - .agg( - aggregator(aggregateColumns: _*).as("edges"), - max(col("max_ts")).as("max_ts"), - min(col("min_ts")).as("min_ts") - ) - } - - override def mandatoryOptions: Set[String] = Set.empty -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a2e10fbb/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala new file mode 100644 index 0000000..0c9829c --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala @@ -0,0 +1,87 @@ +package org.apache.s2graph.s2jobs.wal.process + +import org.apache.s2graph.s2jobs.task.TaskConf +import org.apache.s2graph.s2jobs.wal.{WalLog, WalLogAgg} +import org.apache.spark.sql._ + +object AggregateParam { + val defaultGroupByKeys = Seq("from") + val defaultTopK = 1000 + val defaultIsArrayType = false + val defaultShouldSortTopItems = true +} + +case class AggregateParam(groupByKeys: Option[Seq[String]], + topK: Option[Int], + isArrayType: Option[Boolean], + shouldSortTopItems: Option[Boolean]) { + + import AggregateParam._ + + val groupByColumns = groupByKeys.getOrElse(defaultGroupByKeys) + val heapSize = topK.getOrElse(defaultTopK) + val arrayType = isArrayType.getOrElse(defaultIsArrayType) + val sortTopItems = shouldSortTopItems.getOrElse(defaultShouldSortTopItems) +} + +object WalLogAggregateProcess { + def aggregate(ss: SparkSession, + dataset: Dataset[WalLogAgg], + aggregateParam: AggregateParam)(implicit ord: Ordering[WalLog]) = { + import ss.implicits._ + dataset.groupByKey(_.from).mapGroups { case (_, iter) => + WalLogAgg.merge(iter, aggregateParam) + }.toDF(WalLogAgg.outputColumns: _*) + } + + def aggregateRaw(ss: SparkSession, + dataset: Dataset[WalLog], + aggregateParam: AggregateParam)(implicit ord: Ordering[WalLog]): DataFrame = { + import ss.implicits._ + + dataset.groupByKey(walLog => walLog.from).mapGroups { case (key, iter) => + WalLogAgg.merge(iter.map(WalLogAgg(_)), aggregateParam) + }.toDF(WalLogAgg.outputColumns: _*) + } +} + + +/** + * expect DataFrame of WalLog, then group WalLog by groupByKeys(default from). + * produce DataFrame of WalLogAgg which abstract the session consists of sequence of WalLog ordered by timestamp(desc). + * + * one way to visualize this is that transforming (row, column, value) matrix entries into (row, Sparse Vector(column:value). + * note that we only keep track of max topK latest walLog per each groupByKeys + */ +class WalLogAggregateProcess(taskConf: TaskConf) extends org.apache.s2graph.s2jobs.task.Process(taskConf) { + + import WalLogAggregateProcess._ + + override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = { + import ss.implicits._ + + val groupByKeys = taskConf.options.get("groupByKeys").map(_.split(",").filter(_.nonEmpty).toSeq) + val maxNumOfEdges = taskConf.options.get("maxNumOfEdges").map(_.toInt).getOrElse(1000) + val arrayType = taskConf.options.get("arrayType").map(_.toBoolean).getOrElse(false) + val sortTopItems = taskConf.options.get("sortTopItems").map(_.toBoolean).getOrElse(false) + + taskConf.options.get("parallelism").foreach(d => ss.sqlContext.setConf("spark.sql.shuffle.partitions", d)) + + implicit val ord = WalLog.orderByTsAsc + val walLogs = taskConf.inputs.tail.foldLeft(inputMap(taskConf.inputs.head)) { case (prev, cur) => + prev.union(inputMap(cur)) + } + + //TODO: Current implementation only expect taskConf.options as Map[String, String]. + //Once we change taskConf.options as JsObject, then we can simply parse input paramter as following. + //implicit val paramReads = Json.reads[AggregateParam] + + val param = AggregateParam(groupByKeys, topK = Option(maxNumOfEdges), + isArrayType = Option(arrayType), shouldSortTopItems = Option(sortTopItems)) + + if (arrayType) aggregate(ss, walLogs.as[WalLogAgg], param) + else aggregateRaw(ss, walLogs.as[WalLog], param) + } + + override def mandatoryOptions: Set[String] = Set.empty +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a2e10fbb/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/BoundedPriorityQueue.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/BoundedPriorityQueue.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/BoundedPriorityQueue.scala new file mode 100644 index 0000000..c146452 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/BoundedPriorityQueue.scala @@ -0,0 +1,52 @@ +package org.apache.s2graph.s2jobs.wal.utils + +import java.util.{PriorityQueue => JPriorityQueue} + +import scala.collection.JavaConverters._ +import scala.collection.generic.Growable + +/** + * copied from org.apache.spark.util.BoundedPriorityQueue since it is package private. + * @param maxSize + * @param ord + * @tparam A + */ +class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A]) + extends Iterable[A] with Growable[A] with Serializable { + + private val underlying = new JPriorityQueue[A](maxSize, ord) + + override def iterator: Iterator[A] = underlying.iterator.asScala + + override def size: Int = underlying.size + + override def ++=(xs: TraversableOnce[A]): this.type = { + xs.foreach { this += _ } + this + } + + override def +=(elem: A): this.type = { + if (size < maxSize) { + underlying.offer(elem) + } else { + maybeReplaceLowest(elem) + } + this + } + + override def +=(elem1: A, elem2: A, elems: A*): this.type = { + this += elem1 += elem2 ++= elems + } + + override def clear() { underlying.clear() } + + private def maybeReplaceLowest(a: A): Boolean = { + val head = underlying.peek() + if (head != null && ord.gt(a, head)) { + underlying.poll() + underlying.offer(a) + } else { + false + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a2e10fbb/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala deleted file mode 100644 index a397f7e..0000000 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala +++ /dev/null @@ -1,123 +0,0 @@ -package org.apache.s2graph.s2jobs.wal.process - -import com.holdenkarau.spark.testing.DataFrameSuiteBase -import org.apache.s2graph.s2jobs.task.TaskConf -import org.apache.s2graph.s2jobs.wal.{BoundedPriorityQueue, WalLog} -import org.apache.s2graph.s2jobs.wal.udafs._ -import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} - -import scala.collection.mutable -import scala.util.Random - -class S2EdgeDataAggregateProcessTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase { - val walLogsLs = Seq( - WalLog(1L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 1}"""), - WalLog(2L, "insert", "edge", "a", "c", "s2graph", "friends", """{"name": 2}"""), - WalLog(3L, "insert", "edge", "a", "d", "s2graph", "friends", """{"name": 3}"""), - WalLog(4L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 4}""") - ) - val walLogsLs2 = Seq( - WalLog(5L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 1}"""), - WalLog(6L, "insert", "edge", "a", "c", "s2graph", "friends", """{"name": 2}"""), - WalLog(7L, "insert", "edge", "a", "d", "s2graph", "friends", """{"name": 3}"""), - WalLog(8L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 4}""") - ) - - - test("test S2EdgeDataAggregateProcess") { - import spark.sqlContext.implicits._ - - val edges = spark.createDataset(walLogsLs).toDF() - val inputMap = Map("edges" -> edges) - val taskConf = new TaskConf(name = "test", `type` = "agg", inputs = Seq("edges"), - options = Map("maxNumOfEdges" -> "10", - "runOrderBy" -> "false", - "groupByAggClassName" -> "GroupByAggOptimized")) - - val job = new S2EdgeDataAggregateProcess(taskConf = taskConf) - val processed = job.execute(spark, inputMap) - - processed.printSchema() - processed.collect().foreach { row => - println(row) - } - } - - test("test S2EdgeDataArrayAggregateProcess") { - import spark.sqlContext.implicits._ - - val edges = spark.createDataset(walLogsLs).toDF() - val edges2 = spark.createDataset(walLogsLs2).toDF() - - val firstConf = new TaskConf(name = "test", `type` = "agg", inputs = Seq("edges"), - options = Map("maxNumOfEdges" -> "10")) - - val firstJob = new S2EdgeDataAggregateProcess(firstConf) - val firstJob2 = new S2EdgeDataAggregateProcess(firstConf) - - val first = firstJob.execute(spark, Map("edges" -> edges)) - val first2 = firstJob2.execute(spark, Map("edges" -> edges2)) - - val secondInputMap = Map( - "aggregated" -> first.union(first2) - ) - - val secondConf = new TaskConf(name = "testarray", `type` = "agg", - inputs = Seq("aggregated"), - options = Map("maxNumOfEdges" -> "10")) - - val secondJob = new S2EdgeDataArrayAggregateProcess(secondConf) - - - val processed = secondJob.execute(spark, secondInputMap) - - processed.printSchema() - processed.collect().foreach { row => - println(row) - } - } - - test("mergeTwoSeq") { - val prev: Array[Int] = Array(3, 2, 1) - val cur: Array[Int] = Array(4, 2, 2) - - val ls = S2EdgeDataAggregate.mergeTwoSeq(prev, cur, 10) - println(ls.size) - - ls.foreach { x => - println(x) - } - } - - test("addToTopK test.") { - import S2EdgeDataAggregate._ - val numOfTest = 100 - val numOfNums = 100 - val maxNum = 10 - - (0 until numOfTest).foreach { testNum => - val maxSize = 1 + Random.nextInt(numOfNums) - val pq = new BoundedPriorityQueue[Int](maxSize) - val arr = (0 until numOfNums).map(x => Random.nextInt(maxNum)) - var result: mutable.Seq[Int] = mutable.ArrayBuffer.empty[Int] - - arr.foreach { i => - pq += i - result = addToTopK(result, maxSize, i) - } - result.toSeq.sorted shouldBe pq.toSeq.sorted - } - -// val maxSize = 1 + Random.nextInt(numOfNums) -// val pq = new BoundedPriorityQueue[Int](maxSize) -// val arr = (0 until numOfNums).map(x => Random.nextInt(maxNum)) -// val result = mutable.ArrayBuffer.empty[Int] -// var lastPos = 0 -// arr.foreach { i => -// pq += i -// addToTopK(result, lastPos, maxSize, i) -// lastPos = lastPos + 1 -// } -// result.toSeq.sorted shouldBe pq.toSeq.sorted - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a2e10fbb/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala new file mode 100644 index 0000000..5ae595b --- /dev/null +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala @@ -0,0 +1,78 @@ +package org.apache.s2graph.s2jobs.wal.process + +import com.holdenkarau.spark.testing.DataFrameSuiteBase +import org.apache.s2graph.s2jobs.task.TaskConf +import org.apache.s2graph.s2jobs.wal.WalLog +import org.apache.s2graph.s2jobs.wal.udafs._ +import org.apache.s2graph.s2jobs.wal.utils.BoundedPriorityQueue +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} + +import scala.collection.mutable +import scala.util.Random + +class WalLogAggregateProcessTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase { + val walLogsLs = Seq( + WalLog(1L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 1}"""), + WalLog(2L, "insert", "edge", "a", "c", "s2graph", "friends", """{"name": 2}"""), + WalLog(3L, "insert", "edge", "a", "d", "s2graph", "friends", """{"name": 3}"""), + WalLog(4L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 4}""") + ) + val walLogsLs2 = Seq( + WalLog(5L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 1}"""), + WalLog(6L, "insert", "edge", "a", "c", "s2graph", "friends", """{"name": 2}"""), + WalLog(7L, "insert", "edge", "a", "d", "s2graph", "friends", """{"name": 3}"""), + WalLog(8L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 4}""") + ) + + + test("test S2EdgeDataAggregateProcess") { + import spark.sqlContext.implicits._ + + val edges = spark.createDataset(walLogsLs).toDF() + val inputMap = Map("edges" -> edges) + val taskConf = new TaskConf(name = "test", `type` = "agg", inputs = Seq("edges"), + options = Map("maxNumOfEdges" -> "10", + "runOrderBy" -> "false", + "groupByAggClassName" -> "GroupByAggOptimized")) + + val job = new WalLogAggregateProcess(taskConf = taskConf) + val processed = job.execute(spark, inputMap) + + processed.printSchema() + processed.collect().foreach { row => + println(row) + } + } + + test("mergeTwoSeq") { + val prev: Array[Int] = Array(3, 2, 1) + val cur: Array[Int] = Array(4, 2, 2) + + val ls = S2EdgeDataAggregate.mergeTwoSeq(prev, cur, 10) + println(ls.size) + + ls.foreach { x => + println(x) + } + } + + test("addToTopK test.") { + import S2EdgeDataAggregate._ + val numOfTest = 100 + val numOfNums = 100 + val maxNum = 10 + + (0 until numOfTest).foreach { testNum => + val maxSize = 1 + Random.nextInt(numOfNums) + val pq = new BoundedPriorityQueue[Int](maxSize) + val arr = (0 until numOfNums).map(x => Random.nextInt(maxNum)) + var result: mutable.Seq[Int] = mutable.ArrayBuffer.empty[Int] + + arr.foreach { i => + pq += i + result = addToTopK(result, maxSize, i) + } + result.sorted shouldBe pq.toSeq.sorted + } + } +} \ No newline at end of file