Merge branch 'master' into S2GRAPH-206
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/5ee1906f Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/5ee1906f Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/5ee1906f Branch: refs/heads/master Commit: 5ee1906fcfa127d8975e365b15ee07a57082fabf Parents: 88eb052 16feda8 Author: DO YUNG YOON <steams...@apache.org> Authored: Thu May 10 21:24:54 2018 +0900 Committer: DO YUNG YOON <steams...@apache.org> Committed: Fri May 11 14:34:12 2018 +0900 ---------------------------------------------------------------------- dev_support/graph_mysql/schema.sql | 1 + example/run.sh | 14 +- .../org/apache/s2graph/core/schema/schema.sql | 1 + .../org/apache/s2graph/core/EdgeFetcher.scala | 38 + .../org/apache/s2graph/core/EdgeMutator.scala | 45 + .../scala/org/apache/s2graph/core/Fetcher.scala | 17 - .../org/apache/s2graph/core/GraphUtil.scala | 7 + .../org/apache/s2graph/core/Management.scala | 71 +- .../scala/org/apache/s2graph/core/Mutator.scala | 22 - .../apache/s2graph/core/ResourceManager.scala | 134 + .../scala/org/apache/s2graph/core/S2Graph.scala | 105 +- .../apache/s2graph/core/S2GraphFactory.scala | 11 +- .../org/apache/s2graph/core/S2GraphLike.scala | 49 +- .../apache/s2graph/core/TraversalHelper.scala | 6 +- .../org/apache/s2graph/core/VertexFetcher.scala | 35 + .../org/apache/s2graph/core/VertexMutator.scala | 33 + .../core/fetcher/MemoryModelEdgeFetcher.scala | 62 + .../core/fetcher/annoy/AnnoyModelFetcher.scala | 115 + .../core/fetcher/fasttext/CopyModel.scala | 121 + .../core/fetcher/fasttext/FastText.scala | 194 + .../core/fetcher/fasttext/FastTextArgs.scala | 116 + .../core/fetcher/fasttext/FastTextFetcher.scala | 56 + .../apache/s2graph/core/io/Conversions.scala | 6 +- .../s2graph/core/model/ImportStatus.scala | 40 - .../apache/s2graph/core/model/Importer.scala | 103 - .../s2graph/core/model/MemoryModelFetcher.scala | 41 - .../s2graph/core/model/ModelManager.scala | 85 - .../core/model/annoy/AnnoyModelFetcher.scala | 115 - .../s2graph/core/model/fasttext/CopyModel.scala | 122 - .../s2graph/core/model/fasttext/FastText.scala | 195 - .../core/model/fasttext/FastTextArgs.scala | 119 - .../core/model/fasttext/FastTextFetcher.scala | 64 - .../s2graph/core/rest/RequestParser.scala | 8 +- .../org/apache/s2graph/core/schema/Label.scala | 40 +- .../s2graph/core/schema/ServiceColumn.scala | 47 +- .../storage/DefaultOptimisticEdgeMutator.scala | 176 + .../core/storage/DefaultOptimisticMutator.scala | 171 - .../DefaultOptimisticVertexMutator.scala | 44 + .../core/storage/OptimisticEdgeFetcher.scala | 56 + .../core/storage/OptimisticMutator.scala | 23 +- .../apache/s2graph/core/storage/Storage.scala | 68 +- .../s2graph/core/storage/StorageReadable.scala | 97 - .../storage/WriteWriteConflictResolver.scala | 6 +- .../storage/hbase/AsynchbaseEdgeFetcher.scala | 147 + .../hbase/AsynchbaseOptimisticEdgeFetcher.scala | 35 + .../hbase/AsynchbaseOptimisticMutator.scala | 142 + .../core/storage/hbase/AsynchbaseStorage.scala | 185 +- .../hbase/AsynchbaseStorageReadable.scala | 367 - .../hbase/AsynchbaseStorageWritable.scala | 142 - .../storage/hbase/AsynchbaseVertexFetcher.scala | 87 + .../core/storage/rocks/RocksEdgeFetcher.scala | 91 + .../rocks/RocksOptimisticEdgeFetcher.scala | 41 + .../storage/rocks/RocksOptimisticMutator.scala | 133 + .../core/storage/rocks/RocksStorage.scala | 101 +- .../storage/rocks/RocksStorageReadable.scala | 234 - .../storage/rocks/RocksStorageWritable.scala | 133 - .../core/storage/rocks/RocksVertexFetcher.scala | 114 + .../s2graph/core/utils/ImportStatus.scala | 59 + .../apache/s2graph/core/utils/Importer.scala | 122 + .../s2graph/core/utils/SafeUpdateCache.scala | 11 +- s2core/src/test/resources/item_factors.ann | Bin 137800 -> 0 bytes s2core/src/test/resources/movie.dict | 1682 - s2core/src/test/resources/test-index.dict | 100000 ---------------- s2core/src/test/resources/test-index.tree | Bin 18824 -> 0 bytes s2core/src/test/resources/user_factors.ann | Bin 76804 -> 0 bytes .../s2graph/core/TestCommonWithModels.scala | 14 +- .../s2graph/core/fetcher/EdgeFetcherTest.scala | 89 + .../apache/s2graph/core/model/FetcherTest.scala | 145 - .../s2graph/core/model/HDFSImporterTest.scala | 96 - .../model/fasttext/FastTextFetcherTest.scala | 78 - .../core/tinkerpop/S2GraphProvider.scala | 73 +- .../core/tinkerpop/structure/S2GraphTest.scala | 6 +- .../apache/s2graph/graphql/GraphQLServer.scala | 8 +- .../org/apache/s2graph/graphql/HttpServer.scala | 4 +- .../graphql/repository/GraphRepository.scala | 2 +- .../apache/s2graph/s2jobs/BaseSparkTest.scala | 13 +- .../custom/process/ALSModelProcessTest.scala | 200 +- 77 files changed, 2982 insertions(+), 104451 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/example/run.sh ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --cc s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala index 4b2274a,004b6e8..651323f --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@@ -161,6 -159,30 +159,35 @@@ object S2Graph } ConfigFactory.parseMap(kvs) } + + def initMutators(graph: S2GraphLike): Unit = { + val management = graph.management + + ServiceColumn.findAll().foreach { column => + management.updateVertexMutator(column, column.options) + } + + Label.findAll().foreach { label => + management.updateEdgeMutator(label, label.options) + } + } + + def initFetchers(graph: S2GraphLike): Unit = { + val management = graph.management + + ServiceColumn.findAll().foreach { column => + management.updateVertexFetcher(column, column.options) + } + + Label.findAll().foreach { label => + management.updateEdgeFetcher(label, label.options) + } + } ++ ++ def loadFetchersAndMutators(graph: S2GraphLike): Unit = { ++ initFetchers(graph) ++ initMutators(graph) ++ } } class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2GraphLike { @@@ -250,22 -275,43 +277,42 @@@ storagePool.getOrElse(s"label:${label.label}", defaultStorage) } - //TODO: - override def getFetcher(column: ServiceColumn): Fetcher = { - getStorage(column.service).reader - + /* Currently, each getter on Fetcher and Mutator missing proper implementation + * Please discuss what is proper way to maintain resources here and provide + * right implementation(S2GRAPH-213). + * */ + override def getVertexFetcher(column: ServiceColumn): VertexFetcher = { + resourceManager.getOrElseUpdateVertexFetcher(column) + .getOrElse(defaultStorage.vertexFetcher) } - override def getFetcher(label: Label): Fetcher = { - if (label.fetchConfigExist) modelManager.getFetcher(label) - else getStorage(label).reader + override def getEdgeFetcher(label: Label): EdgeFetcher = { + resourceManager.getOrElseUpdateEdgeFetcher(label) + .getOrElse(defaultStorage.edgeFetcher) } - override def getMutator(column: ServiceColumn): Mutator = { - getStorage(column.service).mutator + override def getAllVertexFetchers(): Seq[VertexFetcher] = { + resourceManager.getAllVertexFetchers() } - override def getMutator(label: Label): Mutator = { - getStorage(label).mutator + override def getAllEdgeFetchers(): Seq[EdgeFetcher] = { + resourceManager.getAllEdgeFetchers() + } + + override def getVertexMutator(column: ServiceColumn): VertexMutator = { + resourceManager.getOrElseUpdateVertexMutator(column) + .getOrElse(defaultStorage.vertexMutator) + } + + override def getEdgeMutator(label: Label): EdgeMutator = { + resourceManager.getOrElseUpdateEdgeMutator(label) + .getOrElse(defaultStorage.edgeMutator) + } + + //TODO: + override def getOptimisticEdgeFetcher(label: Label): OptimisticEdgeFetcher = { + // getStorage(label).optimisticEdgeFetcher + null } //TODO: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala ---------------------------------------------------------------------- diff --cc s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala index 0000000,0000000..f46df13 new file mode 100644 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala @@@ -1,0 -1,0 +1,115 @@@ ++package org.apache.s2graph.core.fetcher.annoy ++ ++import annoy4s.Converters.KeyConverter ++import annoy4s._ ++import com.typesafe.config.Config ++import org.apache.s2graph.core._ ++import org.apache.s2graph.core.types.VertexId ++ ++import scala.concurrent.{ExecutionContext, Future} ++ ++object AnnoyModelFetcher { ++ val IndexFilePathKey = "annoyIndexFilePath" ++// val DictFilePathKey = "annoyDictFilePath" ++ val DimensionKey = "annoyIndexDimension" ++ val IndexTypeKey = "annoyIndexType" ++ ++ // def loadDictFromLocal(file: File): Map[Int, String] = { ++ // val files = if (file.isDirectory) { ++ // file.listFiles() ++ // } else { ++ // Array(file) ++ // } ++ // ++ // files.flatMap { file => ++ // Source.fromFile(file).getLines().zipWithIndex.flatMap { case (line, _idx) => ++ // val tokens = line.stripMargin.split(",") ++ // try { ++ // val tpl = if (tokens.length < 2) { ++ // (tokens.head.toInt, tokens.head) ++ // } else { ++ // (tokens.head.toInt, tokens.tail.head) ++ // } ++ // Seq(tpl) ++ // } catch { ++ // case e: Exception => Nil ++ // } ++ // } ++ // }.toMap ++ // } ++ ++ def buildAnnoy4s[T](indexPath: String)(implicit converter: KeyConverter[T]): Annoy[T] = { ++ Annoy.load[T](indexPath) ++ } ++ ++ // def buildIndex(indexPath: String, ++ // dictPath: String, ++ // dimension: Int, ++ // indexType: IndexType): ANNIndexWithDict = { ++ // val dict = loadDictFromLocal(new File(dictPath)) ++ // val index = new ANNIndex(dimension, indexPath, indexType) ++ // ++ // ANNIndexWithDict(index, dict) ++ // } ++ // ++ // def buildIndex(config: Config): ANNIndexWithDict = { ++ // val indexPath = config.getString(IndexFilePathKey) ++ // val dictPath = config.getString(DictFilePathKey) ++ // ++ // val dimension = config.getInt(DimensionKey) ++ // val indexType = Try { config.getString(IndexTypeKey) }.toOption.map(IndexType.valueOf).getOrElse(IndexType.ANGULAR) ++ // ++ // buildIndex(indexPath, dictPath, dimension, indexType) ++ // } ++} ++ ++// ++//case class ANNIndexWithDict(index: ANNIndex, dict: Map[Int, String]) { ++// val dictRev = dict.map(kv => kv._2 -> kv._1) ++//} ++ ++class AnnoyModelFetcher(val graph: S2GraphLike) extends EdgeFetcher { ++ import AnnoyModelFetcher._ ++ ++ val builder = graph.elementBuilder ++ ++ // var model: ANNIndexWithDict = _ ++ var model: Annoy[String] = _ ++ ++ override def init(config: Config)(implicit ec: ExecutionContext): Unit = { ++ ++ model = AnnoyModelFetcher.buildAnnoy4s(config.getString(IndexFilePathKey)) ++ } ++ ++ /** Fetch **/ ++ override def fetches(queryRequests: Seq[QueryRequest], ++ prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = { ++ val stepResultLs = queryRequests.map { queryRequest => ++ val vertex = queryRequest.vertex ++ val queryParam = queryRequest.queryParam ++ ++ val edgeWithScores = model.query(vertex.innerId.toIdString(), queryParam.limit).getOrElse(Nil).map { case (tgtId, score) => ++ val tgtVertexId = builder.newVertexId(queryParam.label.service, ++ queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), tgtId) ++ ++ val props: Map[String, Any] = if (queryParam.label.metaPropsInvMap.contains("score")) Map("score" -> score) else Map.empty ++ val edge = graph.toEdge(vertex.innerId.value, tgtVertexId.innerId.value, queryParam.labelName, queryParam.direction, props = props) ++ ++ EdgeWithScore(edge, score, queryParam.label) ++ } ++ ++ StepResult(edgeWithScores, Nil, Nil) ++ } ++ ++ Future.successful(stepResultLs) ++ } ++ ++ override def close(): Unit = { ++ // do clean up ++ model.close ++ } ++ ++ // not supported yet. ++ override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = ++ Future.successful(Nil) ++} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/CopyModel.scala ---------------------------------------------------------------------- diff --cc s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/CopyModel.scala index 0000000,0000000..d23b390 new file mode 100644 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/CopyModel.scala @@@ -1,0 -1,0 +1,121 @@@ ++package org.apache.s2graph.core.fetcher.fasttext ++ ++import java.io.{BufferedInputStream, FileInputStream, InputStream} ++import java.nio.{ByteBuffer, ByteOrder} ++import java.util ++ ++import org.apache.s2graph.core.fetcher.fasttext ++import org.rocksdb._ ++ ++import scala.collection.JavaConverters._ ++import scala.collection.mutable.ArrayBuffer ++ ++object CopyModel { ++ ++ def writeArgs(db: RocksDB, handle: ColumnFamilyHandle, args: fasttext.FastTextArgs): Unit = { ++ val wo = new WriteOptions().setDisableWAL(true).setSync(false) ++ db.put(handle, wo, "args".getBytes("UTF-8"), args.serialize) ++ wo.close() ++ println("done ") ++ } ++ ++ def writeVocab(is: InputStream, db: RocksDB, ++ vocabHandle: ColumnFamilyHandle, labelHandle: ColumnFamilyHandle, args: fasttext.FastTextArgs): Unit = { ++ val wo = new WriteOptions().setDisableWAL(true).setSync(false) ++ val bb = ByteBuffer.allocate(13).order(ByteOrder.LITTLE_ENDIAN) ++ val wb = new ArrayBuffer[Byte] ++ for (wid <- 0 until args.size) { ++ bb.clear() ++ wb.clear() ++ var b = is.read() ++ while (b != 0) { ++ wb += b.toByte ++ b = is.read() ++ } ++ bb.putInt(wid) ++ is.read(bb.array(), 4, 9) ++ db.put(vocabHandle, wo, wb.toArray, bb.array()) ++ ++ if (bb.get(12) == 1) { ++ val label = wid - args.nwords ++ db.put(labelHandle, ByteBuffer.allocate(4).putInt(label).array(), wb.toArray) ++ } ++ ++ if ((wid + 1) % 1000 == 0) ++ print(f"\rprocessing ${100 * (wid + 1) / args.size.toFloat}%.2f%%") ++ } ++ println("\rdone ") ++ wo.close() ++ } ++ ++ def writeVectors(is: InputStream, db: RocksDB, handle: ColumnFamilyHandle, args: fasttext.FastTextArgs): Unit = { ++ require(is.read() == 0, "not implemented") ++ val wo = new WriteOptions().setDisableWAL(true).setSync(false) ++ val bb = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN) ++ val key = ByteBuffer.allocate(8) ++ val value = new Array[Byte](args.dim * 4) ++ is.read(bb.array()) ++ val m = bb.getLong ++ val n = bb.getLong ++ require(n * 4 == value.length) ++ var i = 0L ++ while (i < m) { ++ key.clear() ++ key.putLong(i) ++ is.read(value) ++ db.put(handle, wo, key.array(), value) ++ if ((i + 1) % 1000 == 0) ++ print(f"\rprocessing ${100 * (i + 1) / m.toFloat}%.2f%%") ++ i += 1 ++ } ++ println("\rdone ") ++ wo.close() ++ } ++ ++ def printHelp(): Unit = { ++ println("usage: CopyModel <in> <out>") ++ } ++ ++ def copy(in: String, out: String): Unit = { ++ RocksDB.destroyDB(out, new Options) ++ ++ val dbOptions = new DBOptions() ++ .setCreateIfMissing(true) ++ .setCreateMissingColumnFamilies(true) ++ .setAllowMmapReads(false) ++ .setMaxOpenFiles(500000) ++ .setDbWriteBufferSize(134217728) ++ .setMaxBackgroundCompactions(20) ++ ++ val descriptors = new java.util.LinkedList[ColumnFamilyDescriptor]() ++ descriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY)) ++ descriptors.add(new ColumnFamilyDescriptor("vocab".getBytes())) ++ descriptors.add(new ColumnFamilyDescriptor("i".getBytes())) ++ descriptors.add(new ColumnFamilyDescriptor("o".getBytes())) ++ val handles = new util.LinkedList[ColumnFamilyHandle]() ++ val db = RocksDB.open(dbOptions, out, descriptors, handles) ++ ++ val is = new BufferedInputStream(new FileInputStream(in)) ++ val fastTextArgs = FastTextArgs.fromInputStream(is) ++ ++ require(fastTextArgs.magic == FastText.FASTTEXT_FILEFORMAT_MAGIC_INT32) ++ require(fastTextArgs.version == FastText.FASTTEXT_VERSION) ++ ++ println("step 1: writing args") ++ writeArgs(db, handles.get(0), fastTextArgs) ++ println("step 2: writing vocab") ++ writeVocab(is, db, handles.get(1), handles.get(0), fastTextArgs) ++ println("step 3: writing input vectors") ++ writeVectors(is, db, handles.get(2), fastTextArgs) ++ println("step 4: writing output vectors") ++ writeVectors(is, db, handles.get(3), fastTextArgs) ++ println("step 5: compactRange") ++ db.compactRange() ++ println("done") ++ ++ handles.asScala.foreach(_.close()) ++ db.close() ++ is.close() ++ } ++ ++} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastText.scala ---------------------------------------------------------------------- diff --cc s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastText.scala index 0000000,0000000..c465bd8 new file mode 100644 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastText.scala @@@ -1,0 -1,0 +1,194 @@@ ++package org.apache.s2graph.core.fetcher.fasttext ++ ++import java.nio.{ByteBuffer, ByteOrder} ++import java.util ++ ++import org.rocksdb.{ColumnFamilyDescriptor, ColumnFamilyHandle, DBOptions, RocksDB} ++ ++import scala.collection.JavaConverters._ ++import scala.collection.mutable.ArrayBuffer ++ ++case class Line(labels: Array[Int], words: Array[Long]) ++ ++case class Entry(wid: Int, count: Long, tpe: Byte, subwords: Array[Long]) ++ ++object FastText { ++ val EOS = "</s>" ++ val BOW = "<" ++ val EOW = ">" ++ ++ val FASTTEXT_VERSION = 12 // Version 1b ++ val FASTTEXT_FILEFORMAT_MAGIC_INT32 = 793712314 ++ ++ val MODEL_CBOW = 1 ++ val MODEL_SG = 2 ++ val MODEL_SUP = 3 ++ ++ val LOSS_HS = 1 ++ val LOSS_NS = 2 ++ val LOSS_SOFTMAX = 3 ++ ++ val DBPathKey = "dbPath" ++ ++ def tokenize(in: String): Array[String] = in.split("\\s+") ++ Array("</s>") ++ ++ def getSubwords(word: String, minn: Int, maxn: Int): Array[String] = { ++ val l = math.max(minn, 1) ++ val u = math.min(maxn, word.length) ++ val r = l to u flatMap word.sliding ++ r.filterNot(s => s == BOW || s == EOW).toArray ++ } ++ ++ def hash(str: String): Long = { ++ var h = 2166136261L.toInt ++ for (b <- str.getBytes) { ++ h = (h ^ b) * 16777619 ++ } ++ h & 0xffffffffL ++ } ++ ++} ++ ++class FastText(name: String) extends AutoCloseable { ++ ++ import FastText._ ++ ++ private val dbOptions = new DBOptions() ++ private val descriptors = new java.util.LinkedList[ColumnFamilyDescriptor]() ++ descriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY)) ++ descriptors.add(new ColumnFamilyDescriptor("vocab".getBytes())) ++ descriptors.add(new ColumnFamilyDescriptor("i".getBytes())) ++ descriptors.add(new ColumnFamilyDescriptor("o".getBytes())) ++ private val handles = new util.LinkedList[ColumnFamilyHandle]() ++ private val db = RocksDB.openReadOnly(dbOptions, name, descriptors, handles) ++ ++ private val defaultHandle = handles.get(0) ++ private val vocabHandle = handles.get(1) ++ private val inputVectorHandle = handles.get(2) ++ private val outputVectorHandle = handles.get(3) ++ ++ private val args = FastTextArgs.fromByteArray(db.get(defaultHandle, "args".getBytes("UTF-8"))) ++ private val wo = loadOutputVectors() ++ private val labels = loadLabels() ++ ++ println(args) ++ ++ require(args.magic == FASTTEXT_FILEFORMAT_MAGIC_INT32) ++ require(args.version == FASTTEXT_VERSION) ++ ++ // only sup/softmax supported ++ // others are the future work. ++ require(args.model == MODEL_SUP) ++ require(args.loss == LOSS_SOFTMAX) ++ ++ private def getVector(handle: ColumnFamilyHandle, key: Long): Array[Float] = { ++ val keyBytes = ByteBuffer.allocate(8).putLong(key).array() ++ val bb = ByteBuffer.wrap(db.get(handle, keyBytes)).order(ByteOrder.LITTLE_ENDIAN) ++ Array.fill(args.dim)(bb.getFloat) ++ } ++ ++ private def loadOutputVectors(): Array[Array[Float]] = ++ Array.tabulate(args.nlabels)(key => getVector(outputVectorHandle, key.toLong)) ++ ++ private def loadLabels(): Array[String] = { ++ val result = new Array[String](args.nlabels) ++ val it = db.newIterator(defaultHandle) ++ var i = 0 ++ it.seekToFirst() ++ while (it.isValid) { ++ val key = ByteBuffer.wrap(it.key()).getInt() ++ if (key < args.nlabels) { ++ require(i == key) ++ result(i) = new String(it.value(), "UTF-8") ++ i += 1 ++ } ++ it.next() ++ } ++ result ++ } ++ ++ def getInputVector(key: Long): Array[Float] = getVector(inputVectorHandle, key) ++ ++ def getOutputVector(key: Long): Array[Float] = getVector(outputVectorHandle, key) ++ ++ def getEntry(word: String): Entry = { ++ val raw = db.get(vocabHandle, word.getBytes("UTF-8")) ++ if (raw == null) { ++ Entry(-1, 0L, 1, Array.emptyLongArray) ++ } else { ++ val bb = ByteBuffer.wrap(raw).order(ByteOrder.LITTLE_ENDIAN) ++ val wid = bb.getInt ++ val count = bb.getLong ++ val tpe = bb.get ++ val subwords = if (word != EOS && tpe == 0) Array(wid.toLong) ++ computeSubwords(BOW + word + EOW) else Array(wid.toLong) ++ Entry(wid, count, tpe, subwords) ++ } ++ } ++ ++ def computeSubwords(word: String): Array[Long] = ++ getSubwords(word, args.minn, args.maxn).map { w => args.nwords + (hash(w) % args.bucket.toLong) } ++ ++ def getLine(in: String): Line = { ++ val tokens = tokenize(in) ++ val words = new ArrayBuffer[Long]() ++ val labels = new ArrayBuffer[Int]() ++ tokens foreach { token => ++ val Entry(wid, count, tpe, subwords) = getEntry(token) ++ if (tpe == 0) { ++ // addSubwords ++ if (wid < 0) { // OOV ++ if (token != EOS) { ++ words ++= computeSubwords(BOW + token + EOW) ++ } ++ } else { ++ words ++= subwords ++ } ++ } else if (tpe == 1 && wid > 0) { ++ labels += wid - args.nwords ++ } ++ } ++ Line(labels.toArray, words.toArray) ++ } ++ ++ def computeHidden(input: Array[Long]): Array[Float] = { ++ val hidden = new Array[Float](args.dim) ++ for (row <- input.map(getInputVector)) { ++ var i = 0 ++ while (i < hidden.length) { ++ hidden(i) += row(i) / input.length ++ i += 1 ++ } ++ } ++ hidden ++ } ++ ++ def predict(line: Line, k: Int = 1): Array[(String, Float)] = { ++ val hidden = computeHidden(line.words) ++ val output = wo.map { o => ++ o.zip(hidden).map(a => a._1 * a._2).sum ++ } ++ val max = output.max ++ var i = 0 ++ var z = 0.0f ++ while (i < output.length) { ++ output(i) = math.exp((output(i) - max).toDouble).toFloat ++ z += output(i) ++ i += 1 ++ } ++ i = 0 ++ while (i < output.length) { ++ output(i) /= z ++ i += 1 ++ } ++ output.zipWithIndex.sortBy(-_._1).take(k).map { case (prob, i) => ++ labels(i) -> prob ++ } ++ } ++ ++ def close(): Unit = { ++ handles.asScala.foreach(_.close()) ++ dbOptions.close() ++ db.close() ++ } ++ ++} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextArgs.scala ---------------------------------------------------------------------- diff --cc s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextArgs.scala index 0000000,0000000..0ad0b15 new file mode 100644 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextArgs.scala @@@ -1,0 -1,0 +1,116 @@@ ++package org.apache.s2graph.core.fetcher.fasttext ++ ++import java.io.{ByteArrayInputStream, FileInputStream, InputStream} ++import java.nio.{ByteBuffer, ByteOrder} ++ ++case class FastTextArgs( ++ magic: Int, ++ version: Int, ++ dim: Int, ++ ws: Int, ++ epoch: Int, ++ minCount: Int, ++ neg: Int, ++ wordNgrams: Int, ++ loss: Int, ++ model: Int, ++ bucket: Int, ++ minn: Int, ++ maxn: Int, ++ lrUpdateRate: Int, ++ t: Double, ++ size: Int, ++ nwords: Int, ++ nlabels: Int, ++ ntokens: Long, ++ pruneidxSize: Long) { ++ ++ def serialize: Array[Byte] = { ++ val bb = ByteBuffer.allocate(92).order(ByteOrder.LITTLE_ENDIAN) ++ bb.putInt(magic) ++ bb.putInt(version) ++ bb.putInt(dim) ++ bb.putInt(ws) ++ bb.putInt(epoch) ++ bb.putInt(minCount) ++ bb.putInt(neg) ++ bb.putInt(wordNgrams) ++ bb.putInt(loss) ++ bb.putInt(model) ++ bb.putInt(bucket) ++ bb.putInt(minn) ++ bb.putInt(maxn) ++ bb.putInt(lrUpdateRate) ++ bb.putDouble(t) ++ bb.putInt(size) ++ bb.putInt(nwords) ++ bb.putInt(nlabels) ++ bb.putLong(ntokens) ++ bb.putLong(pruneidxSize) ++ bb.array() ++ } ++ ++ override def toString: String = { ++ s"""magic: $magic ++ |version: $version ++ |dim: $dim ++ |ws : $ws ++ |epoch: $epoch ++ |minCount: $minCount ++ |neg: $neg ++ |wordNgrams: $wordNgrams ++ |loss: $loss ++ |model: $model ++ |bucket: $bucket ++ |minn: $minn ++ |maxn: $maxn ++ |lrUpdateRate: $lrUpdateRate ++ |t: $t ++ |size: $size ++ |nwords: $nwords ++ |nlabels: $nlabels ++ |ntokens: $ntokens ++ |pruneIdxSize: $pruneidxSize ++ |""".stripMargin ++ } ++ ++} ++ ++object FastTextArgs { ++ ++ private def getInt(implicit inputStream: InputStream, buffer: Array[Byte]): Int = { ++ inputStream.read(buffer, 0, 4) ++ ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN).getInt ++ } ++ ++ private def getLong(implicit inputStream: InputStream, buffer: Array[Byte]): Long = { ++ inputStream.read(buffer, 0, 8) ++ ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN).getLong ++ } ++ ++ private def getDouble(implicit inputStream: InputStream, buffer: Array[Byte]): Double = { ++ inputStream.read(buffer, 0, 8) ++ ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN).getDouble ++ } ++ ++ def fromByteArray(ar: Array[Byte]): FastTextArgs = ++ fromInputStream(new ByteArrayInputStream(ar)) ++ ++ def fromInputStream(inputStream: InputStream): FastTextArgs = { ++ implicit val is: InputStream = inputStream ++ implicit val bytes: Array[Byte] = new Array[Byte](8) ++ FastTextArgs( ++ getInt, getInt, getInt, getInt, getInt, getInt, getInt, getInt, getInt, getInt, ++ getInt, getInt, getInt, getInt, getDouble, getInt, getInt, getInt, getLong, getLong) ++ } ++ ++ def main(args: Array[String]): Unit = { ++ val args0 = FastTextArgs.fromInputStream(new FileInputStream("/Users/emeth.kim/d/g/fastText/dataset/sample.model.bin")) ++ val serialized = args0.serialize ++ val args1 = FastTextArgs.fromByteArray(serialized) ++ ++ println(args0) ++ println(args1) ++ } ++ ++} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala ---------------------------------------------------------------------- diff --cc s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala index 0000000,0000000..af1c0cc new file mode 100644 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala @@@ -1,0 -1,0 +1,56 @@@ ++package org.apache.s2graph.core.fetcher.fasttext ++ ++import com.typesafe.config.Config ++import org.apache.s2graph.core._ ++import org.apache.s2graph.core.types.VertexId ++import org.apache.s2graph.core.utils.logger ++ ++import scala.concurrent.{ExecutionContext, Future} ++ ++ ++class FastTextFetcher(val graph: S2GraphLike) extends EdgeFetcher { ++ val builder = graph.elementBuilder ++ var fastText: FastText = _ ++ ++ override def init(config: Config)(implicit ec: ExecutionContext): Unit = { ++ val dbPath = config.getString(FastText.DBPathKey) ++ ++ try { ++ fastText = new FastText(dbPath) ++ } catch { ++ case e: Throwable => ++ logger.error(s"[Init]: Failed.", e) ++ println(e) ++ throw e ++ } ++ } ++ ++ override def fetches(queryRequests: Seq[QueryRequest], ++ prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = { ++ val stepResultLs = queryRequests.map { queryRequest => ++ val vertex = queryRequest.vertex ++ val queryParam = queryRequest.queryParam ++ val line = fastText.getLine(vertex.innerId.toIdString()) ++ ++ val edgeWithScores = fastText.predict(line, queryParam.limit).map { case (_label, score) => ++ val tgtVertexId = builder.newVertexId(queryParam.label.service, ++ queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), _label) ++ ++ val props: Map[String, Any] = if (queryParam.label.metaPropsInvMap.contains("score")) Map("score" -> score) else Map.empty ++ val edge = graph.toEdge(vertex.innerId.value, tgtVertexId.innerId.value, queryParam.labelName, queryParam.direction, props = props) ++ ++ EdgeWithScore(edge, score, queryParam.label) ++ } ++ ++ StepResult(edgeWithScores, Nil, Nil) ++ } ++ ++ Future.successful(stepResultLs) ++ } ++ ++ override def close(): Unit = if (fastText != null) fastText.close() ++ ++ // not supported yet. ++ override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = ++ Future.successful(Nil) ++} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala ---------------------------------------------------------------------- diff --cc s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala index b10feb9,c3abd03..0000000 deleted file mode 100644,100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala +++ /dev/null @@@ -1,97 -1,102 +1,0 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, -- * software distributed under the License is distributed on an -- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -- * KIND, either express or implied. See the License for the -- * specific language governing permissions and limitations -- * under the License. -- */ -- --package org.apache.s2graph.core.storage -- --import com.typesafe.config.Config --import org.apache.s2graph.core.GraphExceptions.FetchTimeoutException --import org.apache.s2graph.core._ --import org.apache.s2graph.core.types.VertexId --import org.apache.s2graph.core.utils.logger -- --import scala.concurrent.{ExecutionContext, Future} -- - trait StorageReadable extends Fetcher { -trait StorageReadable extends EdgeFetcher { -- val io: StorageIO -- val serDe: StorageSerDe --// /** --// * responsible to fire parallel fetch call into storage and create future that will return merged result. --// * --// * @param queryRequests --// * @param prevStepEdges --// * @return --// */ --// def fetches(queryRequests: Seq[QueryRequest], --// prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] -- -- def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] -- -- def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] - - - - - -- -- protected def fetchKeyValues(queryRequest: QueryRequest, edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] -- - protected def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] -// protected def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] -- -- -- def fetchSnapshotEdgeInner(edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[(Option[S2EdgeLike], Option[SKeyValue])] = { -- val queryParam = QueryParam(labelName = edge.innerLabel.label, -- direction = GraphUtil.fromDirection(edge.getDir()), -- tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal), -- cacheTTLInMillis = -1) -- val q = Query.toQuery(Seq(edge.srcVertex), Seq(queryParam)) -- val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam) -- -- fetchKeyValues(queryRequest, edge).map { kvs => -- val (edgeOpt, kvOpt) = -- if (kvs.isEmpty) (None, None) -- else { -- import CanSKeyValue._ -- val snapshotEdgeOpt = io.toSnapshotEdge(kvs.head, queryRequest, isInnerCall = true, parentEdges = Nil) -- val _kvOpt = kvs.headOption -- (snapshotEdgeOpt, _kvOpt) -- } -- (edgeOpt, kvOpt) -- } recoverWith { case ex: Throwable => -- logger.error(s"fetchQueryParam failed. fallback return.", ex) -- throw new FetchTimeoutException(s"${edge.toLogString}") -- } -- } - - def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = { - def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2VertexLike] = { - if (kvs.isEmpty) Nil - else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq - } - - val futures = vertices.map { vertex => - val queryParam = QueryParam.Empty - val q = Query.toQuery(Seq(vertex), Seq(queryParam)) - val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam) - - fetchKeyValues(queryRequest, vertex).map { kvs => - fromResult(kvs, vertex.serviceColumn.schemaVersion) - } recoverWith { - case ex: Throwable => Future.successful(Nil) - } - } -- - Future.sequence(futures).map(_.flatten) - } -// def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = { -// def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2VertexLike] = { -// if (kvs.isEmpty) Nil -// else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq -// } -// -// val futures = vertices.map { vertex => -// val queryParam = QueryParam.Empty -// val q = Query.toQuery(Seq(vertex), Seq(queryParam)) -// val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam) -// -// fetchKeyValues(queryRequest, vertex).map { kvs => -// fromResult(kvs, vertex.serviceColumn.schemaVersion) -// } recoverWith { -// case ex: Throwable => Future.successful(Nil) -// } -// } -// -// Future.sequence(futures).map(_.flatten) -// } -- --} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala ---------------------------------------------------------------------- diff --cc s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala index 8dfbe1e,e6779fa..1f93174 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala @@@ -66,6 -66,6 +66,8 @@@ class BaseSparkTest extends FunSuite wi super.beforeAll() s2 = S2GraphHelper.getS2Graph(s2Config, true) ++ S2Graph.loadFetchersAndMutators(s2) ++ deleteRecursively(new File(options.output)) deleteRecursively(new File(options.tempDir)) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala ---------------------------------------------------------------------- diff --cc s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala index bd14352,0000000..3f12e8c mode 100644,000000..100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala @@@ -1,229 -1,0 +1,143 @@@ +package org.apache.s2graph.s2jobs.task.custom.process + +import java.io.File + +import com.holdenkarau.spark.testing.DataFrameSuiteBase - import com.typesafe.config.ConfigFactory +import org.apache.commons.io.FileUtils - import org.apache.s2graph.core.Integrate.IntegrateCommon +import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} - import org.apache.s2graph.core.model.annoy.AnnoyModelFetcher - import org.apache.s2graph.core.{Query, QueryParam} - import org.apache.s2graph.core.model.ModelManager ++import org.apache.s2graph.core.fetcher.annoy.AnnoyModelFetcher ++import org.apache.s2graph.core.{Query, QueryParam, ResourceManager} +import org.apache.s2graph.core.schema.Label ++import org.apache.s2graph.s2jobs.BaseSparkTest +import org.apache.s2graph.s2jobs.task.TaskConf + +import scala.concurrent.{Await, ExecutionContext} +import scala.concurrent.duration.Duration +import scala.io.Source + - class ALSModelProcessTest extends IntegrateCommon with DataFrameSuiteBase { ++class ALSModelProcessTest extends BaseSparkTest { + import scala.collection.JavaConverters._ + - // this test require adding movie lens rating data(u.data, movie.txt) under resources - // so ignore for now until figure out how to automate download dataset. - // ignore("RUN ALS on movie lens rating data and build annoy index on itemFeatures, finally query.") { - // import spark.sqlContext.implicits._ - // val ratingPath = this.getClass.getResource("/u.data").toURI.getPath - // - // val ratings = Source.fromFile(new File(ratingPath)).getLines().toSeq.map { line => - // val tokens = line.split("\t") - // (tokens(0).toInt, tokens(1).toInt, tokens(2).toFloat) - // }.toDF("userId", "movieId", "rating") - // - // val outputPath = "/tmp" - // val localInputPath = "/tmp/annoy_input" - // val localIndexPath = "/tmp/annoy_result" - // - // val taskOptions = Map( - // "outputPath" -> outputPath, - // "localInputPath" -> localInputPath, - // "localIndexPath" -> localIndexPath - // ) - // - // val conf = TaskConf("test", "test", Nil, taskOptions) - // ALSModelProcess.buildAnnoyIndex(spark, conf, ratings) - // - // val labelName = "annoy_model_fetcher_test" - // - // val remoteIndexFilePath = s"${localIndexPath}/annoy-index" - // val remoteDictFilePath = this.getClass.getResource(s"/movie.dict").toURI.getPath - // - // val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get - // val serviceColumn = - // management.createServiceColumn("s2graph", "user", "string", Seq(Prop("age", "0", "int", true))) - // - // val options = s"""{ - // | "importer": { - // | "${ModelManager.ImporterClassNameKey}": "org.apache.s2graph.core.model.IdentityImporter" - // | }, - // | "fetcher": { - // | "${ModelManager.FetcherClassNameKey}": "org.apache.s2graph.core.model.annoy.AnnoyModelFetcher", - // | "${AnnoyModelFetcher.IndexFilePathKey}": "${remoteIndexFilePath}", - // | "${AnnoyModelFetcher.DictFilePathKey}": "${remoteDictFilePath}", - // | "${AnnoyModelFetcher.DimensionKey}": 10 - // | } - // |}""".stripMargin - // - // Label.findByName(labelName, useCache = false).foreach { label => Label.delete(label.id.get) } - // - // val label = management.createLabel( - // labelName, - // serviceColumn, - // serviceColumn, - // true, - // service.serviceName, - // Seq.empty[Index].asJava, - // Seq.empty[Prop].asJava, - // "strong", - // null, - // -1, - // "v3", - // "gz", - // options - // ) - // val config = ConfigFactory.parseString(options) - // val importerFuture = graph.modelManager.importModel(label, config)(ExecutionContext.Implicits.global) - // Await.result(importerFuture, Duration("3 minutes")) - // - // Thread.sleep(10000) - // - // val vertex = graph.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "Toy Story (1995)") - // val queryParam = QueryParam(labelName = labelName, limit = 5) - // - // val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = Seq(queryParam)) - // val stepResult = Await.result(graph.getEdges(query), Duration("60 seconds")) - // - // stepResult.edgeWithScores.foreach { es => - // println(es.edge.tgtVertex.innerIdVal) - // } - // - // // clean up temp directory. - // FileUtils.deleteDirectory(new File(outputPath)) - // } - - def annoyLabelOptions(indexPath: String, dictPath: String): String = { ++ // this test require adding movie ++ def annoyLabelOptions(indexPath: String): String = { + val options = s"""{ - | "importer": { - | "${ModelManager.ImporterClassNameKey}": "org.apache.s2graph.core.model.IdentityImporter" - | }, + | "fetcher": { - | "${ModelManager.FetcherClassNameKey}": "org.apache.s2graph.core.model.annoy.AnnoyModelFetcher", ++ | "${ResourceManager.ClassNameKey}": "org.apache.s2graph.core.fetcher.annoy.AnnoyModelFetcher", + | "${AnnoyModelFetcher.IndexFilePathKey}": "${indexPath}", - | "${AnnoyModelFetcher.DictFilePathKey}": "${dictPath}", + | "${AnnoyModelFetcher.DimensionKey}": 10 + | } + |}""".stripMargin + options + } - def labelImport(labelName: String, indexPath: String, dictPath: String): Label = { ++ def createLabel(labelName: String): Label = { ++ val management = s2.management + val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get + val serviceColumn = + management.createServiceColumn("s2graph", "movie", "string", Seq(Prop("age", "0", "int", true))) + - val options = annoyLabelOptions(indexPath, dictPath) - - Label.findByName(labelName, useCache = false).foreach { label => Label.delete(label.id.get) } - - val label = management.createLabel( - labelName, - serviceColumn, - serviceColumn, - true, - service.serviceName, - Seq.empty[Index].asJava, - Seq.empty[Prop].asJava, - "strong", - null, - -1, - "v3", - "gz", - options - ) - - val config = ConfigFactory.parseString(options) - val importerFuture = graph.modelManager.importModel(label, config)(ExecutionContext.Implicits.global) - Await.result(importerFuture, Duration("3 minutes")) ++ Label.findByName(labelName, useCache = false) match { ++ case None => ++ management.createLabel( ++ labelName, ++ service.serviceName, serviceColumn.columnName, serviceColumn.columnType, ++ service.serviceName, serviceColumn.columnName, serviceColumn.columnType, ++ service.serviceName, ++ Seq.empty[Index], ++ Seq.empty[Prop], ++ isDirected = true, ++ consistencyLevel = "strong", ++ hTableName = None, ++ hTableTTL = None, ++ schemaVersion = "v3", ++ compressionAlgorithm = "gz", ++ options = None ++ ).get ++ case Some(label) => label ++ } ++ } ++ ++ def registerEdgeFetcher(labelName: String, indexPath: String): Label = { ++ val label = createLabel(labelName) ++ s2.management.updateEdgeFetcher(label, Option(annoyLabelOptions(indexPath))) + + Thread.sleep(10000) + + label + } + + def buildALS(ratingsPath: String, indexPath: String) = { + import spark.sqlContext.implicits._ + + FileUtils.deleteQuietly(new File(indexPath)) + + val buffer = scala.collection.mutable.ListBuffer.empty[(Int, Int, Float)] + + val lines = Source.fromFile(ratingsPath).getLines() + // skip over header. + lines.next() + + while (lines.hasNext) { + val line = lines.next() + try { + val Array(userId, movieId, rating, ts) = line.split(",") + buffer += ((userId.toInt, movieId.toInt, rating.toFloat)) + } catch { + case e: Exception => // skip over. + } + } + + val rating = buffer.toDF("userId", "movieId", "rating") + + val processConf = TaskConf(name = "test", `type` = "test", inputs = Nil, + options = Map.empty) + + val process = new ALSModelProcess(processConf) + val df = process.execute(spark, Map("test" -> rating)) + + val sinkConf = TaskConf(name = "sink", `type` = "sink", inputs = Nil, + options = Map("path" -> indexPath, "itemFactors" -> indexPath)) + + val sink = new AnnoyIndexBuildSink("sinkTest", sinkConf) + sink.write(df) + } + ++ def generateDataset = { ++ import sys.process._ ++ ++ val generateInputScript = "sh ./example/movielens/generate_input.sh" ++ generateInputScript ! ++ } ++ + test("ALS ModelProcess and AnnoyIndexBuildSink") { - import spark.sqlContext.implicits._ ++ val labelName = "annoy_index_test" ++ ++ generateDataset + - val inputPath = "/Users/shon/Workspace/incubator-s2graph/example/movielens/input/ratings.csv" - val indexPath = "./annoy_result" - val dictPath = "./example/movielens/input/movie.dict" ++ val inputPath = "input/ratings.csv" ++ val indexPath = "annoy_result" ++// val dictPath = "input/movie.dict" + + buildALS(inputPath, indexPath) + - val labelName = "annoy_index_test" - val label = labelImport(labelName, indexPath, dictPath) - // val options = annoyLabelOptions(indexPath, dictPath) - // - // val config = ConfigFactory.parseString(label.options.get).getConfig("fetcher") - // val config = ConfigFactory.parseString(options).getConfig("fetcher") - - // val ANNIndexWithDict(index, dict) = AnnoyModelFetcher.buildIndex(config) - // val v = index.getItemVector(1) - // - // import scala.collection.JavaConverters._ - // index.getNearest(v, 10).asScala.foreach { x => - // println(x) - // } - - - // - val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get ++ ++ val label = registerEdgeFetcher(labelName, indexPath) ++ ++ val service = s2.management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get + val serviceColumn = - management.createServiceColumn("s2graph", "user", "string", Seq(Prop("age", "0", "int", true))) ++ s2.management.createServiceColumn("s2graph", "user", "string", Seq(Prop("age", "0", "int", true))) + - val vertex = graph.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "1") ++ val vertex = s2.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "1") + val queryParam = QueryParam(labelName = labelName, limit = 5) + + val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = Seq(queryParam)) - val stepResult = Await.result(graph.getEdges(query), Duration("60 seconds")) ++ val stepResult = Await.result(s2.getEdges(query), Duration("60 seconds")) + + stepResult.edgeWithScores.foreach { es => + println(es.edge.tgtVertex.innerIdVal) + } ++ ++ Label.delete(label.id.get) ++ FileUtils.deleteDirectory(new File("input")) ++ FileUtils.deleteDirectory(new File("annoy_result")) + } +}