Repository: incubator-s2graph Updated Branches: refs/heads/master 70a7c71a3 -> 4f1c4ceb6
Extract Where and EdgeTransformer to TraversalHelper. Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/3355701f Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/3355701f Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/3355701f Branch: refs/heads/master Commit: 3355701f00b34d5e1609e8424d09b50e8211e87a Parents: 32eb344 Author: DO YUNG YOON <steams...@apache.org> Authored: Thu Jun 14 18:14:22 2018 +0900 Committer: DO YUNG YOON <steams...@apache.org> Committed: Thu Jun 14 18:14:22 2018 +0900 ---------------------------------------------------------------------- .../apache/s2graph/core/TraversalHelper.scala | 42 +++++++++++++- .../core/fetcher/annoy/AnnoyModelFetcher.scala | 59 ++------------------ .../core/fetcher/fasttext/FastTextFetcher.scala | 8 ++- .../fetcher/tensorflow/InceptionFetcher.scala | 8 ++- .../apache/s2graph/core/storage/StorageIO.scala | 48 +++++----------- 5 files changed, 69 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3355701f/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala index 0a4a49b..ba18e8d 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala @@ -22,6 +22,7 @@ package org.apache.s2graph.core import java.util import org.apache.s2graph.core.S2Graph.{FilterHashKey, HashKey} +import org.apache.s2graph.core.parsers.WhereParser import org.apache.s2graph.core.schema.LabelMeta import org.apache.s2graph.core.types.{HBaseType, InnerVal, LabelWithDirection, VertexId} import org.apache.s2graph.core.utils.{Extensions, logger} @@ -39,11 +40,11 @@ object TraversalHelper { else randomInt(sampleNumber, range, set + Random.nextInt(range)) } - def sample(queryRequest: QueryRequest, edges: Seq[EdgeWithScore], n: Int): Seq[EdgeWithScore] = { + def sample(edges: Seq[EdgeWithScore], offset: Int, n: Int): Seq[EdgeWithScore] = { if (edges.size <= n) { edges } else { - val plainEdges = if (queryRequest.queryParam.offset == 0) { + val plainEdges = if (offset == 0) { edges.tail } else edges @@ -141,6 +142,41 @@ object TraversalHelper { (hashKey, filterHashKey) } + + def edgeToEdgeWithScore(queryRequest: QueryRequest, + edge: S2EdgeLike, + parentEdges: Seq[EdgeWithScore]): Seq[EdgeWithScore] = { + val prevScore = queryRequest.prevStepScore + val queryParam = queryRequest.queryParam + val queryOption = queryRequest.query.queryOption + val nextStepOpt = queryRequest.nextStepOpt + val labelWeight = queryRequest.labelWeight + val where = queryParam.where.get + val isDefaultTransformer = queryParam.edgeTransformer.isDefault + + if (where != WhereParser.success && !where.filter(edge)) Nil + else { + val edges = if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt) + edges.map { e => + if (!queryOption.ignorePrevStepCache) { + EdgeWithScore(e, queryParam.rank.score(edge), queryParam.label) + } else { + val edgeScore = queryParam.rank.score(edge) + val score = queryParam.scorePropagateOp match { + case "plus" => edgeScore + prevScore + case "divide" => + if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0 + else edgeScore / (prevScore + queryParam.scorePropagateShrinkage) + case _ => edgeScore * prevScore + } + val tsVal = processTimeDecay(queryParam, edge) + + EdgeWithScore(e.copyParentEdges(parentEdges), score = score * labelWeight * tsVal, label = queryParam.label) + } + } + } + } + } @@ -481,7 +517,7 @@ class TraversalHelper(graph: S2GraphLike) { val degreeScore = 0.0 val sampled = - if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample) + if (queryRequest.queryParam.sample >= 0) sample(edgeWithScores, queryParam.offset, queryParam.sample) else edgeWithScores val withScores = for { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3355701f/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala index cdf3b71..6c00ca0 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala @@ -29,74 +29,23 @@ import scala.concurrent.{ExecutionContext, Future} object AnnoyModelFetcher { val IndexFilePathKey = "annoyIndexFilePath" -// val DictFilePathKey = "annoyDictFilePath" val DimensionKey = "annoyIndexDimension" val IndexTypeKey = "annoyIndexType" - // def loadDictFromLocal(file: File): Map[Int, String] = { - // val files = if (file.isDirectory) { - // file.listFiles() - // } else { - // Array(file) - // } - // - // files.flatMap { file => - // Source.fromFile(file).getLines().zipWithIndex.flatMap { case (line, _idx) => - // val tokens = line.stripMargin.split(",") - // try { - // val tpl = if (tokens.length < 2) { - // (tokens.head.toInt, tokens.head) - // } else { - // (tokens.head.toInt, tokens.tail.head) - // } - // Seq(tpl) - // } catch { - // case e: Exception => Nil - // } - // } - // }.toMap - // } - def buildAnnoy4s[T](indexPath: String)(implicit converter: KeyConverter[T]): Annoy[T] = { Annoy.load[T](indexPath) } - - // def buildIndex(indexPath: String, - // dictPath: String, - // dimension: Int, - // indexType: IndexType): ANNIndexWithDict = { - // val dict = loadDictFromLocal(new File(dictPath)) - // val index = new ANNIndex(dimension, indexPath, indexType) - // - // ANNIndexWithDict(index, dict) - // } - // - // def buildIndex(config: Config): ANNIndexWithDict = { - // val indexPath = config.getString(IndexFilePathKey) - // val dictPath = config.getString(DictFilePathKey) - // - // val dimension = config.getInt(DimensionKey) - // val indexType = Try { config.getString(IndexTypeKey) }.toOption.map(IndexType.valueOf).getOrElse(IndexType.ANGULAR) - // - // buildIndex(indexPath, dictPath, dimension, indexType) - // } } -// -//case class ANNIndexWithDict(index: ANNIndex, dict: Map[Int, String]) { -// val dictRev = dict.map(kv => kv._2 -> kv._1) -//} - class AnnoyModelFetcher(val graph: S2GraphLike) extends EdgeFetcher { import AnnoyModelFetcher._ + import TraversalHelper._ val builder = graph.elementBuilder - // var model: ANNIndexWithDict = _ var model: Annoy[String] = _ override def init(config: Config)(implicit ec: ExecutionContext): Unit = { - model = AnnoyModelFetcher.buildAnnoy4s(config.getString(IndexFilePathKey)) } @@ -106,15 +55,17 @@ class AnnoyModelFetcher(val graph: S2GraphLike) extends EdgeFetcher { val stepResultLs = queryRequests.map { queryRequest => val vertex = queryRequest.vertex val queryParam = queryRequest.queryParam + val shouldBuildParents = queryRequest.query.queryOption.returnTree || queryParam.whereHasParent + val parentEdges = if (shouldBuildParents) prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil - val edgeWithScores = model.query(vertex.innerId.toIdString(), queryParam.limit).getOrElse(Nil).map { case (tgtId, score) => + val edgeWithScores = model.query(vertex.innerId.toIdString(), queryParam.limit).getOrElse(Nil).flatMap { case (tgtId, score) => val tgtVertexId = builder.newVertexId(queryParam.label.service, queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), tgtId) val props: Map[String, Any] = if (queryParam.label.metaPropsInvMap.contains("score")) Map("score" -> score) else Map.empty val edge = graph.toEdge(vertex.innerId.value, tgtVertexId.innerId.value, queryParam.labelName, queryParam.direction, props = props) - EdgeWithScore(edge, score, queryParam.label) + edgeToEdgeWithScore(queryRequest, edge, parentEdges) } StepResult(edgeWithScores, Nil, Nil) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3355701f/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala index 433b4dc..a07ed07 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala @@ -28,6 +28,7 @@ import scala.concurrent.{ExecutionContext, Future} class FastTextFetcher(val graph: S2GraphLike) extends EdgeFetcher { + import org.apache.s2graph.core.TraversalHelper._ val builder = graph.elementBuilder var fastText: FastText = _ @@ -49,16 +50,19 @@ class FastTextFetcher(val graph: S2GraphLike) extends EdgeFetcher { val stepResultLs = queryRequests.map { queryRequest => val vertex = queryRequest.vertex val queryParam = queryRequest.queryParam + val shouldBuildParents = queryRequest.query.queryOption.returnTree || queryParam.whereHasParent + val parentEdges = if (shouldBuildParents) prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil + val line = fastText.getLine(vertex.innerId.toIdString()) - val edgeWithScores = fastText.predict(line, queryParam.limit).map { case (_label, score) => + val edgeWithScores = fastText.predict(line, queryParam.limit).flatMap { case (_label, score) => val tgtVertexId = builder.newVertexId(queryParam.label.service, queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), _label) val props: Map[String, Any] = if (queryParam.label.metaPropsInvMap.contains("score")) Map("score" -> score) else Map.empty val edge = graph.toEdge(vertex.innerId.value, tgtVertexId.innerId.value, queryParam.labelName, queryParam.direction, props = props) - EdgeWithScore(edge, score, queryParam.label) + edgeToEdgeWithScore(queryRequest, edge, parentEdges) } StepResult(edgeWithScores, Nil, Nil) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3355701f/s2core/src/main/scala/org/apache/s2graph/core/fetcher/tensorflow/InceptionFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/tensorflow/InceptionFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/tensorflow/InceptionFetcher.scala index aa05a31..83fab98 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/tensorflow/InceptionFetcher.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/tensorflow/InceptionFetcher.scala @@ -63,6 +63,7 @@ class InceptionFetcher(graph: S2GraphLike) extends EdgeFetcher { import InceptionFetcher._ import scala.collection.JavaConverters._ + import org.apache.s2graph.core.TraversalHelper._ val builder = graph.elementBuilder var graphDef: Array[Byte] = _ @@ -81,16 +82,19 @@ class InceptionFetcher(graph: S2GraphLike) extends EdgeFetcher { val stepResultLs = queryRequests.map { queryRequest => val vertex = queryRequest.vertex val queryParam = queryRequest.queryParam + val shouldBuildParents = queryRequest.query.queryOption.returnTree || queryParam.whereHasParent + val parentEdges = if (shouldBuildParents) prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil + val urlText = vertex.innerId.toIdString() - val edgeWithScores = predict(graphDef, labels)(getImageBytes(urlText), queryParam.limit).map { case (label, score) => + val edgeWithScores = predict(graphDef, labels)(getImageBytes(urlText), queryParam.limit).flatMap { case (label, score) => val tgtVertexId = builder.newVertexId(queryParam.label.service, queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), label) val props: Map[String, Any] = if (queryParam.label.metaPropsInvMap.contains("score")) Map("score" -> score) else Map.empty val edge = graph.toEdge(vertex.innerId.value, tgtVertexId.innerId.value, queryParam.labelName, queryParam.direction, props = props) - EdgeWithScore(edge, score, queryParam.label) + edgeToEdgeWithScore(queryRequest, edge, parentEdges) } StepResult(edgeWithScores, Nil, Nil) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3355701f/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala index 2b02bdd..9abd80d 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala @@ -27,6 +27,8 @@ import org.apache.s2graph.core.parsers.WhereParser import org.apache.s2graph.core.utils.logger class StorageIO(val graph: S2GraphLike, val serDe: StorageSerDe) { + import TraversalHelper._ + val dummyCursor: Array[Byte] = Array.empty /** Parsing Logic: parse from kv from Storage into Edge */ @@ -86,17 +88,15 @@ class StorageIO(val graph: S2GraphLike, val serDe: StorageSerDe) { startOffset: Int = 0, len: Int = Int.MaxValue): StepResult = { + val toSKeyValue = implicitly[CanSKeyValue[K]].toSKeyValue _ if (kvs.isEmpty) StepResult.Empty.copy(cursors = Seq(dummyCursor)) else { val queryOption = queryRequest.query.queryOption val queryParam = queryRequest.queryParam - val labelWeight = queryRequest.labelWeight - val nextStepOpt = queryRequest.nextStepOpt - val where = queryParam.where.get val label = queryParam.label - val isDefaultTransformer = queryParam.edgeTransformer.isDefault + val first = kvs.head val kv = first val schemaVer = queryParam.label.schemaVersion @@ -114,41 +114,19 @@ class StorageIO(val graph: S2GraphLike, val serDe: StorageSerDe) { val lastCursor: Seq[Array[Byte]] = Seq(if (keyValues.nonEmpty) toSKeyValue(keyValues(keyValues.length - 1)).row else dummyCursor) + val edgeWithScores = for { + (kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < startOffset + len + edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, cacheElementOpt, parentEdges)).toSeq + edgeWithScore <- edgeToEdgeWithScore(queryRequest, edge, parentEdges) + } yield { + edgeWithScore + } + if (!queryOption.ignorePrevStepCache) { - val edgeWithScores = for { - (kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < startOffset + len - edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, cacheElementOpt, parentEdges)).toSeq - if where == WhereParser.success || where.filter(edge) - convertedEdge <- if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt) - } yield { - val score = queryParam.rank.score(edge) - EdgeWithScore(convertedEdge, score, label) - } StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = degreeEdges, cursors = lastCursor) } else { - val degreeScore = 0.0 - - val edgeWithScores = for { - (kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < startOffset + len - edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, cacheElementOpt, parentEdges)).toSeq - if where == WhereParser.success || where.filter(edge) - convertedEdge <- if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt) - } yield { - val edgeScore = queryParam.rank.score(edge) - val score = queryParam.scorePropagateOp match { - case "plus" => edgeScore + prevScore - case "divide" => - if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0 - else edgeScore / (prevScore + queryParam.scorePropagateShrinkage) - case _ => edgeScore * prevScore - } - val tsVal = processTimeDecay(queryParam, edge) - val newScore = degreeScore + score - EdgeWithScore(convertedEdge.copyParentEdges(parentEdges), score = newScore * labelWeight * tsVal, label = label) - } - val sampled = - if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample) + if (queryRequest.queryParam.sample >= 0) sample(edgeWithScores, queryParam.offset, queryParam.sample) else edgeWithScores val normalized = if (queryParam.shouldNormalize) normalize(sampled) else sampled