Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 70a7c71a3 -> 4f1c4ceb6


Extract Where and EdgeTransformer to TraversalHelper.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/3355701f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/3355701f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/3355701f

Branch: refs/heads/master
Commit: 3355701f00b34d5e1609e8424d09b50e8211e87a
Parents: 32eb344
Author: DO YUNG YOON <steams...@apache.org>
Authored: Thu Jun 14 18:14:22 2018 +0900
Committer: DO YUNG YOON <steams...@apache.org>
Committed: Thu Jun 14 18:14:22 2018 +0900

----------------------------------------------------------------------
 .../apache/s2graph/core/TraversalHelper.scala   | 42 +++++++++++++-
 .../core/fetcher/annoy/AnnoyModelFetcher.scala  | 59 ++------------------
 .../core/fetcher/fasttext/FastTextFetcher.scala |  8 ++-
 .../fetcher/tensorflow/InceptionFetcher.scala   |  8 ++-
 .../apache/s2graph/core/storage/StorageIO.scala | 48 +++++-----------
 5 files changed, 69 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3355701f/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
index 0a4a49b..ba18e8d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
@@ -22,6 +22,7 @@ package org.apache.s2graph.core
 import java.util
 
 import org.apache.s2graph.core.S2Graph.{FilterHashKey, HashKey}
+import org.apache.s2graph.core.parsers.WhereParser
 import org.apache.s2graph.core.schema.LabelMeta
 import org.apache.s2graph.core.types.{HBaseType, InnerVal, LabelWithDirection, 
VertexId}
 import org.apache.s2graph.core.utils.{Extensions, logger}
@@ -39,11 +40,11 @@ object TraversalHelper {
     else randomInt(sampleNumber, range, set + Random.nextInt(range))
   }
 
-  def sample(queryRequest: QueryRequest, edges: Seq[EdgeWithScore], n: Int): 
Seq[EdgeWithScore] = {
+  def sample(edges: Seq[EdgeWithScore], offset: Int, n: Int): 
Seq[EdgeWithScore] = {
     if (edges.size <= n) {
       edges
     } else {
-      val plainEdges = if (queryRequest.queryParam.offset == 0) {
+      val plainEdges = if (offset == 0) {
         edges.tail
       } else edges
 
@@ -141,6 +142,41 @@ object TraversalHelper {
 
     (hashKey, filterHashKey)
   }
+
+  def edgeToEdgeWithScore(queryRequest: QueryRequest,
+                          edge: S2EdgeLike,
+                          parentEdges: Seq[EdgeWithScore]): Seq[EdgeWithScore] 
= {
+    val prevScore = queryRequest.prevStepScore
+    val queryParam = queryRequest.queryParam
+    val queryOption = queryRequest.query.queryOption
+    val nextStepOpt = queryRequest.nextStepOpt
+    val labelWeight = queryRequest.labelWeight
+    val where = queryParam.where.get
+    val isDefaultTransformer = queryParam.edgeTransformer.isDefault
+
+    if (where != WhereParser.success && !where.filter(edge)) Nil
+    else {
+      val edges = if (isDefaultTransformer) Seq(edge) else 
convertEdges(queryParam, edge, nextStepOpt)
+      edges.map { e =>
+        if (!queryOption.ignorePrevStepCache) {
+          EdgeWithScore(e, queryParam.rank.score(edge), queryParam.label)
+        } else {
+          val edgeScore = queryParam.rank.score(edge)
+          val score = queryParam.scorePropagateOp match {
+            case "plus" => edgeScore + prevScore
+            case "divide" =>
+              if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0
+              else edgeScore / (prevScore + queryParam.scorePropagateShrinkage)
+            case _ => edgeScore * prevScore
+          }
+          val tsVal = processTimeDecay(queryParam, edge)
+
+          EdgeWithScore(e.copyParentEdges(parentEdges), score = score * 
labelWeight * tsVal, label = queryParam.label)
+        }
+      }
+    }
+  }
+
 }
 
 
@@ -481,7 +517,7 @@ class TraversalHelper(graph: S2GraphLike) {
       val degreeScore = 0.0
 
       val sampled =
-        if (queryRequest.queryParam.sample >= 0) sample(queryRequest, 
edgeWithScores, queryRequest.queryParam.sample)
+        if (queryRequest.queryParam.sample >= 0) sample(edgeWithScores, 
queryParam.offset, queryParam.sample)
         else edgeWithScores
 
       val withScores = for {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3355701f/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala
index cdf3b71..6c00ca0 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala
@@ -29,74 +29,23 @@ import scala.concurrent.{ExecutionContext, Future}
 
 object AnnoyModelFetcher {
   val IndexFilePathKey = "annoyIndexFilePath"
-//  val DictFilePathKey = "annoyDictFilePath"
   val DimensionKey = "annoyIndexDimension"
   val IndexTypeKey = "annoyIndexType"
 
-  //  def loadDictFromLocal(file: File): Map[Int, String] = {
-  //    val files = if (file.isDirectory) {
-  //      file.listFiles()
-  //    } else {
-  //      Array(file)
-  //    }
-  //
-  //    files.flatMap { file =>
-  //      Source.fromFile(file).getLines().zipWithIndex.flatMap { case (line, 
_idx) =>
-  //        val tokens = line.stripMargin.split(",")
-  //        try {
-  //          val tpl = if (tokens.length < 2) {
-  //            (tokens.head.toInt, tokens.head)
-  //          } else {
-  //            (tokens.head.toInt, tokens.tail.head)
-  //          }
-  //          Seq(tpl)
-  //        } catch {
-  //          case e: Exception => Nil
-  //        }
-  //      }
-  //    }.toMap
-  //  }
-
   def buildAnnoy4s[T](indexPath: String)(implicit converter: KeyConverter[T]): 
Annoy[T] = {
     Annoy.load[T](indexPath)
   }
-
-  //  def buildIndex(indexPath: String,
-  //                 dictPath: String,
-  //                 dimension: Int,
-  //                 indexType: IndexType): ANNIndexWithDict = {
-  //    val dict = loadDictFromLocal(new File(dictPath))
-  //    val index = new ANNIndex(dimension, indexPath, indexType)
-  //
-  //    ANNIndexWithDict(index, dict)
-  //  }
-  //
-  //  def buildIndex(config: Config): ANNIndexWithDict = {
-  //    val indexPath = config.getString(IndexFilePathKey)
-  //    val dictPath = config.getString(DictFilePathKey)
-  //
-  //    val dimension = config.getInt(DimensionKey)
-  //    val indexType = Try { config.getString(IndexTypeKey) 
}.toOption.map(IndexType.valueOf).getOrElse(IndexType.ANGULAR)
-  //
-  //    buildIndex(indexPath, dictPath, dimension, indexType)
-  //  }
 }
 
-//
-//case class ANNIndexWithDict(index: ANNIndex, dict: Map[Int, String]) {
-//  val dictRev = dict.map(kv => kv._2 -> kv._1)
-//}
-
 class AnnoyModelFetcher(val graph: S2GraphLike) extends EdgeFetcher {
   import AnnoyModelFetcher._
+  import TraversalHelper._
 
   val builder = graph.elementBuilder
 
-  //  var model: ANNIndexWithDict = _
   var model: Annoy[String] = _
 
   override def init(config: Config)(implicit ec: ExecutionContext): Unit = {
-
     model = AnnoyModelFetcher.buildAnnoy4s(config.getString(IndexFilePathKey))
   }
 
@@ -106,15 +55,17 @@ class AnnoyModelFetcher(val graph: S2GraphLike) extends 
EdgeFetcher {
     val stepResultLs = queryRequests.map { queryRequest =>
       val vertex = queryRequest.vertex
       val queryParam = queryRequest.queryParam
+      val shouldBuildParents = queryRequest.query.queryOption.returnTree || 
queryParam.whereHasParent
+      val parentEdges = if (shouldBuildParents) 
prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil
 
-      val edgeWithScores = model.query(vertex.innerId.toIdString(), 
queryParam.limit).getOrElse(Nil).map { case (tgtId, score) =>
+      val edgeWithScores = model.query(vertex.innerId.toIdString(), 
queryParam.limit).getOrElse(Nil).flatMap { case (tgtId, score) =>
         val tgtVertexId = builder.newVertexId(queryParam.label.service,
           queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), 
tgtId)
 
         val props: Map[String, Any] = if 
(queryParam.label.metaPropsInvMap.contains("score")) Map("score" -> score) else 
Map.empty
         val edge = graph.toEdge(vertex.innerId.value, 
tgtVertexId.innerId.value, queryParam.labelName, queryParam.direction, props = 
props)
 
-        EdgeWithScore(edge, score, queryParam.label)
+        edgeToEdgeWithScore(queryRequest, edge, parentEdges)
       }
 
       StepResult(edgeWithScores, Nil, Nil)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3355701f/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala
index 433b4dc..a07ed07 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala
@@ -28,6 +28,7 @@ import scala.concurrent.{ExecutionContext, Future}
 
 
 class FastTextFetcher(val graph: S2GraphLike) extends EdgeFetcher {
+  import org.apache.s2graph.core.TraversalHelper._
   val builder = graph.elementBuilder
   var fastText: FastText = _
 
@@ -49,16 +50,19 @@ class FastTextFetcher(val graph: S2GraphLike) extends 
EdgeFetcher {
     val stepResultLs = queryRequests.map { queryRequest =>
       val vertex = queryRequest.vertex
       val queryParam = queryRequest.queryParam
+      val shouldBuildParents = queryRequest.query.queryOption.returnTree || 
queryParam.whereHasParent
+      val parentEdges = if (shouldBuildParents) 
prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil
+
       val line = fastText.getLine(vertex.innerId.toIdString())
 
-      val edgeWithScores = fastText.predict(line, queryParam.limit).map { case 
(_label, score) =>
+      val edgeWithScores = fastText.predict(line, queryParam.limit).flatMap { 
case (_label, score) =>
         val tgtVertexId = builder.newVertexId(queryParam.label.service,
           queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), 
_label)
 
         val props: Map[String, Any] = if 
(queryParam.label.metaPropsInvMap.contains("score")) Map("score" -> score) else 
Map.empty
         val edge = graph.toEdge(vertex.innerId.value, 
tgtVertexId.innerId.value, queryParam.labelName, queryParam.direction, props = 
props)
 
-        EdgeWithScore(edge, score, queryParam.label)
+        edgeToEdgeWithScore(queryRequest, edge, parentEdges)
       }
 
       StepResult(edgeWithScores, Nil, Nil)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3355701f/s2core/src/main/scala/org/apache/s2graph/core/fetcher/tensorflow/InceptionFetcher.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/tensorflow/InceptionFetcher.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/tensorflow/InceptionFetcher.scala
index aa05a31..83fab98 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/tensorflow/InceptionFetcher.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/tensorflow/InceptionFetcher.scala
@@ -63,6 +63,7 @@ class InceptionFetcher(graph: S2GraphLike) extends 
EdgeFetcher {
   import InceptionFetcher._
 
   import scala.collection.JavaConverters._
+  import org.apache.s2graph.core.TraversalHelper._
   val builder = graph.elementBuilder
 
   var graphDef: Array[Byte] = _
@@ -81,16 +82,19 @@ class InceptionFetcher(graph: S2GraphLike) extends 
EdgeFetcher {
     val stepResultLs = queryRequests.map { queryRequest =>
       val vertex = queryRequest.vertex
       val queryParam = queryRequest.queryParam
+      val shouldBuildParents = queryRequest.query.queryOption.returnTree || 
queryParam.whereHasParent
+      val parentEdges = if (shouldBuildParents) 
prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil
+
       val urlText = vertex.innerId.toIdString()
 
-      val edgeWithScores = predict(graphDef, labels)(getImageBytes(urlText), 
queryParam.limit).map { case (label, score) =>
+      val edgeWithScores = predict(graphDef, labels)(getImageBytes(urlText), 
queryParam.limit).flatMap { case (label, score) =>
         val tgtVertexId = builder.newVertexId(queryParam.label.service,
           queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), 
label)
 
         val props: Map[String, Any] = if 
(queryParam.label.metaPropsInvMap.contains("score")) Map("score" -> score) else 
Map.empty
         val edge = graph.toEdge(vertex.innerId.value, 
tgtVertexId.innerId.value, queryParam.labelName, queryParam.direction, props = 
props)
 
-        EdgeWithScore(edge, score, queryParam.label)
+        edgeToEdgeWithScore(queryRequest, edge, parentEdges)
       }
 
       StepResult(edgeWithScores, Nil, Nil)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3355701f/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
index 2b02bdd..9abd80d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
@@ -27,6 +27,8 @@ import org.apache.s2graph.core.parsers.WhereParser
 import org.apache.s2graph.core.utils.logger
 
 class StorageIO(val graph: S2GraphLike, val serDe: StorageSerDe) {
+  import TraversalHelper._
+
   val dummyCursor: Array[Byte] = Array.empty
 
   /** Parsing Logic: parse from kv from Storage into Edge */
@@ -86,17 +88,15 @@ class StorageIO(val graph: S2GraphLike, val serDe: 
StorageSerDe) {
                                startOffset: Int = 0,
                                len: Int = Int.MaxValue): StepResult = {
 
+
     val toSKeyValue = implicitly[CanSKeyValue[K]].toSKeyValue _
 
     if (kvs.isEmpty) StepResult.Empty.copy(cursors = Seq(dummyCursor))
     else {
       val queryOption = queryRequest.query.queryOption
       val queryParam = queryRequest.queryParam
-      val labelWeight = queryRequest.labelWeight
-      val nextStepOpt = queryRequest.nextStepOpt
-      val where = queryParam.where.get
       val label = queryParam.label
-      val isDefaultTransformer = queryParam.edgeTransformer.isDefault
+
       val first = kvs.head
       val kv = first
       val schemaVer = queryParam.label.schemaVersion
@@ -114,41 +114,19 @@ class StorageIO(val graph: S2GraphLike, val serDe: 
StorageSerDe) {
 
       val lastCursor: Seq[Array[Byte]] = Seq(if (keyValues.nonEmpty) 
toSKeyValue(keyValues(keyValues.length - 1)).row else dummyCursor)
 
+      val edgeWithScores = for {
+        (kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < 
startOffset + len
+        edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, 
queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, 
cacheElementOpt, parentEdges)).toSeq
+        edgeWithScore <- edgeToEdgeWithScore(queryRequest, edge, parentEdges)
+      } yield {
+        edgeWithScore
+      }
+
       if (!queryOption.ignorePrevStepCache) {
-        val edgeWithScores = for {
-          (kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < 
startOffset + len
-          edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, 
queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, 
cacheElementOpt, parentEdges)).toSeq
-          if where == WhereParser.success || where.filter(edge)
-          convertedEdge <- if (isDefaultTransformer) Seq(edge) else 
convertEdges(queryParam, edge, nextStepOpt)
-        } yield {
-          val score = queryParam.rank.score(edge)
-          EdgeWithScore(convertedEdge, score, label)
-        }
         StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges 
= degreeEdges, cursors = lastCursor)
       } else {
-        val degreeScore = 0.0
-
-        val edgeWithScores = for {
-          (kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < 
startOffset + len
-          edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, 
queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, 
cacheElementOpt, parentEdges)).toSeq
-          if where == WhereParser.success || where.filter(edge)
-          convertedEdge <- if (isDefaultTransformer) Seq(edge) else 
convertEdges(queryParam, edge, nextStepOpt)
-        } yield {
-          val edgeScore = queryParam.rank.score(edge)
-          val score = queryParam.scorePropagateOp match {
-            case "plus" => edgeScore + prevScore
-            case "divide" =>
-              if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0
-              else edgeScore / (prevScore + queryParam.scorePropagateShrinkage)
-            case _ => edgeScore * prevScore
-          }
-          val tsVal = processTimeDecay(queryParam, edge)
-          val newScore = degreeScore + score
-          EdgeWithScore(convertedEdge.copyParentEdges(parentEdges), score = 
newScore * labelWeight * tsVal, label = label)
-        }
-
         val sampled =
-          if (queryRequest.queryParam.sample >= 0) sample(queryRequest, 
edgeWithScores, queryRequest.queryParam.sample)
+          if (queryRequest.queryParam.sample >= 0) sample(edgeWithScores, 
queryParam.offset, queryParam.sample)
           else edgeWithScores
 
         val normalized = if (queryParam.shouldNormalize) normalize(sampled) 
else sampled

Reply via email to