- rename mysqls package to schema.
- remove GlobalIndex.
- add toBytes, fromBytes on Schema.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/8a388a42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/8a388a42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/8a388a42

Branch: refs/heads/master
Commit: 8a388a4278660ce47cc36ecf4336276ad54349b2
Parents: a18ec27
Author: DO YUNG YOON <steams...@apache.org>
Authored: Wed Apr 25 16:29:40 2018 +0900
Committer: DO YUNG YOON <steams...@apache.org>
Committed: Wed Apr 25 16:29:40 2018 +0900

----------------------------------------------------------------------
 .../loader/subscriber/TransferToHFile.scala     |   2 +-
 .../loader/subscriber/TransferToHFileTest.scala |   2 +-
 project/Common.scala                            |   2 +
 s2core/build.sbt                                |   6 +-
 .../core/io/tinkerpop/optimize/S2GraphStep.java |   9 -
 .../org/apache/s2graph/core/mysqls/schema.sql   | 247 ---------
 .../org/apache/s2graph/core/mysqls/setup.sql    |  28 -
 .../org/apache/s2graph/core/schema/schema.sql   | 247 +++++++++
 .../org/apache/s2graph/core/schema/setup.sql    |  28 +
 .../apache/s2graph/core/ExceptionHandler.scala  |  27 +-
 .../s2graph/core/GraphElementBuilder.scala      |   2 +-
 .../org/apache/s2graph/core/JSONParser.scala    |   2 +-
 .../org/apache/s2graph/core/Management.scala    |  26 +-
 .../org/apache/s2graph/core/PostProcess.scala   |   2 +-
 .../org/apache/s2graph/core/QueryParam.scala    |   2 +-
 .../org/apache/s2graph/core/QueryResult.scala   |   2 +-
 .../scala/org/apache/s2graph/core/S2Edge.scala  |   2 +-
 .../org/apache/s2graph/core/S2EdgeBuilder.scala |   2 +-
 .../org/apache/s2graph/core/S2EdgeLike.scala    |   2 +-
 .../s2graph/core/S2EdgePropertyHelper.scala     |   2 +-
 .../scala/org/apache/s2graph/core/S2Graph.scala |   8 +-
 .../apache/s2graph/core/S2GraphFactory.scala    |   2 +-
 .../org/apache/s2graph/core/S2GraphLike.scala   |   2 +-
 .../org/apache/s2graph/core/S2Property.scala    |   2 +-
 .../org/apache/s2graph/core/S2Vertex.scala      |   2 +-
 .../apache/s2graph/core/S2VertexBuilder.scala   |   2 +-
 .../org/apache/s2graph/core/S2VertexLike.scala  |   2 +-
 .../apache/s2graph/core/S2VertexProperty.scala  |   2 +-
 .../s2graph/core/S2VertexPropertyHelper.scala   |   2 +-
 .../apache/s2graph/core/TraversalHelper.scala   |   2 +-
 .../s2graph/core/index/ESIndexProvider.scala    |  21 +-
 .../s2graph/core/index/IndexProvider.scala      |  19 +-
 .../core/index/LuceneIndexProvider.scala        |  13 +-
 .../apache/s2graph/core/io/Conversions.scala    |   2 +-
 .../org/apache/s2graph/core/mysqls/Bucket.scala | 101 ----
 .../apache/s2graph/core/mysqls/ColumnMeta.scala | 169 ------
 .../apache/s2graph/core/mysqls/Experiment.scala | 108 ----
 .../s2graph/core/mysqls/GlobalIndex.scala       | 100 ----
 .../org/apache/s2graph/core/mysqls/Label.scala  | 511 -------------------
 .../apache/s2graph/core/mysqls/LabelIndex.scala | 251 ---------
 .../apache/s2graph/core/mysqls/LabelMeta.scala  | 221 --------
 .../org/apache/s2graph/core/mysqls/Model.scala  | 231 ---------
 .../apache/s2graph/core/mysqls/Service.scala    | 128 -----
 .../s2graph/core/mysqls/ServiceColumn.scala     | 153 ------
 .../core/mysqls/ServiceColumnIndex.scala        | 174 -------
 .../s2graph/core/parsers/WhereParser.scala      |   2 +-
 .../s2graph/core/rest/RequestParser.scala       |   2 +-
 .../apache/s2graph/core/rest/RestHandler.scala  |   2 +-
 .../org/apache/s2graph/core/schema/Bucket.scala | 138 +++++
 .../apache/s2graph/core/schema/ColumnMeta.scala | 165 ++++++
 .../apache/s2graph/core/schema/Experiment.scala | 111 ++++
 .../org/apache/s2graph/core/schema/Label.scala  | 506 ++++++++++++++++++
 .../apache/s2graph/core/schema/LabelIndex.scala | 214 ++++++++
 .../apache/s2graph/core/schema/LabelMeta.scala  | 217 ++++++++
 .../org/apache/s2graph/core/schema/Schema.scala | 225 ++++++++
 .../apache/s2graph/core/schema/Service.scala    | 133 +++++
 .../s2graph/core/schema/ServiceColumn.scala     | 151 ++++++
 .../apache/s2graph/core/storage/StorageIO.scala |   2 +-
 .../hbase/AsynchbaseStorageReadable.scala       |   2 +-
 .../storage/rocks/RocksStorageReadable.scala    |   2 +-
 .../core/storage/serde/MutationHelper.scala     |   2 +-
 .../storage/serde/StorageDeserializable.scala   |   2 +-
 .../storage/serde/StorageSerializable.scala     |   2 +-
 .../tall/IndexEdgeDeserializable.scala          |   2 +-
 .../indexedge/tall/IndexEdgeSerializable.scala  |   2 +-
 .../wide/IndexEdgeDeserializable.scala          |   2 +-
 .../indexedge/wide/IndexEdgeSerializable.scala  |   2 +-
 .../tall/SnapshotEdgeDeserializable.scala       |   2 +-
 .../tall/SnapshotEdgeSerializable.scala         |   2 +-
 .../wide/SnapshotEdgeDeserializable.scala       |   2 +-
 .../wide/SnapshotEdgeSerializable.scala         |   2 +-
 .../vertex/tall/VertexDeserializable.scala      |   2 +-
 .../vertex/wide/VertexDeserializable.scala      |   2 +-
 .../apache/s2graph/core/types/HBaseType.scala   |   2 +-
 .../apache/s2graph/core/types/VertexId.scala    |   2 +-
 .../org/apache/s2graph/core/utils/Logger.scala  |  10 +-
 .../s2graph/core/utils/SafeUpdateCache.scala    | 134 ++++-
 .../org/apache/s2graph/core/utils/Sync.scala    | 181 +++++++
 s2core/src/test/resources/reference.conf        |  61 +++
 .../s2graph/core/Integrate/CrudTest.scala       |   2 +-
 .../core/Integrate/IntegrateCommon.scala        |   2 +-
 .../apache/s2graph/core/ManagementTest.scala    |   2 +-
 .../org/apache/s2graph/core/S2EdgeTest.scala    |   2 +-
 .../org/apache/s2graph/core/TestCommon.scala    |   2 +-
 .../s2graph/core/TestCommonWithModels.scala     |   2 +-
 .../s2graph/core/benchmark/GraphUtilSpec.scala  |   2 +-
 .../s2graph/core/index/IndexProviderTest.scala  |   2 +-
 .../apache/s2graph/core/io/ConversionTest.scala |   2 +-
 .../apache/s2graph/core/models/ModelTest.scala  |  59 ---
 .../s2graph/core/parsers/WhereParserTest.scala  |   2 +-
 .../apache/s2graph/core/schema/SchemaTest.scala |  77 +++
 .../s2graph/core/storage/StorageIOTest.scala    |   2 +-
 .../core/storage/hbase/IndexEdgeTest.scala      |   2 +-
 .../core/storage/rocks/RocksStorageTest.scala   |   2 +-
 .../core/tinkerpop/S2GraphProvider.scala        |   2 +-
 .../core/tinkerpop/structure/S2GraphTest.scala  |   2 +-
 .../counter/core/v2/ExactStorageGraph.scala     |   2 +-
 .../counter/core/v2/RankingStorageGraph.scala   |   2 +-
 .../s2graph/counter/helper/CounterAdmin.scala   |   2 +-
 .../counter/core/RankingCounterSpec.scala       |   2 +-
 .../counter/models/CounterModelSpec.scala       |  69 ---
 .../counter/models/CounterSchemaSpec.scala      |  69 +++
 .../counter/loader/core/DimensionProps.scala    |   2 +-
 .../loader/core/CounterEtlFunctionsSpec.scala   |   2 +-
 .../apache/s2graph/graphql/GraphQLServer.scala  |   6 +-
 .../s2graph/graphql/bind/Unmarshaller.scala     |   4 +-
 .../graphql/repository/GraphRepository.scala    |   2 +-
 .../s2graph/graphql/types/FieldResolver.scala   |   5 +-
 .../s2graph/graphql/types/ManagementType.scala  |   2 +-
 .../apache/s2graph/graphql/types/S2Type.scala   |   2 +-
 .../s2graph/graphql/types/StaticType.scala      |   2 +-
 .../org/apache/s2graph/graphql/TestGraph.scala  |   6 +-
 .../org/apache/s2graph/s2jobs/DegreeKey.scala   |   2 +-
 .../apache/s2graph/s2jobs/S2GraphHelper.scala   |   1 +
 .../apache/s2graph/s2jobs/BaseSparkTest.scala   |   4 +-
 .../org/apache/s2graph/rest/netty/Server.scala  |   2 +-
 .../rest/play/controllers/AdminController.scala |   2 +-
 .../play/controllers/CounterController.scala    |   2 +-
 .../rest/play/controllers/EdgeController.scala  |   2 +-
 119 files changed, 2806 insertions(+), 2712 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
----------------------------------------------------------------------
diff --git 
a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
 
b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
index bfb5a96..ab8aec8 100644
--- 
a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
+++ 
b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
 import org.apache.hadoop.hbase.regionserver.BloomType
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.schema.{Label, LabelMeta}
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement
 import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId}
 import org.apache.s2graph.loader.spark.{FamilyHFileWriteOptions, HBaseContext, 
KeyFamilyQualifier}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala
----------------------------------------------------------------------
diff --git 
a/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala
 
b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala
index 6918ce4..21ce920 100644
--- 
a/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala
+++ 
b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
 import org.apache.hadoop.util.ToolRunner
 import org.apache.s2graph.core.{Management, PostProcess}
 import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
-import org.apache.s2graph.core.mysqls.{Label, ServiceColumn}
+import org.apache.s2graph.core.schema.{Label, ServiceColumn}
 import org.apache.s2graph.core.storage.CanSKeyValue
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
 import org.apache.s2graph.core.types.HBaseType

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/project/Common.scala
----------------------------------------------------------------------
diff --git a/project/Common.scala b/project/Common.scala
index 02a64bf..96109d3 100644
--- a/project/Common.scala
+++ b/project/Common.scala
@@ -31,6 +31,8 @@ object Common {
 
   val elastic4sVersion = "6.1.1"
 
+  val KafkaVersion = "0.10.2.1"
+
   /** use Log4j 1.2.17 as the SLF4j backend in runtime, with bridging 
libraries to forward JCL and JUL logs to SLF4j */
   val loggingRuntime = Seq(
     "log4j" % "log4j" % "1.2.17",

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/build.sbt
----------------------------------------------------------------------
diff --git a/s2core/build.sbt b/s2core/build.sbt
index 110d5e5..cc70e97 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -34,7 +34,8 @@ libraryDependencies ++= Seq(
   "org.apache.hbase" % "hbase-server" % hbaseVersion excludeLogging() 
exclude("com.google.protobuf", "protobuf*"),
   "org.apache.hbase" % "hbase-hadoop-compat" % hbaseVersion excludeLogging(),
   "org.apache.hbase" % "hbase-hadoop2-compat" % hbaseVersion excludeLogging(),
-  "org.apache.kafka" % "kafka-clients" % "0.8.2.0" excludeLogging() 
exclude("com.sun.jdmk", "j*") exclude("com.sun.jmx", "j*") exclude("javax.jms", 
"j*"),
+  "org.apache.kafka" % "kafka-clients" % Common.KafkaVersion excludeLogging() 
exclude("com.sun.jdmk", "j*") exclude("com.sun.jmx", "j*") exclude("javax.jms", 
"j*"),
+  "org.apache.kafka" %% "kafka" % Common.KafkaVersion excludeLogging() 
exclude("com.sun.jdmk", "j*") exclude("com.sun.jmx", "j*") exclude("javax.jms", 
"j*"),
   "commons-pool" % "commons-pool" % "1.6",
   "org.scalikejdbc" %% "scalikejdbc" % "2.1.4",
   "com.h2database" % "h2" % "1.4.192",
@@ -53,7 +54,8 @@ libraryDependencies ++= Seq(
   "org.scala-lang.modules" %% "scala-java8-compat" % "0.8.0",
   "com.sksamuel.elastic4s" %% "elastic4s-core" % elastic4sVersion 
excludeLogging(),
   "com.sksamuel.elastic4s" %% "elastic4s-http" % elastic4sVersion 
excludeLogging(),
-  "com.sksamuel.elastic4s" %% "elastic4s-embedded" % elastic4sVersion 
excludeLogging()
+  "com.sksamuel.elastic4s" %% "elastic4s-embedded" % elastic4sVersion 
excludeLogging(),
+  "org.scala-lang.modules" %% "scala-pickling" % "0.10.1"
 )
 
 libraryDependencies := {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStep.java
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStep.java
 
b/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStep.java
index 00c277b..c8339c8 100644
--- 
a/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStep.java
+++ 
b/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStep.java
@@ -21,24 +21,15 @@ package org.apache.s2graph.core.io.tinkerpop.optimize;
 
 
 import org.apache.s2graph.core.EdgeId;
-import org.apache.s2graph.core.QueryParam;
 import org.apache.s2graph.core.S2Graph;
-import org.apache.s2graph.core.index.IndexProvider;
-import org.apache.s2graph.core.index.IndexProvider$;
-import org.apache.s2graph.core.mysqls.Label;
 import org.apache.s2graph.core.types.VertexId;
-import org.apache.s2graph.core.utils.logger;
-import org.apache.tinkerpop.gremlin.process.traversal.Order;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.step.HasContainerHolder;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Profiling;
 import org.apache.tinkerpop.gremlin.process.traversal.step.filter.HasStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.NoOpBarrierStep;
 import 
org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.IdentityStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer;
 import org.apache.tinkerpop.gremlin.structure.Element;
-import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.T;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql 
b/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
deleted file mode 100644
index 6b9b71e..0000000
--- a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
+++ /dev/null
@@ -1,247 +0,0 @@
---
--- Licensed to the Apache Software Foundation (ASF) under one
--- or more contributor license agreements.  See the NOTICE file
--- distributed with this work for additional information
--- regarding copyright ownership.  The ASF licenses this file
--- to you under the Apache License, Version 2.0 (the
--- "License"); you may not use this file except in compliance
--- with the License.  You may obtain a copy of the License at
---
---   http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing,
--- software distributed under the License is distributed on an
--- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
--- KIND, either express or implied.  See the License for the
--- specific language governing permissions and limitations
--- under the License.
---
-
-SET FOREIGN_KEY_CHECKS = 0;
-
--- ----------------------------
---  Table structure for `services`
--- ----------------------------
-DROP TABLE IF EXISTS `services`;
-CREATE TABLE `services` (
-  `id` integer NOT NULL AUTO_INCREMENT,
-  `service_name` varchar(64) NOT NULL,
-  `access_token` varchar(64) NOT NULL,
-  `cluster` varchar(255) NOT NULL,
-  `hbase_table_name` varchar(255) NOT NULL,
-  `pre_split_size` integer NOT NULL default 0,
-  `hbase_table_ttl` integer,
-  PRIMARY KEY (`id`),
-  UNIQUE KEY `ux_services_service_name` (`service_name`),
-  INDEX `idx_services_access_token` (`access_token`),
-  INDEX `idx_services_cluster` (`cluster`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-
-
--- ----------------------------
---  Table structure for `services_columns`
--- ----------------------------
-DROP TABLE IF EXISTS `service_columns`;
-CREATE TABLE `service_columns` (
-  `id` integer NOT NULL AUTO_INCREMENT,
-  `service_id` integer NOT NULL,
-  `column_name` varchar(64) NOT NULL,
-  `column_type` varchar(8) NOT NULL,
-  `schema_version` varchar(8) NOT NULL default 'v2',
-  PRIMARY KEY (`id`),
-  UNIQUE KEY `ux_service_id_column_name` (`service_id`, `column_name`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-
-ALTER TABLE service_columns add FOREIGN KEY(service_id) REFERENCES 
services(id) ON DELETE CASCADE;
-
-
--- ----------------------------
---  Table structure for `column_metas`
--- ----------------------------
-DROP TABLE IF EXISTS `column_metas`;
-CREATE TABLE `column_metas` (
-  `id` integer NOT NULL AUTO_INCREMENT,
-  `column_id` integer NOT NULL,
-  `name` varchar(64) NOT NULL,
-  `seq` tinyint        NOT NULL,
-  `data_type` varchar(8) NOT NULL DEFAULT 'string',
-  `default_value` varchar(64) NOT NULL DEFAULT '',
-  `store_in_global_index` tinyint NOT NULL DEFAULT 0,
-  PRIMARY KEY (`id`),
-  UNIQUE KEY `ux_column_id_name` (`column_id`, `name`),
-  INDEX `idx_column_id_seq` (`column_id`, `seq`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-
-ALTER TABLE column_metas ADD FOREIGN KEY(column_id) REFERENCES 
service_columns(id) ON DELETE CASCADE;
-
--- ----------------------------
---  Table structure for `labels`
--- ----------------------------
-
-DROP TABLE IF EXISTS `labels`;
-CREATE TABLE `labels` (
-  `id` integer NOT NULL AUTO_INCREMENT,
-  `label` varchar(64) NOT NULL,
-  `src_service_id` integer NOT NULL,
-  `src_column_name` varchar(64) NOT NULL,
-  `src_column_type` varchar(8) NOT NULL,
-  `tgt_service_id` integer NOT NULL,
-  `tgt_column_name` varchar(64) NOT NULL,
-  `tgt_column_type` varchar(8) NOT NULL,
-  `is_directed` tinyint        NOT NULL DEFAULT 1,
-  `service_name` varchar(64),
-  `service_id` integer NOT NULL,
-  `consistency_level` varchar(8) NOT NULL DEFAULT 'weak',
-  `hbase_table_name` varchar(255) NOT NULL DEFAULT 's2graph',
-  `hbase_table_ttl` integer,
-  `schema_version` varchar(8) NOT NULL default 'v2',
-  `is_async` tinyint(4) NOT NULL default '0',
-  `compressionAlgorithm` varchar(64) NOT NULL DEFAULT 'lz4',
-  `options` text,
-  `deleted_at` datetime DEFAULT NULL,
-  PRIMARY KEY (`id`),
-  UNIQUE KEY `ux_label` (`label`),
-  INDEX `idx_labels_src_column_name` (`src_column_name`),
-  INDEX        `idx_labels_tgt_column_name` (`tgt_column_name`),
-  INDEX `idx_labels_src_service_id` (`src_service_id`),
-  INDEX `idx_labels_tgt_service_id` (`tgt_service_id`),
-  INDEX `idx_labels_service_name` (`service_name`),
-  INDEX `idx_labels_service_id` (`service_id`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-
-ALTER TABLE labels add FOREIGN KEY(service_id) REFERENCES services(id);
-
--- ----------------------------
---  Table structure for `global_index`
--- ----------------------------
-DROP TABLE IF EXISTS `global_indices`;
-CREATE TABLE `global_indices` (
-  `id` integer NOT NULL AUTO_INCREMENT,
-  `element_type` varchar(64) NOT NULL,
-  `prop_names` varchar(255) NOT NULL,
-  `index_name` varchar(64)     NOT NULL,
-  PRIMARY KEY (`id`),
-  UNIQUE KEY `ux_global_index_element_type_index_name` (`element_type`, 
`index_name`),
-  UNIQUE KEY `ux_global_index_element_type_prop_names` (`element_type`, 
`prop_names`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-
-
--- ----------------------------
---  Table structure for `label_metas`
--- ----------------------------
-DROP TABLE IF EXISTS `label_metas`;
-CREATE TABLE `label_metas` (
-  `id` integer NOT NULL AUTO_INCREMENT,
-  `label_id` integer NOT NULL,
-  `name` varchar(64) NOT NULL,
-  `seq` tinyint        NOT NULL,
-  `default_value` varchar(64) NOT NULL,
-  `data_type` varchar(8) NOT NULL DEFAULT 'long',
-  `used_in_index` tinyint      NOT NULL DEFAULT 0,
-  `store_in_global_index` tinyint NOT NULL DEFAULT 0,
-  PRIMARY KEY (`id`),
-  UNIQUE KEY `ux_label_metas_label_id_name` (`label_id`, `name`),
-  INDEX `idx_label_metas_label_id_seq` (`label_id`, `seq`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-
-ALTER TABLE label_metas ADD FOREIGN KEY(label_id) REFERENCES labels(id) ON 
DELETE CASCADE;
-
-
--- ----------------------------
---  Table structure for `label_indices`
--- ----------------------------
-DROP TABLE IF EXISTS `label_indices`;
-CREATE TABLE `label_indices` (
-       `id` int(11) NOT NULL AUTO_INCREMENT,
-       `label_id` int(11) NOT NULL,
-       `name` varchar(64) NOT NULL DEFAULT '_PK',
-       `seq` tinyint(4) NOT NULL,
-       `meta_seqs` varchar(64) NOT NULL,
-       `formulars` varchar(255) DEFAULT NULL,
-       `dir` int DEFAULT NULL,
-       `options` text,
-       PRIMARY KEY (`id`),
-       UNIQUE KEY `ux_label_id_seq` (`label_id`,`meta_seqs`),
-       UNIQUE KEY `ux_label_id_name` (`label_id`,`name`),
-       UNIQUE KEY `ux_label_id_meta_seqs_dir` (`label_id`,`meta_seqs`,`dir`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-
-ALTER TABLE label_indices ADD FOREIGN KEY(label_id) REFERENCES labels(id) ON 
DELETE CASCADE;
-
-
--- ----------------------------
---  Table structure for `experiments`
--- ----------------------------
-DROP TABLE IF EXISTS `experiments`;
-CREATE TABLE `experiments` (
-  `id` integer NOT NULL AUTO_INCREMENT,
-  `service_id` integer NOT NULL,
-  `service_name` varchar(128) NOT NULL,
-  `name` varchar(64) NOT NULL,
-  `description` varchar(255) NOT NULL,
-  `experiment_type` varchar(8) NOT NULL DEFAULT 'u',
-  `total_modular` int NOT NULL DEFAULT 100,
-  PRIMARY KEY (`id`),
-  UNIQUE KEY `ux_experiments_service_id_name` (`service_id`, `name`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-
--- ALTER TABLE experiments ADD FOREIGN KEY(service_id) REFERENCES service(id) 
ON DELETE CASCADE;
-
-
--- ----------------------------
---  Table structure for `buckets`
--- ----------------------------
-DROP TABLE IF EXISTS `buckets`;
-CREATE TABLE `buckets` (
-  `id` integer NOT NULL AUTO_INCREMENT,
-  `experiment_id` integer NOT NULL,
-  `modular` varchar(64) NOT NULL,
-  `http_verb` varchar(8) NOT NULL,
-  `api_path` text NOT NULL,
-  `uuid_key` varchar(128),
-  `uuid_placeholder` varchar(64),
-  `request_body` text NOT NULL,
-  `timeout` int NOT NULL DEFAULT 1000,
-  `impression_id` varchar(64) NOT NULL,
-  `is_graph_query` tinyint NOT NULL DEFAULT 1,
-  `is_empty` tinyint NOT NULL DEFAULT 0,
-  PRIMARY KEY (`id`),
-  UNIQUE KEY `ux_buckets_impression_id` (`impression_id`),
-  INDEX `idx_buckets_experiment_id` (`experiment_id`),
-  INDEX `idx_buckets_impression_id` (`impression_id`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-
-SET FOREIGN_KEY_CHECKS = 1;
-
-
--- ----------------------------
---  Table structure for `counter`
--- ----------------------------
-DROP TABLE IF EXISTS `counter`;
-CREATE TABLE `counter` (
-  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
-  `use_flag` tinyint(1) NOT NULL DEFAULT '0',
-  `version` smallint(1) NOT NULL DEFAULT '1',
-  `service` varchar(64) NOT NULL DEFAULT '',
-  `action` varchar(64) NOT NULL DEFAULT '',
-  `item_type` int(11) NOT NULL DEFAULT '0',
-  `auto_comb` tinyint(1) NOT NULL DEFAULT '1',
-  `dimension` varchar(1024) NOT NULL,
-  `use_profile` tinyint(1) NOT NULL DEFAULT '0',
-  `bucket_imp_id` varchar(64) DEFAULT NULL,
-  `use_exact` tinyint(1) NOT NULL DEFAULT '1',
-  `use_rank` tinyint(1) NOT NULL DEFAULT '1',
-  `ttl` int(11) NOT NULL DEFAULT '172800',
-  `daily_ttl` int(11) DEFAULT NULL,
-  `hbase_table` varchar(1024) DEFAULT NULL,
-  `interval_unit` varchar(1024) DEFAULT NULL,
-  `rate_action_id` int(11) unsigned DEFAULT NULL,
-  `rate_base_id` int(11) unsigned DEFAULT NULL,
-  `rate_threshold` int(11) DEFAULT NULL,
-  PRIMARY KEY (`id`),
-  UNIQUE KEY `counter_svc` (`service`,`action`),
-  KEY `counter_rate_action_id` (`rate_action_id`),
-  KEY `counter_rate_base_id` (`rate_base_id`),
-  CONSTRAINT `counter_rate_action_id` FOREIGN KEY (`rate_action_id`) 
REFERENCES `counter` (`id`) ON DELETE NO ACTION ON UPDATE NO ACTION,
-  CONSTRAINT `counter_rate_base_id` FOREIGN KEY (`rate_base_id`) REFERENCES 
`counter` (`id`) ON DELETE NO ACTION ON UPDATE NO ACTION
-) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/resources/org/apache/s2graph/core/mysqls/setup.sql
----------------------------------------------------------------------
diff --git a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/setup.sql 
b/s2core/src/main/resources/org/apache/s2graph/core/mysqls/setup.sql
deleted file mode 100644
index 1bda27c..0000000
--- a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/setup.sql
+++ /dev/null
@@ -1,28 +0,0 @@
---
--- Licensed to the Apache Software Foundation (ASF) under one
--- or more contributor license agreements.  See the NOTICE file
--- distributed with this work for additional information
--- regarding copyright ownership.  The ASF licenses this file
--- to you under the Apache License, Version 2.0 (the
--- "License"); you may not use this file except in compliance
--- with the License.  You may obtain a copy of the License at
---
---   http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing,
--- software distributed under the License is distributed on an
--- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
--- KIND, either express or implied.  See the License for the
--- specific language governing permissions and limitations
--- under the License.
---
-
-CREATE DATABASE IF NOT EXISTS graph_dev;
-
-CREATE USER 'graph'@'%' IDENTIFIED BY 'graph';
-
-GRANT ALL PRIVILEGES ON graph_dev.* TO 'graph'@'%' identified by 'graph';
-
-flush privileges;
-
-use graph_dev;

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/resources/org/apache/s2graph/core/schema/schema.sql
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/resources/org/apache/s2graph/core/schema/schema.sql 
b/s2core/src/main/resources/org/apache/s2graph/core/schema/schema.sql
new file mode 100644
index 0000000..6b9b71e
--- /dev/null
+++ b/s2core/src/main/resources/org/apache/s2graph/core/schema/schema.sql
@@ -0,0 +1,247 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+--
+
+SET FOREIGN_KEY_CHECKS = 0;
+
+-- ----------------------------
+--  Table structure for `services`
+-- ----------------------------
+DROP TABLE IF EXISTS `services`;
+CREATE TABLE `services` (
+  `id` integer NOT NULL AUTO_INCREMENT,
+  `service_name` varchar(64) NOT NULL,
+  `access_token` varchar(64) NOT NULL,
+  `cluster` varchar(255) NOT NULL,
+  `hbase_table_name` varchar(255) NOT NULL,
+  `pre_split_size` integer NOT NULL default 0,
+  `hbase_table_ttl` integer,
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `ux_services_service_name` (`service_name`),
+  INDEX `idx_services_access_token` (`access_token`),
+  INDEX `idx_services_cluster` (`cluster`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+
+
+-- ----------------------------
+--  Table structure for `services_columns`
+-- ----------------------------
+DROP TABLE IF EXISTS `service_columns`;
+CREATE TABLE `service_columns` (
+  `id` integer NOT NULL AUTO_INCREMENT,
+  `service_id` integer NOT NULL,
+  `column_name` varchar(64) NOT NULL,
+  `column_type` varchar(8) NOT NULL,
+  `schema_version` varchar(8) NOT NULL default 'v2',
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `ux_service_id_column_name` (`service_id`, `column_name`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+
+ALTER TABLE service_columns add FOREIGN KEY(service_id) REFERENCES 
services(id) ON DELETE CASCADE;
+
+
+-- ----------------------------
+--  Table structure for `column_metas`
+-- ----------------------------
+DROP TABLE IF EXISTS `column_metas`;
+CREATE TABLE `column_metas` (
+  `id` integer NOT NULL AUTO_INCREMENT,
+  `column_id` integer NOT NULL,
+  `name` varchar(64) NOT NULL,
+  `seq` tinyint        NOT NULL,
+  `data_type` varchar(8) NOT NULL DEFAULT 'string',
+  `default_value` varchar(64) NOT NULL DEFAULT '',
+  `store_in_global_index` tinyint NOT NULL DEFAULT 0,
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `ux_column_id_name` (`column_id`, `name`),
+  INDEX `idx_column_id_seq` (`column_id`, `seq`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+
+ALTER TABLE column_metas ADD FOREIGN KEY(column_id) REFERENCES 
service_columns(id) ON DELETE CASCADE;
+
+-- ----------------------------
+--  Table structure for `labels`
+-- ----------------------------
+
+DROP TABLE IF EXISTS `labels`;
+CREATE TABLE `labels` (
+  `id` integer NOT NULL AUTO_INCREMENT,
+  `label` varchar(64) NOT NULL,
+  `src_service_id` integer NOT NULL,
+  `src_column_name` varchar(64) NOT NULL,
+  `src_column_type` varchar(8) NOT NULL,
+  `tgt_service_id` integer NOT NULL,
+  `tgt_column_name` varchar(64) NOT NULL,
+  `tgt_column_type` varchar(8) NOT NULL,
+  `is_directed` tinyint        NOT NULL DEFAULT 1,
+  `service_name` varchar(64),
+  `service_id` integer NOT NULL,
+  `consistency_level` varchar(8) NOT NULL DEFAULT 'weak',
+  `hbase_table_name` varchar(255) NOT NULL DEFAULT 's2graph',
+  `hbase_table_ttl` integer,
+  `schema_version` varchar(8) NOT NULL default 'v2',
+  `is_async` tinyint(4) NOT NULL default '0',
+  `compressionAlgorithm` varchar(64) NOT NULL DEFAULT 'lz4',
+  `options` text,
+  `deleted_at` datetime DEFAULT NULL,
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `ux_label` (`label`),
+  INDEX `idx_labels_src_column_name` (`src_column_name`),
+  INDEX        `idx_labels_tgt_column_name` (`tgt_column_name`),
+  INDEX `idx_labels_src_service_id` (`src_service_id`),
+  INDEX `idx_labels_tgt_service_id` (`tgt_service_id`),
+  INDEX `idx_labels_service_name` (`service_name`),
+  INDEX `idx_labels_service_id` (`service_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+
+ALTER TABLE labels add FOREIGN KEY(service_id) REFERENCES services(id);
+
+-- ----------------------------
+--  Table structure for `global_index`
+-- ----------------------------
+DROP TABLE IF EXISTS `global_indices`;
+CREATE TABLE `global_indices` (
+  `id` integer NOT NULL AUTO_INCREMENT,
+  `element_type` varchar(64) NOT NULL,
+  `prop_names` varchar(255) NOT NULL,
+  `index_name` varchar(64)     NOT NULL,
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `ux_global_index_element_type_index_name` (`element_type`, 
`index_name`),
+  UNIQUE KEY `ux_global_index_element_type_prop_names` (`element_type`, 
`prop_names`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+
+
+-- ----------------------------
+--  Table structure for `label_metas`
+-- ----------------------------
+DROP TABLE IF EXISTS `label_metas`;
+CREATE TABLE `label_metas` (
+  `id` integer NOT NULL AUTO_INCREMENT,
+  `label_id` integer NOT NULL,
+  `name` varchar(64) NOT NULL,
+  `seq` tinyint        NOT NULL,
+  `default_value` varchar(64) NOT NULL,
+  `data_type` varchar(8) NOT NULL DEFAULT 'long',
+  `used_in_index` tinyint      NOT NULL DEFAULT 0,
+  `store_in_global_index` tinyint NOT NULL DEFAULT 0,
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `ux_label_metas_label_id_name` (`label_id`, `name`),
+  INDEX `idx_label_metas_label_id_seq` (`label_id`, `seq`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+
+ALTER TABLE label_metas ADD FOREIGN KEY(label_id) REFERENCES labels(id) ON 
DELETE CASCADE;
+
+
+-- ----------------------------
+--  Table structure for `label_indices`
+-- ----------------------------
+DROP TABLE IF EXISTS `label_indices`;
+CREATE TABLE `label_indices` (
+       `id` int(11) NOT NULL AUTO_INCREMENT,
+       `label_id` int(11) NOT NULL,
+       `name` varchar(64) NOT NULL DEFAULT '_PK',
+       `seq` tinyint(4) NOT NULL,
+       `meta_seqs` varchar(64) NOT NULL,
+       `formulars` varchar(255) DEFAULT NULL,
+       `dir` int DEFAULT NULL,
+       `options` text,
+       PRIMARY KEY (`id`),
+       UNIQUE KEY `ux_label_id_seq` (`label_id`,`meta_seqs`),
+       UNIQUE KEY `ux_label_id_name` (`label_id`,`name`),
+       UNIQUE KEY `ux_label_id_meta_seqs_dir` (`label_id`,`meta_seqs`,`dir`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+
+ALTER TABLE label_indices ADD FOREIGN KEY(label_id) REFERENCES labels(id) ON 
DELETE CASCADE;
+
+
+-- ----------------------------
+--  Table structure for `experiments`
+-- ----------------------------
+DROP TABLE IF EXISTS `experiments`;
+CREATE TABLE `experiments` (
+  `id` integer NOT NULL AUTO_INCREMENT,
+  `service_id` integer NOT NULL,
+  `service_name` varchar(128) NOT NULL,
+  `name` varchar(64) NOT NULL,
+  `description` varchar(255) NOT NULL,
+  `experiment_type` varchar(8) NOT NULL DEFAULT 'u',
+  `total_modular` int NOT NULL DEFAULT 100,
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `ux_experiments_service_id_name` (`service_id`, `name`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+
+-- ALTER TABLE experiments ADD FOREIGN KEY(service_id) REFERENCES service(id) 
ON DELETE CASCADE;
+
+
+-- ----------------------------
+--  Table structure for `buckets`
+-- ----------------------------
+DROP TABLE IF EXISTS `buckets`;
+CREATE TABLE `buckets` (
+  `id` integer NOT NULL AUTO_INCREMENT,
+  `experiment_id` integer NOT NULL,
+  `modular` varchar(64) NOT NULL,
+  `http_verb` varchar(8) NOT NULL,
+  `api_path` text NOT NULL,
+  `uuid_key` varchar(128),
+  `uuid_placeholder` varchar(64),
+  `request_body` text NOT NULL,
+  `timeout` int NOT NULL DEFAULT 1000,
+  `impression_id` varchar(64) NOT NULL,
+  `is_graph_query` tinyint NOT NULL DEFAULT 1,
+  `is_empty` tinyint NOT NULL DEFAULT 0,
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `ux_buckets_impression_id` (`impression_id`),
+  INDEX `idx_buckets_experiment_id` (`experiment_id`),
+  INDEX `idx_buckets_impression_id` (`impression_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+
+SET FOREIGN_KEY_CHECKS = 1;
+
+
+-- ----------------------------
+--  Table structure for `counter`
+-- ----------------------------
+DROP TABLE IF EXISTS `counter`;
+CREATE TABLE `counter` (
+  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
+  `use_flag` tinyint(1) NOT NULL DEFAULT '0',
+  `version` smallint(1) NOT NULL DEFAULT '1',
+  `service` varchar(64) NOT NULL DEFAULT '',
+  `action` varchar(64) NOT NULL DEFAULT '',
+  `item_type` int(11) NOT NULL DEFAULT '0',
+  `auto_comb` tinyint(1) NOT NULL DEFAULT '1',
+  `dimension` varchar(1024) NOT NULL,
+  `use_profile` tinyint(1) NOT NULL DEFAULT '0',
+  `bucket_imp_id` varchar(64) DEFAULT NULL,
+  `use_exact` tinyint(1) NOT NULL DEFAULT '1',
+  `use_rank` tinyint(1) NOT NULL DEFAULT '1',
+  `ttl` int(11) NOT NULL DEFAULT '172800',
+  `daily_ttl` int(11) DEFAULT NULL,
+  `hbase_table` varchar(1024) DEFAULT NULL,
+  `interval_unit` varchar(1024) DEFAULT NULL,
+  `rate_action_id` int(11) unsigned DEFAULT NULL,
+  `rate_base_id` int(11) unsigned DEFAULT NULL,
+  `rate_threshold` int(11) DEFAULT NULL,
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `counter_svc` (`service`,`action`),
+  KEY `counter_rate_action_id` (`rate_action_id`),
+  KEY `counter_rate_base_id` (`rate_base_id`),
+  CONSTRAINT `counter_rate_action_id` FOREIGN KEY (`rate_action_id`) 
REFERENCES `counter` (`id`) ON DELETE NO ACTION ON UPDATE NO ACTION,
+  CONSTRAINT `counter_rate_base_id` FOREIGN KEY (`rate_base_id`) REFERENCES 
`counter` (`id`) ON DELETE NO ACTION ON UPDATE NO ACTION
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/resources/org/apache/s2graph/core/schema/setup.sql
----------------------------------------------------------------------
diff --git a/s2core/src/main/resources/org/apache/s2graph/core/schema/setup.sql 
b/s2core/src/main/resources/org/apache/s2graph/core/schema/setup.sql
new file mode 100644
index 0000000..1bda27c
--- /dev/null
+++ b/s2core/src/main/resources/org/apache/s2graph/core/schema/setup.sql
@@ -0,0 +1,28 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+--
+
+CREATE DATABASE IF NOT EXISTS graph_dev;
+
+CREATE USER 'graph'@'%' IDENTIFIED BY 'graph';
+
+GRANT ALL PRIVILEGES ON graph_dev.* TO 'graph'@'%' identified by 'graph';
+
+flush privileges;
+
+use graph_dev;

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
index eb5b1da..24a4eb9 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
@@ -67,6 +67,9 @@ class ExceptionHandler(config: Config) {
 }
 
 object ExceptionHandler {
+  val mainBrokerKey = "kafka.metadata.broker.list"
+  val subBrokerKey = "kafka_sub.metadata.broker.list"
+
   type Key = String
   type Val = String
 
@@ -93,21 +96,39 @@ object ExceptionHandler {
 
   case class KafkaMessage(msg: ProducerRecord[Key, Val])
 
-  private def toKafkaProp(config: Config) = {
-    val props = new Properties()
+  def toKafkaProducer(config: Config): Option[KafkaProducer[String, String]] = 
{
+    val brokerKey = "kafka.producer.brokers"
+    if (config.hasPath(brokerKey)) {
+      val brokers = config.getString("kafka.producer.brokers")
+      Option(new KafkaProducer[String, String](toKafkaProp(brokers)))
+    } else {
+      None
+    }
+  }
 
+  def toKafkaProp(config: Config): Properties = {
     /* all default configuration for new producer */
     val brokers =
       if (config.hasPath("kafka.metadata.broker.list")) 
config.getString("kafka.metadata.broker.list")
       else "localhost"
 
+    toKafkaProp(brokers)
+  }
+
+  /*
+   * http://kafka.apache.org/082/documentation.html#producerconfigs
+   * if we change our kafka version, make sure right configuration is set.
+   */
+  def toKafkaProp(brokers: String): Properties = {
+    val props = new Properties()
+
     props.put("bootstrap.servers", brokers)
     props.put("acks", "1")
     props.put("buffer.memory", "33554432")
     props.put("compression.type", "snappy")
     props.put("retries", "0")
     props.put("batch.size", "16384")
-    props.put("linger.ms", "0")
+    props.put("linger.ms", "100")
     props.put("max.request.size", "1048576")
     props.put("receive.buffer.bytes", "32768")
     props.put("send.buffer.bytes", "131072")

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala
index 0478413..a73f5c2 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala
@@ -22,7 +22,7 @@ package org.apache.s2graph.core
 import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
 import org.apache.s2graph.core.JSONParser.{fromJsonToProperties, toInnerVal}
 import org.apache.s2graph.core.S2Graph.{DefaultColumnName, DefaultServiceName}
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.utils.logger
 import org.apache.tinkerpop.gremlin.structure.T

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
index b92e47b..81b3c13 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core
 
 import org.apache.s2graph.core.GraphExceptions.IllegalDataTypeException
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.schema.LabelMeta
 import org.apache.s2graph.core.rest.TemplateHelper
 import org.apache.s2graph.core.types.{InnerVal, InnerValLike, 
InnerValLikeWithTs, VertexId}
 import org.apache.s2graph.core.utils.logger

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index 6a12f0a..d026e5b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -24,7 +24,7 @@ import java.util
 import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.s2graph.core.GraphExceptions.{InvalidHTableException, 
LabelAlreadyExistException, LabelNameTooLongException, LabelNotExistException}
 import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.types.HBaseType._
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.JSONParser._
@@ -95,7 +95,7 @@ object Management {
                           props: Seq[Prop],
                           schemaVersion: String = DEFAULT_VERSION) = {
 
-    Model withTx { implicit session =>
+    Schema withTx { implicit session =>
       val serviceOpt = Service.findByName(serviceName, useCache = false)
       serviceOpt match {
         case None => throw new RuntimeException(s"create service $serviceName 
has not been created.")
@@ -113,7 +113,7 @@ object Management {
   }
 
   def deleteColumn(serviceName: String, columnName: String, schemaVersion: 
String = DEFAULT_VERSION) = {
-    Model withTx { implicit session =>
+    Schema withTx { implicit session =>
       val service = Service.findByName(serviceName, useCache = 
false).getOrElse(throw new RuntimeException("Service not Found"))
       val serviceColumns = ServiceColumn.find(service.id.get, columnName, 
useCache = false)
       val columnNames = serviceColumns.map { serviceColumn =>
@@ -130,7 +130,7 @@ object Management {
   }
 
   def deleteLabel(labelName: String): Try[Label] = {
-    Model withTx { implicit session =>
+    Schema withTx { implicit session =>
       val label = Label.findByName(labelName, useCache = 
false).getOrElse(throw GraphExceptions.LabelNotExistException(labelName))
       Label.deleteAll(label)
       label
@@ -138,7 +138,7 @@ object Management {
   }
 
   def markDeletedLabel(labelName: String) = {
-    Model withTx { implicit session =>
+    Schema withTx { implicit session =>
       Label.findByName(labelName, useCache = false).foreach { label =>
         // rename & delete_at column filled with current time
         Label.markDeleted(label)
@@ -148,7 +148,7 @@ object Management {
   }
 
   def addIndex(labelStr: String, indices: Seq[Index]): Try[Label] = {
-    Model withTx { implicit session =>
+    Schema withTx { implicit session =>
       val label = Label.findByName(labelStr).getOrElse(throw 
LabelNotExistException(s"$labelStr not found"))
       val labelMetaMap = label.metaPropsInvMap
 
@@ -162,7 +162,7 @@ object Management {
   }
 
   def addProp(labelStr: String, prop: Prop) = {
-    Model withTx { implicit session =>
+    Schema withTx { implicit session =>
       val labelOpt = Label.findByName(labelStr)
       val label = labelOpt.getOrElse(throw LabelNotExistException(s"$labelStr 
not found"))
 
@@ -171,7 +171,7 @@ object Management {
   }
 
   def addProps(labelStr: String, props: Seq[Prop]) = {
-    Model withTx { implicit session =>
+    Schema withTx { implicit session =>
       val labelOpt = Label.findByName(labelStr)
       val label = labelOpt.getOrElse(throw LabelNotExistException(s"$labelStr 
not found"))
 
@@ -251,7 +251,7 @@ object Management {
    * update label name.
    */
   def updateLabelName(oldLabelName: String, newLabelName: String) = {
-    Model withTx { implicit session =>
+    Schema withTx { implicit session =>
       for {
         old <- Label.findByName(oldLabelName, useCache = false)
       } {
@@ -269,7 +269,7 @@ object Management {
    * swap label names.
    */
   def swapLabelNames(leftLabel: String, rightLabel: String) = {
-    Model withTx { implicit session =>
+    Schema withTx { implicit session =>
       val tempLabel = "_" + leftLabel + "_"
       Label.updateName(leftLabel, tempLabel)
       Label.updateName(rightLabel, leftLabel)
@@ -338,7 +338,7 @@ class Management(graph: S2GraphLike) {
                     preSplitSize: Int, hTableTTL: Option[Int],
                     compressionAlgorithm: String = 
DefaultCompressionAlgorithm): Try[Service] = {
 
-    Model withTx { implicit session =>
+    Schema withTx { implicit session =>
       val service = Service.findOrInsert(serviceName, cluster, hTableName, 
preSplitSize, hTableTTL.orElse(Some(Integer.MAX_VALUE)), compressionAlgorithm, 
useCache = false)
       val config = toConfig(Map(
         ZookeeperQuorum -> service.cluster,
@@ -366,7 +366,7 @@ class Management(graph: S2GraphLike) {
                           props: Seq[Prop],
                           schemaVersion: String = DEFAULT_VERSION): 
ServiceColumn = {
 
-    val serviceColumnTry = Model withTx { implicit session =>
+    val serviceColumnTry = Schema withTx { implicit session =>
       val serviceOpt = Service.findByName(serviceName, useCache = false)
       serviceOpt match {
         case None => throw new RuntimeException(s"create service $serviceName 
has not been created.")
@@ -432,7 +432,7 @@ class Management(graph: S2GraphLike) {
     if (hTableName.isEmpty && hTableTTL.isDefined) throw new 
RuntimeException("if want to specify ttl, give hbaseTableName also")
 
     val labelOpt = Label.findByName(label, useCache = false)
-    Model withTx { implicit session =>
+    Schema withTx { implicit session =>
       if (labelOpt.isDefined) throw new LabelAlreadyExistException(s"Label 
name ${label} already exist.")
 
       /* create all models */

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
index 8e4be5b..f7a54a9 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
@@ -23,7 +23,7 @@ import java.util.Base64
 
 import com.google.protobuf.ByteString
 import org.apache.s2graph.core.GraphExceptions.{BadQueryException, 
LabelNotExistException}
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core.S2Graph.{FilterHashKey, HashKey}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
index 6d24c3f..21aca12 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
@@ -23,7 +23,7 @@ import com.google.common.hash.Hashing
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core.DuplicatePolicy.DuplicatePolicy
 import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
-import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
+import org.apache.s2graph.core.schema.{Label, LabelIndex, LabelMeta}
 import org.apache.s2graph.core.parsers.{Where, WhereParser}
 import org.apache.s2graph.core.rest.TemplateHelper
 import org.apache.s2graph.core.storage.serde.StorageSerializable._

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
index 37ddf06..be57017 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
@@ -19,7 +19,7 @@
 
 package org.apache.s2graph.core
 
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.schema.{Label, LabelMeta}
 import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs, VertexId}
 import org.apache.s2graph.core.utils.logger
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
index 58b1ce1..10752da 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
@@ -22,7 +22,7 @@ package org.apache.s2graph.core
 
 import org.apache.s2graph.core.S2Edge.{Props, State}
 import org.apache.s2graph.core.JSONParser._
-import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta, 
ServiceColumn}
+import org.apache.s2graph.core.schema.{Label, LabelIndex, LabelMeta, 
ServiceColumn}
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.io.Conversions._
 import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, Graph, 
Property, T, Vertex}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
index 85321d3..a8c92df 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core
 
 import org.apache.s2graph.core.S2Edge.State
-import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
+import org.apache.s2graph.core.schema.{Label, LabelIndex, LabelMeta}
 import org.apache.s2graph.core.types.{InnerValLike, TargetVertexId, VertexId}
 import org.apache.tinkerpop.gremlin.structure.Property
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
index 2321ac8..413c1e9 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
@@ -22,7 +22,7 @@ import java.util
 import java.util.function.BiConsumer
 
 import org.apache.s2graph.core.S2Edge.{Props, State}
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.schema.{Label, LabelIndex, LabelMeta, 
ServiceColumn}
 import org.apache.s2graph.core.types._
 import org.apache.tinkerpop.gremlin.structure
 import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, Graph, 
Property, T, Vertex}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/S2EdgePropertyHelper.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgePropertyHelper.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgePropertyHelper.scala
index 1e0a95b..8c609a0 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgePropertyHelper.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgePropertyHelper.scala
@@ -22,7 +22,7 @@ package org.apache.s2graph.core
 import java.util.function.BiConsumer
 
 import org.apache.s2graph.core.S2Edge.Props
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.schema.LabelMeta
 import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs}
 import org.apache.tinkerpop.gremlin.structure.Property
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 7f19cb4..7816a63 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -27,7 +27,7 @@ import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.commons.configuration.{BaseConfiguration, Configuration}
 import org.apache.s2graph.core.index.IndexProvider
 import org.apache.s2graph.core.io.tinkerpop.optimize.S2GraphStepStrategy
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
 import org.apache.s2graph.core.storage.rocks.RocksStorage
 import org.apache.s2graph.core.storage.{MutateResponse, Storage}
@@ -181,8 +181,8 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends S2Grap
     config.getString("s2graph.storage.backend")
   }.getOrElse("hbase")
 
-  Model.apply(config)
-  Model.loadCache()
+  Schema.apply(config)
+  Schema.loadCache()
 
   override val management = new Management(this)
 
@@ -258,7 +258,7 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends S2Grap
   override def shutdown(modelDataDelete: Boolean = false): Unit =
     if (running.compareAndSet(true, false)) {
       flushStorage()
-      Model.shutdown(modelDataDelete)
+      Schema.shutdown(modelDataDelete)
       defaultStorage.shutdown()
       localLongId.set(0l)
     }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
index 64108db..cce05af 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
@@ -22,7 +22,7 @@ package org.apache.s2graph.core
 import org.apache.commons.configuration.BaseConfiguration
 import org.apache.s2graph.core.Management.JsonModel.Prop
 import org.apache.s2graph.core.S2Graph.{DefaultColumnName, DefaultServiceName}
-import org.apache.s2graph.core.mysqls.{ColumnMeta, ServiceColumn}
+import org.apache.s2graph.core.schema.{ColumnMeta, ServiceColumn}
 import org.apache.s2graph.core.types.HBaseType
 import org.apache.tinkerpop.gremlin.structure.T
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
index f639e84..cbd31cc 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
@@ -31,7 +31,7 @@ import 
org.apache.s2graph.core.GraphExceptions.LabelNotExistException
 import org.apache.s2graph.core.S2Graph.{DefaultColumnName, DefaultServiceName}
 import org.apache.s2graph.core.features.{S2Features, S2GraphVariables}
 import org.apache.s2graph.core.index.IndexProvider
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta, Service, 
ServiceColumn}
+import org.apache.s2graph.core.schema.{Label, LabelMeta, Service, 
ServiceColumn}
 import org.apache.s2graph.core.storage.{MutateResponse, Storage}
 import org.apache.s2graph.core.types.{InnerValLike, VertexId}
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
index 50b94de..874924c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core
 
 
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.schema.LabelMeta
 import org.apache.s2graph.core.types.{CanInnerValLike, InnerValLikeWithTs, 
VertexId}
 import org.apache.tinkerpop.gremlin.structure.Graph.Features
 import org.apache.tinkerpop.gremlin.structure.util.ElementHelper

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
index 954bab0..2de8e92 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
@@ -22,7 +22,7 @@ package org.apache.s2graph.core
 import java.util.function.BiConsumer
 
 import org.apache.s2graph.core.S2Vertex.Props
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.types._
 import org.apache.tinkerpop.gremlin.structure.Vertex
 import org.apache.tinkerpop.gremlin.structure.VertexProperty.Cardinality

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/S2VertexBuilder.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexBuilder.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexBuilder.scala
index 50d6526..ffd16e9 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexBuilder.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexBuilder.scala
@@ -23,7 +23,7 @@ import java.util
 import java.util.function.BiConsumer
 
 import org.apache.s2graph.core.S2Vertex.Props
-import org.apache.s2graph.core.mysqls.ColumnMeta
+import org.apache.s2graph.core.schema.ColumnMeta
 import org.apache.s2graph.core.types.VertexId
 
 class S2VertexBuilder(vertex: S2VertexLike) {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
index 4608ce7..7ece8c2 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
@@ -22,7 +22,7 @@ import java.util
 import java.util.function.{BiConsumer, Consumer}
 
 import org.apache.s2graph.core.S2Vertex.Props
-import org.apache.s2graph.core.mysqls.{ColumnMeta, Label, Service, 
ServiceColumn}
+import org.apache.s2graph.core.schema.{ColumnMeta, Label, Service, 
ServiceColumn}
 import org.apache.s2graph.core.types.{InnerValLike, VertexId}
 import org.apache.tinkerpop.gremlin.structure.VertexProperty.Cardinality
 import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, T, Vertex, 
VertexProperty}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala
index e0abfba..01842d7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala
@@ -21,7 +21,7 @@ package org.apache.s2graph.core
 
 import java.util
 
-import org.apache.s2graph.core.mysqls.ColumnMeta
+import org.apache.s2graph.core.schema.ColumnMeta
 import org.apache.s2graph.core.types.{CanInnerValLike, InnerValLike}
 import org.apache.tinkerpop.gremlin.structure.{Property, VertexProperty}
 import play.api.libs.json.Json

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/S2VertexPropertyHelper.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexPropertyHelper.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexPropertyHelper.scala
index bdb3c00..c46dc9c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexPropertyHelper.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexPropertyHelper.scala
@@ -19,7 +19,7 @@
 
 package org.apache.s2graph.core
 
-import org.apache.s2graph.core.mysqls.ColumnMeta
+import org.apache.s2graph.core.schema.ColumnMeta
 import org.apache.s2graph.core.types.InnerValLike
 
 object S2VertexPropertyHelper {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
index c40078e..0dc2aa2 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
@@ -22,7 +22,7 @@ package org.apache.s2graph.core
 import java.util
 
 import org.apache.s2graph.core.S2Graph.{FilterHashKey, HashKey}
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.schema.LabelMeta
 import org.apache.s2graph.core.types.{HBaseType, InnerVal, LabelWithDirection, 
VertexId}
 import org.apache.s2graph.core.utils.{Extensions, logger}
 import org.apache.tinkerpop.gremlin.structure.Edge

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
index e67d529..be1ad6e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
@@ -25,13 +25,10 @@ import com.sksamuel.elastic4s.{ElasticsearchClientUri, 
IndexAndType}
 import com.sksamuel.elastic4s.http.ElasticDsl._
 import com.sksamuel.elastic4s.http.HttpClient
 import com.typesafe.config.Config
-import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait.Predicate
 import org.apache.s2graph.core.io.Conversions
-import org.apache.s2graph.core.mysqls._
 import org.apache.s2graph.core.types.VertexId
 import org.apache.s2graph.core._
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
-import org.apache.tinkerpop.gremlin.structure.Property
 import play.api.libs.json.{Json, Reads}
 
 import scala.collection.JavaConverters._
@@ -40,14 +37,10 @@ import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.util.Try
 
 class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends 
IndexProvider {
-
-  import GlobalIndex._
   import IndexProvider._
 
   import scala.collection.mutable
 
-  implicit val executor = ec
-
   val esClientUri = 
Try(config.getString("es.index.provider.client.uri")).getOrElse("localhost")
   val client = HttpClient(ElasticsearchClientUri(esClientUri, 9200))
 
@@ -106,10 +99,10 @@ class ESIndexProvider(config: Config)(implicit ec: 
ExecutionContext) extends Ind
 
   override def mutateVerticesAsync(vertices: Seq[S2VertexLike], forceToIndex: 
Boolean = false): Future[Seq[Boolean]] = {
     val bulkRequests = vertices.flatMap { vertex =>
-      toFields(vertex, forceToIndex).toSeq.map { fields =>
-        update(vertex.id.toString()).in(new 
IndexAndType(GlobalIndex.VertexIndexName, 
GlobalIndex.TypeName)).docAsUpsert(fields)
+        toFields(vertex, forceToIndex).toSeq.map { fields =>
+          update(vertex.id.toString()).in(new IndexAndType(VertexIndexName, 
TypeName)).docAsUpsert(fields)
+        }
       }
-    }
 
     if (bulkRequests.isEmpty) Future.successful(vertices.map(_ => true))
     else {
@@ -135,7 +128,7 @@ class ESIndexProvider(config: Config)(implicit ec: 
ExecutionContext) extends Ind
   override def mutateEdgesAsync(edges: Seq[S2EdgeLike], forceToIndex: Boolean 
= false): Future[Seq[Boolean]] = {
     val bulkRequests = edges.flatMap { edge =>
       toFields(edge, forceToIndex).toSeq.map { fields =>
-        update(edge.edgeId.toString()).in(new 
IndexAndType(GlobalIndex.EdgeIndexName, 
GlobalIndex.TypeName)).docAsUpsert(fields)
+        update(edge.edgeId.toString()).in(new IndexAndType(EdgeIndexName, 
TypeName)).docAsUpsert(fields)
       }
     }
 
@@ -185,7 +178,7 @@ class ESIndexProvider(config: Config)(implicit ec: 
ExecutionContext) extends Ind
       queryString,
       0,
       1000,
-      GlobalIndex.EdgeIndexName,
+      EdgeIndexName,
       field,
       Conversions.s2EdgeIdReads)(e => EdgeId.isValid(e).isDefined)
   }
@@ -200,7 +193,7 @@ class ESIndexProvider(config: Config)(implicit ec: 
ExecutionContext) extends Ind
     fetchInner[VertexId](queryString,
       0,
       1000,
-      GlobalIndex.VertexIndexName,
+      VertexIndexName,
       field,
       Conversions.s2VertexIdReads)(v => VertexId.isValid(v).isDefined)
   }
@@ -215,7 +208,7 @@ class ESIndexProvider(config: Config)(implicit ec: 
ExecutionContext) extends Ind
           queryString,
           vertexQueryParam.offset,
           vertexQueryParam.limit,
-          GlobalIndex.VertexIndexName,
+          VertexIndexName,
           field,
           Conversions.s2VertexIdReads)(v => VertexId.isValid(v).isDefined)
       case None => Future.successful(empty)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
index f573ff6..0220ff8 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
@@ -23,7 +23,7 @@ import java.util
 
 import com.typesafe.config.Config
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.types.VertexId
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
 import org.apache.tinkerpop.gremlin.process.traversal.util.{AndP, OrP}
@@ -34,8 +34,23 @@ import scala.concurrent.{ExecutionContext, Future}
 import scala.util.Try
 
 object IndexProvider {
-  import GlobalIndex._
+
+  //TODO: Fix Me
+  val hitsPerPage = 100000
   val IdField = "id"
+  val vidField = "_vid_"
+  val eidField = "_eid_"
+  val labelField = "_label_"
+  val serviceField = "_service_"
+  val serviceColumnField = "_serviceColumn_"
+  val EdgeType = "edge"
+  val VertexType = "vertex"
+  val hiddenIndexFields = Set(vidField, eidField, labelField, serviceField, 
serviceColumnField)
+
+  //  val IndexName = "global_indices"
+  val VertexIndexName = "global_vertex_index"
+  val EdgeIndexName = "global_edge_index"
+  val TypeName = "test"
 
   def apply(config: Config)(implicit ec: ExecutionContext): IndexProvider = {
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
index c417022..4a3d044 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
@@ -31,7 +31,6 @@ import org.apache.lucene.search.{IndexSearcher, Query}
 import org.apache.lucene.store.{BaseDirectory, RAMDirectory, SimpleFSDirectory}
 import org.apache.lucene.search.TopScoreDocCollector
 import org.apache.s2graph.core.io.Conversions
-import org.apache.s2graph.core.mysqls.GlobalIndex
 import org.apache.s2graph.core.types.VertexId
 import org.apache.s2graph.core.utils.logger
 import org.apache.s2graph.core.{EdgeId, S2EdgeLike, S2VertexLike, 
VertexQueryParam}
@@ -42,8 +41,6 @@ import scala.concurrent.Future
 
 
 class LuceneIndexProvider(config: Config) extends IndexProvider {
-
-  import GlobalIndex._
   import IndexProvider._
 
   import scala.collection.JavaConverters._
@@ -140,7 +137,7 @@ class LuceneIndexProvider(config: Config) extends 
IndexProvider {
     Future.successful(mutateEdges(edges, forceToIndex))
 
   override def mutateVertices(vertices: Seq[S2VertexLike], forceToIndex: 
Boolean = false): Seq[Boolean] = {
-    val writer = getOrElseCreateIndexWriter(GlobalIndex.VertexIndexName)
+    val writer = getOrElseCreateIndexWriter(VertexIndexName)
 
     vertices.foreach { vertex =>
       toDocument(vertex, forceToIndex).foreach { doc =>
@@ -160,7 +157,7 @@ class LuceneIndexProvider(config: Config) extends 
IndexProvider {
     Future.successful(mutateVertices(vertices, forceToIndex))
 
   override def mutateEdges(edges: Seq[S2EdgeLike], forceToIndex: Boolean = 
false): Seq[Boolean] = {
-    val writer = getOrElseCreateIndexWriter(GlobalIndex.EdgeIndexName)
+    val writer = getOrElseCreateIndexWriter(EdgeIndexName)
 
     edges.foreach { edge =>
       toDocument(edge, forceToIndex).foreach { doc =>
@@ -208,7 +205,7 @@ class LuceneIndexProvider(config: Config) extends 
IndexProvider {
 
     try {
       val q = new QueryParser(field, analyzer).parse(queryString)
-      fetchInner[VertexId](q, 0, 100, GlobalIndex.VertexIndexName, vidField, 
Conversions.s2VertexIdReads)
+      fetchInner[VertexId](q, 0, 100, VertexIndexName, vidField, 
Conversions.s2VertexIdReads)
     } catch {
       case ex: ParseException =>
         logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
@@ -222,7 +219,7 @@ class LuceneIndexProvider(config: Config) extends 
IndexProvider {
 
     try {
       val q = new QueryParser(field, analyzer).parse(queryString)
-      fetchInner[EdgeId](q, 0, 100, GlobalIndex.EdgeIndexName, field, 
Conversions.s2EdgeIdReads)
+      fetchInner[EdgeId](q, 0, 100, EdgeIndexName, field, 
Conversions.s2EdgeIdReads)
     } catch {
       case ex: ParseException =>
         logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
@@ -239,7 +236,7 @@ class LuceneIndexProvider(config: Config) extends 
IndexProvider {
       val field = vidField
       try {
         val q = new QueryParser(field, analyzer).parse(queryString)
-        fetchInner[VertexId](q, vertexQueryParam.offset, 
vertexQueryParam.limit, GlobalIndex.VertexIndexName, vidField, 
Conversions.s2VertexIdReads)
+        fetchInner[VertexId](q, vertexQueryParam.offset, 
vertexQueryParam.limit, VertexIndexName, vidField, Conversions.s2VertexIdReads)
       } catch {
         case ex: ParseException =>
           logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala
index 83159e2..15f1231 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.io
 
 import org.apache.s2graph.core.{EdgeId, JSONParser, S2VertexPropertyId}
-import org.apache.s2graph.core.mysqls.{ColumnMeta, Service, ServiceColumn}
+import org.apache.s2graph.core.schema.{ColumnMeta, Service, ServiceColumn}
 import org.apache.s2graph.core.types.{HBaseType, InnerValLike, VertexId}
 import play.api.libs.json._
 import play.api.libs.json.Reads._

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala
deleted file mode 100644
index e71bbce..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.mysqls
-
-import scalikejdbc._
-
-import scala.util.Try
-
-object Bucket extends Model[Bucket] {
-
-  val rangeDelimiter = "~"
-  val INVALID_BUCKET_EXCEPTION = new RuntimeException("invalid bucket.")
-
-  def apply(rs: WrappedResultSet): Bucket = {
-    Bucket(rs.intOpt("id"),
-      rs.int("experiment_id"),
-      rs.string("modular"),
-      rs.string("http_verb"),
-      rs.string("api_path"),
-      rs.string("request_body"),
-      rs.int("timeout"),
-      rs.string("impression_id"),
-      rs.boolean("is_graph_query"),
-      rs.boolean("is_empty"))
-  }
-
-  def finds(experimentId: Int)(implicit session: DBSession = AutoSession): 
List[Bucket] = {
-    val cacheKey = "experimentId=" + experimentId
-    withCaches(cacheKey) {
-      sql"""select * from buckets where experiment_id = $experimentId"""
-        .map { rs => Bucket(rs) }.list().apply()
-    }
-  }
-
-  def toRange(str: String): Option[(Int, Int)] = {
-    val range = str.split(rangeDelimiter)
-    if (range.length == 2) Option((range.head.toInt, range.last.toInt))
-    else None
-  }
-
-  def findByImpressionId(impressionId: String, useCache: Boolean = 
true)(implicit session: DBSession = AutoSession): Option[Bucket] = {
-    val cacheKey = "impressionId=" + impressionId
-    val sql = sql"""select * from buckets where impression_id=$impressionId"""
-      .map { rs => Bucket(rs)}
-    if (useCache) {
-      withCache(cacheKey) {
-        sql.single().apply()
-      }
-    } else {
-      sql.single().apply()
-    }
-  }
-
-  def insert(experiment: Experiment, modular: String, httpVerb: String, 
apiPath: String,
-             requestBody: String, timeout: Int, impressionId: String,
-             isGraphQuery: Boolean, isEmpty: Boolean)
-            (implicit session: DBSession = AutoSession): Try[Bucket] = {
-    Try {
-      sql"""
-            INSERT INTO buckets(experiment_id, modular, http_verb, api_path, 
request_body, timeout, impression_id,
-             is_graph_query, is_empty)
-            VALUES (${experiment.id.get}, $modular, $httpVerb, $apiPath, 
$requestBody, $timeout, $impressionId,
-             $isGraphQuery, $isEmpty)
-        """
-        .updateAndReturnGeneratedKey().apply()
-    }.map { newId =>
-      Bucket(Some(newId.toInt), experiment.id.get, modular, httpVerb, apiPath, 
requestBody, timeout, impressionId,
-        isGraphQuery, isEmpty)
-    }
-  }
-}
-
-case class Bucket(id: Option[Int],
-                  experimentId: Int,
-                  modular: String,
-                  httpVerb: String, apiPath: String,
-                  requestBody: String, timeout: Int, impressionId: String,
-                  isGraphQuery: Boolean = true,
-                  isEmpty: Boolean = false) {
-
-  import Bucket._
-
-  lazy val rangeOpt = toRange(modular)
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala
deleted file mode 100644
index 51f4a93..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.mysqls
-
-import play.api.libs.json.Json
-import scalikejdbc._
-
-import scala.util.Try
-
-object ColumnMeta extends Model[ColumnMeta] {
-
-  val timeStampSeq = -1.toByte
-  val lastModifiedAtColumnSeq = 0.toByte
-  val lastModifiedAtColumn = ColumnMeta(Some(0), 0, "lastModifiedAt", 
lastModifiedAtColumnSeq, "long", "-1")
-  val maxValue = Byte.MaxValue
-
-  val timestamp = ColumnMeta(None, -1, "_timestamp", timeStampSeq.toByte, 
"long", "-1")
-  val reservedMetas = Seq(timestamp, lastModifiedAtColumn)
-  val reservedMetaNamesSet = reservedMetas.map(_.name).toSet
-
-  def isValid(columnMeta: ColumnMeta): Boolean =
-    columnMeta.id.isDefined && columnMeta.id.get > 0 && columnMeta.seq >= 0
-
-  def valueOf(rs: WrappedResultSet): ColumnMeta = {
-    ColumnMeta(Some(rs.int("id")), rs.int("column_id"), rs.string("name"),
-      rs.byte("seq"), rs.string("data_type").toLowerCase(), 
rs.string("default_value"), rs.boolean("store_in_global_index"))
-  }
-
-  def findById(id: Int)(implicit session: DBSession = AutoSession) = {
-    //    val cacheKey = s"id=$id"
-    val cacheKey = "id=" + id
-    withCache(cacheKey) {
-      sql"""select * from column_metas where id = ${id}""".map { rs => 
ColumnMeta.valueOf(rs) }.single.apply
-    }.get
-  }
-
-  def findAllByColumn(columnId: Int, useCache: Boolean = true)(implicit 
session: DBSession = AutoSession) = {
-    //    val cacheKey = s"columnId=$columnId"
-    val cacheKey = "columnId=" + columnId
-    if (useCache) {
-      withCaches(cacheKey)( sql"""select *from column_metas where column_id = 
${columnId} order by seq ASC"""
-        .map { rs => ColumnMeta.valueOf(rs) }.list.apply())
-    } else {
-      sql"""select * from column_metas where column_id = ${columnId} order by 
seq ASC"""
-        .map { rs => ColumnMeta.valueOf(rs) }.list.apply()
-    }
-  }
-
-  def findByName(columnId: Int, name: String, useCache: Boolean = 
true)(implicit session: DBSession = AutoSession) = {
-    //    val cacheKey = s"columnId=$columnId:name=$name"
-    val cacheKey = "columnId=" + columnId + ":name=" + name
-    if (useCache) {
-      withCache(cacheKey)( sql"""select * from column_metas where column_id = 
${columnId} and name = ${name}"""
-          .map { rs => ColumnMeta.valueOf(rs) }.single.apply())
-    } else {
-      sql"""select * from column_metas where column_id = ${columnId} and name 
= ${name}"""
-          .map { rs => ColumnMeta.valueOf(rs) }.single.apply()
-    }
-  }
-
-  def insert(columnId: Int, name: String, dataType: String, defaultValue: 
String, storeInGlobalIndex: Boolean = false)(implicit session: DBSession = 
AutoSession) = {
-    val ls = findAllByColumn(columnId, false)
-    val seq = ls.size + 1
-    if (seq <= maxValue) {
-      sql"""insert into column_metas(column_id, name, seq, data_type, 
default_value, store_in_global_index)
-    select ${columnId}, ${name}, ${seq}, ${dataType}, ${defaultValue}, 
${storeInGlobalIndex}"""
-        .updateAndReturnGeneratedKey.apply()
-    }
-  }
-
-  def findOrInsert(columnId: Int,
-                   name: String,
-                   dataType: String,
-                   defaultValue: String,
-                   storeInGlobalIndex: Boolean = false,
-                   useCache: Boolean = true)(implicit session: DBSession = 
AutoSession): ColumnMeta = {
-    findByName(columnId, name, useCache) match {
-      case Some(c) => c
-      case None =>
-        insert(columnId, name, dataType, defaultValue, storeInGlobalIndex)
-        expireCache(s"columnId=$columnId:name=$name")
-        findByName(columnId, name).get
-    }
-  }
-
-  def findByIdAndSeq(columnId: Int, seq: Byte, useCache: Boolean = 
true)(implicit session: DBSession = AutoSession) = {
-    val cacheKey = "columnId=" + columnId + ":seq=" + seq
-    lazy val columnMetaOpt = sql"""
-        select * from column_metas where column_id = ${columnId} and seq = 
${seq}
-    """.map { rs => ColumnMeta.valueOf(rs) }.single.apply()
-
-    if (useCache) withCache(cacheKey)(columnMetaOpt)
-    else columnMetaOpt
-  }
-
-  def delete(id: Int)(implicit session: DBSession = AutoSession) = {
-    val columnMeta = findById(id)
-    val (columnId, name) = (columnMeta.columnId, columnMeta.name)
-    sql"""delete from column_metas where id = ${id}""".execute.apply()
-    val cacheKeys = List(s"id=$id", s"columnId=$columnId:name=$name", 
s"columnId=$columnId")
-    cacheKeys.foreach { key =>
-      expireCache(key)
-      expireCaches(key)
-    }
-  }
-
-  def findAll()(implicit session: DBSession = AutoSession) = {
-    val ls = sql"""select * from column_metas""".map { rs => 
ColumnMeta.valueOf(rs) }.list().apply()
-
-    putsToCache(ls.map { x =>
-      val cacheKey = s"id=${x.id.get}"
-      (cacheKey -> x)
-    })
-    putsToCache(ls.map { x =>
-      val cacheKey = s"columnId=${x.columnId}:name=${x.name}"
-      (cacheKey -> x)
-    })
-    putsToCache(ls.map { x =>
-      val cacheKey = s"columnId=${x.columnId}:seq=${x.seq}"
-      (cacheKey -> x)
-    })
-    putsToCaches(ls.groupBy(x => x.columnId).map { case (columnId, ls) =>
-      val cacheKey = s"columnId=${columnId}"
-      (cacheKey -> ls)
-    }.toList)
-  }
-
-  def updateStoreInGlobalIndex(id: Int, storeInGlobalIndex: Boolean)(implicit 
session: DBSession = AutoSession): Try[Long] = Try {
-    sql"""
-          update column_metas set store_in_global_index = 
${storeInGlobalIndex} where id = ${id}
-       """.updateAndReturnGeneratedKey.apply()
-  }
-}
-
-case class ColumnMeta(id: Option[Int],
-                      columnId: Int,
-                      name: String,
-                      seq: Byte,
-                      dataType: String,
-                      defaultValue: String,
-                      storeInGlobalIndex: Boolean = false) {
-  lazy val toJson = Json.obj("name" -> name, "dataType" -> dataType)
-  override def equals(other: Any): Boolean = {
-    if (!other.isInstanceOf[ColumnMeta]) false
-    else {
-      val o = other.asInstanceOf[ColumnMeta]
-      //      labelId == o.labelId &&
-      seq == o.seq
-    }
-  }
-  override def hashCode(): Int = seq.toInt
-}

Reply via email to