update SafeUpdateCache constructor.

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/d403ff44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/d403ff44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/d403ff44

Branch: refs/heads/master
Commit: d403ff4404fe002b1daa74b03bc7247f5ee39696
Parents: 1a342bb
Author: DO YUNG YOON <steams...@apache.org>
Authored: Thu Apr 26 11:57:00 2018 +0900
Committer: DO YUNG YOON <steams...@apache.org>
Committed: Thu Apr 26 11:57:00 2018 +0900

----------------------------------------------------------------------
 .../org/apache/s2graph/core/schema/Schema.scala |   8 +-
 .../s2graph/core/utils/SafeUpdateCache.scala    |  15 +-
 .../org/apache/s2graph/core/utils/Sync.scala    | 181 -------------------
 s2core/src/test/resources/reference.conf        |  61 -------
 .../apache/s2graph/core/schema/SchemaTest.scala |   6 +-
 .../apache/s2graph/graphql/GraphQLServer.scala  |   8 +-
 6 files changed, 23 insertions(+), 256 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d403ff44/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala
index 5b9e0ae..ebae966 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala
@@ -45,9 +45,6 @@ object Schema {
   private val ModelReferenceCount = new AtomicLong(0L)
 
   def apply(config: Config) = {
-    maxSize = config.getInt("cache.max.size")
-    ttl = config.getInt("cache.ttl.seconds")
-
     Class.forName(config.getString("db.default.driver"))
 
     val settings = ConnectionPoolSettings(
@@ -63,7 +60,7 @@ object Schema {
       settings)
 
     checkSchema()
-    safeUpdateCache = new SafeUpdateCache(maxSize, ttl)(ec)
+    safeUpdateCache = new SafeUpdateCache(config)(ec)
     ModelReferenceCount.incrementAndGet()
   }
 
@@ -220,6 +217,7 @@ object Schema {
 
   def toBytes(): Array[Byte] = safeUpdateCache.toBytes()
 
-  def fromBytes(safeUpdateCache: SafeUpdateCache, bytes: Array[Byte]): Unit = 
SafeUpdateCache.fromBytes(safeUpdateCache, bytes)
+  def fromBytes(config: Config, bytes: Array[Byte])(implicit ec: 
ExecutionContext): SafeUpdateCache =
+    SafeUpdateCache.fromBytes(config, bytes)
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d403ff44/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
index cc9bd4b..7d15078 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
@@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicBoolean
 
 import com.google.common.cache.CacheBuilder
 import com.google.common.hash.Hashing
+import com.typesafe.config.Config
+
 import scala.collection.JavaConversions._
 import scala.concurrent.{ExecutionContext, Future}
 import scala.util.{Failure, Success, Try}
@@ -31,6 +33,8 @@ import scala.util.{Failure, Success, Try}
 object SafeUpdateCache {
 
   case class CacheKey(key: String)
+  val MaxSizeKey = "cache.max.size"
+  val TtlKey = "cache.ttl.seconds"
 
   def serialise(value: AnyRef): Try[Array[Byte]] = {
     import scala.pickling.Defaults._
@@ -51,8 +55,9 @@ object SafeUpdateCache {
     Try(BinaryPickle(bytes).unpickle[AnyRef])
   }
 
-  def fromBytes(cache: SafeUpdateCache, bytes: Array[Byte]): Unit = {
+  def fromBytes(config: Config, bytes: Array[Byte])(implicit ec: 
ExecutionContext): SafeUpdateCache = {
     import org.apache.hadoop.io.WritableUtils
+    val cache: SafeUpdateCache = new SafeUpdateCache(config)
 
     val bais = new ByteArrayInputStream(bytes)
     val input = new DataInputStream(bais)
@@ -70,16 +75,18 @@ object SafeUpdateCache {
       bais.close()
       input.close()
     }
+
+    cache
   }
 }
 
-class SafeUpdateCache(maxSize: Int,
-                      ttl: Int)
+class SafeUpdateCache(val config: Config)
                      (implicit executionContext: ExecutionContext) {
 
   import java.lang.{Long => JLong}
   import SafeUpdateCache._
-
+  val maxSize = config.getInt(SafeUpdateCache.MaxSizeKey)
+  val ttl = config.getInt(SafeUpdateCache.TtlKey)
   private val cache = CacheBuilder.newBuilder().maximumSize(maxSize)
     .build[JLong, (AnyRef, Int, AtomicBoolean)]()
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d403ff44/s2core/src/main/scala/org/apache/s2graph/core/utils/Sync.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/Sync.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/utils/Sync.scala
deleted file mode 100644
index fb5dbe6..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/utils/Sync.scala
+++ /dev/null
@@ -1,181 +0,0 @@
-//package org.apache.s2graph.core.utils
-//
-//import java.util.Properties
-//
-//import io.reactivex._
-//import io.reactivex.schedulers.Schedulers
-//import org.apache.kafka.clients.consumer.KafkaConsumer
-//import org.apache.kafka.clients.producer.{Callback, KafkaProducer, 
ProducerRecord, RecordMetadata}
-//import org.apache.s2graph.core.ExceptionHandler
-//
-//
-//trait Sync[VALUE] {
-//  type KV = (String, VALUE)
-//
-//  val EmptyValue: VALUE
-//  val SyncId: String
-//  val SkipSelfBroadcast: Boolean
-//
-//  val delim = "___" // ! do not change
-//
-//  // public
-//  val emitter = io.reactivex.processors.PublishProcessor.create[KV]()
-//
-//  def send(key: String): Unit = send(key, EmptyValue)
-//
-//  def send(key: String, value: VALUE): Unit = sendImpl(encodeKey(key), value)
-//
-//  def shutdown(): Unit = {
-//    emitter.onComplete()
-//  }
-//
-//  protected def emit(kv: KV): Unit = {
-//    val (encodedKey, value) = kv
-//    val (sid, key) = decodeKey(encodedKey)
-//
-//    if (SkipSelfBroadcast && sid == SyncId) {
-//      logger.syncInfo(s"[Message ignore] sent by self: ${SyncId}")
-//      // pass
-//    } else {
-//      emitter.onNext(key -> value)
-//      logger.syncInfo(s"[Message emit success]: selfId: ${SyncId}, from: 
${sid}")
-//    }
-//  }
-//
-//  protected def sendImpl(key: String, value: VALUE): Unit
-//
-//  private def encodeKey(key: String): String = s"${SyncId}${delim}${key}"
-//
-//  private def decodeKey(key: String): (String, String) = {
-//    val Array(cid, decodedKey) = key.split(delim)
-//    cid -> decodedKey
-//  }
-//}
-//
-//class MemorySync(syncId: String) extends Sync[Array[Byte]] {
-//
-//  override val EmptyValue: Array[Byte] = Array.empty[Byte]
-//
-//  override val SyncId: String = syncId
-//
-//  override val SkipSelfBroadcast: Boolean = false
-//
-//  override protected def sendImpl(key: String, value: Array[Byte]): Unit = 
emit(key -> value)
-//}
-//
-//object KafkaSync {
-//
-//  import java.util.Properties
-//
-//  val keySerializer = 
"org.apache.kafka.common.serialization.StringSerializer"
-//  val valueSerializer = 
"org.apache.kafka.common.serialization.ByteArraySerializer"
-//  val keyDeserializer = 
"org.apache.kafka.common.serialization.StringDeserializer"
-//  val valueDeserializer = 
"org.apache.kafka.common.serialization.ByteArrayDeserializer"
-//
-//  val maxMessageSize = "15728640" // 15 MB
-//
-//  def producerConfig(brokers: String): Properties = {
-//    val props = ExceptionHandler.toKafkaProp(brokers)
-//
-//    props.setProperty("key.serializer", keySerializer)
-//    props.setProperty("value.serializer", valueSerializer)
-//    props.setProperty("message.max.bytes", maxMessageSize)
-//    props.setProperty("max.request.size", maxMessageSize)
-//
-//
-//    props
-//  }
-//
-//  def consumerConfig(brokers: String, groupId: String): Properties = {
-//    val props = new Properties()
-//
-//    props.put("bootstrap.servers", brokers)
-//    props.put("group.id", groupId)
-//    props.put("enable.auto.commit", "false")
-//    props.put("key.deserializer", keyDeserializer)
-//    props.put("value.deserializer", valueDeserializer)
-//    props.put("max.partition.fetch.bytes", maxMessageSize)
-//    props.put("fetch.message.max.bytes", maxMessageSize)
-//
-//    props
-//  }
-//}
-//
-//class KafkaSync(topic: String,
-//                syncId: String,
-//                producerConfig: Properties,
-//                consumerConfig: Properties,
-//                skipSelfBroadcast: Boolean = true,
-//                seekTo: String = "end"
-//               ) extends Sync[Array[Byte]] {
-//
-//  type VALUE = Array[Byte]
-//
-//  val consumerTimeout: Int = 1000
-//
-//  override val EmptyValue = Array.empty[Byte]
-//
-//  override val SyncId = syncId
-//
-//  override val SkipSelfBroadcast: Boolean = skipSelfBroadcast
-//
-//  lazy val producer = new KafkaProducer[String, VALUE](producerConfig)
-//
-//  lazy val consumer = {
-//    import scala.collection.JavaConverters._
-//
-//    val kc = new KafkaConsumer[String, VALUE](consumerConfig)
-//    kc.subscribe(Seq(topic).asJava)
-//    kc.poll(consumerTimeout * 10) // Just for meta info sync
-//
-//    if (seekTo == "end") {
-//      kc.seekToEnd(kc.assignment())
-//    } else if (seekTo == "beginning") {
-//      kc.seekToBeginning(kc.assignment())
-//    } else {
-//      // pass
-//    }
-//
-//    kc
-//  }
-//
-//  // Emit event from kafka consumer: Flowable is just for while loop, not 
for Observabl
-//  Flowable.create(new FlowableOnSubscribe[KV] {
-//    import scala.collection.JavaConverters._
-//
-//    override def subscribe(e: FlowableEmitter[KV]) = {
-//      while (true) {
-//        val ls = consumer.poll(consumerTimeout).asScala.map(record => 
record.key() -> record.value())
-//        ls.foreach(emit)
-//
-//        if (ls.nonEmpty) {
-//          consumer.commitSync()
-//          logger.syncInfo(s"[Kafka consume success]: message size: 
${ls.size}]")
-//        }
-//      }
-//
-//      e.onComplete()
-//    }
-//  }, 
BackpressureStrategy.BUFFER).subscribeOn(Schedulers.single()).subscribe()
-//
-//  override protected def sendImpl(key: String, value: VALUE): Unit = {
-//    val record = new ProducerRecord[String, VALUE](topic, key, value)
-//
-//    producer.send(record, new Callback {
-//      override def onCompletion(metadata: RecordMetadata, exception: 
Exception) = {
-//        if (exception != null) {
-//          logger.syncInfo(s"[Kafka produce failed]: from: key: ${key} e: 
${exception}")
-//        } else {
-//          logger.syncInfo(s"[Kafka produce success]: from: ${SyncId} 
${metadata}")
-//        }
-//      }
-//    })
-//  }
-//
-//  override def shutdown(): Unit = {
-//    super.shutdown()
-//
-//    producer.close()
-//    consumer.close()
-//  }
-//}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d403ff44/s2core/src/test/resources/reference.conf
----------------------------------------------------------------------
diff --git a/s2core/src/test/resources/reference.conf 
b/s2core/src/test/resources/reference.conf
deleted file mode 100644
index d7fe546..0000000
--- a/s2core/src/test/resources/reference.conf
+++ /dev/null
@@ -1,61 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# APP PHASE
-phase = test
-
-host = localhost
-
-# Hbase
-hbase.table.compression.algorithm="gz"
-hbase.zookeeper.quorum=${host}
-
-# Asynchbase
-hbase.client.retries.number=100
-hbase.rpcs.buffered_flush_interval=100
-hbase.rpc.timeout=0
-
-# local retry number
-max.retry.number=100
-max.back.off=50
-
-# Future cache.
-future.cache.max.size=100000
-future.cache.expire.after.write=10000
-future.cache.expire.after.access=5000
-future.cache.metric.interval=60000
-
-# Local Cache
-cache.ttl.seconds=60
-cache.max.size=100000
-
-# DB
-s2graph.models.table.name = "models-dev"
-
-db.default.driver = "org.h2.Driver"
-db.default.url = "jdbc:h2:file:./var/metastore;MODE=MYSQL"
-db.default.user = "sa"
-db.default.password = "sa"
-
-
-akka {
-  loggers = ["akka.event.slf4j.Slf4jLogger"]
-  loglevel = "DEBUG"
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d403ff44/s2core/src/test/scala/org/apache/s2graph/core/schema/SchemaTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/schema/SchemaTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/schema/SchemaTest.scala
index 1843a57..018c730 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/schema/SchemaTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/schema/SchemaTest.scala
@@ -20,9 +20,10 @@
 package org.apache.s2graph.core.schema
 
 import org.apache.s2graph.core.TestCommonWithModels
-import org.apache.s2graph.core.utils.SafeUpdateCache
 import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
 
+import scala.concurrent.ExecutionContext
+
 class SchemaTest extends FunSuite with Matchers with TestCommonWithModels with 
BeforeAndAfterAll {
   override def beforeAll(): Unit = {
     initTests()
@@ -60,8 +61,7 @@ class SchemaTest extends FunSuite with Matchers with 
TestCommonWithModels with B
   test("serialize/deserialize Schema.") {
     import scala.collection.JavaConverters._
     val originalMap = Schema.safeUpdateCache.asMap().asScala
-    val newCache = new SafeUpdateCache(Schema.maxSize, 
Schema.ttl)(scala.concurrent.ExecutionContext.Implicits.global)
-    Schema.fromBytes(newCache, Schema.toBytes())
+    val newCache = Schema.fromBytes(config, 
Schema.toBytes())(ExecutionContext.Implicits.global)
     val newMap = newCache.asMap().asScala
 
     originalMap.size shouldBe newMap.size

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d403ff44/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
----------------------------------------------------------------------
diff --git 
a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala 
b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
index ddb21b2..391a99f 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
@@ -39,11 +39,11 @@ import sangria.parser.QueryParser
 import sangria.schema.Schema
 import spray.json.{JsObject, JsString}
 
+import scala.collection.JavaConverters._
 import scala.concurrent.ExecutionContext
 import scala.util.{Failure, Success, Try}
 
 object GraphQLServer {
-
   val className = Schema.getClass.getName
   val logger = LoggerFactory.getLogger(this.getClass)
 
@@ -57,7 +57,11 @@ object GraphQLServer {
   val s2graph = new S2Graph(config)
   val schemaCacheTTL = Try(config.getInt("schemaCacheTTL")).getOrElse(-1)
   val s2Repository = new GraphRepository(s2graph)
-  val schemaCache = new SafeUpdateCache(1, ttl = schemaCacheTTL)
+  val schemaConfig = ConfigFactory.parseMap(Map(
+    SafeUpdateCache.MaxSizeKey -> 1, SafeUpdateCache.TtlKey -> schemaCacheTTL
+  ).asJava)
+
+  val schemaCache = new SafeUpdateCache(schemaConfig)
 
   def endpoint(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): 
Route = {
 

Reply via email to