Implement and add test cases on below. - WalLogAggregateProcess - BuildTopFeaturesProcess - FilterTopFeaturesProcess
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/651ca2cf Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/651ca2cf Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/651ca2cf Branch: refs/heads/master Commit: 651ca2cf9fad57e4613e22dde39c65fe88755a5c Parents: d9af2b8 Author: DO YUNG YOON <steams...@apache.org> Authored: Wed Aug 22 19:15:48 2018 +0900 Committer: DO YUNG YOON <steams...@apache.org> Committed: Thu Aug 23 00:52:24 2018 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/s2jobs/task/Task.scala | 16 +++ .../org/apache/s2graph/s2jobs/wal/WalLog.scala | 60 ++++++--- .../wal/process/BuildTopFeaturesProcess.scala | 126 +++++++++++++++++++ .../wal/process/FeatureIndexProcess.scala | 99 --------------- .../wal/process/FilterTopFeaturesProcess.scala | 90 +++++++++++++ .../wal/process/FilterWalLogAggProcess.scala | 75 ----------- .../wal/process/WalLogAggregateProcess.scala | 20 +-- .../wal/process/params/AggregateParam.scala | 19 ++- .../process/params/BuildTopFeaturesParam.scala | 17 +++ .../wal/process/params/FeatureIndexParam.scala | 15 --- .../process/params/FilterTopFeaturesParam.scala | 7 ++ .../process/params/FilterWalLogAggParam.scala | 7 -- .../wal/transformer/DefaultTransformer.scala | 4 +- .../s2jobs/wal/transformer/ExtractDomain.scala | 11 +- .../wal/transformer/ExtractServiceName.scala | 13 +- .../s2jobs/wal/transformer/Transformer.scala | 3 +- .../s2graph/s2jobs/wal/udfs/WalLogUDF.scala | 26 ++-- .../apache/s2graph/s2jobs/wal/TestData.scala | 42 +++++++ .../s2graph/s2jobs/wal/TransformerTest.scala | 10 +- .../process/BuildTopFeaturesProcessTest.scala | 31 +++++ .../process/FilterTopFeaturesProcessTest.scala | 84 +++++++++++++ .../process/WalLogAggregateProcessTest.scala | 64 ++-------- .../s2jobs/wal/udafs/WalLogUDAFTest.scala | 42 +++++++ 23 files changed, 574 insertions(+), 307 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala index ab02900..62081df 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala @@ -21,9 +21,13 @@ package org.apache.s2graph.s2jobs.task import org.apache.s2graph.core.S2GraphConfigs import org.apache.s2graph.s2jobs.Logger +import org.apache.s2graph.s2jobs.wal.transformer.Transformer +import play.api.libs.json.Json //import org.apache.s2graph.s2jobs.loader.GraphFileOptions object TaskConf { + val Empty = new TaskConf(name = "empty", `type` = "empty", inputs = Nil, options = Map.empty) + // def toGraphFileOptions(taskConf: TaskConf): GraphFileOptions = { // val args = taskConf.options.filterKeys(GraphFileOptions.OptionKeys) // .flatMap(kv => Seq(kv._1, kv._2)).toSeq.toArray @@ -42,6 +46,18 @@ object TaskConf { def parseLocalCacheConfigs(taskConf: TaskConf): Map[String, Any] = { taskConf.options.filterKeys(S2GraphConfigs.CacheConfigs.DEFAULTS.keySet).mapValues(_.toInt) } + + def parseTransformers(taskConf: TaskConf): Seq[Transformer] = { + val classes = Json.parse(taskConf.options.getOrElse("transformClasses", + """["org.apache.s2graph.s2jobs.wal.transformer.DefaultTransformer"]""")).as[Seq[String]] + + classes.map { className => + Class.forName(className) + .getConstructor(classOf[TaskConf]) + .newInstance(taskConf) + .asInstanceOf[Transformer] + } + } } case class TaskConf(name:String, `type`:String, inputs:Seq[String] = Nil, options:Map[String, String] = Map.empty, cache:Option[Boolean]=None) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala index e4c3467..9617ca1 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala @@ -3,6 +3,7 @@ package org.apache.s2graph.s2jobs.wal import com.google.common.hash.Hashing import org.apache.s2graph.core.JSONParser import org.apache.s2graph.s2jobs.wal.process.params.AggregateParam +import org.apache.s2graph.s2jobs.wal.transformer.Transformer import org.apache.s2graph.s2jobs.wal.utils.BoundedPriorityQueue import org.apache.spark.sql.Row import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} @@ -17,22 +18,12 @@ object WalLogAgg { new WalLogAgg(walLog.from, Seq(walLog), walLog.timestamp, walLog.timestamp) } + def toFeatureHash(dimVal: DimVal): Long = toFeatureHash(dimVal.dim, dimVal.value) + def toFeatureHash(dim: String, value: String): Long = { Hashing.murmur3_128().hashBytes(s"$dim:$value".getBytes("UTF-8")).asLong() } - def filter(walLogAgg: WalLogAgg, validFeatureHashKeys: Set[Long]) = { - val filtered = walLogAgg.logs.map { walLog => - val fields = Json.parse(walLog.props).as[JsObject].fields.filter { case (dim, jsValue) => - validFeatureHashKeys(toFeatureHash(dim, JSONParser.jsValueToString(jsValue))) - } - - walLog.copy(props = Json.toJson(fields).as[JsObject].toString) - } - - walLogAgg.copy(logs = filtered) - } - def merge(iter: Iterator[WalLogAgg], param: AggregateParam)(implicit ord: Ordering[WalLog]) = { val heap = new BoundedPriorityQueue[WalLog](param.heapSize) @@ -51,6 +42,47 @@ object WalLogAgg { WalLogAgg(topItems.head.from, topItems, maxTs, minTs) } + + def filterProps(walLogAgg: WalLogAgg, + transformers: Seq[Transformer], + validFeatureHashKeys: Set[Long]) = { + val filtered = walLogAgg.logs.map { walLog => + val fields = walLog.propsJson.fields.filter { case (propKey, propValue) => + val filtered = transformers.flatMap { transformer => + transformer.toDimValLs(walLog, propKey, JSONParser.jsValueToString(propValue)).filter(dimVal => validFeatureHashKeys(toFeatureHash(dimVal))) + } + filtered.nonEmpty + } + + walLog.copy(props = Json.toJson(fields.toMap).as[JsObject].toString) + } + + walLogAgg.copy(logs = filtered) + } +} + +object DimValCountRank { + def fromRow(row: Row): DimValCountRank = { + val dim = row.getAs[String]("dim") + val value = row.getAs[String]("value") + val count = row.getAs[Long]("count") + val rank = row.getAs[Long]("rank") + + new DimValCountRank(DimVal(dim, value), count, rank) + } +} + +case class DimValCountRank(dimVal: DimVal, count: Long, rank: Long) + +case class DimValCount(dimVal: DimVal, count: Long) + +object DimVal { + def fromRow(row: Row): DimVal = { + val dim = row.getAs[String]("dim") + val value = row.getAs[String]("value") + + new DimVal(dim, value) + } } case class DimVal(dim: String, value: String) @@ -71,8 +103,8 @@ case class WalLog(timestamp: Long, val id = from val columnName = label val serviceName = to - - lazy val propsKeyValues = Json.parse(props).as[JsObject].fields.map { case (key, jsValue) => + lazy val propsJson = Json.parse(props).as[JsObject] + lazy val propsKeyValues = propsJson.fields.map { case (key, jsValue) => key -> JSONParser.jsValueToString(jsValue) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/BuildTopFeaturesProcess.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/BuildTopFeaturesProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/BuildTopFeaturesProcess.scala new file mode 100644 index 0000000..048b470 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/BuildTopFeaturesProcess.scala @@ -0,0 +1,126 @@ +package org.apache.s2graph.s2jobs.wal.process + +import org.apache.s2graph.s2jobs.task.TaskConf +import org.apache.s2graph.s2jobs.wal.WalLogAgg.toFeatureHash +import org.apache.s2graph.s2jobs.wal.process.params.BuildTopFeaturesParam +import org.apache.s2graph.s2jobs.wal.transformer._ +import org.apache.s2graph.s2jobs.wal.udfs.WalLogUDF +import org.apache.s2graph.s2jobs.wal.{DimVal, DimValCount, WalLog, WalLogAgg} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.apache.spark.sql.functions._ +import play.api.libs.json.{JsObject, Json} + +import scala.collection.mutable + +object BuildTopFeaturesProcess { + def extractDimValuesWithCount(transformers: Seq[Transformer]) = { + udf((rows: Seq[Row]) => { + val logs = rows.map(WalLog.fromRow) + val dimValCounts = mutable.Map.empty[DimVal, Int] + + logs.foreach { walLog => + walLog.propsKeyValues.foreach { case (propsKey, propsValue) => + transformers.foreach { transformer => + transformer.toDimValLs(walLog, propsKey, propsValue).foreach { dimVal => + val newCount = dimValCounts.getOrElse(dimVal, 0) + 1 + dimValCounts += (dimVal -> newCount) + } + } + } + } + + dimValCounts.toSeq.sortBy(-_._2)map { case (dimVal, count) => + DimValCount(dimVal, count) + } + }) + } + + def extractDimValues(transformers: Seq[Transformer]) = { + udf((rows: Seq[Row]) => { + val logs = rows.map(WalLog.fromRow) + // TODO: this can be changed into Map to count how many times each dimVal exist in sequence of walLog + // then change this to mutable.Map.empty[DimVal, Int], then aggregate. + val distinctDimValues = mutable.Set.empty[DimVal] + + logs.foreach { walLog => + walLog.propsKeyValues.foreach { case (propsKey, propsValue) => + transformers.foreach { transformer => + transformer.toDimValLs(walLog, propsKey, propsValue).foreach { dimVal => + distinctDimValues += dimVal + } + } + } + } + + distinctDimValues.toSeq + }) + } + + def buildDictionary(ss: SparkSession, + allDimVals: DataFrame, + param: BuildTopFeaturesParam, + dimValColumnName: String = "dimVal"): DataFrame = { + import ss.implicits._ + + val rawFeatures = allDimVals + .select(col(param._countColumnName), col(s"$dimValColumnName.dim").as("dim"), col(s"$dimValColumnName.value").as("value")) + .groupBy("dim", "value") + .agg(countDistinct(param._countColumnName).as("count")) + .filter(s"count > ${param._minUserCount}") + + val ds: Dataset[((String, Long), String)] = + rawFeatures.select("dim", "value", "count").as[(String, String, Long)] + .map { case (dim, value, uv) => + (dim, uv) -> value + } + + + implicit val ord = Ordering.Tuple2(Ordering.String, Ordering.Long.reverse) + + val rdd: RDD[(Long, (String, Long), String)] = WalLogUDF.appendRank(ds, param.numOfPartitions, param.samplePointsPerPartitionHint) + + rdd.toDF("rank", "dim_count", "value") + .withColumn("dim", col("dim_count._1")) + .withColumn("count", col("dim_count._2")) + .select("dim", "value", "count", "rank") + } +} + +case class BuildTopFeaturesProcess(taskConf: TaskConf) extends org.apache.s2graph.s2jobs.task.Process(taskConf) { + + import BuildTopFeaturesProcess._ + + override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = { + val countColumnName = taskConf.options.getOrElse("countColumnName", "from") + val numOfPartitions = taskConf.options.get("numOfPartitions").map(_.toInt) + val samplePointsPerPartitionHint = taskConf.options.get("samplePointsPerPartitionHint").map(_.toInt) + val minUserCount = taskConf.options.get("minUserCount").map(_.toLong) + + numOfPartitions.map { d => ss.sqlContext.setConf("spark.sql.shuffle.partitions", d.toString) } + + val param = BuildTopFeaturesParam(minUserCount = minUserCount, countColumnName = Option(countColumnName), + numOfPartitions = numOfPartitions, samplePointsPerPartitionHint = samplePointsPerPartitionHint + ) + + val edges = taskConf.inputs.tail.foldLeft(inputMap(taskConf.inputs.head)) { case (prev, cur) => + prev.union(inputMap(cur)) + } + + //TODO: user expect to inject transformers that transform (WalLog, propertyKey, propertyValue) to Seq[DimVal]. + val transformers = TaskConf.parseTransformers(taskConf) + val dimValExtractUDF = extractDimValues(transformers) + val dimValColumnName = "dimVal" + + val rawFeatures = edges + .withColumn(dimValColumnName, explode(dimValExtractUDF(col("logs")))) + + val dict = buildDictionary(ss, rawFeatures, param, dimValColumnName) + + dict + } + + + override def mandatoryOptions: Set[String] = Set.empty +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala deleted file mode 100644 index 073b0cf..0000000 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala +++ /dev/null @@ -1,99 +0,0 @@ -package org.apache.s2graph.s2jobs.wal.process - -import org.apache.s2graph.s2jobs.task.TaskConf -import org.apache.s2graph.s2jobs.wal.process.params.FeatureIndexParam -import org.apache.s2graph.s2jobs.wal.transformer._ -import org.apache.s2graph.s2jobs.wal.udfs.WalLogUDF -import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql._ -import org.apache.spark.sql.functions._ - -import scala.collection.mutable - -object FeatureIndexProcess { - def extractDimValues(transformers: Seq[Transformer]) = { - udf((rows: Seq[Row]) => { - val logs = rows.map(WalLog.fromRow) - // TODO: this can be changed into Map to count how many times each dimVal exist in sequence of walLog - // then change this to mutable.Map.empty[DimVal, Int], then aggregate. - val distinctDimValues = mutable.Set.empty[DimVal] - - logs.foreach { walLog => - walLog.propsKeyValues.foreach { case (propsKey, propsValue) => - transformers.foreach { transformer => - transformer.toDimValLs(walLog, propsKey, propsValue).foreach(distinctDimValues += _) - } - } - } - - distinctDimValues.toSeq - }) - } - - def buildDictionary(ss: SparkSession, - allDimVals: DataFrame, - param: FeatureIndexParam, - dimValColumnName: String = "dimVal"): DataFrame = { - import ss.implicits._ - - val rawFeatures = allDimVals - .select(col(param._countColumnName), col(s"$dimValColumnName.dim").as("dim"), col(s"$dimValColumnName.value").as("value")) - .groupBy("dim", "value") - .agg(countDistinct(param._countColumnName).as("count")) - .filter(s"count > ${param._minUserCount}") - - val ds: Dataset[((String, Long), String)] = - rawFeatures.select("dim", "value", "count").as[(String, String, Long)] - .map { case (dim, value, uv) => - (dim, uv) -> value - } - - - implicit val ord = Ordering.Tuple2(Ordering.String, Ordering.Long.reverse) - - val rdd: RDD[(Long, (String, Long), String)] = WalLogUDF.appendRank(ds, param.numOfPartitions, param.samplePointsPerPartitionHint) - - rdd.toDF("rank", "dim_count", "value") - .withColumn("dim", col("dim_count._1")) - .withColumn("count", col("dim_count._2")) - .select("dim", "value", "count", "rank") - } -} - -case class FeatureIndexProcess(taskConf: TaskConf) extends org.apache.s2graph.s2jobs.task.Process(taskConf) { - - import FeatureIndexProcess._ - - override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = { - val countColumnName = taskConf.options.getOrElse("countColumnName", "from") - val numOfPartitions = taskConf.options.get("numOfPartitions").map(_.toInt) - val samplePointsPerPartitionHint = taskConf.options.get("samplePointsPerPartitionHint").map(_.toInt) - val minUserCount = taskConf.options.get("minUserCount").map(_.toLong) - - numOfPartitions.map { d => ss.sqlContext.setConf("spark.sql.shuffle.partitions", d.toString) } - - val param = FeatureIndexParam(minUserCount = minUserCount, countColumnName = Option(countColumnName), - numOfPartitions = numOfPartitions, samplePointsPerPartitionHint = samplePointsPerPartitionHint - ) - - val edges = taskConf.inputs.tail.foldLeft(inputMap(taskConf.inputs.head)) { case (prev, cur) => - prev.union(inputMap(cur)) - } - - //TODO: user expect to inject transformers that transform (WalLog, propertyKey, propertyValue) to Seq[DimVal]. - val transformers = Seq(DefaultTransformer()) - val dimValExtractUDF = extractDimValues(transformers) - val dimValColumnName = "dimVal" - - val rawFeatures = edges - .withColumn(dimValColumnName, explode(dimValExtractUDF(col("logs")))) - - val dict = buildDictionary(ss, rawFeatures, param, dimValColumnName) - - dict - } - - - override def mandatoryOptions: Set[String] = Set.empty -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FilterTopFeaturesProcess.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FilterTopFeaturesProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FilterTopFeaturesProcess.scala new file mode 100644 index 0000000..ec7e645 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FilterTopFeaturesProcess.scala @@ -0,0 +1,90 @@ +package org.apache.s2graph.s2jobs.wal.process + +import org.apache.s2graph.s2jobs.task.TaskConf +import org.apache.s2graph.s2jobs.wal.WalLogAgg +import org.apache.s2graph.s2jobs.wal.transformer.{DefaultTransformer, Transformer} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.functions.{col, udf} +import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} +import play.api.libs.json.{JsObject, Json} + +object FilterTopFeaturesProcess { + private var validFeatureHashKeys: Set[Long] = null + def getValidFeatureHashKeys(validFeatureHashKeysBCast: Broadcast[Array[Long]]): Set[Long] = { + if (validFeatureHashKeys == null) { + validFeatureHashKeys = validFeatureHashKeysBCast.value.toSet + } + + validFeatureHashKeys + } + + def collectDistinctFeatureHashes(ss: SparkSession, + filteredDict: DataFrame): Array[Long] = { + import ss.implicits._ + + val featureHashUDF = udf((dim: String, value: String) => WalLogAgg.toFeatureHash(dim, value)) + + filteredDict.withColumn("featureHash", featureHashUDF(col("dim"), col("value"))) + .select("featureHash") + .distinct().as[Long].collect() + } + + def filterTopKsPerDim(dict: DataFrame, + maxRankPerDim: Broadcast[Map[String, Int]], + defaultMaxRank: Int): DataFrame = { + val filterUDF = udf((dim: String, rank: Long) => { + rank < maxRankPerDim.value.getOrElse(dim, defaultMaxRank) + }) + + dict.filter(filterUDF(col("dim"), col("rank"))) + } + + def filterWalLogAgg(ss: SparkSession, + walLogAgg: Dataset[WalLogAgg], + transformers: Seq[Transformer], + validFeatureHashKeysBCast: Broadcast[Array[Long]]) = { + import ss.implicits._ + walLogAgg.mapPartitions { iter => + val validFeatureHashKeys = getValidFeatureHashKeys(validFeatureHashKeysBCast) + + iter.map { walLogAgg => + WalLogAgg.filterProps(walLogAgg, transformers, validFeatureHashKeys) + } + } + } +} + +class FilterTopFeaturesProcess(taskConf: TaskConf) extends org.apache.s2graph.s2jobs.task.Process(taskConf) { + + import FilterTopFeaturesProcess._ + + /* + filter topKs per dim, then build valid dimValLs. + then broadcast valid dimValLs to original dataframe, and filter out not valid dimVal. + */ + override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = { + import ss.implicits._ + + val maxRankPerDim = taskConf.options.get("maxRankPerDim").map { s => + Json.parse(s).as[JsObject].fields.map { case (k, jsValue) => + k -> jsValue.as[Int] + }.toMap + } + val maxRankPerDimBCast = ss.sparkContext.broadcast(maxRankPerDim.getOrElse(Map.empty)) + + val defaultMaxRank = taskConf.options.get("defaultMaxRank").map(_.toInt) + + val featureDict = inputMap(taskConf.options("featureDict")) + val walLogAgg = inputMap(taskConf.options("walLogAgg")).as[WalLogAgg] + + val transformers = TaskConf.parseTransformers(taskConf) + + val filteredDict = filterTopKsPerDim(featureDict, maxRankPerDimBCast, defaultMaxRank.getOrElse(Int.MaxValue)) + val validFeatureHashKeys = collectDistinctFeatureHashes(ss, filteredDict) + val validFeatureHashKeysBCast = ss.sparkContext.broadcast(validFeatureHashKeys) + + filterWalLogAgg(ss, walLogAgg, transformers, validFeatureHashKeysBCast).toDF() + } + + override def mandatoryOptions: Set[String] = Set("featureDict", "walLogAgg") +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FilterWalLogAggProcess.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FilterWalLogAggProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FilterWalLogAggProcess.scala deleted file mode 100644 index 3c546a7..0000000 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FilterWalLogAggProcess.scala +++ /dev/null @@ -1,75 +0,0 @@ -package org.apache.s2graph.s2jobs.wal.process - -import org.apache.s2graph.s2jobs.task.TaskConf -import org.apache.s2graph.s2jobs.wal.WalLogAgg -import org.apache.spark.broadcast.Broadcast -import org.apache.spark.sql.functions.{col, udf} -import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} -import play.api.libs.json.{JsObject, Json} - -object FilterWalLogAggProcess { - - def collectDistinctFeatureHashes(ss: SparkSession, - filteredDict: DataFrame): Array[Long] = { - import ss.implicits._ - - val featureHashUDF = udf((dim: String, value: String) => WalLogAgg.toFeatureHash(dim, value)) - - filteredDict.withColumn("featureHash", featureHashUDF(col("dim"), col("value"))) - .select("featureHash") - .distinct().as[Long].collect() - } - - def filterTopKsPerDim(dict: DataFrame, - maxRankPerDim: Broadcast[Map[String, Int]], - defaultMaxRank: Int): DataFrame = { - val filterUDF = udf((dim: String, rank: Long) => { - rank < maxRankPerDim.value.getOrElse(dim, defaultMaxRank) - }) - - dict.filter(filterUDF(col("dim"), col("rank"))) - } - - def filterWalLogAgg(walLogAgg: Dataset[WalLogAgg], - validFeatureHashKeysBCast: Broadcast[Array[Long]]) = { - walLogAgg.mapPartitions { iter => - val validFeatureHashKeys = validFeatureHashKeysBCast.value.toSet - - iter.map { walLogAgg => - WalLogAgg.filter(walLogAgg, validFeatureHashKeys) - } - } - } -} - -class FilterWalLogAggProcess(taskConf: TaskConf) extends org.apache.s2graph.s2jobs.task.Process(taskConf) { - - import FilterWalLogAggProcess._ - - /* - filter topKs per dim, then build valid dimValLs. - then broadcast valid dimValLs to original dataframe, and filter out not valid dimVal. - */ - override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = { - val maxRankPerDim = taskConf.options.get("maxRankPerDim").map { s => - Json.parse(s).as[JsObject].fields.map { case (k, jsValue) => - k -> jsValue.as[Int] - }.toMap - } - val maxRankPerDimBCast = ss.sparkContext.broadcast(maxRankPerDim.getOrElse(Map.empty)) - - val defaultMaxRank = taskConf.options.get("defaultMaxRank").map(_.toInt) - - val featureDict = inputMap(taskConf.options("featureDict")) - val walLogAgg = inputMap(taskConf.options("walLogAgg")).as[WalLogAgg] - - - val filteredDict = filterTopKsPerDim(featureDict, maxRankPerDimBCast, defaultMaxRank.getOrElse(Int.MaxValue)) - val validFeatureHashKeys = collectDistinctFeatureHashes(ss, filteredDict) - val validFeatureHashKeysBCast = ss.sparkContext.broadcast(validFeatureHashKeys) - - filterWalLogAgg(walLogAgg, validFeatureHashKeysBCast).toDF() - } - - override def mandatoryOptions: Set[String] = Set("featureDict", "walLogAgg") -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala index 2b42e20..ee4debd 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala @@ -41,26 +41,18 @@ class WalLogAggregateProcess(taskConf: TaskConf) extends org.apache.s2graph.s2jo override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = { import ss.implicits._ - val groupByKeys = taskConf.options.get("groupByKeys").map(_.split(",").filter(_.nonEmpty).toSeq) - val maxNumOfEdges = taskConf.options.get("maxNumOfEdges").map(_.toInt).getOrElse(1000) - val arrayType = taskConf.options.get("arrayType").map(_.toBoolean).getOrElse(false) - val sortTopItems = taskConf.options.get("sortTopItems").map(_.toBoolean).getOrElse(false) - val numOfPartitions = taskConf.options.get("numOfPartitions") - numOfPartitions.foreach(d => ss.sqlContext.setConf("spark.sql.shuffle.partitions", d)) + //TODO: Current implementation only expect taskConf.options as Map[String, String]. + //Once we change taskConf.options as JsObject, then we can simply parse input paramter as following. + //implicit val paramReads = Json.reads[AggregateParam] + val param = AggregateParam.fromTaskConf(taskConf) + param.numOfPartitions.foreach(d => ss.sqlContext.setConf("spark.sql.shuffle.partitions", d.toString)) implicit val ord = WalLog.orderByTsAsc val walLogs = taskConf.inputs.tail.foldLeft(inputMap(taskConf.inputs.head)) { case (prev, cur) => prev.union(inputMap(cur)) } - //TODO: Current implementation only expect taskConf.options as Map[String, String]. - //Once we change taskConf.options as JsObject, then we can simply parse input paramter as following. - //implicit val paramReads = Json.reads[AggregateParam] - - val param = AggregateParam(groupByKeys, topK = Option(maxNumOfEdges), - isArrayType = Option(arrayType), shouldSortTopItems = Option(sortTopItems)) - - if (arrayType) aggregate(ss, walLogs.as[WalLogAgg], param) + if (param.arrayType) aggregate(ss, walLogs.as[WalLogAgg], param) else aggregateRaw(ss, walLogs.as[WalLog], param) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala index 89c36f4..4cb1377 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala @@ -1,17 +1,34 @@ package org.apache.s2graph.s2jobs.wal.process.params +import org.apache.s2graph.s2jobs.task.TaskConf object AggregateParam { val defaultGroupByKeys = Seq("from") val defaultTopK = 1000 val defaultIsArrayType = false val defaultShouldSortTopItems = true + + def fromTaskConf(taskConf: TaskConf): AggregateParam = { + val groupByKeys = taskConf.options.get("groupByKeys").map(_.split(",").filter(_.nonEmpty).toSeq) + val maxNumOfEdges = taskConf.options.get("maxNumOfEdges").map(_.toInt).getOrElse(defaultTopK) + val arrayType = taskConf.options.get("arrayType").map(_.toBoolean).getOrElse(defaultIsArrayType) + val sortTopItems = taskConf.options.get("sortTopItems").map(_.toBoolean).getOrElse(defaultShouldSortTopItems) + val numOfPartitions = taskConf.options.get("numOfPartitions").map(_.toInt) + + new AggregateParam(groupByKeys = groupByKeys, + topK = Option(maxNumOfEdges), + isArrayType = Option(arrayType), + shouldSortTopItems = Option(sortTopItems), + numOfPartitions = numOfPartitions + ) + } } case class AggregateParam(groupByKeys: Option[Seq[String]], topK: Option[Int], isArrayType: Option[Boolean], - shouldSortTopItems: Option[Boolean]) { + shouldSortTopItems: Option[Boolean], + numOfPartitions: Option[Int]) { import AggregateParam._ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/BuildTopFeaturesParam.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/BuildTopFeaturesParam.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/BuildTopFeaturesParam.scala new file mode 100644 index 0000000..3e8bae5 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/BuildTopFeaturesParam.scala @@ -0,0 +1,17 @@ +package org.apache.s2graph.s2jobs.wal.process.params + +object BuildTopFeaturesParam { + val defaultMinUserCount = 0L + val defaultCountColumnName = "from" +} + +case class BuildTopFeaturesParam(minUserCount: Option[Long], + countColumnName: Option[String], + samplePointsPerPartitionHint: Option[Int], + numOfPartitions: Option[Int]) { + + import BuildTopFeaturesParam._ + + val _countColumnName = countColumnName.getOrElse(defaultCountColumnName) + val _minUserCount = minUserCount.getOrElse(defaultMinUserCount) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala deleted file mode 100644 index 4ede70b..0000000 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala +++ /dev/null @@ -1,15 +0,0 @@ -package org.apache.s2graph.s2jobs.wal.process.params - -object FeatureIndexParam { - val defaultMinUserCount = 0L - val defaultCountColumnName = "from" -} - -case class FeatureIndexParam(minUserCount: Option[Long], - countColumnName: Option[String], - samplePointsPerPartitionHint: Option[Int], - numOfPartitions: Option[Int]) { - import FeatureIndexParam._ - val _countColumnName = countColumnName.getOrElse(defaultCountColumnName) - val _minUserCount = minUserCount.getOrElse(defaultMinUserCount) -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FilterTopFeaturesParam.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FilterTopFeaturesParam.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FilterTopFeaturesParam.scala new file mode 100644 index 0000000..3b9d868 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FilterTopFeaturesParam.scala @@ -0,0 +1,7 @@ +package org.apache.s2graph.s2jobs.wal.process.params + +class FilterTopFeaturesParam(maxRankPerDim: Option[Map[String, Int]], + defaultMaxRank: Option[Int]) { + val _maxRankPerDim = maxRankPerDim.getOrElse(Map.empty) + val _defaultMaxRank = defaultMaxRank.getOrElse(Int.MaxValue) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FilterWalLogAggParam.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FilterWalLogAggParam.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FilterWalLogAggParam.scala deleted file mode 100644 index 4b2209f..0000000 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FilterWalLogAggParam.scala +++ /dev/null @@ -1,7 +0,0 @@ -package org.apache.s2graph.s2jobs.wal.process.params - -class FilterWalLogAggParam(maxRankPerDim: Option[Map[String, Int]], - defaultMaxRank: Option[Int]) { - val _maxRankPerDim = maxRankPerDim.getOrElse(Map.empty) - val _defaultMaxRank = defaultMaxRank.getOrElse(Int.MaxValue) -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/DefaultTransformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/DefaultTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/DefaultTransformer.scala index 2839d6a..de328e5 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/DefaultTransformer.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/DefaultTransformer.scala @@ -1,3 +1,5 @@ package org.apache.s2graph.s2jobs.wal.transformer -case class DefaultTransformer() extends Transformer +import org.apache.s2graph.s2jobs.task.TaskConf + +case class DefaultTransformer(taskConf: TaskConf) extends Transformer(taskConf) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractDomain.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractDomain.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractDomain.scala index 9b09d71..45bbe66 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractDomain.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractDomain.scala @@ -1,12 +1,15 @@ package org.apache.s2graph.s2jobs.wal.transformer +import org.apache.s2graph.s2jobs.task.TaskConf import org.apache.s2graph.s2jobs.wal.utils.UrlUtils import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog} +import play.api.libs.json.Json -case class ExtractDomain(urlDimensions: Set[String], - hostDimName: String = "host", - domainDimName: String = "domain", - keywordDimName: String = "uri_keywords") extends Transformer { +case class ExtractDomain(taskConf: TaskConf) extends Transformer(taskConf) { + val urlDimensions = Json.parse(taskConf.options.getOrElse("urlDimensions", "[]")).as[Set[String]] + val hostDimName = taskConf.options.getOrElse("hostDimName", "host") + val domainDimName= taskConf.options.getOrElse("domainDimName", "domain") + val keywordDimName = taskConf.options.getOrElse("keywordDimName", "uri_keywords") override def toDimValLs(walLog: WalLog, propertyKey: String, propertyValue: String): Seq[DimVal] = { if (!urlDimensions(propertyKey)) Nil else { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractServiceName.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractServiceName.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractServiceName.scala index 7ad7dfd..15efe9f 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractServiceName.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractServiceName.scala @@ -1,10 +1,17 @@ package org.apache.s2graph.s2jobs.wal.transformer +import org.apache.s2graph.core.JSONParser +import org.apache.s2graph.s2jobs.task.TaskConf import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog} +import play.api.libs.json.{JsObject, Json} + +class ExtractServiceName(taskConf: TaskConf) extends Transformer(taskConf) { + val serviceDims = Json.parse(taskConf.options.getOrElse("serviceDims", "[]")).as[Set[String]] + val domainServiceMap = Json.parse(taskConf.options.getOrElse("domainServiceMap", "{}")).as[JsObject].fields.map { case (k, v) => + k -> JSONParser.jsValueToString(v) + }.toMap + val serviceDimName = taskConf.options.getOrElse("serviceDimName", "serviceDimName") -class ExtractServiceName(serviceDims: Set[String], - domainServiceMap: Map[String, String] = Map.empty, - serviceDimName: String = "serviceName") extends Transformer { override def toDimValLs(walLog: WalLog, propertyKey: String, propertyValue: String): Seq[DimVal] = { if (!serviceDims(propertyKey)) Nil else { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/Transformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/Transformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/Transformer.scala index 523f58d..68be2ad 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/Transformer.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/Transformer.scala @@ -1,11 +1,12 @@ package org.apache.s2graph.s2jobs.wal.transformer +import org.apache.s2graph.s2jobs.task.TaskConf import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog} /** * decide how to transform walLog's each property key value to Seq[DimVal] */ -trait Transformer { +abstract class Transformer(taskConf: TaskConf) extends Serializable { def toDimValLs(walLog: WalLog, propertyKey: String, propertyValue: String): Seq[DimVal] = { val dim = s"${walLog.label}:${propertyKey}" val value = propertyValue http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/WalLogUDF.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/WalLogUDF.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/WalLogUDF.scala index 34f4a2b..d213d6c 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/WalLogUDF.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/WalLogUDF.scala @@ -137,22 +137,16 @@ object WalLogUDF { val sorted = rdd.repartitionAndSortWithinPartitions(partitioner) def rank(idx: Int, iter: Iterator[((K1, K2), V)]) = { - val initialK1: K1 = null.asInstanceOf[K1] - val initialK2: K2 = null.asInstanceOf[K2] - val initialV: V = null.asInstanceOf[V] - val zero = List((1L, initialK1, initialK2, initialV)) - - def merge(acc: List[(Long, K1, K2, V)], x: ((K1, K2), V)) = - (acc.head, x) match { - case ((offset, prevKey1, _, _), ((curKey1: K1, curKey2: K2), curVal: V)) => { - val newOffset = if (prevKey1 == curKey1) offset + 1L else 1L - (newOffset, curKey1, curKey2, curVal) :: acc - } - } - - iter.foldLeft(zero)(merge).reverse.drop(1).map { case (offset, key1, key2, value) => - (idx, offset, key1, key2, value) - }.toIterator + var curOffset = 1L + var curK1 = null.asInstanceOf[K1] + + iter.map{ case ((key1, key2), value) => + // println(s">>>[$idx] curK1: $curK1, curOffset: $curOffset") + val newOffset = if (curK1 == key1) curOffset + 1L else 1L + curOffset = newOffset + curK1 = key1 + (idx, newOffset, key1, key2, value) + } } def getOffset(idx: Int, iter: Iterator[((K1, K2), V)]) = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TestData.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TestData.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TestData.scala new file mode 100644 index 0000000..fae9265 --- /dev/null +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TestData.scala @@ -0,0 +1,42 @@ +package org.apache.s2graph.s2jobs.wal + +object TestData { + val testServiceName = "s2graph" + val walLogsLs = Seq( + WalLog(1L, "insert", "edge", "u1", "i1", s"$testServiceName", "click", """{"item_name":"awesome item"}"""), + WalLog(2L, "insert", "edge", "u1", "i1", s"$testServiceName", "purchase", """{"price":2}"""), + WalLog(3L, "insert", "edge", "u1", "q1", s"$testServiceName", "search", """{"referrer":"www.google.com"}"""), + WalLog(4L, "insert", "edge", "u2", "i1", s"$testServiceName", "click", """{"item_name":"awesome item"}"""), + WalLog(5L, "insert", "edge", "u2", "q2", s"$testServiceName", "search", """{"referrer":"www.bing.com"}"""), + WalLog(6L, "insert", "edge", "u3", "i2", s"$testServiceName", "click", """{"item_name":"bad item"}"""), + WalLog(7L, "insert", "edge", "u4", "q1", s"$testServiceName", "search", """{"referrer":"www.google.com"}""") + ) + + // order by from + val aggExpected = Array( + WalLogAgg("u1", Seq( + WalLog(3L, "insert", "edge", "u1", "q1", s"$testServiceName", "search", """{"referrer":"www.google.com"}"""), + WalLog(2L, "insert", "edge", "u1", "i1", s"$testServiceName", "purchase", """{"price":2}"""), + WalLog(1L, "insert", "edge", "u1", "i1", s"$testServiceName", "click", """{"item_name":"awesome item"}""") + ), 3L, 1L), + WalLogAgg("u2", Seq( + WalLog(5L, "insert", "edge", "u2", "q2", s"$testServiceName", "search", """{"referrer":"www.bing.com"}"""), + WalLog(4L, "insert", "edge", "u2", "i1", s"$testServiceName", "click", """{"item_name":"awesome item"}""") + ), 5L, 4L), + WalLogAgg("u3", Seq( + WalLog(6L, "insert", "edge", "u3", "i2", s"$testServiceName", "click", """{"item_name":"bad item"}""") + ), 6L, 6L), + WalLogAgg("u4", Seq( + WalLog(7L, "insert", "edge", "u4", "q1", s"$testServiceName", "search", """{"referrer":"www.google.com"}""") + ), 7L, 7L) + ) + + // order by dim, rank + val featureDictExpected = Array( + DimValCountRank(DimVal("click:item_name", "awesome item"), 2, 1), + DimValCountRank(DimVal("click:item_name", "bad item"), 1, 2), + DimValCountRank(DimVal("purchase:price", "2"), 1, 1), + DimValCountRank(DimVal("search:referrer", "www.google.com"), 2, 1), + DimValCountRank(DimVal("search:referrer", "www.bing.com"), 1, 2) + ) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TransformerTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TransformerTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TransformerTest.scala index 5089390..974002b 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TransformerTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TransformerTest.scala @@ -1,20 +1,26 @@ package org.apache.s2graph.s2jobs.wal +import org.apache.s2graph.s2jobs.task.TaskConf import org.apache.s2graph.s2jobs.wal.transformer._ import org.scalatest.{FunSuite, Matchers} +import play.api.libs.json.Json class TransformerTest extends FunSuite with Matchers { val walLog = WalLog(1L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 1, "url": "www.google.com"}""") test("test default transformer") { - val transformer = new DefaultTransformer + val taskConf = TaskConf.Empty + val transformer = new DefaultTransformer(taskConf) val dimVals = transformer.toDimValLs(walLog, "name", "1") dimVals shouldBe Seq(DimVal("friends:name", "1")) } test("test ExtractDomain from URL") { - val transformer = new ExtractDomain(urlDimensions = Set("url")) + val taskConf = TaskConf.Empty.copy(options = + Map("urlDimensions" -> Json.toJson(Seq("url")).toString()) + ) + val transformer = new ExtractDomain(taskConf) val dimVals = transformer.toDimValLs(walLog, "url", "http://www.google.com/abc") dimVals shouldBe Seq( http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/BuildTopFeaturesProcessTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/BuildTopFeaturesProcessTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/BuildTopFeaturesProcessTest.scala new file mode 100644 index 0000000..4d2f079 --- /dev/null +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/BuildTopFeaturesProcessTest.scala @@ -0,0 +1,31 @@ +package org.apache.s2graph.s2jobs.wal.process + +import com.holdenkarau.spark.testing.DataFrameSuiteBase +import org.apache.s2graph.s2jobs.task.TaskConf +import org.apache.s2graph.s2jobs.wal.DimValCountRank +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} + +class BuildTopFeaturesProcessTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase { + + import org.apache.s2graph.s2jobs.wal.TestData._ + + test("test entire process.") { + import spark.implicits._ + val df = spark.createDataset(aggExpected).toDF() + + val taskConf = new TaskConf(name = "test", `type` = "test", inputs = Seq("input"), + options = Map("minUserCount" -> "0") + ) + val job = new BuildTopFeaturesProcess(taskConf) + + + val inputMap = Map("input" -> df) + val featureDicts = job.execute(spark, inputMap) + .orderBy("dim", "rank") + .map(DimValCountRank.fromRow) + .collect() + + featureDicts shouldBe featureDictExpected + + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/FilterTopFeaturesProcessTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/FilterTopFeaturesProcessTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/FilterTopFeaturesProcessTest.scala new file mode 100644 index 0000000..cd8295a --- /dev/null +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/FilterTopFeaturesProcessTest.scala @@ -0,0 +1,84 @@ +package org.apache.s2graph.s2jobs.wal.process + +import com.holdenkarau.spark.testing.DataFrameSuiteBase +import org.apache.s2graph.s2jobs.task.TaskConf +import org.apache.s2graph.s2jobs.wal.transformer.DefaultTransformer +import org.apache.s2graph.s2jobs.wal.{DimValCountRank, WalLogAgg} +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} + +class FilterTopFeaturesProcessTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase { + import org.apache.s2graph.s2jobs.wal.TestData._ + + test("test filterTopKsPerDim.") { + import spark.implicits._ + val featureDf = spark.createDataset(featureDictExpected).map { x => + (x.dimVal.dim, x.dimVal.value, x.count, x.rank) + }.toDF("dim", "value", "count", "rank") + + val maxRankPerDim = spark.sparkContext.broadcast(Map.empty[String, Int]) + + // filter nothing because all feature has rank < 10 + val filtered = FilterTopFeaturesProcess.filterTopKsPerDim(featureDf, maxRankPerDim, 10) + + val real = filtered.orderBy("dim", "rank").map(DimValCountRank.fromRow).collect() + real.zip(featureDictExpected).foreach { case (real, expected) => + real shouldBe expected + } + // filter rank >= 2 + val filtered2 = FilterTopFeaturesProcess.filterTopKsPerDim(featureDf, maxRankPerDim, 2) + val real2 = filtered2.orderBy("dim", "rank").map(DimValCountRank.fromRow).collect() + real2 shouldBe featureDictExpected.filter(_.rank < 2) + } + + + test("test filterWalLogAgg.") { + import spark.implicits._ + val walLogAgg = spark.createDataset(aggExpected) + val featureDf = spark.createDataset(featureDictExpected).map { x => + (x.dimVal.dim, x.dimVal.value, x.count, x.rank) + }.toDF("dim", "value", "count", "rank") + val maxRankPerDim = spark.sparkContext.broadcast(Map.empty[String, Int]) + + val transformers = Seq(DefaultTransformer(TaskConf.Empty)) + // filter nothing. so input, output should be same. + val featureFiltered = FilterTopFeaturesProcess.filterTopKsPerDim(featureDf, maxRankPerDim, 10) + val validFeatureHashKeys = FilterTopFeaturesProcess.collectDistinctFeatureHashes(spark, featureFiltered) + val validFeatureHashKeysBCast = spark.sparkContext.broadcast(validFeatureHashKeys) + val real = FilterTopFeaturesProcess.filterWalLogAgg(spark, walLogAgg, transformers, validFeatureHashKeysBCast) + .collect().sortBy(_.from) + + real.zip(aggExpected).foreach { case (real, expected) => + real shouldBe expected + } + } + + test("test entire process. filter nothing.") { + import spark.implicits._ + val df = spark.createDataset(aggExpected).toDF() + val featureDf = spark.createDataset(featureDictExpected).map { x => + (x.dimVal.dim, x.dimVal.value, x.count, x.rank) + }.toDF("dim", "value", "count", "rank") + + val inputKey = "input" + val featureDictKey = "feature" + // filter nothing since we did not specified maxRankPerDim and defaultMaxRank. + val taskConf = new TaskConf(name = "test", `type` = "test", + inputs = Seq(inputKey, featureDictKey), + options = Map( + "featureDict" -> featureDictKey, + "walLogAgg" -> inputKey + ) + ) + val inputMap = Map(inputKey -> df, featureDictKey -> featureDf) + val job = new FilterTopFeaturesProcess(taskConf) + val filtered = job.execute(spark, inputMap) + .orderBy("from") + .as[WalLogAgg] + .collect() + + filtered.zip(aggExpected).foreach { case (real, expected) => + real shouldBe expected + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala index 5753411..1bb7426 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala @@ -2,35 +2,20 @@ package org.apache.s2graph.s2jobs.wal.process import com.holdenkarau.spark.testing.DataFrameSuiteBase import org.apache.s2graph.s2jobs.task.TaskConf -import org.apache.s2graph.s2jobs.wal.WalLog -import org.apache.s2graph.s2jobs.wal.udafs._ -import org.apache.s2graph.s2jobs.wal.utils.BoundedPriorityQueue +import org.apache.s2graph.s2jobs.wal._ import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} -import scala.collection.mutable -import scala.util.Random - class WalLogAggregateProcessTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase { - val walLogsLs = Seq( - WalLog(1L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 1}"""), - WalLog(2L, "insert", "edge", "a", "c", "s2graph", "friends", """{"name": 2}"""), - WalLog(3L, "insert", "edge", "a", "d", "s2graph", "friends", """{"name": 3}"""), - WalLog(4L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 4}""") - ) - val walLogsLs2 = Seq( - WalLog(5L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 1}"""), - WalLog(6L, "insert", "edge", "a", "c", "s2graph", "friends", """{"name": 2}"""), - WalLog(7L, "insert", "edge", "a", "d", "s2graph", "friends", """{"name": 3}"""), - WalLog(8L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 4}""") - ) - + import org.apache.s2graph.s2jobs.wal.TestData._ - test("test S2EdgeDataAggregateProcess") { + test("test entire process") { import spark.sqlContext.implicits._ val edges = spark.createDataset(walLogsLs).toDF() - val inputMap = Map("edges" -> edges) - val taskConf = new TaskConf(name = "test", `type` = "agg", inputs = Seq("edges"), + val processKey = "agg" + val inputMap = Map(processKey -> edges) + + val taskConf = new TaskConf(name = "test", `type` = "agg", inputs = Seq(processKey), options = Map("maxNumOfEdges" -> "10") ) @@ -38,40 +23,9 @@ class WalLogAggregateProcessTest extends FunSuite with Matchers with BeforeAndAf val processed = job.execute(spark, inputMap) processed.printSchema() - processed.collect().foreach { row => - println(row) - } - } - - test("mergeTwoSeq") { - val prev: Array[Int] = Array(3, 2, 1) - val cur: Array[Int] = Array(4, 2, 2) - - val ls = WalLogUDAF.mergeTwoSeq(prev, cur, 10) - println(ls.size) - - ls.foreach { x => - println(x) + processed.orderBy("from").as[WalLogAgg].collect().zip(aggExpected).foreach { case (real, expected) => + real shouldBe expected } } - test("addToTopK test.") { - import WalLogUDAF._ - val numOfTest = 100 - val numOfNums = 100 - val maxNum = 10 - - (0 until numOfTest).foreach { testNum => - val maxSize = 1 + Random.nextInt(numOfNums) - val pq = new BoundedPriorityQueue[Int](maxSize) - val arr = (0 until numOfNums).map(x => Random.nextInt(maxNum)) - var result: mutable.Seq[Int] = mutable.ArrayBuffer.empty[Int] - - arr.foreach { i => - pq += i - result = addToTopK(result, maxSize, i) - } - result.sorted shouldBe pq.toSeq.sorted - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/651ca2cf/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAFTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAFTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAFTest.scala new file mode 100644 index 0000000..aded56d --- /dev/null +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAFTest.scala @@ -0,0 +1,42 @@ +package org.apache.s2graph.s2jobs.wal.udafs + +import org.apache.s2graph.s2jobs.wal.utils.BoundedPriorityQueue +import org.scalatest._ + +import scala.collection.mutable +import scala.util.Random + +class WalLogUDAFTest extends FunSuite with Matchers { + + test("mergeTwoSeq") { + val prev: Array[Int] = Array(3, 2, 1) + val cur: Array[Int] = Array(4, 2, 2) + + val ls = WalLogUDAF.mergeTwoSeq(prev, cur, 10) + println(ls.size) + + ls.foreach { x => + println(x) + } + } + + test("addToTopK test.") { + import WalLogUDAF._ + val numOfTest = 100 + val numOfNums = 100 + val maxNum = 10 + + (0 until numOfTest).foreach { testNum => + val maxSize = 1 + Random.nextInt(numOfNums) + val pq = new BoundedPriorityQueue[Int](maxSize) + val arr = (0 until numOfNums).map(x => Random.nextInt(maxNum)) + var result: mutable.Seq[Int] = mutable.ArrayBuffer.empty[Int] + + arr.foreach { i => + pq += i + result = addToTopK(result, maxSize, i) + } + result.sorted shouldBe pq.toSeq.sorted + } + } +}