clean up WalLogAggregateProcess.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/5122e98a Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/5122e98a Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/5122e98a Branch: refs/heads/master Commit: 5122e98adfe89d60d50e8c6468ebd868c2951b88 Parents: a2e10fb Author: DO YUNG YOON <steams...@apache.org> Authored: Mon Aug 13 12:43:32 2018 +0900 Committer: DO YUNG YOON <steams...@apache.org> Committed: Mon Aug 13 17:59:40 2018 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/s2jobs/wal/WalLog.scala | 4 +- .../wal/process/FeatureIndexProcess.scala | 146 ++++++++++ .../wal/process/WalLogAggregateProcess.scala | 25 +- .../wal/process/params/AggregateParam.scala | 22 ++ .../wal/process/params/FeatureIndexParam.scala | 20 ++ .../wal/transformer/DefaultTransformer.scala | 3 + .../s2jobs/wal/transformer/ExtractDomain.scala | 20 ++ .../wal/transformer/ExtractServiceName.scala | 17 ++ .../s2jobs/wal/transformer/Transformer.scala | 15 + .../s2jobs/wal/udafs/S2EdgeDataAggregate.scala | 290 ------------------- .../s2graph/s2jobs/wal/udafs/WalLogUDAF.scala | 290 +++++++++++++++++++ .../s2graph/s2jobs/wal/udfs/WalLogUDF.scala | 216 ++++++++++++++ .../s2graph/s2jobs/wal/utils/UrlUtils.scala | 57 ++++ .../s2graph/s2jobs/wal/TransformerTest.scala | 26 ++ .../process/WalLogAggregateProcessTest.scala | 9 +- 15 files changed, 842 insertions(+), 318 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala index dc628ec..f4a670b 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala @@ -1,7 +1,7 @@ package org.apache.s2graph.s2jobs.wal import org.apache.s2graph.core.JSONParser -import org.apache.s2graph.s2jobs.wal.process.AggregateParam +import org.apache.s2graph.s2jobs.wal.process.params.AggregateParam import org.apache.s2graph.s2jobs.wal.utils.BoundedPriorityQueue import org.apache.spark.sql.Row import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} @@ -36,6 +36,8 @@ object WalLogAgg { } } +case class DimVal(dim: String, value: String) + case class WalLogAgg(from: String, logs: Seq[WalLog], maxTs: Long, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala new file mode 100644 index 0000000..7d04334 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala @@ -0,0 +1,146 @@ +package org.apache.s2graph.s2jobs.wal.process + +import com.google.common.hash.Hashing +import org.apache.s2graph.s2jobs.task.TaskConf +import org.apache.s2graph.s2jobs.wal.process.params.FeatureIndexParam +import org.apache.s2graph.s2jobs.wal.transformer._ +import org.apache.s2graph.s2jobs.wal.udfs.WalLogUDF +import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.apache.spark.sql.functions._ +import play.api.libs.json.{JsObject, Json} + +import scala.collection.mutable + +object FeatureIndexProcess { + def extractDimValues(transformers: Seq[Transformer]) = { + udf((rows: Seq[Row]) => { + val logs = rows.map(WalLog.fromRow) + // TODO: this can be changed into Map to count how many times each dimVal exist in sequence of walLog + // then change this to mutable.Map.empty[DimVal, Int], then aggregate. + val distinctDimValues = mutable.Set.empty[DimVal] + + logs.foreach { walLog => + walLog.propsKeyValues.foreach { case (propsKey, propsValue) => + transformers.foreach { transformer => + transformer.toDimValLs(walLog, propsKey, propsValue).foreach(distinctDimValues += _) + } + } + } + + distinctDimValues.toSeq + }) + } + + def buildDictionary(ss: SparkSession, + allDimVals: DataFrame, + param: FeatureIndexParam, + dimValColumnName: String = "dimVal"): DataFrame = { + import ss.implicits._ + + val rawFeatures = allDimVals + .select(col(param._countColumnName), col(s"$dimValColumnName.dim").as("dim"), col(s"$dimValColumnName.value").as("value")) + .groupBy("dim", "value") + .agg(countDistinct(param._countColumnName).as("count")) + .filter(s"count > ${param._minUserCount}") + + val ds: Dataset[((String, Long), String)] = + rawFeatures.select("dim", "value", "count").as[(String, String, Long)] + .map { case (dim, value, uv) => + (dim, uv) -> value + } + + + implicit val ord = Ordering.Tuple2(Ordering.String, Ordering.Long.reverse) + + val rdd: RDD[(Long, (String, Long), String)] = WalLogUDF.appendRank(ds, param.numOfPartitions, param.samplePointsPerPartitionHint) + + rdd.toDF("rank", "dim_count", "value") + .withColumn("dim", col("dim_count._1")) + .withColumn("count", col("dim_count._2")) + .select("dim", "value", "count", "rank") + } + + def toFeatureHash(dim: String, value: String): Long = { + Hashing.murmur3_128().hashBytes(s"$dim:$value".getBytes("UTF-8")).asLong() + } + + def collectDistinctFeatureHashes(ss: SparkSession, + filteredDict: DataFrame): Array[Long] = { + import ss.implicits._ + + val featureHashUDF = udf((dim: String, value: String) => toFeatureHash(dim, value)) + + filteredDict.withColumn("featureHash", featureHashUDF(col("dim"), col("value"))) + .select("featureHash") + .distinct().as[Long].collect() + } + + def filterTopKsPerDim(dict: DataFrame, + maxRankPerDim: Broadcast[Map[String, Int]], + defaultMaxRank: Int): DataFrame = { + val filterUDF = udf((dim: String, rank: Long) => { + rank < maxRankPerDim.value.getOrElse(dim, defaultMaxRank) + }) + + dict.filter(filterUDF(col("dim"), col("rank"))) + } +} + +case class FeatureIndexProcess(taskConf: TaskConf) extends org.apache.s2graph.s2jobs.task.Process(taskConf) { + + import FeatureIndexProcess._ + + override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = { + val countColumnName = taskConf.options.getOrElse("countColumnName", "from") + val numOfPartitions = taskConf.options.get("numOfPartitions").map(_.toInt) + val samplePointsPerPartitionHint = taskConf.options.get("samplePointsPerPartitionHint").map(_.toInt) + val minUserCount = taskConf.options.get("minUserCount").map(_.toLong) + val maxRankPerDim = taskConf.options.get("maxRankPerDim").map { s => + val json = Json.parse(s).as[JsObject] + json.fieldSet.map { case (key, jsValue) => key -> jsValue.as[Int] }.toMap + } + val defaultMaxRank = taskConf.options.get("defaultMaxRank").map(_.toInt) + val dictPath = taskConf.options.get("dictPath") + + numOfPartitions.map { d => ss.sqlContext.setConf("spark.sql.shuffle.partitions", d.toString) } + + // val maxRankPerDimBCast = ss.sparkContext.broadcast(maxRankPerDim.getOrElse(Map.empty)) + + val param = FeatureIndexParam(minUserCount = minUserCount, countColumnName = Option(countColumnName), + numOfPartitions = numOfPartitions, samplePointsPerPartitionHint = samplePointsPerPartitionHint, + maxRankPerDim = maxRankPerDim, defaultMaxRank = defaultMaxRank, dictPath = dictPath + ) + + val edges = taskConf.inputs.tail.foldLeft(inputMap(taskConf.inputs.head)) { case (prev, cur) => + prev.union(inputMap(cur)) + } + + //TODO: user expect to inject transformers that transform (WalLog, propertyKey, propertyValue) to Seq[DimVal]. + val transformers = Seq(DefaultTransformer()) + val dimValExtractUDF = extractDimValues(transformers) + val dimValColumnName = "dimVal" + + val rawFeatures = edges + .withColumn(dimValColumnName, explode(dimValExtractUDF(col("logs")))) + + val dict = buildDictionary(ss, rawFeatures, param, dimValColumnName) + + dict + //TODO: filter topKs per dim, then build valid dimValLs. + // then broadcast valid dimValLs to original dataframe, and filter out not valid dimVal. + + // dictPath.foreach { path => dict.write.mode(SaveMode.Overwrite).parquet(path) } + // + // val filteredDict = filterTopKsPerDim(dict, maxRankPerDimBCast, defaultMaxRank.getOrElse(Int.MaxValue)) + // val distinctFeatureHashes = collectDistinctFeatureHashes(ss, filteredDict) + // val distinctFeatureHashesBCast = ss.sparkContext.broadcast(distinctFeatureHashes) + + // filteredDict + } + + + override def mandatoryOptions: Set[String] = Set.empty +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala index 0c9829c..2b42e20 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala @@ -1,29 +1,10 @@ package org.apache.s2graph.s2jobs.wal.process import org.apache.s2graph.s2jobs.task.TaskConf +import org.apache.s2graph.s2jobs.wal.process.params.AggregateParam import org.apache.s2graph.s2jobs.wal.{WalLog, WalLogAgg} import org.apache.spark.sql._ -object AggregateParam { - val defaultGroupByKeys = Seq("from") - val defaultTopK = 1000 - val defaultIsArrayType = false - val defaultShouldSortTopItems = true -} - -case class AggregateParam(groupByKeys: Option[Seq[String]], - topK: Option[Int], - isArrayType: Option[Boolean], - shouldSortTopItems: Option[Boolean]) { - - import AggregateParam._ - - val groupByColumns = groupByKeys.getOrElse(defaultGroupByKeys) - val heapSize = topK.getOrElse(defaultTopK) - val arrayType = isArrayType.getOrElse(defaultIsArrayType) - val sortTopItems = shouldSortTopItems.getOrElse(defaultShouldSortTopItems) -} - object WalLogAggregateProcess { def aggregate(ss: SparkSession, dataset: Dataset[WalLogAgg], @@ -64,8 +45,8 @@ class WalLogAggregateProcess(taskConf: TaskConf) extends org.apache.s2graph.s2jo val maxNumOfEdges = taskConf.options.get("maxNumOfEdges").map(_.toInt).getOrElse(1000) val arrayType = taskConf.options.get("arrayType").map(_.toBoolean).getOrElse(false) val sortTopItems = taskConf.options.get("sortTopItems").map(_.toBoolean).getOrElse(false) - - taskConf.options.get("parallelism").foreach(d => ss.sqlContext.setConf("spark.sql.shuffle.partitions", d)) + val numOfPartitions = taskConf.options.get("numOfPartitions") + numOfPartitions.foreach(d => ss.sqlContext.setConf("spark.sql.shuffle.partitions", d)) implicit val ord = WalLog.orderByTsAsc val walLogs = taskConf.inputs.tail.foldLeft(inputMap(taskConf.inputs.head)) { case (prev, cur) => http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala new file mode 100644 index 0000000..89c36f4 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala @@ -0,0 +1,22 @@ +package org.apache.s2graph.s2jobs.wal.process.params + + +object AggregateParam { + val defaultGroupByKeys = Seq("from") + val defaultTopK = 1000 + val defaultIsArrayType = false + val defaultShouldSortTopItems = true +} + +case class AggregateParam(groupByKeys: Option[Seq[String]], + topK: Option[Int], + isArrayType: Option[Boolean], + shouldSortTopItems: Option[Boolean]) { + + import AggregateParam._ + + val groupByColumns = groupByKeys.getOrElse(defaultGroupByKeys) + val heapSize = topK.getOrElse(defaultTopK) + val arrayType = isArrayType.getOrElse(defaultIsArrayType) + val sortTopItems = shouldSortTopItems.getOrElse(defaultShouldSortTopItems) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala new file mode 100644 index 0000000..ddf7037 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala @@ -0,0 +1,20 @@ +package org.apache.s2graph.s2jobs.wal.process.params + +object FeatureIndexParam { + val defaultMinUserCount = 0L + val defaultCountColumnName = "from" +} + +case class FeatureIndexParam(minUserCount: Option[Long], + countColumnName: Option[String], + samplePointsPerPartitionHint: Option[Int], + numOfPartitions: Option[Int], + maxRankPerDim: Option[Map[String, Int]], + defaultMaxRank: Option[Int], + dictPath: Option[String]) { + import FeatureIndexParam._ + val _countColumnName = countColumnName.getOrElse(defaultCountColumnName) + val _minUserCount = minUserCount.getOrElse(defaultMinUserCount) + val _maxRankPerDim = maxRankPerDim.getOrElse(Map.empty) + val _defaultMaxRank = defaultMaxRank.getOrElse(Int.MaxValue) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/DefaultTransformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/DefaultTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/DefaultTransformer.scala new file mode 100644 index 0000000..2839d6a --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/DefaultTransformer.scala @@ -0,0 +1,3 @@ +package org.apache.s2graph.s2jobs.wal.transformer + +case class DefaultTransformer() extends Transformer http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractDomain.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractDomain.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractDomain.scala new file mode 100644 index 0000000..9b09d71 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractDomain.scala @@ -0,0 +1,20 @@ +package org.apache.s2graph.s2jobs.wal.transformer + +import org.apache.s2graph.s2jobs.wal.utils.UrlUtils +import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog} + +case class ExtractDomain(urlDimensions: Set[String], + hostDimName: String = "host", + domainDimName: String = "domain", + keywordDimName: String = "uri_keywords") extends Transformer { + override def toDimValLs(walLog: WalLog, propertyKey: String, propertyValue: String): Seq[DimVal] = { + if (!urlDimensions(propertyKey)) Nil + else { + val (_, domains, kwdOpt) = UrlUtils.extract(propertyValue) + + domains.headOption.toSeq.map(DimVal(hostDimName, _)) ++ + domains.map(DimVal(domainDimName, _)) ++ + kwdOpt.toSeq.map(DimVal(keywordDimName, _)) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractServiceName.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractServiceName.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractServiceName.scala new file mode 100644 index 0000000..7ad7dfd --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/ExtractServiceName.scala @@ -0,0 +1,17 @@ +package org.apache.s2graph.s2jobs.wal.transformer + +import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog} + +class ExtractServiceName(serviceDims: Set[String], + domainServiceMap: Map[String, String] = Map.empty, + serviceDimName: String = "serviceName") extends Transformer { + override def toDimValLs(walLog: WalLog, propertyKey: String, propertyValue: String): Seq[DimVal] = { + if (!serviceDims(propertyKey)) Nil + else { + val serviceName = domainServiceMap.getOrElse(propertyValue, propertyValue) + + Seq(DimVal(serviceDimName, serviceName)) + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/Transformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/Transformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/Transformer.scala new file mode 100644 index 0000000..523f58d --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/transformer/Transformer.scala @@ -0,0 +1,15 @@ +package org.apache.s2graph.s2jobs.wal.transformer + +import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog} + +/** + * decide how to transform walLog's each property key value to Seq[DimVal] + */ +trait Transformer { + def toDimValLs(walLog: WalLog, propertyKey: String, propertyValue: String): Seq[DimVal] = { + val dim = s"${walLog.label}:${propertyKey}" + val value = propertyValue + + Seq(DimVal(dim, value)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala deleted file mode 100644 index dabfd99..0000000 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala +++ /dev/null @@ -1,290 +0,0 @@ -package org.apache.s2graph.s2jobs.wal.udafs - -import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.expressions.GenericRow -import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} -import org.apache.spark.sql.types._ - -import scala.annotation.tailrec -import scala.collection.mutable - -object S2EdgeDataAggregate { - type Element = (Long, String, String, String) - - val emptyRow = new GenericRow(Array(-1L, "empty", "empty", "empty")) - - val elementOrd = Ordering.by[Element, Long](_._1) - - val rowOrdering = new Ordering[Row] { - override def compare(x: Row, y: Row): Int = { - x.getAs[Long](0).compareTo(y.getAs[Long](0)) - } - } - - val rowOrderingDesc = new Ordering[Row] { - override def compare(x: Row, y: Row): Int = { - -x.getAs[Long](0).compareTo(y.getAs[Long](0)) - } - } - - val fields = Seq( - StructField(name = "timestamp", LongType), - StructField(name = "to", StringType), - StructField(name = "label", StringType), - StructField(name = "props", StringType) - ) - - val arrayType = ArrayType(elementType = StructType(fields)) - - def apply(maxNumOfEdges: Int = 1000): GroupByAggOptimized = { - new GroupByAggOptimized(maxNumOfEdges) - } - - def swap[T](array: mutable.Seq[T], i: Int, j: Int) = { - val tmp = array(i) - array(i) = array(j) - array(j) = tmp - } - - @tailrec - def percolateDown[T](array: mutable.Seq[T], idx: Int)(implicit ordering: Ordering[T]): Unit = { - val left = 2 * idx + 1 - val right = 2 * idx + 2 - var smallest = idx - - if (left < array.size && ordering.compare(array(left), array(smallest)) < 0) { - smallest = left - } - - if (right < array.size && ordering.compare(array(right), array(smallest)) < 0) { - smallest = right - } - - if (smallest != idx) { - swap(array, idx, smallest) - percolateDown(array, smallest) - } - } - - def percolateUp[T](array: mutable.Seq[T], - idx: Int)(implicit ordering: Ordering[T]): Unit = { - var pos = idx - var parent = (pos - 1) / 2 - while (parent >= 0 && ordering.compare(array(pos), array(parent)) < 0) { - // swap pos and parent, since a[parent] > array[pos] - swap(array, parent, pos) - pos = parent - parent = (pos - 1) / 2 - } - } - - def addToTopK[T](array: mutable.Seq[T], - size: Int, - newData: T)(implicit ordering: Ordering[T]): mutable.Seq[T] = { - // use array as minHeap to keep track of topK. - // parent = (i -1) / 2 - // left child = 2 * i + 1 - // right chiud = 2 * i + 2 - - // check if array is already full. - if (array.size >= size) { - // compare newData to min. newData < array(0) - val currentMin = array(0) - if (ordering.compare(newData, currentMin) < 0) { - // drop newData - } else { - // delete min - array(0) = newData - // percolate down - percolateDown(array, 0) - } - array - } else { - // append new element into seqeunce since there are room left. - val newArray = array :+ newData - val idx = newArray.size - 1 - // percolate up last element - percolateUp(newArray, idx) - newArray - } - } - - def mergeTwoSeq[T](prev: Seq[T], cur: Seq[T], size: Int)(implicit ordering: Ordering[T]): Seq[T] = { - import scala.collection.mutable - val (n, m) = (cur.size, prev.size) - - var (i, j) = (0, 0) - var idx = 0 - val arr = new mutable.ArrayBuffer[T](size) - - while (idx < size && i < n && j < m) { - if (ordering.compare(cur(i), prev(j)) < 0) { - arr += cur(i) - i += 1 - } else { - arr += prev(j) - j += 1 - } - idx += 1 - } - while (idx < size && i < n) { - arr += cur(i) - i += 1 - } - while (idx < size && j < m) { - arr += prev(j) - j += 1 - } - - arr - } -} - -class GroupByAggOptimized(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFunction { - - import S2EdgeDataAggregate._ - - implicit val ord = rowOrdering - - val arrayType = ArrayType(elementType = StructType(fields)) - - type ROWS = mutable.Seq[Row] - - override def inputSchema: StructType = StructType(fields) - - override def bufferSchema: StructType = StructType(Seq( - StructField(name = "edges", dataType = arrayType) - )) - - override def dataType: DataType = arrayType - - override def deterministic: Boolean = true - - override def initialize(buffer: MutableAggregationBuffer): Unit = { - buffer.update(0, mutable.ArrayBuffer.empty[Row]) - } - - override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { - val prev = buffer.getAs[ROWS](0) - - val updated = addToTopK(prev, maxNumOfEdges, input) - - buffer.update(0, updated) - } - - override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { - var prev = buffer1.getAs[ROWS](0) - val cur = buffer2.getAs[ROWS](0) - - cur.filter(_ != null).foreach { row => - prev = addToTopK(prev, maxNumOfEdges, row) - } - - buffer1.update(0, prev) - } - - override def evaluate(buffer: Row): Any = { - val ls = buffer.getAs[ROWS](0) - takeTopK(ls, maxNumOfEdges) - } - - private def takeTopK(ls: Seq[Row], k: Int) = { - val sorted = ls.sorted - if (sorted.size <= k) sorted else sorted.take(k) - } -} - -class GroupByAgg(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFunction { - import S2EdgeDataAggregate._ - - implicit val ord = rowOrderingDesc - - val arrayType = ArrayType(elementType = StructType(fields)) - - override def inputSchema: StructType = StructType(fields) - - override def bufferSchema: StructType = StructType(Seq( - StructField(name = "edges", dataType = arrayType), - StructField(name = "buffered", dataType = BooleanType) - )) - - override def dataType: DataType = arrayType - - override def deterministic: Boolean = true - - override def initialize(buffer: MutableAggregationBuffer): Unit = { - buffer.update(0, scala.collection.mutable.ListBuffer.empty[Element]) - } - - /* not optimized */ - override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { - val element = input - - val prev = buffer.getAs[Seq[Row]](0) - val appended = prev :+ element - - buffer.update(0, appended) - buffer.update(1, false) - } - - private def takeTopK(ls: Seq[Row], k: Int) = { - val sorted = ls.sorted - if (sorted.size <= k) sorted else sorted.take(k) - } - /* not optimized */ - override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { - val cur = buffer2.getAs[Seq[Row]](0) - val prev = buffer1.getAs[Seq[Row]](0) - - buffer1.update(0, takeTopK(prev ++ cur, maxNumOfEdges)) - buffer1.update(1, true) - } - - override def evaluate(buffer: Row): Any = { - val ls = buffer.getAs[Seq[Row]](0) - val buffered = buffer.getAs[Boolean](1) - if (buffered) ls - else takeTopK(ls, maxNumOfEdges) - } -} - -class GroupByArrayAgg(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFunction { - import S2EdgeDataAggregate._ - - implicit val ord = rowOrdering - - import scala.collection.mutable - - override def inputSchema: StructType = StructType(Seq( - StructField(name = "edges", dataType = arrayType) - )) - - override def bufferSchema: StructType = StructType(Seq( - StructField(name = "edges", dataType = arrayType) - )) - - override def dataType: DataType = arrayType - - override def deterministic: Boolean = true - - override def initialize(buffer: MutableAggregationBuffer): Unit = - buffer.update(0, mutable.ListBuffer.empty[Row]) - - override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { - val cur = input.getAs[Seq[Row]](0) - val prev = buffer.getAs[Seq[Row]](0) - val merged = mergeTwoSeq(cur, prev, maxNumOfEdges) - - buffer.update(0, merged) - } - - override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { - val cur = buffer2.getAs[Seq[Row]](0) - val prev = buffer1.getAs[Seq[Row]](0) - - val merged = mergeTwoSeq(cur, prev, maxNumOfEdges) - buffer1.update(0, merged) - } - - override def evaluate(buffer: Row): Any = buffer.getAs[Seq[Row]](0) -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAF.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAF.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAF.scala new file mode 100644 index 0000000..81a1356 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/WalLogUDAF.scala @@ -0,0 +1,290 @@ +package org.apache.s2graph.s2jobs.wal.udafs + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.GenericRow +import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} +import org.apache.spark.sql.types._ + +import scala.annotation.tailrec +import scala.collection.mutable + +object WalLogUDAF { + type Element = (Long, String, String, String) + + val emptyRow = new GenericRow(Array(-1L, "empty", "empty", "empty")) + + val elementOrd = Ordering.by[Element, Long](_._1) + + val rowOrdering = new Ordering[Row] { + override def compare(x: Row, y: Row): Int = { + x.getAs[Long](0).compareTo(y.getAs[Long](0)) + } + } + + val rowOrderingDesc = new Ordering[Row] { + override def compare(x: Row, y: Row): Int = { + -x.getAs[Long](0).compareTo(y.getAs[Long](0)) + } + } + + val fields = Seq( + StructField(name = "timestamp", LongType), + StructField(name = "to", StringType), + StructField(name = "label", StringType), + StructField(name = "props", StringType) + ) + + val arrayType = ArrayType(elementType = StructType(fields)) + + def apply(maxNumOfEdges: Int = 1000): GroupByAggOptimized = { + new GroupByAggOptimized(maxNumOfEdges) + } + + def swap[T](array: mutable.Seq[T], i: Int, j: Int) = { + val tmp = array(i) + array(i) = array(j) + array(j) = tmp + } + + @tailrec + def percolateDown[T](array: mutable.Seq[T], idx: Int)(implicit ordering: Ordering[T]): Unit = { + val left = 2 * idx + 1 + val right = 2 * idx + 2 + var smallest = idx + + if (left < array.size && ordering.compare(array(left), array(smallest)) < 0) { + smallest = left + } + + if (right < array.size && ordering.compare(array(right), array(smallest)) < 0) { + smallest = right + } + + if (smallest != idx) { + swap(array, idx, smallest) + percolateDown(array, smallest) + } + } + + def percolateUp[T](array: mutable.Seq[T], + idx: Int)(implicit ordering: Ordering[T]): Unit = { + var pos = idx + var parent = (pos - 1) / 2 + while (parent >= 0 && ordering.compare(array(pos), array(parent)) < 0) { + // swap pos and parent, since a[parent] > array[pos] + swap(array, parent, pos) + pos = parent + parent = (pos - 1) / 2 + } + } + + def addToTopK[T](array: mutable.Seq[T], + size: Int, + newData: T)(implicit ordering: Ordering[T]): mutable.Seq[T] = { + // use array as minHeap to keep track of topK. + // parent = (i -1) / 2 + // left child = 2 * i + 1 + // right chiud = 2 * i + 2 + + // check if array is already full. + if (array.size >= size) { + // compare newData to min. newData < array(0) + val currentMin = array(0) + if (ordering.compare(newData, currentMin) < 0) { + // drop newData + } else { + // delete min + array(0) = newData + // percolate down + percolateDown(array, 0) + } + array + } else { + // append new element into seqeunce since there are room left. + val newArray = array :+ newData + val idx = newArray.size - 1 + // percolate up last element + percolateUp(newArray, idx) + newArray + } + } + + def mergeTwoSeq[T](prev: Seq[T], cur: Seq[T], size: Int)(implicit ordering: Ordering[T]): Seq[T] = { + import scala.collection.mutable + val (n, m) = (cur.size, prev.size) + + var (i, j) = (0, 0) + var idx = 0 + val arr = new mutable.ArrayBuffer[T](size) + + while (idx < size && i < n && j < m) { + if (ordering.compare(cur(i), prev(j)) < 0) { + arr += cur(i) + i += 1 + } else { + arr += prev(j) + j += 1 + } + idx += 1 + } + while (idx < size && i < n) { + arr += cur(i) + i += 1 + } + while (idx < size && j < m) { + arr += prev(j) + j += 1 + } + + arr + } +} + +class GroupByAggOptimized(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFunction { + + import WalLogUDAF._ + + implicit val ord = rowOrdering + + val arrayType = ArrayType(elementType = StructType(fields)) + + type ROWS = mutable.Seq[Row] + + override def inputSchema: StructType = StructType(fields) + + override def bufferSchema: StructType = StructType(Seq( + StructField(name = "edges", dataType = arrayType) + )) + + override def dataType: DataType = arrayType + + override def deterministic: Boolean = true + + override def initialize(buffer: MutableAggregationBuffer): Unit = { + buffer.update(0, mutable.ArrayBuffer.empty[Row]) + } + + override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { + val prev = buffer.getAs[ROWS](0) + + val updated = addToTopK(prev, maxNumOfEdges, input) + + buffer.update(0, updated) + } + + override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { + var prev = buffer1.getAs[ROWS](0) + val cur = buffer2.getAs[ROWS](0) + + cur.filter(_ != null).foreach { row => + prev = addToTopK(prev, maxNumOfEdges, row) + } + + buffer1.update(0, prev) + } + + override def evaluate(buffer: Row): Any = { + val ls = buffer.getAs[ROWS](0) + takeTopK(ls, maxNumOfEdges) + } + + private def takeTopK(ls: Seq[Row], k: Int) = { + val sorted = ls.sorted + if (sorted.size <= k) sorted else sorted.take(k) + } +} + +class GroupByAgg(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFunction { + import WalLogUDAF._ + + implicit val ord = rowOrderingDesc + + val arrayType = ArrayType(elementType = StructType(fields)) + + override def inputSchema: StructType = StructType(fields) + + override def bufferSchema: StructType = StructType(Seq( + StructField(name = "edges", dataType = arrayType), + StructField(name = "buffered", dataType = BooleanType) + )) + + override def dataType: DataType = arrayType + + override def deterministic: Boolean = true + + override def initialize(buffer: MutableAggregationBuffer): Unit = { + buffer.update(0, scala.collection.mutable.ListBuffer.empty[Element]) + } + + /* not optimized */ + override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { + val element = input + + val prev = buffer.getAs[Seq[Row]](0) + val appended = prev :+ element + + buffer.update(0, appended) + buffer.update(1, false) + } + + private def takeTopK(ls: Seq[Row], k: Int) = { + val sorted = ls.sorted + if (sorted.size <= k) sorted else sorted.take(k) + } + /* not optimized */ + override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { + val cur = buffer2.getAs[Seq[Row]](0) + val prev = buffer1.getAs[Seq[Row]](0) + + buffer1.update(0, takeTopK(prev ++ cur, maxNumOfEdges)) + buffer1.update(1, true) + } + + override def evaluate(buffer: Row): Any = { + val ls = buffer.getAs[Seq[Row]](0) + val buffered = buffer.getAs[Boolean](1) + if (buffered) ls + else takeTopK(ls, maxNumOfEdges) + } +} + +class GroupByArrayAgg(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFunction { + import WalLogUDAF._ + + implicit val ord = rowOrdering + + import scala.collection.mutable + + override def inputSchema: StructType = StructType(Seq( + StructField(name = "edges", dataType = arrayType) + )) + + override def bufferSchema: StructType = StructType(Seq( + StructField(name = "edges", dataType = arrayType) + )) + + override def dataType: DataType = arrayType + + override def deterministic: Boolean = true + + override def initialize(buffer: MutableAggregationBuffer): Unit = + buffer.update(0, mutable.ListBuffer.empty[Row]) + + override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { + val cur = input.getAs[Seq[Row]](0) + val prev = buffer.getAs[Seq[Row]](0) + val merged = mergeTwoSeq(cur, prev, maxNumOfEdges) + + buffer.update(0, merged) + } + + override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { + val cur = buffer2.getAs[Seq[Row]](0) + val prev = buffer1.getAs[Seq[Row]](0) + + val merged = mergeTwoSeq(cur, prev, maxNumOfEdges) + buffer1.update(0, merged) + } + + override def evaluate(buffer: Row): Any = buffer.getAs[Seq[Row]](0) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/WalLogUDF.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/WalLogUDF.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/WalLogUDF.scala new file mode 100644 index 0000000..34f4a2b --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udfs/WalLogUDF.scala @@ -0,0 +1,216 @@ +package org.apache.s2graph.s2jobs.wal.udfs + +import com.google.common.hash.Hashing +import org.apache.s2graph.core.JSONParser +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.{Dataset, Row} +import play.api.libs.json._ + +import scala.reflect.ClassTag + +object WalLogUDF { + + import scala.collection.mutable + + type MergedProps = Map[String, Seq[String]] + type MutableMergedProps = mutable.Map[String, mutable.Map[String, Int]] + type MutableMergedPropsInner = mutable.Map[String, Int] + + def initMutableMergedPropsInner = mutable.Map.empty[String, Int] + + def initMutableMergedProps = mutable.Map.empty[String, mutable.Map[String, Int]] + +// //TODO: +// def toDimension(rawActivity: RawActivity, propertyKey: String): String = { +// // val (ts, dst, label, _) = rawActivity +// // label + "." + propertyKey +// propertyKey +// } +// +// def updateMutableMergedProps(mutableMergedProps: MutableMergedProps)(dimension: String, +// dimensionValue: String, +// count: Int = 1): Unit = { +// val buffer = mutableMergedProps.getOrElseUpdate(dimension, initMutableMergedPropsInner) +// val newCount = buffer.getOrElse(dimensionValue, 0) + count +// buffer += (dimensionValue -> newCount) +// } +// +// def groupByDimensionValues(rawActivity: RawActivity, +// propsJson: JsObject, +// mergedProps: MutableMergedProps, +// toDimensionFunc: (RawActivity, String) => String, +// excludePropKeys: Set[String] = Set.empty): Unit = { +// propsJson.fields.filter(t => !excludePropKeys(t._1)).foreach { case (propertyKey, jsValue) => +// val values = jsValue match { +// case JsString(s) => Seq(s) +// case JsArray(arr) => arr.map(JSONParser.jsValueToString) +// case _ => Seq(jsValue.toString()) +// } +// val dimension = toDimensionFunc(rawActivity, propertyKey) +// +// values.foreach { value => +// updateMutableMergedProps(mergedProps)(dimension, value) +// } +// } +// } +// +// def buildMergedProps(rawActivities: Seq[RawActivity], +// toDimensionFunc: (RawActivity, String) => String, +// defaultTopKs: Int = 100, +// dimTopKs: Map[String, Int] = Map.empty, +// excludePropKeys: Set[String] = Set.empty, +// dimValExtractors: Seq[Extractor] = Nil): MergedProps = { +// val mergedProps = initMutableMergedProps +// +// rawActivities.foreach { case rawActivity@(_, _, _, rawProps) => +// val propsJson = Json.parse(rawProps).as[JsObject] +// groupByDimensionValues(rawActivity, propsJson, mergedProps, toDimensionFunc, excludePropKeys) +// } +// // work on extra dimVals. +// dimValExtractors.foreach { extractor => +// extractor.extract(rawActivities, mergedProps) +// } +// +// mergedProps.map { case (key, values) => +// val topK = dimTopKs.getOrElse(key, defaultTopKs) +// +// key -> values.toSeq.sortBy(-_._2).take(topK).map(_._1) +// }.toMap +// } +// +// def rowToRawActivity(row: Row): RawActivity = { +// (row.getAs[Long](0), row.getAs[String](1), row.getAs[String](2), row.getAs[String](3)) +// } +// +// def appendMergeProps(toDimensionFunc: (RawActivity, String) => String = toDimension, +// defaultTopKs: Int = 100, +// dimTopKs: Map[String, Int] = Map.empty, +// excludePropKeys: Set[String] = Set.empty, +// dimValExtractors: Seq[Extractor] = Nil, +// minTs: Long = 0, +// maxTs: Long = Long.MaxValue) = udf((acts: Seq[Row]) => { +// val rows = acts.map(rowToRawActivity).filter(act => act._1 >= minTs && act._1 < maxTs) +// +// buildMergedProps(rows, toDimensionFunc, defaultTopKs, dimTopKs, excludePropKeys, dimValExtractors) +// }) + + val extractDimensionValues = { + udf((dimensionValues: Map[String, Seq[String]]) => { + dimensionValues.toSeq.flatMap { case (dimension, values) => + values.map { value => dimension -> value } + } + }) + } + + def toHash(dimension: String, dimensionValue: String): Long = { + val key = s"$dimension.$dimensionValue" + Hashing.murmur3_128().hashBytes(key.toString.getBytes("UTF-8")).asLong() + } + + def filterDimensionValues(validDimValues: Broadcast[Set[Long]]) = { + udf((dimensionValues: Map[String, Seq[String]]) => { + dimensionValues.map { case (dimension, values) => + val filtered = values.filter { value => + val hash = toHash(dimension, value) + + validDimValues.value(hash) + } + + dimension -> filtered + } + }) + } + + def appendRank[K1: ClassTag, K2: ClassTag, V: ClassTag](ds: Dataset[((K1, K2), V)], + numOfPartitions: Option[Int] = None, + samplePointsPerPartitionHint: Option[Int] = None)(implicit ordering: Ordering[(K1, K2)]) = { + import org.apache.spark.RangePartitioner + val rdd = ds.rdd + + val partitioner = new RangePartitioner(numOfPartitions.getOrElse(rdd.partitions.size), + rdd, + true, + samplePointsPerPartitionHint = samplePointsPerPartitionHint.getOrElse(20) + ) + + val sorted = rdd.repartitionAndSortWithinPartitions(partitioner) + + def rank(idx: Int, iter: Iterator[((K1, K2), V)]) = { + val initialK1: K1 = null.asInstanceOf[K1] + val initialK2: K2 = null.asInstanceOf[K2] + val initialV: V = null.asInstanceOf[V] + val zero = List((1L, initialK1, initialK2, initialV)) + + def merge(acc: List[(Long, K1, K2, V)], x: ((K1, K2), V)) = + (acc.head, x) match { + case ((offset, prevKey1, _, _), ((curKey1: K1, curKey2: K2), curVal: V)) => { + val newOffset = if (prevKey1 == curKey1) offset + 1L else 1L + (newOffset, curKey1, curKey2, curVal) :: acc + } + } + + iter.foldLeft(zero)(merge).reverse.drop(1).map { case (offset, key1, key2, value) => + (idx, offset, key1, key2, value) + }.toIterator + } + + def getOffset(idx: Int, iter: Iterator[((K1, K2), V)]) = { + val buffer = mutable.Map.empty[K1, (Int, Long)] + if (!iter.hasNext) buffer.toIterator + else { + val ((k1, k2), v) = iter.next() + var prevKey1: K1 = k1 + var size = 1L + iter.foreach { case ((k1, k2), v) => + if (prevKey1 != k1) { + buffer += prevKey1 -> (idx, size) + prevKey1 = k1 + size = 0L + } + size += 1L + } + if (size > 0) buffer += prevKey1 -> (idx, size) + buffer.iterator + } + } + + val partRanks = sorted.mapPartitionsWithIndex(rank) + val _offsets = sorted.mapPartitionsWithIndex(getOffset) + val offsets = _offsets.groupBy(_._1).flatMap { case (k1, partitionWithSize) => + val ls = partitionWithSize.toSeq.map(_._2).sortBy(_._1) + var sum = ls.head._2 + val lss = ls.tail.map { case (partition, size) => + val x = (partition, sum) + sum += size + x + } + lss.map { case (partition, offset) => + (k1, partition) -> offset + } + }.collect() + + println(offsets) + + val offsetsBCast = ds.sparkSession.sparkContext.broadcast(offsets) + + def adjust(iter: Iterator[(Int, Long, K1, K2, V)], startOffsets: Map[(K1, Int), Long]) = { + iter.map { case (partition, rankInPartition, key1, key2, value) => + val startOffset = startOffsets.getOrElse((key1, partition), 0L) + val rank = startOffset + rankInPartition + + (partition, rankInPartition, rank, (key1, key2), value) + } + } + + val withRanks = partRanks + .mapPartitions { iter => + val startOffsets = offsetsBCast.value.toMap + adjust(iter, startOffsets) + }.map { case (_, _, rank, (key1, key2), value) => + (rank, (key1, key2), value) + } + + withRanks + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/UrlUtils.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/UrlUtils.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/UrlUtils.scala new file mode 100644 index 0000000..2941357 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/utils/UrlUtils.scala @@ -0,0 +1,57 @@ +package org.apache.s2graph.s2jobs.wal.utils + +import java.net.{URI, URLDecoder} + +import scala.util.matching.Regex + +object UrlUtils { + val pattern = new Regex("""(\\x[0-9A-Fa-f]{2}){3}""") + val koreanPattern = new scala.util.matching.Regex("([ê°-í£]+[\\-_a-zA-Z 0-9]*)+|([\\-_a-zA-Z 0-9]+[ê°-í£]+)") + + + // url extraction functions + def urlDecode(url: String): (Boolean, String) = { + try { + val decoded = URLDecoder.decode(url, "UTF-8") + (url != decoded, decoded) + } catch { + case e: Exception => (false, url) + } + } + + def hex2String(url: String): String = { + pattern replaceAllIn(url, m => { + new String(m.toString.replaceAll("[^0-9A-Fa-f]", "").sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toByte), "utf-8") + }) + } + + def toDomains(url: String, maxDepth: Int = 3): Seq[String] = { + val uri = new URI(url) + val domain = uri.getHost + if (domain == null) Nil + else { + val paths = uri.getPath.split("/") + if (paths.isEmpty) Seq(domain) + else { + val depth = Math.min(maxDepth, paths.size) + (1 to depth).map { ith => + domain + paths.take(ith).mkString("/") + } + } + } + } + + def extract(_url: String): (String, Seq[String], Option[String]) = { + try { + val url = hex2String(_url) + val (encoded, decodedUrl) = urlDecode(url) + + val kwdOpt = koreanPattern.findAllMatchIn(decodedUrl).toList.map(_.group(0)).headOption.map(_.replaceAll("\\s", "")) + val domains = toDomains(url.replaceAll(" ", "")) + (decodedUrl, domains, kwdOpt) + } catch { + case e: Exception => (_url, Nil, None) + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TransformerTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TransformerTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TransformerTest.scala new file mode 100644 index 0000000..5089390 --- /dev/null +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/TransformerTest.scala @@ -0,0 +1,26 @@ +package org.apache.s2graph.s2jobs.wal + +import org.apache.s2graph.s2jobs.wal.transformer._ +import org.scalatest.{FunSuite, Matchers} + +class TransformerTest extends FunSuite with Matchers { + val walLog = WalLog(1L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 1, "url": "www.google.com"}""") + + test("test default transformer") { + val transformer = new DefaultTransformer + val dimVals = transformer.toDimValLs(walLog, "name", "1") + + dimVals shouldBe Seq(DimVal("friends:name", "1")) + } + + test("test ExtractDomain from URL") { + val transformer = new ExtractDomain(urlDimensions = Set("url")) + val dimVals = transformer.toDimValLs(walLog, "url", "http://www.google.com/abc") + + dimVals shouldBe Seq( + DimVal("host", "www.google.com"), + DimVal("domain", "www.google.com"), + DimVal("domain", "www.google.com/abc") + ) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5122e98a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala index 5ae595b..5753411 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcessTest.scala @@ -31,9 +31,8 @@ class WalLogAggregateProcessTest extends FunSuite with Matchers with BeforeAndAf val edges = spark.createDataset(walLogsLs).toDF() val inputMap = Map("edges" -> edges) val taskConf = new TaskConf(name = "test", `type` = "agg", inputs = Seq("edges"), - options = Map("maxNumOfEdges" -> "10", - "runOrderBy" -> "false", - "groupByAggClassName" -> "GroupByAggOptimized")) + options = Map("maxNumOfEdges" -> "10") + ) val job = new WalLogAggregateProcess(taskConf = taskConf) val processed = job.execute(spark, inputMap) @@ -48,7 +47,7 @@ class WalLogAggregateProcessTest extends FunSuite with Matchers with BeforeAndAf val prev: Array[Int] = Array(3, 2, 1) val cur: Array[Int] = Array(4, 2, 2) - val ls = S2EdgeDataAggregate.mergeTwoSeq(prev, cur, 10) + val ls = WalLogUDAF.mergeTwoSeq(prev, cur, 10) println(ls.size) ls.foreach { x => @@ -57,7 +56,7 @@ class WalLogAggregateProcessTest extends FunSuite with Matchers with BeforeAndAf } test("addToTopK test.") { - import S2EdgeDataAggregate._ + import WalLogUDAF._ val numOfTest = 100 val numOfNums = 100 val maxNum = 10