update SafeUpdateCache constructor.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/d403ff44 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/d403ff44 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/d403ff44 Branch: refs/heads/master Commit: d403ff4404fe002b1daa74b03bc7247f5ee39696 Parents: 1a342bb Author: DO YUNG YOON <steams...@apache.org> Authored: Thu Apr 26 11:57:00 2018 +0900 Committer: DO YUNG YOON <steams...@apache.org> Committed: Thu Apr 26 11:57:00 2018 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/core/schema/Schema.scala | 8 +- .../s2graph/core/utils/SafeUpdateCache.scala | 15 +- .../org/apache/s2graph/core/utils/Sync.scala | 181 ------------------- s2core/src/test/resources/reference.conf | 61 ------- .../apache/s2graph/core/schema/SchemaTest.scala | 6 +- .../apache/s2graph/graphql/GraphQLServer.scala | 8 +- 6 files changed, 23 insertions(+), 256 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d403ff44/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala index 5b9e0ae..ebae966 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala @@ -45,9 +45,6 @@ object Schema { private val ModelReferenceCount = new AtomicLong(0L) def apply(config: Config) = { - maxSize = config.getInt("cache.max.size") - ttl = config.getInt("cache.ttl.seconds") - Class.forName(config.getString("db.default.driver")) val settings = ConnectionPoolSettings( @@ -63,7 +60,7 @@ object Schema { settings) checkSchema() - safeUpdateCache = new SafeUpdateCache(maxSize, ttl)(ec) + safeUpdateCache = new SafeUpdateCache(config)(ec) ModelReferenceCount.incrementAndGet() } @@ -220,6 +217,7 @@ object Schema { def toBytes(): Array[Byte] = safeUpdateCache.toBytes() - def fromBytes(safeUpdateCache: SafeUpdateCache, bytes: Array[Byte]): Unit = SafeUpdateCache.fromBytes(safeUpdateCache, bytes) + def fromBytes(config: Config, bytes: Array[Byte])(implicit ec: ExecutionContext): SafeUpdateCache = + SafeUpdateCache.fromBytes(config, bytes) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d403ff44/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala index cc9bd4b..7d15078 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala @@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicBoolean import com.google.common.cache.CacheBuilder import com.google.common.hash.Hashing +import com.typesafe.config.Config + import scala.collection.JavaConversions._ import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success, Try} @@ -31,6 +33,8 @@ import scala.util.{Failure, Success, Try} object SafeUpdateCache { case class CacheKey(key: String) + val MaxSizeKey = "cache.max.size" + val TtlKey = "cache.ttl.seconds" def serialise(value: AnyRef): Try[Array[Byte]] = { import scala.pickling.Defaults._ @@ -51,8 +55,9 @@ object SafeUpdateCache { Try(BinaryPickle(bytes).unpickle[AnyRef]) } - def fromBytes(cache: SafeUpdateCache, bytes: Array[Byte]): Unit = { + def fromBytes(config: Config, bytes: Array[Byte])(implicit ec: ExecutionContext): SafeUpdateCache = { import org.apache.hadoop.io.WritableUtils + val cache: SafeUpdateCache = new SafeUpdateCache(config) val bais = new ByteArrayInputStream(bytes) val input = new DataInputStream(bais) @@ -70,16 +75,18 @@ object SafeUpdateCache { bais.close() input.close() } + + cache } } -class SafeUpdateCache(maxSize: Int, - ttl: Int) +class SafeUpdateCache(val config: Config) (implicit executionContext: ExecutionContext) { import java.lang.{Long => JLong} import SafeUpdateCache._ - + val maxSize = config.getInt(SafeUpdateCache.MaxSizeKey) + val ttl = config.getInt(SafeUpdateCache.TtlKey) private val cache = CacheBuilder.newBuilder().maximumSize(maxSize) .build[JLong, (AnyRef, Int, AtomicBoolean)]() http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d403ff44/s2core/src/main/scala/org/apache/s2graph/core/utils/Sync.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/Sync.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/Sync.scala deleted file mode 100644 index fb5dbe6..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/utils/Sync.scala +++ /dev/null @@ -1,181 +0,0 @@ -//package org.apache.s2graph.core.utils -// -//import java.util.Properties -// -//import io.reactivex._ -//import io.reactivex.schedulers.Schedulers -//import org.apache.kafka.clients.consumer.KafkaConsumer -//import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata} -//import org.apache.s2graph.core.ExceptionHandler -// -// -//trait Sync[VALUE] { -// type KV = (String, VALUE) -// -// val EmptyValue: VALUE -// val SyncId: String -// val SkipSelfBroadcast: Boolean -// -// val delim = "___" // ! do not change -// -// // public -// val emitter = io.reactivex.processors.PublishProcessor.create[KV]() -// -// def send(key: String): Unit = send(key, EmptyValue) -// -// def send(key: String, value: VALUE): Unit = sendImpl(encodeKey(key), value) -// -// def shutdown(): Unit = { -// emitter.onComplete() -// } -// -// protected def emit(kv: KV): Unit = { -// val (encodedKey, value) = kv -// val (sid, key) = decodeKey(encodedKey) -// -// if (SkipSelfBroadcast && sid == SyncId) { -// logger.syncInfo(s"[Message ignore] sent by self: ${SyncId}") -// // pass -// } else { -// emitter.onNext(key -> value) -// logger.syncInfo(s"[Message emit success]: selfId: ${SyncId}, from: ${sid}") -// } -// } -// -// protected def sendImpl(key: String, value: VALUE): Unit -// -// private def encodeKey(key: String): String = s"${SyncId}${delim}${key}" -// -// private def decodeKey(key: String): (String, String) = { -// val Array(cid, decodedKey) = key.split(delim) -// cid -> decodedKey -// } -//} -// -//class MemorySync(syncId: String) extends Sync[Array[Byte]] { -// -// override val EmptyValue: Array[Byte] = Array.empty[Byte] -// -// override val SyncId: String = syncId -// -// override val SkipSelfBroadcast: Boolean = false -// -// override protected def sendImpl(key: String, value: Array[Byte]): Unit = emit(key -> value) -//} -// -//object KafkaSync { -// -// import java.util.Properties -// -// val keySerializer = "org.apache.kafka.common.serialization.StringSerializer" -// val valueSerializer = "org.apache.kafka.common.serialization.ByteArraySerializer" -// val keyDeserializer = "org.apache.kafka.common.serialization.StringDeserializer" -// val valueDeserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer" -// -// val maxMessageSize = "15728640" // 15 MB -// -// def producerConfig(brokers: String): Properties = { -// val props = ExceptionHandler.toKafkaProp(brokers) -// -// props.setProperty("key.serializer", keySerializer) -// props.setProperty("value.serializer", valueSerializer) -// props.setProperty("message.max.bytes", maxMessageSize) -// props.setProperty("max.request.size", maxMessageSize) -// -// -// props -// } -// -// def consumerConfig(brokers: String, groupId: String): Properties = { -// val props = new Properties() -// -// props.put("bootstrap.servers", brokers) -// props.put("group.id", groupId) -// props.put("enable.auto.commit", "false") -// props.put("key.deserializer", keyDeserializer) -// props.put("value.deserializer", valueDeserializer) -// props.put("max.partition.fetch.bytes", maxMessageSize) -// props.put("fetch.message.max.bytes", maxMessageSize) -// -// props -// } -//} -// -//class KafkaSync(topic: String, -// syncId: String, -// producerConfig: Properties, -// consumerConfig: Properties, -// skipSelfBroadcast: Boolean = true, -// seekTo: String = "end" -// ) extends Sync[Array[Byte]] { -// -// type VALUE = Array[Byte] -// -// val consumerTimeout: Int = 1000 -// -// override val EmptyValue = Array.empty[Byte] -// -// override val SyncId = syncId -// -// override val SkipSelfBroadcast: Boolean = skipSelfBroadcast -// -// lazy val producer = new KafkaProducer[String, VALUE](producerConfig) -// -// lazy val consumer = { -// import scala.collection.JavaConverters._ -// -// val kc = new KafkaConsumer[String, VALUE](consumerConfig) -// kc.subscribe(Seq(topic).asJava) -// kc.poll(consumerTimeout * 10) // Just for meta info sync -// -// if (seekTo == "end") { -// kc.seekToEnd(kc.assignment()) -// } else if (seekTo == "beginning") { -// kc.seekToBeginning(kc.assignment()) -// } else { -// // pass -// } -// -// kc -// } -// -// // Emit event from kafka consumer: Flowable is just for while loop, not for Observabl -// Flowable.create(new FlowableOnSubscribe[KV] { -// import scala.collection.JavaConverters._ -// -// override def subscribe(e: FlowableEmitter[KV]) = { -// while (true) { -// val ls = consumer.poll(consumerTimeout).asScala.map(record => record.key() -> record.value()) -// ls.foreach(emit) -// -// if (ls.nonEmpty) { -// consumer.commitSync() -// logger.syncInfo(s"[Kafka consume success]: message size: ${ls.size}]") -// } -// } -// -// e.onComplete() -// } -// }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.single()).subscribe() -// -// override protected def sendImpl(key: String, value: VALUE): Unit = { -// val record = new ProducerRecord[String, VALUE](topic, key, value) -// -// producer.send(record, new Callback { -// override def onCompletion(metadata: RecordMetadata, exception: Exception) = { -// if (exception != null) { -// logger.syncInfo(s"[Kafka produce failed]: from: key: ${key} e: ${exception}") -// } else { -// logger.syncInfo(s"[Kafka produce success]: from: ${SyncId} ${metadata}") -// } -// } -// }) -// } -// -// override def shutdown(): Unit = { -// super.shutdown() -// -// producer.close() -// consumer.close() -// } -//} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d403ff44/s2core/src/test/resources/reference.conf ---------------------------------------------------------------------- diff --git a/s2core/src/test/resources/reference.conf b/s2core/src/test/resources/reference.conf deleted file mode 100644 index d7fe546..0000000 --- a/s2core/src/test/resources/reference.conf +++ /dev/null @@ -1,61 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# APP PHASE -phase = test - -host = localhost - -# Hbase -hbase.table.compression.algorithm="gz" -hbase.zookeeper.quorum=${host} - -# Asynchbase -hbase.client.retries.number=100 -hbase.rpcs.buffered_flush_interval=100 -hbase.rpc.timeout=0 - -# local retry number -max.retry.number=100 -max.back.off=50 - -# Future cache. -future.cache.max.size=100000 -future.cache.expire.after.write=10000 -future.cache.expire.after.access=5000 -future.cache.metric.interval=60000 - -# Local Cache -cache.ttl.seconds=60 -cache.max.size=100000 - -# DB -s2graph.models.table.name = "models-dev" - -db.default.driver = "org.h2.Driver" -db.default.url = "jdbc:h2:file:./var/metastore;MODE=MYSQL" -db.default.user = "sa" -db.default.password = "sa" - - -akka { - loggers = ["akka.event.slf4j.Slf4jLogger"] - loglevel = "DEBUG" -} - http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d403ff44/s2core/src/test/scala/org/apache/s2graph/core/schema/SchemaTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/schema/SchemaTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/schema/SchemaTest.scala index 1843a57..018c730 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/schema/SchemaTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/schema/SchemaTest.scala @@ -20,9 +20,10 @@ package org.apache.s2graph.core.schema import org.apache.s2graph.core.TestCommonWithModels -import org.apache.s2graph.core.utils.SafeUpdateCache import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} +import scala.concurrent.ExecutionContext + class SchemaTest extends FunSuite with Matchers with TestCommonWithModels with BeforeAndAfterAll { override def beforeAll(): Unit = { initTests() @@ -60,8 +61,7 @@ class SchemaTest extends FunSuite with Matchers with TestCommonWithModels with B test("serialize/deserialize Schema.") { import scala.collection.JavaConverters._ val originalMap = Schema.safeUpdateCache.asMap().asScala - val newCache = new SafeUpdateCache(Schema.maxSize, Schema.ttl)(scala.concurrent.ExecutionContext.Implicits.global) - Schema.fromBytes(newCache, Schema.toBytes()) + val newCache = Schema.fromBytes(config, Schema.toBytes())(ExecutionContext.Implicits.global) val newMap = newCache.asMap().asScala originalMap.size shouldBe newMap.size http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d403ff44/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala index ddb21b2..391a99f 100644 --- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala +++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala @@ -39,11 +39,11 @@ import sangria.parser.QueryParser import sangria.schema.Schema import spray.json.{JsObject, JsString} +import scala.collection.JavaConverters._ import scala.concurrent.ExecutionContext import scala.util.{Failure, Success, Try} object GraphQLServer { - val className = Schema.getClass.getName val logger = LoggerFactory.getLogger(this.getClass) @@ -57,7 +57,11 @@ object GraphQLServer { val s2graph = new S2Graph(config) val schemaCacheTTL = Try(config.getInt("schemaCacheTTL")).getOrElse(-1) val s2Repository = new GraphRepository(s2graph) - val schemaCache = new SafeUpdateCache(1, ttl = schemaCacheTTL) + val schemaConfig = ConfigFactory.parseMap(Map( + SafeUpdateCache.MaxSizeKey -> 1, SafeUpdateCache.TtlKey -> schemaCacheTTL + ).asJava) + + val schemaCache = new SafeUpdateCache(schemaConfig) def endpoint(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Route = {