bug fix on ModelManager.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/1c222d65 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/1c222d65 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/1c222d65 Branch: refs/heads/master Commit: 1c222d650e025c6035b1957660bc0291921e48b6 Parents: 60c935e Author: DO YUNG YOON <steams...@apache.org> Authored: Fri May 4 18:26:17 2018 +0900 Committer: DO YUNG YOON <steams...@apache.org> Committed: Fri May 4 19:00:10 2018 +0900 ---------------------------------------------------------------------- .../s2graph/core/model/ModelManager.scala | 1 - .../s2graph/core/model/fasttext/FastText.scala | 1 + .../core/model/fasttext/FastTextFetcher.scala | 22 ++++++++-- .../model/fasttext/FastTextFetcherTest.scala | 43 ++++++++++++-------- .../custom/process/ALSModelProcessTest.scala | 3 +- 5 files changed, 49 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1c222d65/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala b/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala index 97dd591..bae0780 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala @@ -59,7 +59,6 @@ class ModelManager(s2GraphLike: S2GraphLike) { initFetcher(config.getConfig("fetcher")).map { fetcher => importer.setStatus(true) - Label.updateOption(label.label, "") fetcherPool .remove(k) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1c222d65/s2core/src/main/scala/org/apache/s2graph/core/model/fasttext/FastText.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/fasttext/FastText.scala b/s2core/src/main/scala/org/apache/s2graph/core/model/fasttext/FastText.scala index b5d10a9..51d588a 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/model/fasttext/FastText.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/model/fasttext/FastText.scala @@ -188,6 +188,7 @@ class FastText(name: String) extends AutoCloseable { def close(): Unit = { handles.asScala.foreach(_.close()) + dbOptions.close() db.close() } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1c222d65/s2core/src/main/scala/org/apache/s2graph/core/model/fasttext/FastTextFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/fasttext/FastTextFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/model/fasttext/FastTextFetcher.scala index 774d784..d553580 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/model/fasttext/FastTextFetcher.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/model/fasttext/FastTextFetcher.scala @@ -3,6 +3,7 @@ package org.apache.s2graph.core.model.fasttext import com.typesafe.config.Config import org.apache.s2graph.core._ import org.apache.s2graph.core.types.VertexId +import org.apache.s2graph.core.utils.logger import scala.concurrent.{ExecutionContext, Future} @@ -12,13 +13,28 @@ class FastTextFetcher(val graph: S2GraphLike) extends Fetcher { var fastText: FastText = _ override def init(config: Config)(implicit ec: ExecutionContext): Future[Fetcher] = { - Future { + val future = Future { val dbPath = config.getString(FastText.DBPathKey) - fastText = new FastText(dbPath) - + try { + fastText = new FastText(dbPath) + } catch { + case e: Throwable => + logger.error(s"[Init]: Failed.", e) + println(e) + throw e + } this } + + future.onFailure { + case e: Exception => + logger.error(s"[Init]: Failed.", e) + println(e) + throw e + } + + future } override def fetches(queryRequests: Seq[QueryRequest], http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1c222d65/s2core/src/test/scala/org/apache/s2graph/core/model/fasttext/FastTextFetcherTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/model/fasttext/FastTextFetcherTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/model/fasttext/FastTextFetcherTest.scala index 7077e52..34c7e8f 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/model/fasttext/FastTextFetcherTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/model/fasttext/FastTextFetcherTest.scala @@ -3,6 +3,7 @@ package org.apache.s2graph.core.model.fasttext import com.typesafe.config.ConfigFactory import org.apache.s2graph.core.Integrate.IntegrateCommon import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} +import org.apache.s2graph.core.model.ModelManager import org.apache.s2graph.core.{Query, QueryParam, QueryRequest} import org.apache.s2graph.core.schema.Label @@ -15,9 +16,6 @@ class FastTextFetcherTest extends IntegrateCommon { test("FastTextFetcher init test.") { val modelPath = "./emoji" - val config = ConfigFactory.parseMap(Map(FastText.DBPathKey -> modelPath).asJava) - val fetcher = new FastTextFetcher(graph) - Await.ready(fetcher.init(config)(ExecutionContext.Implicits.global), Duration("3 minutes")) val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get val emojiColumn = @@ -27,9 +25,19 @@ class FastTextFetcherTest extends IntegrateCommon { management.createServiceColumn("s2graph", "sentence", "string", Nil) val labelName = "sentence_emoji" + val options = s"""{ + | + | "importer": { + | "${ModelManager.ImporterClassNameKey}": "org.apache.s2graph.core.model.IdentityImporter" + | }, + | "fetcher": { + | "${ModelManager.FetcherClassNameKey}": "org.apache.s2graph.core.model.fasttext.FastTextFetcher", + | "${FastText.DBPathKey}": "${modelPath}" + | } + |}""".stripMargin - Label.findByName(labelName, useCache = false).foreach { label => Label.delete(label.id.get) } + Label.findByName(labelName, useCache = false).foreach { label => Label.delete(label.id.get) } val label = management.createLabel( labelName, sentenceColumn , @@ -43,25 +51,28 @@ class FastTextFetcherTest extends IntegrateCommon { -1, "v3", "gz", - "" + options ) + + val importer = Await.result(management.importModel(labelName, options), Duration("1 minutes")) + while (!importer.isFinished) { + Thread.sleep(1000) + } + + val vertex = graph.elementBuilder.toVertex(service.serviceName, sentenceColumn.columnName, "íë¬ì´") val queryParam = QueryParam(labelName = labelName, limit = 5) val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = Seq(queryParam)) - val queryRequests = Seq( - QueryRequest(query, 0, vertex, queryParam) - ) - val future = fetcher.fetches(queryRequests, Map.empty) - val results = Await.result(future, Duration("10 seconds")) - results.foreach { stepResult => - stepResult.edgeWithScores.foreach { es => - val Array(itemId, resourceId) = es.edge.tgtVertex.innerIdVal.toString.replace("__label__", "").split("_") - val text = String.format("http://item.kakaocdn.net/dw/%s.thum_%03d.png", itemId, Int.box(resourceId.toInt)) + val future = graph.getEdges(query) + val stepResult = Await.result(future, Duration("10 seconds")) + + stepResult.edgeWithScores.foreach { es => + val Array(itemId, resourceId) = es.edge.tgtVertex.innerIdVal.toString.replace("__label__", "").split("_") + val text = String.format("http://item.kakaocdn.net/dw/%s.thum_%03d.png", itemId, Int.box(resourceId.toInt)) - println(text) - } + println(text) } } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1c222d65/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala index 4d2623e..bd14352 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala @@ -7,8 +7,9 @@ import com.typesafe.config.ConfigFactory import org.apache.commons.io.FileUtils import org.apache.s2graph.core.Integrate.IntegrateCommon import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} +import org.apache.s2graph.core.model.annoy.AnnoyModelFetcher import org.apache.s2graph.core.{Query, QueryParam} -import org.apache.s2graph.core.model.{ANNIndexWithDict, HDFSImporter, ModelManager} +import org.apache.s2graph.core.model.ModelManager import org.apache.s2graph.core.schema.Label import org.apache.s2graph.s2jobs.task.TaskConf