Start implement ResourceManager.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/be83d07c Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/be83d07c Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/be83d07c Branch: refs/heads/master Commit: be83d07ca5ecb271bd3678c38734d1176182c286 Parents: 43f627e Author: DO YUNG YOON <steams...@apache.org> Authored: Wed May 9 19:25:15 2018 +0900 Committer: DO YUNG YOON <steams...@apache.org> Committed: Wed May 9 19:25:15 2018 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/core/schema/schema.sql | 1 + .../apache/s2graph/core/EdgeBulkFetcher.scala | 28 --- .../org/apache/s2graph/core/EdgeFetcher.scala | 3 + .../org/apache/s2graph/core/EdgeMutator.scala | 7 + .../org/apache/s2graph/core/Management.scala | 57 +++++- .../apache/s2graph/core/ResourceManager.scala | 130 +++++++++++++ .../scala/org/apache/s2graph/core/S2Graph.scala | 49 ++--- .../apache/s2graph/core/S2GraphFactory.scala | 2 +- .../org/apache/s2graph/core/S2GraphLike.scala | 38 ++-- .../apache/s2graph/core/VertexBulkFetcher.scala | 26 --- .../org/apache/s2graph/core/VertexFetcher.scala | 4 + .../org/apache/s2graph/core/VertexMutator.scala | 5 + .../s2graph/core/fetcher/FetcherManager.scala | 106 ----------- .../core/fetcher/MemoryModelEdgeFetcher.scala | 26 ++- .../apache/s2graph/core/io/Conversions.scala | 6 +- .../org/apache/s2graph/core/schema/Label.scala | 34 ++-- .../s2graph/core/schema/ServiceColumn.scala | 47 ++++- .../storage/DefaultOptimisticEdgeMutator.scala | 176 +++++++++++++++++ .../core/storage/DefaultOptimisticMutator.scala | 190 ------------------- .../DefaultOptimisticVertexMutator.scala | 44 +++++ .../apache/s2graph/core/storage/Storage.scala | 4 - .../hbase/AsynchbaseEdgeBulkFetcher.scala | 69 ------- .../storage/hbase/AsynchbaseEdgeFetcher.scala | 31 ++- .../core/storage/hbase/AsynchbaseStorage.scala | 7 +- .../hbase/AsynchbaseVertexBulkFetcher.scala | 63 ------ .../storage/hbase/AsynchbaseVertexFetcher.scala | 26 +++ .../storage/rocks/RocksEdgeBulkFetcher.scala | 68 ------- .../core/storage/rocks/RocksEdgeFetcher.scala | 35 +++- .../core/storage/rocks/RocksStorage.scala | 7 +- .../storage/rocks/RocksVertexBulkFetcher.scala | 88 --------- .../core/storage/rocks/RocksVertexFetcher.scala | 53 ++++++ .../s2graph/core/utils/SafeUpdateCache.scala | 6 +- .../s2graph/core/fetcher/EdgeFetcherTest.scala | 12 +- .../apache/s2graph/graphql/GraphQLServer.scala | 8 +- .../org/apache/s2graph/graphql/HttpServer.scala | 4 +- 35 files changed, 710 insertions(+), 750 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/resources/org/apache/s2graph/core/schema/schema.sql ---------------------------------------------------------------------- diff --git a/s2core/src/main/resources/org/apache/s2graph/core/schema/schema.sql b/s2core/src/main/resources/org/apache/s2graph/core/schema/schema.sql index 6b9b71e..4f7f832 100644 --- a/s2core/src/main/resources/org/apache/s2graph/core/schema/schema.sql +++ b/s2core/src/main/resources/org/apache/s2graph/core/schema/schema.sql @@ -48,6 +48,7 @@ CREATE TABLE `service_columns` ( `column_name` varchar(64) NOT NULL, `column_type` varchar(8) NOT NULL, `schema_version` varchar(8) NOT NULL default 'v2', + `options` text, PRIMARY KEY (`id`), UNIQUE KEY `ux_service_id_column_name` (`service_id`, `column_name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/EdgeBulkFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/EdgeBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/EdgeBulkFetcher.scala deleted file mode 100644 index 646f5f4..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/EdgeBulkFetcher.scala +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core - -import com.typesafe.config.Config - -import scala.concurrent.{ExecutionContext, Future} - -trait EdgeBulkFetcher { - def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala index f28a161..c3760e0 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala @@ -31,5 +31,8 @@ trait EdgeFetcher { def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] + def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] + def close(): Unit = {} + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala index dc0099e..252d129 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala @@ -19,11 +19,13 @@ package org.apache.s2graph.core +import com.typesafe.config.Config import org.apache.s2graph.core.storage.MutateResponse import scala.concurrent.{ExecutionContext, Future} trait EdgeMutator { + def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] @@ -35,4 +37,9 @@ trait EdgeMutator { def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult, requestTs: Long, retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] + + def close(): Unit = {} + + def init(config: Config)(implicit ec: ExecutionContext): Unit = {} + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/Management.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala index 9046449..c3aef7a 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala @@ -95,14 +95,15 @@ object Management { columnName: String, columnType: String, props: Seq[Prop], - schemaVersion: String = DEFAULT_VERSION) = { + schemaVersion: String = DEFAULT_VERSION, + options: Option[String] = None) = { Schema withTx { implicit session => val serviceOpt = Service.findByName(serviceName, useCache = false) serviceOpt match { case None => throw new RuntimeException(s"create service $serviceName has not been created.") case Some(service) => - val serviceColumn = ServiceColumn.findOrInsert(service.id.get, columnName, Some(columnType), schemaVersion, useCache = false) + val serviceColumn = ServiceColumn.findOrInsert(service.id.get, columnName, Some(columnType), schemaVersion, options, useCache = false) for { Prop(propName, defaultValue, dataType, storeInGlobalIndex) <- props } yield { @@ -304,13 +305,50 @@ class Management(graph: S2GraphLike) { import Management._ - def importModel(labelName: String, options: String): Future[Importer] = { - Label.updateOption(labelName, options) + def updateEdgeFetcher(labelName: String, options: String): Unit = { + val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName)) - val label = Label.findByName(labelName, false).getOrElse(throw new LabelNotExistException(labelName)) - val config = ConfigFactory.parseString(options) + updateEdgeFetcher(label, options) + } + + def updateEdgeFetcher(label: Label, options: String): Unit = { + val newLabel = Label.updateOption(label, options) + graph.resourceManager.getOrElseUpdateEdgeFetcher(newLabel, forceUpdate = true) + } + + def updateVertexFetcher(serviceName: String, columnName: String, options: String): Unit = { + val service = Service.findByName(serviceName).getOrElse(throw new IllegalArgumentException(s"$serviceName is not exist.")) + val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new IllegalArgumentException(s"$columnName is not exist.")) + + updateVertexFetcher(column, options) + } + + def updateVertexFetcher(column: ServiceColumn, options: String): Unit = { + val newColumn = ServiceColumn.updateOption(column, options) + graph.resourceManager.getOrElseUpdateVertexFetcher(newColumn, forceUpdate = true) + } + + def updateEdgeMutator(labelName: String, options: String): Unit = { + val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName)) + + updateEdgeMutator(label, options) + } + + def updateEdgeMutator(label: Label, options: String): Unit = { + val newLabel = Label.updateOption(label, options) + graph.resourceManager.getOrElseUpdateEdgeMutator(newLabel, forceUpdate = true) + } + + def updateVertexMutator(serviceName: String, columnName: String, options: String): Unit = { + val service = Service.findByName(serviceName).getOrElse(throw new IllegalArgumentException(s"$serviceName is not exist.")) + val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new IllegalArgumentException(s"$columnName is not exist.")) + + updateVertexMutator(column, options) + } - graph.modelManager.importModel(label, config)(importEx) + def updateVertexMutator(column: ServiceColumn, options: String): Unit = { + val newColumn = ServiceColumn.updateOption(column, options) + graph.resourceManager.getOrElseUpdateVertexMutator(newColumn, forceUpdate = true) } def createStorageTable(zkAddr: String, @@ -375,14 +413,15 @@ class Management(graph: S2GraphLike) { columnName: String, columnType: String, props: Seq[Prop], - schemaVersion: String = DEFAULT_VERSION): ServiceColumn = { + schemaVersion: String = DEFAULT_VERSION, + options: Option[String] = None): ServiceColumn = { val serviceColumnTry = Schema withTx { implicit session => val serviceOpt = Service.findByName(serviceName, useCache = false) serviceOpt match { case None => throw new RuntimeException(s"create service $serviceName has not been created.") case Some(service) => - val serviceColumn = ServiceColumn.findOrInsert(service.id.get, columnName, Some(columnType), schemaVersion, useCache = false) + val serviceColumn = ServiceColumn.findOrInsert(service.id.get, columnName, Some(columnType), schemaVersion, options, useCache = false) for { Prop(propName, defaultValue, dataType, storeInGlobalIndex) <- props } yield { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala new file mode 100644 index 0000000..b877603 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core + +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.s2graph.core.schema.{Label, ServiceColumn} +import org.apache.s2graph.core.utils.SafeUpdateCache +import scala.concurrent.ExecutionContext + + +object ResourceManager { + + import SafeUpdateCache._ + + import scala.collection.JavaConverters._ + + val ClassNameKey = "className" + val EdgeFetcherKey = classOf[EdgeFetcher].getClass().getName + + val VertexFetcherKey = classOf[VertexFetcher].getClass().getName + + val EdgeMutatorKey = classOf[EdgeMutator].getClass.getName + val VertexMutatorKey = classOf[VertexMutator].getClass.getName + + val DefaultConfig = ConfigFactory.parseMap(Map(MaxSizeKey -> 1000, TtlKey -> -1).asJava) +} + +class ResourceManager(graph: S2GraphLike, + _config: Config)(implicit ec: ExecutionContext) { + + import ResourceManager._ + + import scala.collection.JavaConverters._ + + val cache = new SafeUpdateCache(_config) + + def getAllVertexFetchers(): Seq[VertexFetcher] = { + cache.asMap().asScala.toSeq.collect { case (_, (obj: VertexFetcher, _, _)) => obj } + } + + def getAllEdgeFetchers(): Seq[EdgeFetcher] = { + cache.asMap().asScala.toSeq.collect { case (_, (obj: EdgeFetcher, _, _)) => obj } + } + + def getOrElseUpdateVertexFetcher(column: ServiceColumn, forceUpdate: Boolean = false): Option[VertexFetcher] = { + val cacheKey = VertexFetcherKey + "_" + column.service.serviceName + "_" + column.columnName + cache.withCache(cacheKey, false, forceUpdate) { + column.toFetcherConfig.map { fetcherConfig => + val className = fetcherConfig.getString(ClassNameKey) + val fetcher = Class.forName(className) + .getConstructor(classOf[S2GraphLike]) + .newInstance(graph) + .asInstanceOf[VertexFetcher] + + fetcher.init(fetcherConfig) + + fetcher + } + } + } + + def getOrElseUpdateEdgeFetcher(label: Label, forceUpdate: Boolean = false): Option[EdgeFetcher] = { + val cacheKey = EdgeFetcherKey + "_" + label.label + + cache.withCache(cacheKey, false, forceUpdate) { + label.toFetcherConfig.map { fetcherConfig => + val className = fetcherConfig.getString(ClassNameKey) + val fetcher = Class.forName(className) + .getConstructor(classOf[S2GraphLike]) + .newInstance(graph) + .asInstanceOf[EdgeFetcher] + + fetcher.init(fetcherConfig) + + fetcher + } + } + } + + def getOrElseUpdateVertexMutator(column: ServiceColumn, forceUpdate: Boolean = false): Option[VertexMutator] = { + val cacheKey = VertexMutatorKey + "_" + column.service.serviceName + "_" + column.columnName + cache.withCache(cacheKey, false, forceUpdate) { + column.toMutatorConfig.map { mutatorConfig => + val className = mutatorConfig.getString(ClassNameKey) + val fetcher = Class.forName(className) + .getConstructor(classOf[S2GraphLike]) + .newInstance(graph) + .asInstanceOf[VertexMutator] + + fetcher.init(mutatorConfig) + + fetcher + } + } + } + + def getOrElseUpdateEdgeMutator(label: Label, forceUpdate: Boolean = false): Option[EdgeMutator] = { + val cacheKey = EdgeMutatorKey + "_" + label.label + cache.withCache(cacheKey, false, forceUpdate) { + label.toMutatorConfig.map { mutatorConfig => + val className = mutatorConfig.getString(ClassNameKey) + val fetcher = Class.forName(className) + .getConstructor(classOf[S2GraphLike]) + .newInstance(graph) + .asInstanceOf[EdgeMutator] + + fetcher.init(mutatorConfig) + + fetcher + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala index c4cb48f..09fd55e 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@ -20,30 +20,27 @@ package org.apache.s2graph.core import java.util -import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} -import java.util.concurrent.{ExecutorService, Executors, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.{ExecutorService, Executors} import com.typesafe.config.{Config, ConfigFactory} import org.apache.commons.configuration.{BaseConfiguration, Configuration} import org.apache.s2graph.core.index.IndexProvider import org.apache.s2graph.core.io.tinkerpop.optimize.S2GraphStepStrategy -import org.apache.s2graph.core.fetcher.FetcherManager import org.apache.s2graph.core.schema._ import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage import org.apache.s2graph.core.storage.rocks.RocksStorage import org.apache.s2graph.core.storage.{MutateResponse, OptimisticEdgeFetcher, Storage} import org.apache.s2graph.core.types._ -import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger} -import org.apache.tinkerpop.gremlin.process.traversal.{P, TraversalStrategies} -import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer +import org.apache.s2graph.core.utils.{Extensions, Importer, logger} +import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, Graph} import scala.collection.JavaConversions._ import scala.collection.mutable -import scala.collection.mutable.{ArrayBuffer, ListBuffer} +import scala.collection.mutable.ArrayBuffer import scala.concurrent._ -import scala.concurrent.duration.Duration -import scala.util.{Random, Try} +import scala.util.Try object S2Graph { @@ -94,6 +91,7 @@ object S2Graph { val numOfThread = Runtime.getRuntime.availableProcessors() val threadPool = Executors.newFixedThreadPool(numOfThread) val ec = ExecutionContext.fromExecutor(threadPool) + val resourceManagerEc = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numOfThread)) val DefaultServiceName = "" val DefaultColumnName = "vertex" @@ -187,7 +185,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap override val management = new Management(this) - override val modelManager = new FetcherManager(this) + override val resourceManager: ResourceManager = new ResourceManager(this, config)(S2Graph.resourceManagerEc) override val indexProvider = IndexProvider.apply(config) @@ -250,32 +248,39 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap storagePool.getOrElse(s"label:${label.label}", defaultStorage) } - //TODO: + /* Currently, each getter on Fetcher and Mutator missing proper implementation + * Please discuss what is proper way to maintain resources here and provide + * right implementation(S2GRAPH-213). + * */ override def getVertexFetcher(column: ServiceColumn): VertexFetcher = { - getStorage(column.service).vertexFetcher - } - override def getVertexBulkFetcher: VertexBulkFetcher = { - defaultStorage.vertexBulkFetcher + resourceManager.getOrElseUpdateVertexFetcher(column) + .getOrElse(defaultStorage.vertexFetcher) } override def getEdgeFetcher(label: Label): EdgeFetcher = { - if (label.fetchConfigExist) modelManager.getFetcher(label) - else getStorage(label).edgeFetcher + resourceManager.getOrElseUpdateEdgeFetcher(label) + .getOrElse(defaultStorage.edgeFetcher) } - override def getEdgeBulkFetcher: EdgeBulkFetcher = { - defaultStorage.edgeBulkFetcher + override def getAllVertexFetchers(): Seq[VertexFetcher] = { + resourceManager.getAllVertexFetchers() + } + + override def getAllEdgeFetchers(): Seq[EdgeFetcher] = { + resourceManager.getAllEdgeFetchers() } override def getVertexMutator(column: ServiceColumn): VertexMutator = { - getStorage(column.service).vertexMutator + resourceManager.getOrElseUpdateVertexMutator(column) + .getOrElse(defaultStorage.vertexMutator) } override def getEdgeMutator(label: Label): EdgeMutator = { - getStorage(label).edgeMutator + resourceManager.getOrElseUpdateEdgeMutator(label) + .getOrElse(defaultStorage.edgeMutator) } - /** optional */ + //TODO: override def getOptimisticEdgeFetcher(label: Label): OptimisticEdgeFetcher = { // getStorage(label).optimisticEdgeFetcher null http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala index cce05af..0a667ef 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala @@ -66,7 +66,7 @@ object S2GraphFactory { val DefaultService = management.createService(DefaultServiceName, "localhost", "s2graph", 0, None).get // Management.deleteColumn(DefaultServiceName, DefaultColumnName) - val DefaultColumn = ServiceColumn.findOrInsert(DefaultService.id.get, DefaultColumnName, Some("integer"), HBaseType.DEFAULT_VERSION, useCache = false) + val DefaultColumn = ServiceColumn.findOrInsert(DefaultService.id.get, DefaultColumnName, Some("integer"), HBaseType.DEFAULT_VERSION, options = None, useCache = false) val DefaultColumnMetas = { ColumnMeta.findOrInsert(DefaultColumn.id.get, "test", "string", "-", storeInGlobalIndex = true, useCache = false) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala index 5e2c168..99423d6 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala @@ -19,22 +19,20 @@ package org.apache.s2graph.core -import java.util -import java.util.concurrent.{CompletableFuture, TimeUnit} -import java.util.concurrent.atomic.AtomicLong import java.lang.{Boolean => JBoolean, Long => JLong} +import java.util import java.util.Optional +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.{CompletableFuture, TimeUnit} import com.typesafe.config.Config import org.apache.commons.configuration.Configuration import org.apache.s2graph.core.GraphExceptions.LabelNotExistException -import org.apache.s2graph.core.S2Graph.{DefaultColumnName, DefaultServiceName} import org.apache.s2graph.core.features.{S2Features, S2GraphVariables} import org.apache.s2graph.core.index.IndexProvider -import org.apache.s2graph.core.fetcher.FetcherManager import org.apache.s2graph.core.schema.{Label, LabelMeta, Service, ServiceColumn} import org.apache.s2graph.core.storage.{MutateResponse, OptimisticEdgeFetcher, Storage} -import org.apache.s2graph.core.types.{InnerValLike, VertexId} +import org.apache.s2graph.core.types.VertexId import org.apache.tinkerpop.gremlin.process.computer.GraphComputer import org.apache.tinkerpop.gremlin.structure import org.apache.tinkerpop.gremlin.structure.Edge.Exceptions @@ -44,10 +42,10 @@ import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, Element, Graph, import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, ExecutionContext, Future} import scala.compat.java8.FutureConverters._ import scala.compat.java8.OptionConverters._ +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Future} trait S2GraphLike extends Graph { @@ -69,7 +67,7 @@ trait S2GraphLike extends Graph { val traversalHelper: TraversalHelper - val modelManager: FetcherManager + val resourceManager: ResourceManager lazy val MaxRetryNum: Int = config.getInt("max.retry.number") lazy val MaxBackOff: Int = config.getInt("max.back.off") @@ -95,11 +93,11 @@ trait S2GraphLike extends Graph { def getVertexFetcher(column: ServiceColumn): VertexFetcher - def getVertexBulkFetcher(): VertexBulkFetcher - def getEdgeFetcher(label: Label): EdgeFetcher - def getEdgeBulkFetcher(): EdgeBulkFetcher + def getAllVertexFetchers(): Seq[VertexFetcher] + + def getAllEdgeFetchers(): Seq[EdgeFetcher] /** optional */ def getOptimisticEdgeFetcher(label: Label): OptimisticEdgeFetcher @@ -211,7 +209,13 @@ trait S2GraphLike extends Graph { if (ids.isEmpty) { //TODO: default storage need to be fixed. - Await.result(getVertexBulkFetcher().fetchVerticesAll(), WaitTimeout).iterator + val futures = getAllVertexFetchers.map { vertexFetcher => + vertexFetcher.fetchVerticesAll() + } + + val future = Future.sequence(futures) + + Await.result(future, WaitTimeout).flatten.iterator } else { val vertices = ids.collect { case s2Vertex: S2VertexLike => s2Vertex @@ -236,7 +240,13 @@ trait S2GraphLike extends Graph { def edges(edgeIds: AnyRef*): util.Iterator[structure.Edge] = { if (edgeIds.isEmpty) { // FIXME - Await.result(getEdgeBulkFetcher().fetchEdgesAll(), WaitTimeout).iterator + val futures = getAllEdgeFetchers().map { edgeFetcher => + edgeFetcher.fetchEdgesAll() + } + + val future = Future.sequence(futures) + + Await.result(future, WaitTimeout).flatten.iterator } else { Await.result(edgesAsync(edgeIds: _*), WaitTimeout) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/VertexBulkFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/VertexBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/VertexBulkFetcher.scala deleted file mode 100644 index cbebab5..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/VertexBulkFetcher.scala +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core - -import scala.concurrent.{ExecutionContext, Future} - -trait VertexBulkFetcher { - def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala index 5c10d18..b641e7f 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala @@ -26,6 +26,10 @@ import scala.concurrent.{ExecutionContext, Future} trait VertexFetcher { def init(config: Config)(implicit ec: ExecutionContext): Unit = {} + def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] + + def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] + def close(): Unit = {} } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala index 18be890..d1c8ecf 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala @@ -19,10 +19,15 @@ package org.apache.s2graph.core +import com.typesafe.config.Config import org.apache.s2graph.core.storage.MutateResponse import scala.concurrent.{ExecutionContext, Future} trait VertexMutator { + def close(): Unit = {} + + def init(config: Config)(implicit ec: ExecutionContext): Unit = {} + def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/fetcher/FetcherManager.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/FetcherManager.scala b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/FetcherManager.scala deleted file mode 100644 index 26db7ff..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/FetcherManager.scala +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.fetcher - -import com.typesafe.config.Config -import org.apache.s2graph.core.schema.Label -import org.apache.s2graph.core.utils.{Importer, logger} -import org.apache.s2graph.core.{EdgeFetcher, S2GraphLike} - -import scala.concurrent.{ExecutionContext, Future} - -object FetcherManager { - val ClassNameKey = "className" -} - -class FetcherManager(s2GraphLike: S2GraphLike) { - - import FetcherManager._ - - private val fetcherPool = scala.collection.mutable.Map.empty[String, EdgeFetcher] - - private val ImportLock = new java.util.concurrent.ConcurrentHashMap[String, Importer] - - def toImportLockKey(label: Label): String = label.label - - def getFetcher(label: Label): EdgeFetcher = { - fetcherPool.getOrElse(toImportLockKey(label), throw new IllegalStateException(s"$label is not imported.")) - } - - def initImporter(config: Config): Importer = { - val className = config.getString(ClassNameKey) - - Class.forName(className) - .getConstructor(classOf[S2GraphLike]) - .newInstance(s2GraphLike) - .asInstanceOf[Importer] - } - - def initFetcher(config: Config)(implicit ec: ExecutionContext): Future[EdgeFetcher] = { - val className = config.getString(ClassNameKey) - - val fetcher = Class.forName(className) - .getConstructor(classOf[S2GraphLike]) - .newInstance(s2GraphLike) - .asInstanceOf[EdgeFetcher] - - fetcher.init(config) - - Future.successful(fetcher) - } - - def importModel(label: Label, config: Config)(implicit ec: ExecutionContext): Future[Importer] = { - val importer = ImportLock.computeIfAbsent(toImportLockKey(label), new java.util.function.Function[String, Importer] { - override def apply(k: String): Importer = { - val importer = initImporter(config.getConfig("importer")) - - //TODO: Update Label's extra options. - importer - .run(config.getConfig("importer")) - .map { importer => - logger.info(s"Close importer") - importer.close() - - initFetcher(config.getConfig("fetcher")).map { fetcher => - importer.setStatus(true) - - fetcherPool - .remove(k) - .foreach { oldFetcher => - logger.info(s"Delete old storage ($k) => $oldFetcher") - oldFetcher.close() - } - - fetcherPool += (k -> fetcher) - } - } - .onComplete { _ => - logger.info(s"ImportLock release: $k") - ImportLock.remove(k) - } - - importer - } - }) - - Future.successful(importer) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/fetcher/MemoryModelEdgeFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/MemoryModelEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/MemoryModelEdgeFetcher.scala index bf90d69..110d615 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/MemoryModelEdgeFetcher.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/MemoryModelEdgeFetcher.scala @@ -37,18 +37,26 @@ class MemoryModelEdgeFetcher(val graph: S2GraphLike) extends EdgeFetcher { override def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = { val stepResultLs = queryRequests.map { queryRequest => - val queryParam = queryRequest.queryParam - val edges = ranges.map { ith => - val tgtVertexId = builder.newVertexId(queryParam.label.service, queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), ith.toString) + toEdges(queryRequest) + } - graph.toEdge(queryRequest.vertex.innerIdVal, - tgtVertexId.innerId.value, queryParam.label.label, queryParam.direction) - } + Future.successful(stepResultLs) + } - val edgeWithScores = edges.map(e => EdgeWithScore(e, 1.0, queryParam.label)) - StepResult(edgeWithScores, Nil, Nil) + override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = { + Future.successful(Nil) + } + + private def toEdges(queryRequest: QueryRequest) = { + val queryParam = queryRequest.queryParam + val edges = ranges.map { ith => + val tgtVertexId = builder.newVertexId(queryParam.label.service, queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), ith.toString) + + graph.toEdge(queryRequest.vertex.innerIdVal, + tgtVertexId.innerId.value, queryParam.label.label, queryParam.direction) } - Future.successful(stepResultLs) + val edgeWithScores = edges.map(e => EdgeWithScore(e, 1.0, queryParam.label)) + StepResult(edgeWithScores, Nil, Nil) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala b/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala index 15f1231..948cdd8 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala @@ -75,7 +75,8 @@ object Conversions { (JsPath \ "serviceId").read[Int] and (JsPath \ "columnName").read[String] and (JsPath \ "columnType").read[String] and - (JsPath \ "schemaVersion").read[String] + (JsPath \ "schemaVersion").read[String] and + (JsPath \ "options").readNullable[String] )(ServiceColumn.apply _) implicit val serviceColumnWrites: Writes[ServiceColumn] = ( @@ -83,7 +84,8 @@ object Conversions { (JsPath \ "serviceId").write[Int] and (JsPath \ "columnName").write[String] and (JsPath \ "columnType").write[String] and - (JsPath \ "schemaVersion").write[String] + (JsPath \ "schemaVersion").write[String] and + (JsPath \ "options").writeNullable[String] )(unlift(ServiceColumn.unapply)) implicit val columnMetaReads: Reads[ColumnMeta] = ( http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala index cca1769..f3ce5e0 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala @@ -60,7 +60,9 @@ object Label extends SQLSyntaxSupport[Label] { select * from labels where label = ${labelName} - and deleted_at is null """.map { rs => Label(rs) }.single.apply() + and deleted_at is null """.map { rs => + Label(rs) + }.single.apply() if (useCache) withCache(cacheKey)(labelOpt) else labelOpt @@ -109,15 +111,17 @@ object Label extends SQLSyntaxSupport[Label] { .map { rs => Label(rs) }.single.apply()) } - def findById(id: Int)(implicit session: DBSession = AutoSession): Label = { + def findById(id: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Label = { val cacheKey = className + "id=" + id - withCache(cacheKey)( - sql""" + lazy val sql = sql""" select * from labels where id = ${id} and deleted_at is null""" - .map { rs => Label(rs) }.single.apply()).get + .map { rs => Label(rs) }.single.apply() + + if (useCache) withCache(cacheKey)(sql).get + else sql.get } def findByTgtColumnId(columnId: Int)(implicit session: DBSession = AutoSession): List[Label] = { @@ -191,8 +195,8 @@ object Label extends SQLSyntaxSupport[Label] { val serviceId = service.id.get /** insert serviceColumn */ - val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, Some(srcColumnType), schemaVersion) - val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, Some(tgtColumnType), schemaVersion) + val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, Some(srcColumnType), schemaVersion, None) + val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, Some(tgtColumnType), schemaVersion, None) if (srcCol.columnType != srcColumnType) throw new RuntimeException(s"source service column type not matched ${srcCol.columnType} != ${srcColumnType}") if (tgtCol.columnType != tgtColumnType) throw new RuntimeException(s"target service column type not matched ${tgtCol.columnType} != ${tgtColumnType}") @@ -259,18 +263,18 @@ object Label extends SQLSyntaxSupport[Label] { cnt } - def updateOption(labelName: String, options: String)(implicit session: DBSession = AutoSession) = { + def updateOption(label: Label, options: String)(implicit session: DBSession = AutoSession) = { scala.util.Try(Json.parse(options)).getOrElse(throw new RuntimeException("invalid Json option")) - logger.info(s"update options of label $labelName, ${options}") - val cnt = sql"""update labels set options = $options where label = $labelName""".update().apply() - val label = Label.findByName(labelName, useCache = false).get + logger.info(s"update options of label ${label.label}, ${options}") + val cnt = sql"""update labels set options = $options where id = ${label.id.get}""".update().apply() + val updatedLabel = findById(label.id.get, useCache = false) val cacheKeys = List(s"id=${label.id.get}", s"label=${label.label}") cacheKeys.foreach { key => expireCache(className + key) expireCaches(className + key) } - cnt + updatedLabel } def delete(id: Int)(implicit session: DBSession = AutoSession) = { @@ -390,12 +394,14 @@ case class Label(id: Option[Int], label: String, lazy val storageConfigOpt: Option[Config] = toStorageConfig - lazy val fetchConfigExist: Boolean = toFetcherConfig.isDefined - def toFetcherConfig: Option[Config] = { Schema.toConfig(extraOptions, "fetcher") } + def toMutatorConfig: Option[Config] = { + Schema.toConfig(extraOptions, "mutator") + } + def toStorageConfig: Option[Config] = { Schema.toConfig(extraOptions, "storage") } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala index cc1698a..61f1a09 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala @@ -19,9 +19,11 @@ package org.apache.s2graph.core.schema +import com.typesafe.config.Config import org.apache.s2graph.core.JSONParser import org.apache.s2graph.core.JSONParser._ import org.apache.s2graph.core.types.{HBaseType, InnerValLike, InnerValLikeWithTs} +import org.apache.s2graph.core.utils.logger import play.api.libs.json.Json import scalikejdbc._ @@ -29,10 +31,11 @@ object ServiceColumn extends SQLSyntaxSupport[ServiceColumn] { import Schema._ val className = ServiceColumn.getClass.getSimpleName - val Default = ServiceColumn(Option(0), -1, "default", "string", "v4") + val Default = ServiceColumn(Option(0), -1, "default", "string", "v4", None) def valueOf(rs: WrappedResultSet): ServiceColumn = { - ServiceColumn(rs.intOpt("id"), rs.int("service_id"), rs.string("column_name"), rs.string("column_type").toLowerCase(), rs.string("schema_version")) + ServiceColumn(rs.intOpt("id"), rs.int("service_id"), rs.string("column_name"), + rs.string("column_type").toLowerCase(), rs.string("schema_version"), rs.stringOpt("options")) } def findByServiceId(serviceId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Seq[ServiceColumn] = { @@ -65,9 +68,9 @@ object ServiceColumn extends SQLSyntaxSupport[ServiceColumn] { """.map { rs => ServiceColumn.valueOf(rs) }.single.apply() } } - def insert(serviceId: Int, columnName: String, columnType: Option[String], schemaVersion: String)(implicit session: DBSession = AutoSession) = { - sql"""insert into service_columns(service_id, column_name, column_type, schema_version) - values(${serviceId}, ${columnName}, ${columnType}, ${schemaVersion})""".execute.apply() + def insert(serviceId: Int, columnName: String, columnType: Option[String], schemaVersion: String, options: Option[String])(implicit session: DBSession = AutoSession) = { + sql"""insert into service_columns(service_id, column_name, column_type, schema_version, options) + values(${serviceId}, ${columnName}, ${columnType}, ${schemaVersion}, ${options})""".execute.apply() } def delete(id: Int)(implicit session: DBSession = AutoSession) = { val serviceColumn = findById(id, useCache = false) @@ -79,11 +82,14 @@ object ServiceColumn extends SQLSyntaxSupport[ServiceColumn] { expireCaches(className + key) } } - def findOrInsert(serviceId: Int, columnName: String, columnType: Option[String], schemaVersion: String = HBaseType.DEFAULT_VERSION, useCache: Boolean = true)(implicit session: DBSession = AutoSession): ServiceColumn = { + def findOrInsert(serviceId: Int, columnName: String, columnType: Option[String], + schemaVersion: String = HBaseType.DEFAULT_VERSION, + options: Option[String], + useCache: Boolean = true)(implicit session: DBSession = AutoSession): ServiceColumn = { find(serviceId, columnName, useCache) match { case Some(sc) => sc case None => - insert(serviceId, columnName, columnType, schemaVersion) + insert(serviceId, columnName, columnType, schemaVersion, options) // val cacheKey = s"serviceId=$serviceId:columnName=$columnName" val cacheKey = "serviceId=" + serviceId + ":columnName=" + columnName expireCache(className + cacheKey) @@ -101,12 +107,29 @@ object ServiceColumn extends SQLSyntaxSupport[ServiceColumn] { ls } + def updateOption(serviceColumn: ServiceColumn, options: String)(implicit session: DBSession = AutoSession) = { + scala.util.Try(Json.parse(options)).getOrElse(throw new RuntimeException("invalid Json option")) + logger.info(s"update options of service column ${serviceColumn.service.serviceName} ${serviceColumn.columnName}, ${options}") + val cnt = sql"""update service_columns set options = $options where id = ${serviceColumn.id.get}""".update().apply() + val column = findById(serviceColumn.id.get, useCache = false) + + val cacheKeys = List(s"id=${column.id.get}", + s"serviceId=${serviceColumn.serviceId}:columnName=${serviceColumn.columnName}") + + cacheKeys.foreach { key => + expireCache(className + key) + expireCaches(className + key) + } + + column + } } case class ServiceColumn(id: Option[Int], serviceId: Int, columnName: String, columnType: String, - schemaVersion: String) { + schemaVersion: String, + options: Option[String]) { lazy val service = Service.findById(serviceId) lazy val metasWithoutCache = ColumnMeta.timestamp +: ColumnMeta.findAllByColumn(id.get, false) :+ ColumnMeta.lastModifiedAtColumn @@ -147,5 +170,13 @@ case class ServiceColumn(id: Option[Int], } } + lazy val extraOptions = Schema.extraOptions(options) + def toFetcherConfig: Option[Config] = { + Schema.toConfig(extraOptions, "fetcher") + } + + def toMutatorConfig: Option[Config] = { + Schema.toConfig(extraOptions, "mutator") + } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticEdgeMutator.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticEdgeMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticEdgeMutator.scala new file mode 100644 index 0000000..4deecf5 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticEdgeMutator.scala @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.s2graph.core.storage + +import org.apache.s2graph.core._ +import org.apache.s2graph.core.schema.LabelMeta +import org.apache.s2graph.core.utils.logger + +import scala.concurrent.{ExecutionContext, Future} + +class DefaultOptimisticEdgeMutator(graph: S2GraphLike, + serDe: StorageSerDe, + optimisticEdgeFetcher: OptimisticEdgeFetcher, + optimisticMutator: OptimisticMutator, + io: StorageIO) extends EdgeMutator { + lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, optimisticMutator, optimisticEdgeFetcher) + + private def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = + optimisticMutator.writeToStorage(cluster, kvs, withWait) + + override def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult, + requestTs: Long, + retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] = { + if (stepInnerResult.isEmpty) Future.successful(true) + else { + val head = stepInnerResult.edgeWithScores.head + val zkQuorum = head.edge.innerLabel.hbaseZkAddr + val futures = for { + edgeWithScore <- stepInnerResult.edgeWithScores + } yield { + val edge = edgeWithScore.edge + + val edgeSnapshot = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs())) + val reversedSnapshotEdgeMutations = serDe.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put)) + + val edgeForward = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs())) + val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge => + serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ + io.buildIncrementsAsync(indexEdge, -1L) + } + + /* reverted direction */ + val edgeRevert = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs())) + val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge => + serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ + io.buildIncrementsAsync(indexEdge, -1L) + } + + val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations + + writeToStorage(zkQuorum, mutations, withWait = true) + } + + Future.sequence(futures).map { rets => rets.forall(_.isSuccess) } + } + } + + override def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = { + val mutations = _edges.flatMap { edge => + val (_, edgeUpdate) = + if (edge.getOp() == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge) + else S2Edge.buildOperation(None, Seq(edge)) + + val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy) + + if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false) + io.buildVertexPutsAsync(edge) ++ io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr + } + + writeToStorage(zkQuorum, mutations, withWait).map { ret => + _edges.zipWithIndex.map { case (edge, idx) => + idx -> ret.isSuccess + } + } + } + + override def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = { + def mutateEdgesInner(edges: Seq[S2EdgeLike], + checkConsistency: Boolean, + withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = { + assert(edges.nonEmpty) + // TODO:: remove after code review: unreachable code + if (!checkConsistency) { + + val futures = edges.map { edge => + val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge)) + + val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy) + val mutations = + io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr + + if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false) + + writeToStorage(zkQuorum, mutations, withWait) + } + Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) } + } else { + optimisticEdgeFetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) => + conflictResolver.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_)) + } + } + } + + val edgeWithIdxs = _edges.zipWithIndex + val grouped = edgeWithIdxs.groupBy { case (edge, idx) => + (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId) + } toSeq + + val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) => + val edges = edgeGroup.map(_._1) + val idxs = edgeGroup.map(_._2) + // After deleteAll, process others + val mutateEdgeFutures = edges.toList match { + case head :: tail => + val edgeFuture = mutateEdgesInner(edges, checkConsistency = true, withWait) + + //TODO: decide what we will do on failure on vertex put + val puts = io.buildVertexPutsAsync(head) + val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait) + Seq(edgeFuture, vertexFuture) + case Nil => Nil + } + + val composed = for { + // deleteRet <- Future.sequence(deleteAllFutures) + mutateRet <- Future.sequence(mutateEdgeFutures) + } yield mutateRet + + composed.map(_.forall(_.isSuccess)).map { ret => idxs.map(idx => idx -> ret) } + } + + Future.sequence(mutateEdges).map { squashedRets => + squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2) + } + } + + override def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = { + val futures = for { + edge <- edges + } yield { + val kvs = for { + relEdge <- edge.relatedEdges + edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid) + } yield { + val countWithTs = edge.propertyValueInner(LabelMeta.count) + val countVal = countWithTs.innerVal.toString().toLong + io.buildIncrementsCountAsync(edgeWithIndex, countVal).head + } + writeToStorage(zkQuorum, kvs, withWait = withWait) + } + + Future.sequence(futures) + } + + override def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] = { + val kvs = io.buildDegreePuts(edge, degreeVal) + + writeToStorage(zkQuorum, kvs, withWait = true) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala deleted file mode 100644 index 82cc27a..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.s2graph.core.storage - -import org.apache.s2graph.core._ -import org.apache.s2graph.core.schema.LabelMeta -import org.apache.s2graph.core.utils.logger - -import scala.concurrent.{ExecutionContext, Future} - -class DefaultOptimisticMutator(graph: S2GraphLike, - serDe: StorageSerDe, - optimisticEdgeFetcher: OptimisticEdgeFetcher, - optimisticMutator: OptimisticMutator) extends VertexMutator with EdgeMutator { - - lazy val io: StorageIO = new StorageIO(graph, serDe) - - lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, optimisticMutator, optimisticEdgeFetcher) - - private def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = - optimisticMutator.writeToStorage(cluster, kvs, withWait) - - def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult, - requestTs: Long, - retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] = { - if (stepInnerResult.isEmpty) Future.successful(true) - else { - val head = stepInnerResult.edgeWithScores.head - val zkQuorum = head.edge.innerLabel.hbaseZkAddr - val futures = for { - edgeWithScore <- stepInnerResult.edgeWithScores - } yield { - val edge = edgeWithScore.edge - - val edgeSnapshot = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs())) - val reversedSnapshotEdgeMutations = serDe.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put)) - - val edgeForward = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs())) - val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge => - serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ - io.buildIncrementsAsync(indexEdge, -1L) - } - - /* reverted direction */ - val edgeRevert = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs())) - val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge => - serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ - io.buildIncrementsAsync(indexEdge, -1L) - } - - val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations - - writeToStorage(zkQuorum, mutations, withWait = true) - } - - Future.sequence(futures).map { rets => rets.forall(_.isSuccess) } - } - } - - def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = { - if (vertex.op == GraphUtil.operations("delete")) { - writeToStorage(zkQuorum, - serDe.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait) - } else if (vertex.op == GraphUtil.operations("deleteAll")) { - logger.info(s"deleteAll for vertex is truncated. $vertex") - Future.successful(MutateResponse.Success) // Ignore withWait parameter, because deleteAll operation may takes long time - } else { - writeToStorage(zkQuorum, io.buildPutsAll(vertex), withWait) - } - } - - def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = { - val mutations = _edges.flatMap { edge => - val (_, edgeUpdate) = - if (edge.getOp() == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge) - else S2Edge.buildOperation(None, Seq(edge)) - - val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy) - - if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false) - io.buildVertexPutsAsync(edge) ++ io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr - } - - writeToStorage(zkQuorum, mutations, withWait).map { ret => - _edges.zipWithIndex.map { case (edge, idx) => - idx -> ret.isSuccess - } - } - } - - def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = { - def mutateEdgesInner(edges: Seq[S2EdgeLike], - checkConsistency: Boolean, - withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = { - assert(edges.nonEmpty) - // TODO:: remove after code review: unreachable code - if (!checkConsistency) { - - val futures = edges.map { edge => - val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge)) - - val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy) - val mutations = - io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr - - if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false) - - writeToStorage(zkQuorum, mutations, withWait) - } - Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) } - } else { - optimisticEdgeFetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) => - conflictResolver.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_)) - } - } - } - - val edgeWithIdxs = _edges.zipWithIndex - val grouped = edgeWithIdxs.groupBy { case (edge, idx) => - (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId) - } toSeq - - val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) => - val edges = edgeGroup.map(_._1) - val idxs = edgeGroup.map(_._2) - // After deleteAll, process others - val mutateEdgeFutures = edges.toList match { - case head :: tail => - val edgeFuture = mutateEdgesInner(edges, checkConsistency = true, withWait) - - //TODO: decide what we will do on failure on vertex put - val puts = io.buildVertexPutsAsync(head) - val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait) - Seq(edgeFuture, vertexFuture) - case Nil => Nil - } - - val composed = for { - // deleteRet <- Future.sequence(deleteAllFutures) - mutateRet <- Future.sequence(mutateEdgeFutures) - } yield mutateRet - - composed.map(_.forall(_.isSuccess)).map { ret => idxs.map(idx => idx -> ret) } - } - - Future.sequence(mutateEdges).map { squashedRets => - squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2) - } - } - - def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = { - val futures = for { - edge <- edges - } yield { - val kvs = for { - relEdge <- edge.relatedEdges - edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid) - } yield { - val countWithTs = edge.propertyValueInner(LabelMeta.count) - val countVal = countWithTs.innerVal.toString().toLong - io.buildIncrementsCountAsync(edgeWithIndex, countVal).head - } - writeToStorage(zkQuorum, kvs, withWait = withWait) - } - - Future.sequence(futures) - } - - def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] = { - val kvs = io.buildDegreePuts(edge, degreeVal) - - writeToStorage(zkQuorum, kvs, withWait = true) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticVertexMutator.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticVertexMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticVertexMutator.scala new file mode 100644 index 0000000..6be619b --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticVertexMutator.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.storage + +import org.apache.s2graph.core.utils.logger +import org.apache.s2graph.core.{GraphUtil, S2GraphLike, S2VertexLike, VertexMutator} + +import scala.concurrent.{ExecutionContext, Future} + +class DefaultOptimisticVertexMutator(graph: S2GraphLike, + serDe: StorageSerDe, + optimisticEdgeFetcher: OptimisticEdgeFetcher, + optimisticMutator: OptimisticMutator, + io: StorageIO) extends VertexMutator { + + def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = { + if (vertex.op == GraphUtil.operations("delete")) { + optimisticMutator.writeToStorage(zkQuorum, + serDe.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait) + } else if (vertex.op == GraphUtil.operations("deleteAll")) { + logger.info(s"deleteAll for vertex is truncated. $vertex") + Future.successful(MutateResponse.Success) // Ignore withWait parameter, because deleteAll operation may takes long time + } else { + optimisticMutator.writeToStorage(zkQuorum, io.buildPutsAll(vertex), withWait) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala index 36ecfcb..bf620bf 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala @@ -42,12 +42,8 @@ abstract class Storage(val graph: S2GraphLike, val edgeFetcher: EdgeFetcher - val edgeBulkFetcher: EdgeBulkFetcher - val vertexFetcher: VertexFetcher - val vertexBulkFetcher: VertexBulkFetcher - val edgeMutator: EdgeMutator val vertexMutator: VertexMutator http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeBulkFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeBulkFetcher.scala deleted file mode 100644 index 3d25dd9..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeBulkFetcher.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.storage.hbase - -import java.util - -import com.typesafe.config.Config -import org.apache.s2graph.core.schema.Label -import org.apache.s2graph.core.storage.serde.Serializable -import org.apache.s2graph.core.{EdgeBulkFetcher, S2EdgeLike, S2Graph, S2GraphLike} -import org.apache.s2graph.core.storage.{CanSKeyValue, StorageIO, StorageSerDe} -import org.apache.s2graph.core.types.HBaseType -import org.apache.s2graph.core.utils.{CanDefer, Extensions} -import org.hbase.async.{HBaseClient, KeyValue} - -import scala.concurrent.{ExecutionContext, Future} - -class AsynchbaseEdgeBulkFetcher(val graph: S2GraphLike, - val config: Config, - val client: HBaseClient, - val serDe: StorageSerDe, - val io: StorageIO) extends EdgeBulkFetcher { - import Extensions.DeferOps - import CanDefer._ - import scala.collection.JavaConverters._ - import AsynchbaseStorage._ - - override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = { - val futures = Label.findAll().groupBy(_.hbaseTableName).toSeq.map { case (hTableName, labels) => - val distinctLabels = labels.toSet - val scan = AsynchbasePatcher.newScanner(client, hTableName) - scan.setFamily(Serializable.edgeCf) - scan.setMaxVersions(1) - - scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map { - case null => Seq.empty - case kvsLs => - kvsLs.asScala.flatMap { kvs => - kvs.asScala.flatMap { kv => - val sKV = implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv) - - serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION) - .fromKeyValues(Seq(kv), None) - .filter(e => distinctLabels(e.innerLabel) && e.getDirection() == "out" && !e.isDegree) - } - } - } - } - - Future.sequence(futures).map(_.flatten) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeFetcher.scala index 4239d15..8eafc68 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeFetcher.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeFetcher.scala @@ -25,12 +25,14 @@ import com.stumbleupon.async.Deferred import com.typesafe.config.Config import org.apache.hadoop.hbase.util.Bytes import org.apache.s2graph.core._ -import org.apache.s2graph.core.storage.{StorageIO, StorageSerDe} +import org.apache.s2graph.core.schema.Label +import org.apache.s2graph.core.storage.serde.Serializable +import org.apache.s2graph.core.storage.{CanSKeyValue, StorageIO, StorageSerDe} import org.apache.s2graph.core.types.{HBaseType, VertexId} import org.apache.s2graph.core.utils.{CanDefer, DeferCache, Extensions, logger} import org.hbase.async._ -import scala.concurrent.ExecutionContext +import scala.concurrent.{ExecutionContext, Future} class AsynchbaseEdgeFetcher(val graph: S2GraphLike, val config: Config, @@ -64,6 +66,31 @@ class AsynchbaseEdgeFetcher(val graph: S2GraphLike, }.toFuture(emptyStepResult).map(_.asScala) } + override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = { + val futures = Label.findAll().groupBy(_.hbaseTableName).toSeq.map { case (hTableName, labels) => + val distinctLabels = labels.toSet + val scan = AsynchbasePatcher.newScanner(client, hTableName) + scan.setFamily(Serializable.edgeCf) + scan.setMaxVersions(1) + + scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map { + case null => Seq.empty + case kvsLs => + kvsLs.asScala.flatMap { kvs => + kvs.asScala.flatMap { kv => + val sKV = implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv) + + serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION) + .fromKeyValues(Seq(kv), None) + .filter(e => distinctLabels(e.innerLabel) && e.getDirection() == "out" && !e.isDegree) + } + } + } + } + + Future.sequence(futures).map(_.flatten) + } + /** * we are using future cache to squash requests into same key on storage. * http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala index f65ee20..89303e6 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -323,17 +323,14 @@ class AsynchbaseStorage(override val graph: S2GraphLike, private lazy val optimisticEdgeFetcher = new AsynchbaseOptimisticEdgeFetcher(client, serDe, io) private lazy val optimisticMutator = new AsynchbaseOptimisticMutator(graph, serDe, optimisticEdgeFetcher, client, clientWithFlush) - private lazy val _mutator = new DefaultOptimisticMutator(graph, serDe, optimisticEdgeFetcher, optimisticMutator) override val management: StorageManagement = new AsynchbaseStorageManagement(config, clients) override val serDe: StorageSerDe = new AsynchbaseStorageSerDe(graph) override val edgeFetcher: EdgeFetcher = new AsynchbaseEdgeFetcher(graph, config, client, serDe, io) - override val edgeBulkFetcher: EdgeBulkFetcher = new AsynchbaseEdgeBulkFetcher(graph, config, client, serDe, io) override val vertexFetcher: VertexFetcher = new AsynchbaseVertexFetcher(graph, config, client, serDe, io) - override val vertexBulkFetcher: VertexBulkFetcher = new AsynchbaseVertexBulkFetcher(graph, config, client, serDe, io) - override val edgeMutator: EdgeMutator = _mutator - override val vertexMutator: VertexMutator = _mutator + override val edgeMutator: EdgeMutator = new DefaultOptimisticEdgeMutator(graph, serDe, optimisticEdgeFetcher, optimisticMutator, io) + override val vertexMutator: VertexMutator = new DefaultOptimisticVertexMutator(graph, serDe, optimisticEdgeFetcher, optimisticMutator, io) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexBulkFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexBulkFetcher.scala deleted file mode 100644 index e6bf4e6..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexBulkFetcher.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.storage.hbase - -import com.typesafe.config.Config -import org.apache.s2graph.core.schema.ServiceColumn -import org.apache.s2graph.core.storage.serde.Serializable -import org.apache.s2graph.core.storage.{StorageIO, StorageSerDe} -import org.apache.s2graph.core.types.HBaseType -import org.apache.s2graph.core.utils.Extensions -import org.apache.s2graph.core.{S2Graph, S2GraphLike, VertexBulkFetcher} -import org.hbase.async.HBaseClient - -import scala.concurrent.{ExecutionContext, Future} - -class AsynchbaseVertexBulkFetcher(val graph: S2GraphLike, - val config: Config, - val client: HBaseClient, - val serDe: StorageSerDe, - val io: StorageIO) extends VertexBulkFetcher { - - import AsynchbaseStorage._ - import Extensions.DeferOps - - import scala.collection.JavaConverters._ - - override def fetchVerticesAll()(implicit ec: ExecutionContext) = { - val futures = ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.map { case (hTableName, columns) => - val distinctColumns = columns.toSet - val scan = AsynchbasePatcher.newScanner(client, hTableName) - scan.setFamily(Serializable.vertexCf) - scan.setMaxVersions(1) - - scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map { - case null => Seq.empty - case kvsLs => - kvsLs.asScala.flatMap { kvs => - serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(kvs.asScala, None) - .filter(v => distinctColumns(v.serviceColumn)) - } - } - } - Future.sequence(futures).map(_.flatten) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala index 560dd2b..f16c8e9 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala @@ -21,7 +21,11 @@ package org.apache.s2graph.core.storage.hbase import com.typesafe.config.Config import org.apache.s2graph.core._ +import org.apache.s2graph.core.schema.ServiceColumn +import org.apache.s2graph.core.storage.serde.Serializable import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageSerDe} +import org.apache.s2graph.core.types.HBaseType +import org.apache.s2graph.core.utils.Extensions import org.hbase.async.HBaseClient import scala.concurrent.{ExecutionContext, Future} @@ -32,6 +36,9 @@ class AsynchbaseVertexFetcher(val graph: S2GraphLike, val serDe: StorageSerDe, val io: StorageIO) extends VertexFetcher { import AsynchbaseStorage._ + import Extensions.DeferOps + import scala.collection.JavaConverters._ + private def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = { val rpc = buildRequest(serDe, queryRequest, vertex) @@ -58,4 +65,23 @@ class AsynchbaseVertexFetcher(val graph: S2GraphLike, Future.sequence(futures).map(_.flatten) } + + override def fetchVerticesAll()(implicit ec: ExecutionContext) = { + val futures = ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.map { case (hTableName, columns) => + val distinctColumns = columns.toSet + val scan = AsynchbasePatcher.newScanner(client, hTableName) + scan.setFamily(Serializable.vertexCf) + scan.setMaxVersions(1) + + scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map { + case null => Seq.empty + case kvsLs => + kvsLs.asScala.flatMap { kvs => + serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(kvs.asScala, None) + .filter(v => distinctColumns(v.serviceColumn)) + } + } + } + Future.sequence(futures).map(_.flatten) + } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeBulkFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeBulkFetcher.scala deleted file mode 100644 index 2ca4b35..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeBulkFetcher.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.storage.rocks - -import com.typesafe.config.Config -import org.apache.s2graph.core.schema.Label -import org.apache.s2graph.core.{EdgeBulkFetcher, S2EdgeLike, S2GraphLike} -import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageSerDe} -import org.apache.s2graph.core.types.HBaseType -import org.rocksdb.RocksDB - -import scala.collection.mutable.ArrayBuffer -import scala.concurrent.{ExecutionContext, Future} - -class RocksEdgeBulkFetcher(val graph: S2GraphLike, - val config: Config, - val db: RocksDB, - val vdb: RocksDB, - val serDe: StorageSerDe, - val io: StorageIO) extends EdgeBulkFetcher { - import RocksStorage._ - - override def fetchEdgesAll()(implicit ec: ExecutionContext) = { - val edges = new ArrayBuffer[S2EdgeLike]() - Label.findAll().groupBy(_.hbaseTableName).toSeq.foreach { case (hTableName, labels) => - val distinctLabels = labels.toSet - - val iter = db.newIterator() - try { - iter.seekToFirst() - while (iter.isValid) { - val kv = SKeyValue(table, iter.key(), SKeyValue.EdgeCf, qualifier, iter.value, System.currentTimeMillis()) - - serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(Seq(kv), None) - .filter(e => distinctLabels(e.innerLabel) && e.getDirection() == "out" && !e.isDegree) - .foreach { edge => - edges += edge - } - - - iter.next() - } - - } finally { - iter.close() - } - } - - Future.successful(edges) - } -}