http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/schema/ColumnMeta.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/schema/ColumnMeta.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/schema/ColumnMeta.scala
new file mode 100644
index 0000000..e850541
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/ColumnMeta.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.schema
+
+import play.api.libs.json.Json
+import scalikejdbc._
+
+import scala.util.Try
+
+object ColumnMeta extends SQLSyntaxSupport[ColumnMeta] {
+  import Schema._
+  val className = ColumnMeta.getClass.getSimpleName
+
+  val timeStampSeq = 0.toByte
+  val countSeq = -1.toByte
+  val lastModifiedAtColumnSeq = 0.toByte
+  val lastModifiedAtColumn = ColumnMeta(Some(0), 0, "lastModifiedAt", 
lastModifiedAtColumnSeq, "long", "-1")
+  val maxValue = Byte.MaxValue
+
+  val timestamp = ColumnMeta(None, -1, "_timestamp", timeStampSeq.toByte, 
"long", "-1")
+  val reservedMetas = Seq(timestamp, lastModifiedAtColumn)
+  val reservedMetaNamesSet = reservedMetas.map(_.name).toSet
+
+  def isValid(columnMeta: ColumnMeta): Boolean =
+    columnMeta.id.isDefined && columnMeta.id.get > 0 && columnMeta.seq >= 0
+
+  def valueOf(rs: WrappedResultSet): ColumnMeta = {
+    ColumnMeta(Some(rs.int("id")), rs.int("column_id"), rs.string("name"),
+      rs.byte("seq"), rs.string("data_type").toLowerCase(), 
rs.string("default_value"), rs.boolean("store_in_global_index"))
+  }
+
+  def findById(id: Int)(implicit session: DBSession = AutoSession) = {
+    val cacheKey = className + "id=" + id
+    withCache(cacheKey) {
+      sql"""select * from column_metas where id = ${id}""".map { rs => 
ColumnMeta.valueOf(rs) }.single.apply
+    }.get
+  }
+
+  def findAllByColumn(columnId: Int, useCache: Boolean = true)(implicit 
session: DBSession = AutoSession) = {
+    //    val cacheKey = s"columnId=$columnId"
+    val cacheKey = className + "columnId=" + columnId
+    if (useCache) {
+      withCaches(cacheKey)( sql"""select *from column_metas where column_id = 
${columnId} order by seq ASC"""
+        .map { rs => ColumnMeta.valueOf(rs) }.list.apply())
+    } else {
+      sql"""select * from column_metas where column_id = ${columnId} order by 
seq ASC"""
+        .map { rs => ColumnMeta.valueOf(rs) }.list.apply()
+    }
+  }
+
+  def findByName(columnId: Int, name: String, useCache: Boolean = 
true)(implicit session: DBSession = AutoSession) = {
+    //    val cacheKey = s"columnId=$columnId:name=$name"
+    val cacheKey = className + "columnId=" + columnId + ":name=" + name
+    lazy val sql =  sql"""select * from column_metas where column_id = 
${columnId} and name = ${name}"""
+      .map { rs => ColumnMeta.valueOf(rs) }.single.apply()
+
+    if (useCache) withCache(cacheKey)(sql)
+    else sql
+  }
+
+  def insert(columnId: Int, name: String, dataType: String, defaultValue: 
String, storeInGlobalIndex: Boolean = false)(implicit session: DBSession = 
AutoSession) = {
+    val ls = findAllByColumn(columnId, false)
+    val seq = ls.size + 1
+    if (seq <= maxValue) {
+      sql"""insert into column_metas(column_id, name, seq, data_type, 
default_value, store_in_global_index)
+    select ${columnId}, ${name}, ${seq}, ${dataType}, ${defaultValue}, 
${storeInGlobalIndex}"""
+        .updateAndReturnGeneratedKey.apply()
+    }
+  }
+
+  def findOrInsert(columnId: Int,
+                   name: String,
+                   dataType: String,
+                   defaultValue: String,
+                   storeInGlobalIndex: Boolean = false,
+                   useCache: Boolean = true)(implicit session: DBSession = 
AutoSession): ColumnMeta = {
+    findByName(columnId, name, useCache) match {
+      case Some(c) => c
+      case None =>
+        insert(columnId, name, dataType, defaultValue, storeInGlobalIndex)
+        expireCache(className + s"columnId=$columnId:name=$name")
+        findByName(columnId, name).get
+    }
+  }
+
+  def findByIdAndSeq(columnId: Int, seq: Byte, useCache: Boolean = 
true)(implicit session: DBSession = AutoSession) = {
+    val cacheKey = className + "columnId=" + columnId + ":seq=" + seq
+    lazy val columnMetaOpt = sql"""
+        select * from column_metas where column_id = ${columnId} and seq = 
${seq}
+    """.map { rs => ColumnMeta.valueOf(rs) }.single.apply()
+
+    if (useCache) withCache(cacheKey)(columnMetaOpt)
+    else columnMetaOpt
+  }
+
+  def delete(id: Int)(implicit session: DBSession = AutoSession) = {
+    val columnMeta = findById(id)
+    val (columnId, name) = (columnMeta.columnId, columnMeta.name)
+    sql"""delete from column_metas where id = ${id}""".execute.apply()
+    val cacheKeys = List(s"id=$id", s"columnId=$columnId:name=$name", 
s"colunmId=$columnId")
+    cacheKeys.foreach { key =>
+      expireCache(className + key)
+      expireCaches(className + key)
+    }
+  }
+
+  def findAll()(implicit session: DBSession = AutoSession) = {
+    val ls = sql"""select * from column_metas""".map { rs => 
ColumnMeta.valueOf(rs) }.list().apply()
+
+    putsToCacheOption(ls.flatMap { x =>
+      Seq(
+        s"id=${x.id.get}",
+        s"columnId=${x.columnId}:name=${x.name}",
+        s"columnId=${x.columnId}:seq=${x.seq}"
+      ).map(cacheKey => (className + cacheKey, x))
+    })
+
+    putsToCaches(ls.groupBy(x => x.columnId).map { case (columnId, ls) =>
+      val cacheKey = className + s"columnId=${columnId}"
+      (cacheKey -> ls)
+    }.toList)
+  }
+
+  def updateStoreInGlobalIndex(id: Int, storeInGlobalIndex: Boolean)(implicit 
session: DBSession = AutoSession): Try[Long] = Try {
+    sql"""
+          update column_metas set store_in_global_index = 
${storeInGlobalIndex} where id = ${id}
+       """.updateAndReturnGeneratedKey.apply()
+  }
+}
+
+case class ColumnMeta(id: Option[Int],
+                      columnId: Int,
+                      name: String,
+                      seq: Byte,
+                      dataType: String,
+                      defaultValue: String,
+                      storeInGlobalIndex: Boolean = false) {
+  lazy val toJson = Json.obj("name" -> name, "dataType" -> dataType)
+  override def equals(other: Any): Boolean = {
+    if (!other.isInstanceOf[ColumnMeta]) false
+    else {
+      val o = other.asInstanceOf[ColumnMeta]
+      //      labelId == o.labelId &&
+      seq == o.seq
+    }
+  }
+  override def hashCode(): Int = seq.toInt
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/schema/Experiment.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/schema/Experiment.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/schema/Experiment.scala
new file mode 100644
index 0000000..860bf70
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Experiment.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.schema
+
+import org.apache.s2graph.core.GraphUtil
+import scalikejdbc._
+
+import scala.util.{Try, Random}
+
+object Experiment extends SQLSyntaxSupport[Experiment] {
+  import Schema._
+  val className = Experiment.getClass.getSimpleName
+
+  val ImpressionKey = "S2-Impression-Id"
+  val ImpressionId = "Impression-Id"
+
+  def apply(rs: WrappedResultSet): Experiment = {
+    Experiment(rs.intOpt("id"),
+      rs.int("service_id"),
+      rs.string("name"),
+      rs.string("description"),
+      rs.string("experiment_type"),
+      rs.int("total_modular"))
+  }
+
+  def finds(serviceId: Int)(implicit session: DBSession = AutoSession): 
List[Experiment] = {
+    val cacheKey = className + "serviceId=" + serviceId
+    withCaches(cacheKey, false) {
+      sql"""select * from experiments where service_id = ${serviceId}"""
+        .map { rs => Experiment(rs) }.list().apply()
+    }
+  }
+
+  def findBy(serviceId: Int, name: String)(implicit session: DBSession = 
AutoSession): Option[Experiment] = {
+    val cacheKey = className + "serviceId=" + serviceId + ":name=" + name
+    withCache(cacheKey, false) {
+      sql"""select * from experiments where service_id = ${serviceId} and name 
= ${name}"""
+        .map { rs => Experiment(rs) }.single.apply
+    }
+  }
+
+  def findById(id: Int)(implicit session: DBSession = AutoSession): 
Option[Experiment] = {
+    val cacheKey = className + "id=" + id
+    withCache(cacheKey, false)(
+      sql"""select * from experiments where id = ${id}"""
+        .map { rs => Experiment(rs) }.single.apply
+    )
+  }
+
+  def insert(service: Service, name: String, description: String, 
experimentType: String = "t", totalModular: Int = 100)
+            (implicit session: DBSession = AutoSession): Try[Experiment] = {
+    Try {
+      sql"""INSERT INTO experiments(service_id, service_name, `name`, 
description, experiment_type, total_modular)
+         VALUES(${service.id.get}, ${service.serviceName}, $name, 
$description, $experimentType, $totalModular)"""
+        .updateAndReturnGeneratedKey().apply()
+    }.map { newId =>
+      Experiment(Some(newId.toInt), service.id.get, name, description, 
experimentType, totalModular)
+    }
+  }
+}
+
+case class Experiment(id: Option[Int],
+                      serviceId: Int,
+                      name: String,
+                      description: String,
+                      experimentType: String,
+                      totalModular: Int) {
+
+  def buckets = Bucket.finds(id.get)
+
+  def rangeBuckets = for {
+    bucket <- buckets
+    range <- bucket.rangeOpt
+  } yield range -> bucket
+
+
+  def findBucket(uuid: String, impIdOpt: Option[String] = None): 
Option[Bucket] = {
+    impIdOpt match {
+      case Some(impId) => Bucket.findByImpressionId(impId)
+      case None =>
+        val seed = experimentType match {
+          case "u" => (GraphUtil.murmur3(uuid) % totalModular) + 1
+          case _ => Random.nextInt(totalModular) + 1
+        }
+        findBucket(seed)
+    }
+  }
+
+  def findBucket(uuidMod: Int): Option[Bucket] = {
+    rangeBuckets.find { case ((from, to), bucket) =>
+      from <= uuidMod && uuidMod <= to
+    }.map(_._2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
new file mode 100644
index 0000000..7fb1183
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.schema
+
+import java.util.Calendar
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.GraphExceptions.ModelNotFoundException
+import org.apache.s2graph.core.GraphUtil
+import org.apache.s2graph.core.JSONParser._
+import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
+import org.apache.s2graph.core.types.{InnerVal, InnerValLike, 
InnerValLikeWithTs}
+import org.apache.s2graph.core.utils.logger
+import play.api.libs.json.{JsArray, JsObject, JsValue, Json}
+import scalikejdbc._
+
+object Label extends SQLSyntaxSupport[Label] {
+  import Schema._
+  val className = Label.getClass.getSimpleName
+
+  val maxHBaseTableNames = 2
+
+  def apply(rs: WrappedResultSet): Label = {
+    Label(Option(rs.int("id")), rs.string("label"),
+      rs.int("src_service_id"), rs.string("src_column_name"), 
rs.string("src_column_type"),
+      rs.int("tgt_service_id"), rs.string("tgt_column_name"), 
rs.string("tgt_column_type"),
+      rs.boolean("is_directed"), rs.string("service_name"), 
rs.int("service_id"), rs.string("consistency_level"),
+      rs.string("hbase_table_name"), rs.intOpt("hbase_table_ttl"), 
rs.string("schema_version"), rs.boolean("is_async"),
+      rs.string("compressionAlgorithm"), rs.stringOpt("options"))
+  }
+
+  def deleteAll(label: Label)(implicit session: DBSession) = {
+    val id = label.id
+    LabelMeta.findAllByLabelId(id.get, false).foreach { x => 
LabelMeta.delete(x.id.get) }
+    LabelIndex.findByLabelIdAll(id.get, false).foreach { x => 
LabelIndex.delete(x.id.get) }
+    Label.delete(id.get)
+  }
+
+  def findByName(labelName: String, useCache: Boolean = true)(implicit 
session: DBSession = AutoSession): Option[Label] = {
+    val cacheKey = className + "label=" + labelName
+    lazy val labelOpt =
+      sql"""
+        select *
+        from labels
+        where label = ${labelName}
+        and deleted_at is null """.map { rs => Label(rs) }.single.apply()
+
+    if (useCache) withCache(cacheKey)(labelOpt)
+    else labelOpt
+  }
+
+  def insert(label: String,
+             srcServiceId: Int,
+             srcColumnName: String,
+             srcColumnType: String,
+             tgtServiceId: Int,
+             tgtColumnName: String,
+             tgtColumnType: String,
+             isDirected: Boolean,
+             serviceName: String,
+             serviceId: Int,
+             consistencyLevel: String,
+             hTableName: String,
+             hTableTTL: Option[Int],
+             schemaVersion: String,
+             isAsync: Boolean,
+             compressionAlgorithm: String,
+             options: Option[String])(implicit session: DBSession = 
AutoSession) = {
+    sql"""
+       insert into labels(label,
+    src_service_id, src_column_name, src_column_type,
+    tgt_service_id, tgt_column_name, tgt_column_type,
+    is_directed, service_name, service_id, consistency_level, 
hbase_table_name, hbase_table_ttl, schema_version, is_async,
+    compressionAlgorithm, options)
+       values (${label},
+    ${srcServiceId}, ${srcColumnName}, ${srcColumnType},
+    ${tgtServiceId}, ${tgtColumnName}, ${tgtColumnType},
+    ${isDirected}, ${serviceName}, ${serviceId}, ${consistencyLevel}, 
${hTableName}, ${hTableTTL},
+    ${schemaVersion}, ${isAsync}, ${compressionAlgorithm}, ${options})
+    """
+      .updateAndReturnGeneratedKey.apply()
+  }
+
+  def findByIdOpt(id: Int)(implicit session: DBSession = AutoSession): 
Option[Label] = {
+    val cacheKey = className + "id=" + id
+    withCache(cacheKey)(
+      sql"""
+        select         *
+        from   labels
+        where  id = ${id}
+        and deleted_at is null"""
+        .map { rs => Label(rs) }.single.apply())
+  }
+
+  def findById(id: Int)(implicit session: DBSession = AutoSession): Label = {
+    val cacheKey = className + "id=" + id
+    withCache(cacheKey)(
+      sql"""
+        select         *
+        from   labels
+        where  id = ${id}
+        and deleted_at is null"""
+        .map { rs => Label(rs) }.single.apply()).get
+  }
+
+  def findByTgtColumnId(columnId: Int)(implicit session: DBSession = 
AutoSession): List[Label] = {
+    val cacheKey = className + "tgtColumnId=" + columnId
+    val col = ServiceColumn.findById(columnId)
+    withCaches(cacheKey)(
+      sql"""
+          select       *
+          from labels
+          where        tgt_column_name = ${col.columnName}
+          and service_id = ${col.serviceId}
+          and deleted_at is null
+        """.map { rs => Label(rs) }.list().apply())
+  }
+
+  def findBySrcColumnId(columnId: Int)(implicit session: DBSession = 
AutoSession): List[Label] = {
+    val cacheKey = className + "srcColumnId=" + columnId
+    val col = ServiceColumn.findById(columnId)
+    withCaches(cacheKey)(
+      sql"""
+          select       *
+          from labels
+          where        src_column_name = ${col.columnName}
+          and service_id = ${col.serviceId}
+          and deleted_at is null
+        """.map { rs => Label(rs) }.list().apply())
+  }
+
+  def findBySrcServiceId(serviceId: Int)(implicit session: DBSession = 
AutoSession): List[Label] = {
+    val cacheKey = className + "srcServiceId=" + serviceId
+    withCaches(cacheKey)(
+      sql"""select * from labels where src_service_id = ${serviceId} and 
deleted_at is null""".map { rs => Label(rs) }.list().apply
+    )
+  }
+
+  def findByTgtServiceId(serviceId: Int)(implicit session: DBSession = 
AutoSession): List[Label] = {
+    val cacheKey = className + "tgtServiceId=" + serviceId
+    withCaches(cacheKey)(
+      sql"""select * from labels where tgt_service_id = ${serviceId} and 
deleted_at is null""".map { rs => Label(rs) }.list().apply
+    )
+  }
+
+  def insertAll(labelName: String, srcServiceName: String, srcColumnName: 
String, srcColumnType: String,
+                tgtServiceName: String, tgtColumnName: String, tgtColumnType: 
String,
+                isDirected: Boolean = true,
+                serviceName: String,
+                indices: Seq[Index],
+                metaProps: Seq[Prop],
+                consistencyLevel: String,
+                hTableName: Option[String],
+                hTableTTL: Option[Int],
+                schemaVersion: String,
+                isAsync: Boolean,
+                compressionAlgorithm: String,
+                options: Option[String])(implicit session: DBSession = 
AutoSession): Label = {
+
+    val srcServiceOpt = Service.findByName(srcServiceName, useCache = false)
+    val tgtServiceOpt = Service.findByName(tgtServiceName, useCache = false)
+    val serviceOpt = Service.findByName(serviceName, useCache = false)
+    if (srcServiceOpt.isEmpty) throw new RuntimeException(s"source service 
$srcServiceName is not created.")
+    if (tgtServiceOpt.isEmpty) throw new RuntimeException(s"target service 
$tgtServiceName is not created.")
+    if (serviceOpt.isEmpty) throw new RuntimeException(s"service $serviceName 
is not created.")
+
+    val newLabel = for {
+      srcService <- srcServiceOpt
+      tgtService <- tgtServiceOpt
+      service <- serviceOpt
+    } yield {
+        val srcServiceId = srcService.id.get
+        val tgtServiceId = tgtService.id.get
+        val serviceId = service.id.get
+
+        /** insert serviceColumn */
+        val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, 
Some(srcColumnType), schemaVersion)
+        val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, 
Some(tgtColumnType), schemaVersion)
+
+        if (srcCol.columnType != srcColumnType) throw new 
RuntimeException(s"source service column type not matched ${srcCol.columnType} 
!= ${srcColumnType}")
+        if (tgtCol.columnType != tgtColumnType) throw new 
RuntimeException(s"target service column type not matched ${tgtCol.columnType} 
!= ${tgtColumnType}")
+
+        /* create label */
+        Label.findByName(labelName, useCache = false).getOrElse {
+
+          val createdId = insert(labelName, srcServiceId, srcColumnName, 
srcColumnType,
+            tgtServiceId, tgtColumnName, tgtColumnType, isDirected, 
serviceName, serviceId, consistencyLevel,
+            hTableName.getOrElse(service.hTableName), 
hTableTTL.orElse(service.hTableTTL), schemaVersion, isAsync,
+            compressionAlgorithm, options).toInt
+
+          val labelMetaMap = metaProps.map { case Prop(propName, defaultValue, 
dataType, storeInGlobalIndex) =>
+            val labelMeta = LabelMeta.findOrInsert(createdId, propName, 
defaultValue, dataType, storeInGlobalIndex)
+            (propName -> labelMeta.seq)
+          }.toMap ++ LabelMeta.reservedMetas.map (labelMeta => labelMeta.name 
-> labelMeta.seq).toMap
+
+          if (indices.isEmpty) {
+            // make default index with _PK, _timestamp, 0
+            LabelIndex.findOrInsert(createdId, LabelIndex.DefaultName, 
LabelIndex.DefaultMetaSeqs.toList, "none", None, None)
+          } else {
+            indices.foreach { index =>
+              val metaSeq = index.propNames.map { name => labelMetaMap(name) }
+              LabelIndex.findOrInsert(createdId, index.name, metaSeq.toList, 
"none", index.direction, index.options)
+            }
+          }
+
+          val cacheKeys = List(s"id=$createdId", s"label=$labelName")
+          val ret = findByName(labelName, useCache = false).get
+          putsToCacheOption(cacheKeys.map(k => className + k -> ret))
+          ret
+        }
+      }
+
+    newLabel.getOrElse(throw new RuntimeException("failed to create label"))
+  }
+
+  def findAll()(implicit session: DBSession = AutoSession) = {
+    val ls = sql"""select * from labels where deleted_at is null""".map { rs 
=> Label(rs) }.list().apply()
+    putsToCacheOption(ls.flatMap { x =>
+      Seq(
+        s"id=${x.id.get}",
+        s"label=${x.label}"
+      ).map(cacheKey => (className + cacheKey, x))
+    })
+    ls
+  }
+
+  def updateName(oldName: String, newName: String)(implicit session: DBSession 
= AutoSession) = {
+    logger.info(s"rename label: $oldName -> $newName")
+    sql"""update labels set label = ${newName} where label = 
${oldName}""".update.apply()
+  }
+
+  def updateHTableName(labelName: String, newHTableName: String)(implicit 
session: DBSession = AutoSession) = {
+    logger.info(s"update HTable of label $labelName to $newHTableName")
+    val cnt = sql"""update labels set hbase_table_name = $newHTableName where 
label = $labelName""".update().apply()
+    val label = Label.findByName(labelName, useCache = false).get
+
+    val cacheKeys = List(s"id=${label.id.get}", s"label=${label.label}")
+    cacheKeys.foreach { key =>
+      expireCache(className + key)
+      expireCaches(className + key)
+    }
+    cnt
+  }
+
+  def delete(id: Int)(implicit session: DBSession = AutoSession) = {
+    val label = findById(id)
+    logger.info(s"delete label: $label")
+    val cnt = sql"""delete from labels where id = 
${label.id.get}""".update().apply()
+    val cacheKeys = List(s"id=$id", s"label=${label.label}")
+    cacheKeys.foreach { key =>
+      expireCache(className + key)
+      expireCaches(className + key)
+    }
+    cnt
+  }
+
+  def markDeleted(label: Label)(implicit session: DBSession = AutoSession) = {
+
+    logger.info(s"mark deleted label: $label")
+    val oldName = label.label
+    val now = Calendar.getInstance().getTime
+    val newName = s"deleted_${now.getTime}_"+ label.label
+    val cnt = sql"""update labels set label = ${newName}, deleted_at = ${now} 
where id = ${label.id.get}""".update.apply()
+    val cacheKeys = List(s"id=${label.id.get}", s"label=${oldName}")
+    cacheKeys.foreach { key =>
+      expireCache(className + key)
+      expireCaches(className + key)
+    }
+    cnt
+  }
+}
+
+case class Label(id: Option[Int], label: String,
+                 srcServiceId: Int, srcColumnName: String, srcColumnType: 
String,
+                 tgtServiceId: Int, tgtColumnName: String, tgtColumnType: 
String,
+                 isDirected: Boolean = true, serviceName: String, serviceId: 
Int, consistencyLevel: String = "strong",
+                 hTableName: String, hTableTTL: Option[Int],
+                 schemaVersion: String, isAsync: Boolean = false,
+                 compressionAlgorithm: String,
+                 options: Option[String]) {
+  def metas(useCache: Boolean = true) = LabelMeta.findAllByLabelId(id.get, 
useCache = useCache)
+
+  def indices(useCache: Boolean = true) = LabelIndex.findByLabelIdAll(id.get, 
useCache = useCache)
+
+  //  lazy val firstHBaseTableName = 
hbaseTableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME)
+  lazy val srcService = Service.findById(srcServiceId)
+  lazy val tgtService = Service.findById(tgtServiceId)
+  lazy val service = Service.findById(serviceId)
+  /**
+   * TODO
+   * change this to apply hbase table from target serviceName
+   */
+  //  lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, 
service.tableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME))
+  //  lazy val (hbaseZkAddr, hbaseTableName) = (Config.HBASE_ZOOKEEPER_QUORUM, 
hTableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME))
+  //  lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, 
hTableName.split(",").headOption.getOrElse(GraphConnection.getConfVal("hbase.table.name")))
+  lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, 
hTableName.split(",").head)
+
+  lazy val srcColumn = ServiceColumn.find(srcServiceId, 
srcColumnName).getOrElse(throw ModelNotFoundException("Source column not 
found"))
+  lazy val tgtColumn = ServiceColumn.find(tgtServiceId, 
tgtColumnName).getOrElse(throw ModelNotFoundException("Target column not 
found"))
+
+  lazy val defaultIndex = LabelIndex.findByLabelIdAndSeq(id.get, 
LabelIndex.DefaultSeq)
+
+  //TODO: Make sure this is correct
+
+//  lazy val metas = metas(useCache = true)
+  lazy val indices = LabelIndex.findByLabelIdAll(id.get, useCache = true)
+  lazy val labelMetas = LabelMeta.findAllByLabelId(id.get, useCache = true)
+  lazy val labelMetaSet = labelMetas.toSet
+  lazy val labelMetaMap = (labelMetas ++ LabelMeta.reservedMetas).map(m => 
m.seq -> m).toMap
+
+  lazy val indicesMap = indices.map(idx => (idx.seq, idx)) toMap
+  lazy val indexSeqsMap = indices.map(idx => (idx.metaSeqs, idx)) toMap
+  lazy val indexNameMap = indices.map(idx => (idx.name, idx)) toMap
+  lazy val extraIndices = indices.filter(idx => defaultIndex.isDefined && 
idx.id.get != defaultIndex.get.id.get)
+  //      indices filterNot (_.id.get == defaultIndex.get.id.get)
+  lazy val extraIndicesMap = extraIndices.map(idx => (idx.seq, idx)) toMap
+
+  lazy val metaProps = LabelMeta.reservedMetas.map { m =>
+    if (m == LabelMeta.to) m.copy(dataType = tgtColumnType)
+    else if (m == LabelMeta.from) m.copy(dataType = srcColumnType)
+    else m
+  } ::: LabelMeta.findAllByLabelId(id.get, useCache = true)
+
+  lazy val metaPropsInner = LabelMeta.reservedMetasInner.map { m =>
+    if (m == LabelMeta.to) m.copy(dataType = tgtColumnType)
+    else if (m == LabelMeta.from) m.copy(dataType = srcColumnType)
+    else m
+  } ::: LabelMeta.findAllByLabelId(id.get, useCache = true)
+
+  lazy val metaPropsMap = metaProps.map(x => (x.seq, x)).toMap
+  lazy val metaPropsInvMap = metaProps.map(x => (x.name, x)).toMap
+  lazy val metaPropNames = metaProps.map(x => x.name)
+  lazy val metaPropNamesMap = metaProps.map(x => (x.seq, x.name)) toMap
+
+  /** this is used only by edgeToProps */
+  lazy val metaPropsDefaultMap = (for {
+    prop <- metaProps if LabelMeta.isValidSeq(prop.seq)
+    jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, 
schemaVersion), prop.dataType)
+  } yield prop.name -> jsValue).toMap
+
+  lazy val metaPropsDefaultMapInnerString = (for {
+    prop <- metaPropsInner if LabelMeta.isValidSeq(prop.seq)
+    innerVal = InnerValLikeWithTs(toInnerVal(prop.defaultValue, prop.dataType, 
schemaVersion), System.currentTimeMillis())
+  } yield prop.name -> innerVal).toMap
+
+  lazy val metaPropsDefaultMapInner = (for {
+    prop <- metaPropsInner
+    innerVal = InnerValLikeWithTs(toInnerVal(prop.defaultValue, prop.dataType, 
schemaVersion), System.currentTimeMillis())
+  } yield prop -> innerVal).toMap
+  lazy val metaPropsDefaultMapInnerSeq = metaPropsDefaultMapInner.toSeq
+  lazy val metaPropsJsValueWithDefault = (for {
+    prop <- metaProps if LabelMeta.isValidSeq(prop.seq)
+    jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, 
schemaVersion), prop.dataType)
+  } yield prop -> jsValue).toMap
+//  lazy val extraOptions = Model.extraOptions(Option("""{
+//    "storage": {
+//      "s2graph.storage.backend": "rocks",
+//      "rocks.db.path": "/tmp/db"
+//    }
+//  }"""))
+
+  lazy val tokens: Set[String] = 
extraOptions.get("tokens").fold(Set.empty[String]) {
+    case JsArray(tokens) => tokens.map(_.as[String]).toSet
+    case _ =>
+      logger.error("Invalid token JSON")
+      Set.empty[String]
+  }
+
+  lazy val extraOptions = Schema.extraOptions(options)
+
+  lazy val durability = 
extraOptions.get("durability").map(_.as[Boolean]).getOrElse(true)
+
+  lazy val storageConfigOpt: Option[Config] = toStorageConfig
+
+  def toStorageConfig: Option[Config] = {
+    Schema.toStorageConfig(extraOptions)
+  }
+
+  def srcColumnWithDir(dir: Int) = {
+    // GraphUtil.directions("out"
+    if (dir == 0) srcColumn else tgtColumn
+  }
+
+  def tgtColumnWithDir(dir: Int) = {
+    // GraphUtil.directions("out"
+    if (dir == 0) tgtColumn else srcColumn
+  }
+
+  lazy val tgtSrc = (tgtColumn, srcColumn)
+  lazy val srcTgt = (srcColumn, tgtColumn)
+
+  def srcTgtColumn(dir: Int) = if (dir == 1) tgtSrc else srcTgt
+
+  lazy val EmptyPropsWithTs = Map(LabelMeta.timestamp -> 
InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 0))
+//  def init() = {
+//    metas()
+//    metaSeqsToNames()
+//    service
+//    srcColumn
+//    tgtColumn
+//    defaultIndex
+//    indices
+//    metaProps
+//  }
+
+  //  def srcColumnInnerVal(jsValue: JsValue) = {
+  //    jsValueToInnerVal(jsValue, srcColumnType, version)
+  //  }
+  //  def tgtColumnInnerVal(jsValue: JsValue) = {
+  //    jsValueToInnerVal(jsValue, tgtColumnType, version)
+  //  }
+
+  override def toString(): String = {
+    val orderByKeys = LabelMeta.findAllByLabelId(id.get)
+    super.toString() + orderByKeys.toString()
+  }
+
+  //  def findLabelIndexSeq(scoring: List[(Byte, Double)]): Byte = {
+  //    if (scoring.isEmpty) LabelIndex.defaultSeq
+  //    else {
+  //      LabelIndex.findByLabelIdAndSeqs(id.get, 
scoring.map(_._1).sorted).map(_.seq).getOrElse(LabelIndex.defaultSeq)
+  //
+  ////      LabelIndex.findByLabelIdAndSeqs(id.get, 
scoring.map(_._1).sorted).map(_.seq).getOrElse(LabelIndex.defaultSeq)
+  //    }
+  //  }
+  lazy val toJson = {
+    val allIdxs = LabelIndex.findByLabelIdAll(id.get, useCache = false)
+    val defaultIdxOpt = LabelIndex.findByLabelIdAndSeq(id.get, 
LabelIndex.DefaultSeq, useCache = false)
+    val extraIdxs = allIdxs.filter(idx => defaultIdxOpt.isDefined && 
idx.id.get != defaultIdxOpt.get.id.get)
+    val metaProps = LabelMeta.reservedMetas.map { m =>
+      if (m == LabelMeta.to) m.copy(dataType = tgtColumnType)
+      else if (m == LabelMeta.from) m.copy(dataType = srcColumnType)
+      else m
+    } ::: LabelMeta.findAllByLabelId(id.get, useCache = false)
+
+    val defaultIdx = defaultIdxOpt.map(x => x.toJson).getOrElse(Json.obj())
+    val optionsJs = try {
+      val obj = options.map(Json.parse).getOrElse(Json.obj()).as[JsObject]
+      if (!obj.value.contains("tokens")) obj
+      else obj ++ Json.obj("tokens" -> 
obj.value("tokens").as[Seq[String]].map("*" * _.length))
+
+    } catch { case e: Exception => Json.obj() }
+
+    Json.obj("labelName" -> label,
+      "from" -> srcColumn.toJson, "to" -> tgtColumn.toJson,
+      "isDirected" -> isDirected,
+      "serviceName" -> serviceName,
+      "consistencyLevel" -> consistencyLevel,
+      "schemaVersion" -> schemaVersion,
+      "isAsync" -> isAsync,
+      "compressionAlgorithm" -> compressionAlgorithm,
+      "defaultIndex" -> defaultIdx,
+      "extraIndex" -> extraIdxs.map(exIdx => exIdx.toJson),
+      "metaProps" -> metaProps.filter { labelMeta => 
LabelMeta.isValidSeqForAdmin(labelMeta.seq) }.map(_.toJson),
+      "options" -> optionsJs
+    )
+  }
+
+  def propsToInnerValsWithTs(props: Map[String, Any],
+                             ts: Long = System.currentTimeMillis()): 
Map[LabelMeta, InnerValLikeWithTs] = {
+    for {
+      (k, v) <- props
+      labelMeta <- metaPropsInvMap.get(k)
+      innerVal = toInnerVal(v, labelMeta.dataType, schemaVersion)
+    } yield labelMeta -> InnerValLikeWithTs(innerVal, ts)
+
+  }
+
+  def innerValsWithTsToProps(props: Map[LabelMeta, InnerValLikeWithTs],
+                             selectColumns: Map[Byte, Boolean]): Map[String, 
Any] = {
+    if (selectColumns.isEmpty) {
+      for {
+        (meta, v) <- metaPropsDefaultMapInner ++ props
+      } yield {
+        meta.name -> innerValToAny(v.innerVal, meta.dataType)
+      }
+    } else {
+      for {
+        (k, _) <- selectColumns
+        if k != LabelMeta.toSeq && k != LabelMeta.fromSeq
+        labelMeta <- metaPropsMap.get(k)
+      } yield {
+        val v = 
props.get(labelMeta).orElse(metaPropsDefaultMapInner.get(labelMeta)).get
+        labelMeta.name -> innerValToAny(v.innerVal, labelMeta.dataType)
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelIndex.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelIndex.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelIndex.scala
new file mode 100644
index 0000000..bb8425f
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelIndex.scala
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.schema
+
+import org.apache.s2graph.core.GraphUtil
+import org.apache.s2graph.core.schema.LabelIndex.LabelIndexMutateOption
+import org.apache.s2graph.core.utils.logger
+import play.api.libs.json.{JsObject, JsString, Json}
+import scalikejdbc._
+
+object LabelIndex extends SQLSyntaxSupport[LabelIndex] {
+  import Schema._
+  val className = LabelIndex.getClass.getSimpleName
+
+  val DefaultName = "_PK"
+  val DefaultMetaSeqs = Seq(LabelMeta.timestampSeq)
+  val DefaultSeq = 1.toByte
+  val MaxOrderSeq = 7
+
+  def apply(rs: WrappedResultSet): LabelIndex = {
+    LabelIndex(rs.intOpt("id"), rs.int("label_id"), rs.string("name"), 
rs.byte("seq"),
+      rs.string("meta_seqs").split(",").filter(_ != "").map(s => 
s.toByte).toList match {
+        case metaSeqsList => metaSeqsList
+      },
+      rs.string("formulars"),
+      rs.intOpt("dir"),
+      rs.stringOpt("options")
+    )
+  }
+
+  case class LabelIndexMutateOption(dir: Byte,
+                                    method: String,
+                                    rate: Double,
+                                    totalModular: Long,
+                                    storeDegree: Boolean) {
+
+    val isBufferIncrement = method == "drop" || method == "sample" || method 
== "hash_sample"
+
+    def sample[T](a: T, hashOpt: Option[Long]): Boolean = {
+      if (method == "drop") false
+      else if (method == "sample") {
+        if (scala.util.Random.nextDouble() < rate) true
+        else false
+      } else if (method == "hash_sample") {
+        val hash = hashOpt.getOrElse(throw new RuntimeException("hash_sample 
need _from_hash value"))
+        if ((hash.abs % totalModular) / totalModular.toDouble < rate) true
+        else false
+      } else true
+    }
+  }
+
+  def findById(id: Int)(implicit session: DBSession = AutoSession) = {
+    val cacheKey = className + "id=" + id
+    withCache(cacheKey) {
+      sql"""select * from label_indices where id = ${id}""".map { rs => 
LabelIndex(rs) }.single.apply
+    }.get
+  }
+
+  def findByLabelIdAll(labelId: Int, useCache: Boolean = true)(implicit 
session: DBSession = AutoSession) = {
+    val cacheKey = className + "labelId=" + labelId
+    if (useCache) {
+      withCaches(cacheKey)( sql"""
+        select * from label_indices where label_id = ${labelId} and seq > 0 
order by seq ASC
+      """.map { rs => LabelIndex(rs) }.list.apply)
+    } else {
+      sql"""
+        select * from label_indices where label_id = ${labelId} and seq > 0 
order by seq ASC
+      """.map { rs => LabelIndex(rs) }.list.apply
+    }
+  }
+
+  def insert(labelId: Int, indexName: String, seq: Byte, metaSeqs: List[Byte], 
formulars: String,
+             direction: Option[Int], options: Option[String])(implicit 
session: DBSession = AutoSession): Long = {
+    sql"""
+       insert into label_indices(label_id, name, seq, meta_seqs, formulars, 
dir, options)
+       values (${labelId}, ${indexName}, ${seq}, ${metaSeqs.mkString(",")}, 
${formulars}, ${direction}, ${options})
+    """
+      .updateAndReturnGeneratedKey.apply()
+  }
+
+  def findOrInsert(labelId: Int, indexName: String, metaSeqs: List[Byte], 
formulars: String,
+                   direction: Option[Int], options: Option[String])(implicit 
session: DBSession = AutoSession): LabelIndex = {
+    findByLabelIdAndSeqs(labelId, metaSeqs, direction) match {
+      case Some(s) => s
+      case None =>
+        val orders = findByLabelIdAll(labelId, false)
+        val seq = (orders.size + 1).toByte
+        assert(seq <= MaxOrderSeq)
+        val createdId = insert(labelId, indexName, seq, metaSeqs, formulars, 
direction, options)
+        val cacheKeys = List(s"labelId=$labelId:seq=$seq",
+          s"labelId=$labelId:seqs=$metaSeqs:dir=$direction", 
s"labelId=$labelId:seq=$seq", s"id=$createdId")
+
+        cacheKeys.foreach { key =>
+          expireCache(className + key)
+          expireCaches(className + key)
+        }
+
+        findByLabelIdAndSeq(labelId, seq).get
+    }
+  }
+
+  def findByLabelIdAndSeqs(labelId: Int, seqs: List[Byte], direction: 
Option[Int])(implicit session: DBSession = AutoSession): Option[LabelIndex] = {
+    val cacheKey = className + "labelId=" + labelId + ":seqs=" + 
seqs.mkString(",") + ":dir=" + direction
+    withCache(cacheKey) {
+      sql"""
+      select * from label_indices where label_id = ${labelId} and meta_seqs = 
${seqs.mkString(",")} and dir = ${direction}
+      """.map { rs => LabelIndex(rs) }.single.apply
+    }
+  }
+
+  def findByLabelIdAndSeq(labelId: Int, seq: Byte, useCache: Boolean = 
true)(implicit session: DBSession = AutoSession): Option[LabelIndex] = {
+    //    val cacheKey = s"labelId=$labelId:seq=$seq"
+    val cacheKey = className + "labelId=" + labelId + ":seq=" + seq
+    if (useCache) {
+      withCache(cacheKey)( sql"""
+      select * from label_indices where label_id = ${labelId} and seq = ${seq}
+      """.map { rs => LabelIndex(rs) }.single.apply)
+    } else {
+      sql"""
+      select * from label_indices where label_id = ${labelId} and seq = ${seq}
+      """.map { rs => LabelIndex(rs) }.single.apply
+    }
+  }
+
+  def delete(id: Int)(implicit session: DBSession = AutoSession) = {
+    val labelIndex = findById(id)
+    val seqs = labelIndex.metaSeqs.mkString(",")
+    val (labelId, seq) = (labelIndex.labelId, labelIndex.seq)
+    sql"""delete from label_indices where id = ${id}""".execute.apply()
+
+    val cacheKeys = List(s"id=$id", s"labelId=$labelId", 
s"labelId=$labelId:seq=$seq", 
s"labelId=$labelId:seqs=$seqs:dir=${labelIndex.dir}")
+    cacheKeys.foreach { key =>
+      expireCache(className + key)
+      expireCaches(className + key)
+    }
+  }
+
+  def findAll()(implicit session: DBSession = AutoSession) = {
+    val ls = sql"""select * from label_indices""".map { rs => LabelIndex(rs) 
}.list.apply
+    putsToCacheOption(ls.flatMap { x =>
+      Seq(
+        s"id=${x.id.get}",
+        s"labelId=${x.labelId}:seq=${x.seq}",
+        s"labelId=${x.labelId}:seqs=${x.metaSeqs.mkString(",")}:dir=${x.dir}"
+      ).map(cacheKey => (className + cacheKey, x))
+    })
+
+    putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) =>
+      val cacheKey = s"labelId=${labelId}"
+      (className + cacheKey -> ls)
+    }.toList)
+  }
+}
+
+
+case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, 
metaSeqs: Seq[Byte], formulars: String,
+                      dir: Option[Int], options: Option[String]) {
+  // both
+  lazy val label = Label.findById(labelId)
+  lazy val metas = label.metaPropsMap
+  lazy val sortKeyTypes = metaSeqs.flatMap(metaSeq => 
label.metaPropsMap.get(metaSeq))
+  lazy val sortKeyTypesArray = sortKeyTypes.toArray
+  lazy val propNames = sortKeyTypes.map { labelMeta => labelMeta.name }
+
+  lazy val toJson = {
+    val dirJs = dir.map(GraphUtil.fromDirection).getOrElse("both")
+    val optionsJs = try { options.map(Json.parse).getOrElse(Json.obj()) } 
catch { case e: Exception => Json.obj() }
+
+    Json.obj(
+      "name" -> name,
+      "propNames" -> sortKeyTypes.map(x => x.name),
+      "dir" -> dirJs,
+      "options" -> optionsJs
+    )
+  }
+
+  def parseOption(dir: String): Option[LabelIndexMutateOption] = try {
+    options.map { string =>
+      val jsObj = Json.parse(string) \ dir
+
+      val method = (jsObj \ "method").asOpt[String].getOrElse("default")
+      val rate = (jsObj \ "rate").asOpt[Double].getOrElse(1.0)
+      val totalModular = (jsObj \ "totalModular").asOpt[Long].getOrElse(100L)
+      val storeDegree = (jsObj \ "storeDegree").asOpt[Boolean].getOrElse(true)
+
+      LabelIndexMutateOption(GraphUtil.directions(dir).toByte, method, rate, 
totalModular, storeDegree)
+    }
+  } catch {
+    case e: Exception =>
+      logger.error(s"Parse failed labelOption: ${this.label}", e)
+      None
+  }
+
+  lazy val inDirOption = parseOption("in")
+
+  lazy val outDirOption = parseOption("out")
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelMeta.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelMeta.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelMeta.scala
new file mode 100644
index 0000000..f25be5f
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelMeta.scala
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.schema
+
+/**
+ * Created by shon on 6/3/15.
+ */
+
+import org.apache.s2graph.core.GraphExceptions.MaxPropSizeReachedException
+import org.apache.s2graph.core.{GraphExceptions, JSONParser}
+import play.api.libs.json.Json
+import scalikejdbc._
+
+import scala.util.Try
+
+object LabelMeta extends SQLSyntaxSupport[LabelMeta] {
+  import Schema._
+  val className = LabelMeta.getClass.getSimpleName
+  /** dummy sequences */
+
+  val fromSeq = (-4).toByte
+  val toSeq = (-5).toByte
+  val lastOpSeq = (-3).toByte
+  val lastDeletedAtSeq = (-2).toByte
+  val timestampSeq = (0).toByte
+  val labelSeq = (-6).toByte
+  val directionSeq = -7.toByte
+  val fromHashSeq = -8.toByte
+
+  val countSeq = (Byte.MaxValue - 2).toByte
+  val degreeSeq = (Byte.MaxValue - 1).toByte
+  val maxValue = Byte.MaxValue
+  val emptySeq = Byte.MaxValue
+
+  /** reserved sequences */
+  //  val deleted = LabelMeta(id = Some(lastDeletedAt), labelId = 
lastDeletedAt, name = "lastDeletedAt",
+  //    seq = lastDeletedAt, defaultValue = "", dataType = "long")
+  val fromHash = LabelMeta(id = None, labelId = fromHashSeq, name = 
"_from_hash",
+    seq = fromHashSeq, defaultValue = fromHashSeq.toString, dataType = "long")
+  val from = LabelMeta(id = Some(fromSeq), labelId = fromSeq, name = "_from",
+    seq = fromSeq, defaultValue = fromSeq.toString, dataType = "string")
+  val to = LabelMeta(id = Some(toSeq), labelId = toSeq, name = "_to",
+    seq = toSeq, defaultValue = toSeq.toString, dataType = "string")
+  val timestamp = LabelMeta(id = Some(-1), labelId = -1, name = "_timestamp",
+    seq = timestampSeq, defaultValue = "0", dataType = "long")
+  val degree = LabelMeta(id = Some(-1), labelId = -1, name = "_degree",
+    seq = degreeSeq, defaultValue = "0", dataType = "long")
+  val count = LabelMeta(id = Some(-1), labelId = -1, name = "_count",
+    seq = countSeq, defaultValue = "-1", dataType = "long")
+  val lastDeletedAt = LabelMeta(id = Some(-1), labelId = -1, name = 
"_lastDeletedAt",
+    seq = lastDeletedAtSeq, defaultValue = "-1", dataType = "long")
+  val label = LabelMeta(id = Some(-1), labelId = -1, name = "label",
+    seq = labelSeq, defaultValue = "", dataType = "string")
+  val direction = LabelMeta(id = Some(-1), labelId = -1, name = "direction",
+    seq = directionSeq, defaultValue = "out", dataType = "string")
+  val empty = LabelMeta(id = Some(-1), labelId = -1, name = "_empty",
+    seq = emptySeq, defaultValue = "-1", dataType = "long")
+
+  // Each reserved column(_timestamp, timestamp) has same seq number, starts 
with '_' has high priority
+  val reservedMetas = List(empty, label, direction, lastDeletedAt, from, 
fromHash, to, degree, timestamp, count).flatMap { lm => List(lm, lm.copy(name = 
lm.name.drop(1))) }.reverse
+  val reservedMetasInner = List(empty, label, direction, lastDeletedAt, from, 
fromHash, to, degree, timestamp, count)
+  val reservedMetaNamesSet = reservedMetasInner.map(_.name).toSet
+
+  val defaultRequiredMetaNames = Set("from", "_from", "to", "_to", 
"_from_hash", "label", "direction", "timestamp", "_timestamp")
+
+  def apply(rs: WrappedResultSet): LabelMeta = {
+    LabelMeta(Some(rs.int("id")), rs.int("label_id"), rs.string("name"), 
rs.byte("seq"),
+      rs.string("default_value"), rs.string("data_type").toLowerCase, 
rs.boolean("store_in_global_index"))
+  }
+
+  /** Note: DegreeSeq should not be included in serializer/deserializer.
+    * only 0 <= seq <= CountSeq(Int.MaxValue - 2), not DegreeSet(Int.MaxValue 
- 1) should be
+    * included in actual bytes in storage.
+    * */
+  def isValidSeq(seq: Byte): Boolean = seq >= 0 && seq <= countSeq // || seq 
== fromHashSeq
+
+  def isValidSeqForAdmin(seq: Byte): Boolean = seq > 0 && seq < countSeq // || 
seq == fromHashSeq
+
+  def findById(id: Int)(implicit session: DBSession = AutoSession): LabelMeta 
= {
+    val cacheKey = className + "id=" + id
+
+    withCache(cacheKey) {
+      sql"""select * from label_metas where id = ${id}""".map { rs => 
LabelMeta(rs) }.single.apply
+    }.get
+  }
+
+  def findAllByLabelId(labelId: Int, useCache: Boolean = true)(implicit 
session: DBSession = AutoSession): List[LabelMeta] = {
+    val cacheKey = className + "labelId=" + labelId
+    lazy val labelMetas = sql"""select *
+                                                               from label_metas
+                                                               where label_id 
= ${labelId} order by seq ASC""".map(LabelMeta(_)).list.apply()
+
+    if (useCache) withCaches(cacheKey)(labelMetas)
+    else labelMetas
+  }
+
+  def findByName(labelId: Int, name: String, useCache: Boolean = 
true)(implicit session: DBSession = AutoSession): Option[LabelMeta] = {
+    name match {
+      case timestamp.name => Some(timestamp)
+      case from.name => Some(from)
+      case to.name => Some(to)
+      case _ =>
+        val cacheKey = className + "labelId=" + labelId + ":name=" + name
+        lazy val labelMeta = sql"""
+            select *
+            from label_metas where label_id = ${labelId} and name = ${name}"""
+          .map { rs => LabelMeta(rs) }.single.apply()
+
+        if (useCache) withCache(cacheKey)(labelMeta)
+        else labelMeta
+    }
+  }
+
+  def insert(labelId: Int, name: String, defaultValue: String, dataType: 
String, storeInGlobalIndex: Boolean = false)(implicit session: DBSession = 
AutoSession) = {
+    val ls = findAllByLabelId(labelId, useCache = false)
+    val seq = ls.size + 1
+
+    if (seq < maxValue) {
+      sql"""insert into label_metas(label_id, name, seq, default_value, 
data_type)
+    select ${labelId}, ${name}, ${seq}, ${defaultValue}, 
${dataType}""".updateAndReturnGeneratedKey.apply()
+    } else {
+      throw MaxPropSizeReachedException("max property size reached")
+    }
+  }
+
+  def findOrInsert(labelId: Int,
+                   name: String,
+                   defaultValue: String,
+                   dataType: String,
+                   storeInGlobalIndex: Boolean = false)(implicit session: 
DBSession = AutoSession): LabelMeta = {
+
+    findByName(labelId, name) match {
+      case Some(c) => c
+      case None =>
+        insert(labelId, name, defaultValue, dataType, storeInGlobalIndex)
+        val cacheKey = "labelId=" + labelId + ":name=" + name
+        val cacheKeys = "labelId=" + labelId
+        expireCache(className + cacheKey)
+        expireCaches(className + cacheKeys)
+        findByName(labelId, name, useCache = false).get
+    }
+  }
+
+  def delete(id: Int)(implicit session: DBSession = AutoSession) = {
+    val labelMeta = findById(id)
+    val (labelId, name) = (labelMeta.labelId, labelMeta.name)
+    sql"""delete from label_metas where id = ${id}""".execute.apply()
+    val cacheKeys = List(s"id=$id", s"labelId=$labelId", 
s"labelId=$labelId:name=$name")
+    cacheKeys.foreach { key =>
+      expireCache(className + key)
+      expireCaches(className + key)
+    }
+  }
+
+  def findAll()(implicit session: DBSession = AutoSession) = {
+    val ls = sql"""select * from label_metas""".map { rs => LabelMeta(rs) 
}.list.apply
+    putsToCacheOption(ls.flatMap { x =>
+      Seq(
+        s"id=${x.id.get}",
+        s"labelId=${x.labelId}:name=${x.name}",
+        s"labelId=${x.labelId}:seq=${x.seq}"
+      ).map(cacheKey => (className + cacheKey, x))
+    })
+
+    putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) =>
+      val cacheKey = s"labelId=${labelId}"
+      cacheKey -> ls
+    }.toList)
+
+    ls
+  }
+
+  def updateStoreInGlobalIndex(id: Int, storeInGlobalIndex: Boolean)(implicit 
session: DBSession = AutoSession): Try[Long] = Try {
+    sql"""
+          update label_metas set store_in_global_index = ${storeInGlobalIndex} 
where id = ${id}
+       """.updateAndReturnGeneratedKey.apply()
+  }
+}
+
+case class LabelMeta(id: Option[Int],
+                     labelId: Int,
+                     name: String,
+                     seq: Byte,
+                     defaultValue: String,
+                     dataType: String,
+                     storeInGlobalIndex: Boolean = false) {
+
+  lazy val toJson = Json.obj("name" -> name, "defaultValue" -> defaultValue, 
"dataType" -> dataType, "storeInGlobalIndex" -> storeInGlobalIndex)
+
+  override def equals(other: Any): Boolean = {
+    if (!other.isInstanceOf[LabelMeta]) false
+    else {
+      val o = other.asInstanceOf[LabelMeta]
+//      labelId == o.labelId &&
+        seq == o.seq
+    }
+  }
+  override def hashCode(): Int = seq.toInt
+//    (labelId, seq).hashCode()
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala
new file mode 100644
index 0000000..5b9e0ae
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.schema
+
+import java.util.concurrent.Executors
+import java.util.concurrent.atomic.AtomicLong
+
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.s2graph.core.JSONParser
+import org.apache.s2graph.core.utils.{SafeUpdateCache, logger}
+import play.api.libs.json.{JsObject, JsValue, Json}
+import scalikejdbc._
+
+import scala.concurrent.ExecutionContext
+import scala.io.Source
+import scala.language.{higherKinds, implicitConversions}
+import scala.util.{Failure, Success, Try}
+
+object Schema {
+  var maxSize = 10000
+  var ttl = 60
+  var safeUpdateCache: SafeUpdateCache = _
+
+  val numOfThread = Runtime.getRuntime.availableProcessors()
+  val threadPool = Executors.newFixedThreadPool(numOfThread)
+  val ec = ExecutionContext.fromExecutor(threadPool)
+
+  private val ModelReferenceCount = new AtomicLong(0L)
+
+  def apply(config: Config) = {
+    maxSize = config.getInt("cache.max.size")
+    ttl = config.getInt("cache.ttl.seconds")
+
+    Class.forName(config.getString("db.default.driver"))
+
+    val settings = ConnectionPoolSettings(
+      initialSize = 10,
+      maxSize = 10,
+      connectionTimeoutMillis = 30000L,
+      validationQuery = "select 1;")
+
+    ConnectionPool.singleton(
+      config.getString("db.default.url"),
+      config.getString("db.default.user"),
+      config.getString("db.default.password"),
+      settings)
+
+    checkSchema()
+    safeUpdateCache = new SafeUpdateCache(maxSize, ttl)(ec)
+    ModelReferenceCount.incrementAndGet()
+  }
+
+  def checkSchema(): Unit = {
+    withTx { implicit session =>
+      sql"""show tables""".map(rs => rs.string(1)).list.apply()
+    } match {
+      case Success(tables) =>
+        if (tables.isEmpty) {
+          // this is a very simple migration tool that only supports creating
+          // appropriate tables when there are no tables in the database at 
all.
+          // Ideally, it should be improved to a sophisticated migration tool
+          // that supports versioning, etc.
+          logger.info("Creating tables ...")
+          val schema = getClass.getResourceAsStream("schema.sql")
+          val lines = Source.fromInputStream(schema, "UTF-8").getLines
+          val sources = lines.map(_.split("-- ").head.trim).mkString("\n")
+          val statements = sources.split(";\n")
+          withTx { implicit session =>
+            statements.foreach(sql => session.execute(sql))
+          } match {
+            case Success(_) =>
+              logger.info("Successfully imported schema")
+            case Failure(e) =>
+              throw new RuntimeException("Error while importing schema", e)
+          }
+        }
+      case Failure(e) =>
+        throw new RuntimeException("Could not list tables in the database", e)
+    }
+  }
+
+  def withTx[T](block: DBSession => T): Try[T] = {
+    using(DB(ConnectionPool.borrow())) { conn =>
+      Try {
+        conn.begin()
+        val session = conn.withinTxSession()
+        val result = block(session)
+
+        conn.commit()
+
+        result
+      } recoverWith {
+        case e: Exception =>
+          conn.rollbackIfActive()
+          Failure(e)
+      }
+    }
+  }
+
+  def shutdown(modelDataDelete: Boolean = false) =
+    if (ModelReferenceCount.decrementAndGet() <= 0) {
+      // FIXME: When Model is served by embedded database and deleteData is 
set, Model deletes
+      // the underlying database. Its purpose is clearing runtime footprint 
when running tests.
+      if (modelDataDelete) {
+        withTx { implicit session =>
+          sql"SHOW TABLES"
+              .map(rs => rs.string(1))
+              .list
+              .apply()
+              .map { table => s"TRUNCATE TABLE $table" }
+        } match {
+          case Success(stmts) =>
+            val newStmts = List("SET FOREIGN_KEY_CHECKS = 0") ++ stmts ++ 
List("SET FOREIGN_KEY_CHECKS = 1")
+            withTx { implicit session =>
+              newStmts.foreach { stmt =>
+                session.execute(stmt)
+              }
+            } match {
+              case Success(_) =>
+                logger.info(s"Success to truncate models: $stmts")
+              case Failure(e) =>
+                throw new IllegalStateException(s"Failed to truncate models", 
e)
+            }
+          case Failure(e) =>
+            throw new IllegalStateException(s"Failed to list models", e)
+        }
+      }
+//      clearCache()
+      safeUpdateCache.shutdown()
+      ConnectionPool.closeAll()
+    }
+
+  def loadCache() = {
+    Service.findAll()
+    ServiceColumn.findAll()
+    Label.findAll()
+    LabelMeta.findAll()
+    LabelIndex.findAll()
+    ColumnMeta.findAll()
+  }
+
+//  def clearCache() = {
+//    Service.expireAll()
+//    ServiceColumn.expireAll()
+//    Label.expireAll()
+//    LabelMeta.expireAll()
+//    LabelIndex.expireAll()
+//    ColumnMeta.expireAll()
+//  }
+
+  def extraOptions(options: Option[String]): Map[String, JsValue] = options 
match {
+    case None => Map.empty
+    case Some(v) =>
+      try {
+        Json.parse(v).asOpt[JsObject].map { obj => obj.fields.toMap 
}.getOrElse(Map.empty)
+      } catch {
+        case e: Exception =>
+          logger.error(s"An error occurs while parsing the extra label 
option", e)
+          Map.empty
+      }
+  }
+
+  def toStorageConfig(options: Map[String, JsValue]): Option[Config] = {
+    try {
+      options.get("storage").map { jsValue =>
+        import scala.collection.JavaConverters._
+        val configMap = jsValue.as[JsObject].fieldSet.toMap.map { case (key, 
value) =>
+          key -> JSONParser.jsValueToAny(value).getOrElse(throw new 
RuntimeException("!!"))
+        }
+
+        ConfigFactory.parseMap(configMap.asJava)
+      }
+    } catch {
+      case e: Exception =>
+        logger.error(s"toStorageConfig error. use default storage", e)
+        None
+    }
+  }
+
+  private def toMultiKey(key: String): String = key + ".__m__"
+
+  def withCache[T <: AnyRef](key: String, broadcast: Boolean = true)(op: => T) 
= safeUpdateCache.withCache(key, broadcast)(op)
+
+  def withCaches[T <: AnyRef](key: String, broadcast: Boolean = true)(op: => 
T) = safeUpdateCache.withCache(toMultiKey(key), broadcast)(op)
+
+  def expireCache(key: String) = safeUpdateCache.invalidate(key)
+
+  def expireCaches(key: String) = safeUpdateCache.invalidate(toMultiKey(key))
+
+  def putsToCacheOption[T <: AnyRef](kvs: List[(String, T)]) = kvs.foreach {
+    case (key, value) => safeUpdateCache.put(key, Option(value))
+  }
+
+  def putsToCaches[T <: AnyRef](kvs: List[(String, T)]) = kvs.foreach {
+    case (key, values) => safeUpdateCache.put(toMultiKey(key), values)
+  }
+
+  def getCacheSize(): Int = safeUpdateCache.asMap().size()
+
+  def getAllCacheData[T <: AnyRef](): (List[(String, T)], List[(String, 
List[T])]) = {
+    (Nil, Nil)
+  }
+
+  def toBytes(): Array[Byte] = safeUpdateCache.toBytes()
+
+  def fromBytes(safeUpdateCache: SafeUpdateCache, bytes: Array[Byte]): Unit = 
SafeUpdateCache.fromBytes(safeUpdateCache, bytes)
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala
new file mode 100644
index 0000000..611a746
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.schema
+
+import java.util.UUID
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.utils.logger
+import play.api.libs.json.Json
+import scalikejdbc._
+
+object Service extends SQLSyntaxSupport[Service] {
+  import Schema._
+  val className = Service.getClass.getSimpleName
+
+  def valueOf(rs: WrappedResultSet): Service = {
+    Service(rs.intOpt("id"),
+      rs.string("service_name"),
+      rs.string("access_token"),
+      rs.string("cluster"),
+      rs.string("hbase_table_name"),
+      rs.int("pre_split_size"),
+      rs.intOpt("hbase_table_ttl"))
+  }
+
+  def findByAccessToken(accessToken: String)(implicit session: DBSession = 
AutoSession): Option[Service] = {
+    val cacheKey = className + s"accessToken=$accessToken"
+    withCache(cacheKey)( sql"""select * from services where access_token = 
${accessToken}""".map { rs => Service.valueOf(rs) }.single.apply)
+  }
+
+  def findById(id: Int)(implicit session: DBSession = AutoSession): Service = {
+    val cacheKey = className + "id=" + id
+    withCache(cacheKey)( sql"""select * from services where id = ${id}""".map 
{ rs => Service.valueOf(rs) }.single.apply).get
+  }
+
+  def findByName(serviceName: String, useCache: Boolean = true)(implicit 
session: DBSession = AutoSession): Option[Service] = {
+    val cacheKey = className + "serviceName=" + serviceName
+    lazy val serviceOpt = sql"""
+        select * from services where service_name = ${serviceName}
+      """.map { rs => Service.valueOf(rs) }.single.apply()
+
+    if (useCache) withCache(cacheKey)(serviceOpt)
+    else serviceOpt
+  }
+
+  def insert(serviceName: String, cluster: String,
+             hTableName: String, preSplitSize: Int, hTableTTL: Option[Int],
+             compressionAlgorithm: String)(implicit session: DBSession = 
AutoSession): Unit = {
+    logger.info(s"$serviceName, $cluster, $hTableName, $preSplitSize, 
$hTableTTL, $compressionAlgorithm")
+    val accessToken = UUID.randomUUID().toString()
+    sql"""insert into services(service_name, access_token, cluster, 
hbase_table_name, pre_split_size, hbase_table_ttl)
+    values(${serviceName}, ${accessToken}, ${cluster}, ${hTableName}, 
${preSplitSize}, ${hTableTTL})""".execute.apply()
+  }
+
+  def delete(id: Int)(implicit session: DBSession = AutoSession) = {
+    val service = findById(id)
+    val serviceName = service.serviceName
+    sql"""delete from service_columns where id = ${id}""".execute.apply()
+    val cacheKeys = List(s"id=$id", s"serviceName=$serviceName")
+    cacheKeys.foreach { key =>
+      expireCache(className + key)
+      expireCaches(className + key)
+    }
+  }
+
+  def findOrInsert(serviceName: String, cluster: String, hTableName: String,
+                   preSplitSize: Int, hTableTTL: Option[Int], 
compressionAlgorithm: String, useCache: Boolean = true)(implicit session: 
DBSession = AutoSession): Service = {
+    findByName(serviceName, useCache) match {
+      case Some(s) => s
+      case None =>
+        insert(serviceName, cluster, hTableName, preSplitSize, hTableTTL, 
compressionAlgorithm)
+        val cacheKey = "serviceName=" + serviceName
+        expireCache(className + cacheKey)
+        findByName(serviceName).get
+    }
+  }
+
+  def findAll()(implicit session: DBSession = AutoSession) = {
+    val ls = sql"""select * from services""".map { rs => Service.valueOf(rs) 
}.list.apply
+    putsToCacheOption(ls.flatMap { x =>
+      Seq(
+        s"id=${x.id.get}",
+        s"serviceName=${x.serviceName}"
+      ).map(cacheKey => (className + cacheKey, x))
+    })
+
+    ls
+  }
+
+  def findAllConn()(implicit session: DBSession = AutoSession): List[String] = 
{
+    sql"""select distinct(cluster) from services""".map { rs => 
rs.string("cluster") }.list.apply
+  }
+}
+
+case class Service(id: Option[Int],
+                   serviceName: String,
+                   accessToken: String,
+                   cluster: String,
+                   hTableName: String,
+                   preSplitSize: Int,
+                   hTableTTL: Option[Int],
+                   options: Option[String] = None) {
+  lazy val toJson =
+    id match {
+      case Some(_id) =>
+        Json.obj("id" -> _id, "name" -> serviceName, "accessToken" -> 
accessToken, "cluster" -> cluster,
+          "hTableName" -> hTableName, "preSplitSize" -> preSplitSize, 
"hTableTTL" -> hTableTTL)
+      case None =>
+        Json.parse("{}")
+    }
+
+  lazy val extraOptions = Schema.extraOptions(options)
+  lazy val storageConfigOpt: Option[Config] = toStorageConfig
+  def serviceColumns(useCache: Boolean): Seq[ServiceColumn] = 
ServiceColumn.findByServiceId(id.get, useCache = useCache)
+  def toStorageConfig: Option[Config] = Schema.toStorageConfig(extraOptions)
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala
new file mode 100644
index 0000000..cc1698a
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.schema
+
+import org.apache.s2graph.core.JSONParser
+import org.apache.s2graph.core.JSONParser._
+import org.apache.s2graph.core.types.{HBaseType, InnerValLike, 
InnerValLikeWithTs}
+import play.api.libs.json.Json
+import scalikejdbc._
+
+object ServiceColumn extends SQLSyntaxSupport[ServiceColumn] {
+  import Schema._
+  val className = ServiceColumn.getClass.getSimpleName
+
+  val Default = ServiceColumn(Option(0), -1, "default", "string", "v4")
+
+  def valueOf(rs: WrappedResultSet): ServiceColumn = {
+    ServiceColumn(rs.intOpt("id"), rs.int("service_id"), 
rs.string("column_name"), rs.string("column_type").toLowerCase(), 
rs.string("schema_version"))
+  }
+
+  def findByServiceId(serviceId: Int, useCache: Boolean = true)(implicit 
session: DBSession = AutoSession): Seq[ServiceColumn] = {
+    val cacheKey = "serviceId=" + serviceId
+
+    lazy val sql = sql"""select * from service_columns where service_id = 
${serviceId}""".map { x => ServiceColumn.valueOf(x) }.list().apply()
+
+    if (useCache) withCaches(cacheKey)(sql)
+    else sql
+  }
+
+  def findById(id: Int, useCache: Boolean = true)(implicit session: DBSession 
= AutoSession): ServiceColumn = {
+    val cacheKey = className + "id=" + id
+    lazy val sql = sql"""select * from service_columns where id = ${id}""".map 
{ x => ServiceColumn.valueOf(x) }.single.apply
+    if (useCache) withCache(cacheKey)(sql).get
+    else sql.get
+  }
+
+  def find(serviceId: Int, columnName: String, useCache: Boolean = 
true)(implicit session: DBSession = AutoSession): Option[ServiceColumn] = {
+    val cacheKey = className + "serviceId=" + serviceId + ":columnName=" + 
columnName
+    if (useCache) {
+      withCache(cacheKey) {
+        sql"""
+          select * from service_columns where service_id = ${serviceId} and 
column_name = ${columnName}
+        """.map { rs => ServiceColumn.valueOf(rs) }.single.apply()
+      }
+    } else {
+      sql"""
+        select * from service_columns where service_id = ${serviceId} and 
column_name = ${columnName}
+      """.map { rs => ServiceColumn.valueOf(rs) }.single.apply()
+    }
+  }
+  def insert(serviceId: Int, columnName: String, columnType: Option[String], 
schemaVersion: String)(implicit session: DBSession = AutoSession) = {
+    sql"""insert into service_columns(service_id, column_name, column_type, 
schema_version)
+         values(${serviceId}, ${columnName}, ${columnType}, 
${schemaVersion})""".execute.apply()
+  }
+  def delete(id: Int)(implicit session: DBSession = AutoSession) = {
+    val serviceColumn = findById(id, useCache = false)
+    val (serviceId, columnName) = (serviceColumn.serviceId, 
serviceColumn.columnName)
+    sql"""delete from service_columns where id = ${id}""".execute.apply()
+    val cacheKeys = List(s"id=$id", 
s"serviceId=$serviceId:columnName=$columnName")
+    cacheKeys.foreach { key =>
+      expireCache(className + key)
+      expireCaches(className + key)
+    }
+  }
+  def findOrInsert(serviceId: Int, columnName: String, columnType: 
Option[String], schemaVersion: String = HBaseType.DEFAULT_VERSION, useCache: 
Boolean = true)(implicit session: DBSession = AutoSession): ServiceColumn = {
+    find(serviceId, columnName, useCache) match {
+      case Some(sc) => sc
+      case None =>
+        insert(serviceId, columnName, columnType, schemaVersion)
+//        val cacheKey = s"serviceId=$serviceId:columnName=$columnName"
+        val cacheKey = "serviceId=" + serviceId + ":columnName=" + columnName
+        expireCache(className + cacheKey)
+        find(serviceId, columnName).get
+    }
+  }
+  def findAll()(implicit session: DBSession = AutoSession) = {
+    val ls = sql"""select * from service_columns""".map { rs => 
ServiceColumn.valueOf(rs) }.list.apply
+    putsToCacheOption(ls.flatMap { x =>
+      Seq(
+        s"id=${x.id.get}",
+        s"serviceId=${x.serviceId}:columnName=${x.columnName}"
+      ).map(cacheKey => (className + cacheKey, x))
+    })
+
+    ls
+  }
+}
+case class ServiceColumn(id: Option[Int],
+                         serviceId: Int,
+                         columnName: String,
+                         columnType: String,
+                         schemaVersion: String)  {
+
+  lazy val service = Service.findById(serviceId)
+  lazy val metasWithoutCache = ColumnMeta.timestamp +: 
ColumnMeta.findAllByColumn(id.get, false) :+ ColumnMeta.lastModifiedAtColumn
+  lazy val metas = ColumnMeta.timestamp +: ColumnMeta.findAllByColumn(id.get) 
:+ ColumnMeta.lastModifiedAtColumn
+  lazy val metasMap = metas.map { meta => meta.seq.toInt -> meta } toMap
+  lazy val metasInvMap = metas.map { meta => meta.name -> meta} toMap
+  lazy val metaNamesMap = (ColumnMeta.lastModifiedAtColumn :: metas).map(x => 
(x.seq.toInt, x.name)) toMap
+  lazy val metaPropsDefaultMap = metas.map { meta =>
+    meta -> JSONParser.toInnerVal(meta.defaultValue, meta.dataType, 
schemaVersion)
+  }.toMap
+  lazy val toJson = Json.obj("serviceName" -> service.serviceName, 
"columnName" -> columnName, "columnType" -> columnType)
+
+  def propsToInnerVals(props: Map[String, Any]): Map[ColumnMeta, InnerValLike] 
= {
+    val ret = for {
+      (k, v) <- props
+      labelMeta <- metasInvMap.get(k)
+      innerVal = toInnerVal(v, labelMeta.dataType, schemaVersion)
+    } yield labelMeta -> innerVal
+
+    ret
+  }
+
+  def innerValsToProps(props: Map[Int, InnerValLike]): Map[String, Any] = {
+    for {
+      (k, v) <- props
+      columnMeta <- metasMap.get(k)
+    } yield {
+      columnMeta.name -> v.value
+    }
+  }
+
+  def innerValsWithTsToProps(props: Map[Int, InnerValLikeWithTs]): Map[String, 
Any] = {
+    for {
+      (k, v) <- props
+      columnMeta <- metasMap.get(k)
+    } yield {
+      columnMeta.name -> v.innerVal.value
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
index 2d74a7c..2b02bdd 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
@@ -22,7 +22,7 @@ package org.apache.s2graph.core.storage
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core.TraversalHelper._
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.schema.LabelMeta
 import org.apache.s2graph.core.parsers.WhereParser
 import org.apache.s2graph.core.utils.logger
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
index b1fdd7c..0526042 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
@@ -26,7 +26,7 @@ import com.stumbleupon.async.Deferred
 import com.typesafe.config.Config
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{Label, ServiceColumn}
+import org.apache.s2graph.core.schema.{Label, ServiceColumn}
 import org.apache.s2graph.core.storage._
 import org.apache.s2graph.core.storage.serde._
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage.{AsyncRPC, 
ScanWithRange}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
index e575c5b..5db02cc 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
@@ -24,7 +24,7 @@ import java.util.Base64
 import com.typesafe.config.Config
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{Label, ServiceColumn}
+import org.apache.s2graph.core.schema.{Label, ServiceColumn}
 import org.apache.s2graph.core.storage.rocks.RocksHelper.{GetRequest, 
RocksRPC, ScanWithRange}
 import org.apache.s2graph.core.storage.serde.StorageSerializable
 import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageReadable, 
StorageSerDe}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
index 9871b36..0748efb 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
@@ -19,7 +19,7 @@
 
 package org.apache.s2graph.core.storage.serde
 
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.schema.LabelMeta
 import org.apache.s2graph.core._
 import org.apache.s2graph.core.storage._
 import org.apache.s2graph.core.utils.logger

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageDeserializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageDeserializable.scala
index dc7690b..884ca11 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageDeserializable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageDeserializable.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.storage.serde
 
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.{ColumnMeta, Label, LabelMeta, 
ServiceColumn}
+import org.apache.s2graph.core.schema.{ColumnMeta, Label, LabelMeta, 
ServiceColumn}
 import org.apache.s2graph.core.storage.CanSKeyValue
 import org.apache.s2graph.core.types.{InnerVal, InnerValLike, 
InnerValLikeWithTs}
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageSerializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageSerializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageSerializable.scala
index 219d097..20e676a 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageSerializable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageSerializable.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.storage.serde
 
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.{ColumnMeta, LabelMeta}
+import org.apache.s2graph.core.schema.{ColumnMeta, LabelMeta}
 import org.apache.s2graph.core.storage.SKeyValue
 import org.apache.s2graph.core.types.{InnerValLike, InnerValLikeWithTs}
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
index 945f246..44ae52f 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
@@ -21,7 +21,7 @@ package org.apache.s2graph.core.storage.serde.indexedge.tall
 
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn}
+import org.apache.s2graph.core.schema.{Label, LabelMeta, ServiceColumn}
 import org.apache.s2graph.core.storage.CanSKeyValue
 import org.apache.s2graph.core.storage.serde.Deserializable
 import org.apache.s2graph.core.storage.serde.StorageDeserializable._

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
index 28982dc..6e467b6 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.storage.serde.indexedge.tall
 
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.schema.LabelMeta
 import org.apache.s2graph.core.types.VertexId
 import org.apache.s2graph.core.{GraphUtil, IndexEdge}
 import org.apache.s2graph.core.storage.serde.StorageSerializable._

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
index e533b4b..1051c6e 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
@@ -21,7 +21,7 @@ package org.apache.s2graph.core.storage.serde.indexedge.wide
 
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn}
+import org.apache.s2graph.core.schema.{Label, LabelMeta, ServiceColumn}
 import org.apache.s2graph.core.storage.serde.StorageDeserializable._
 import org.apache.s2graph.core.storage._
 import org.apache.s2graph.core.storage.serde.Deserializable

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
index 34e9a6e..91127f9 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.storage.serde.indexedge.wide
 
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.schema.LabelMeta
 import org.apache.s2graph.core.types.VertexId
 import org.apache.s2graph.core.{GraphUtil, IndexEdge}
 import org.apache.s2graph.core.storage.serde.StorageSerializable._

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
index f5c10a7..580acd7 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.storage.serde.snapshotedge.tall
 
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn}
+import org.apache.s2graph.core.schema.{Label, LabelMeta, ServiceColumn}
 import org.apache.s2graph.core.storage.serde.StorageDeserializable._
 import org.apache.s2graph.core.storage.CanSKeyValue
 import org.apache.s2graph.core.types._

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
index 02b2977..7937de8 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
@@ -21,7 +21,7 @@ package 
org.apache.s2graph.core.storage.serde.snapshotedge.tall
 
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core.{S2Edge, SnapshotEdge}
-import org.apache.s2graph.core.mysqls.LabelIndex
+import org.apache.s2graph.core.schema.LabelIndex
 import org.apache.s2graph.core.storage.serde._
 import org.apache.s2graph.core.storage.serde.StorageSerializable._
 import org.apache.s2graph.core.types.SourceAndTargetVertexIdPair

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
index 7dec6d9..e0243cc 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.storage.serde.snapshotedge.wide
 
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.schema.{Label, LabelMeta}
 import org.apache.s2graph.core.storage.serde.StorageDeserializable._
 import org.apache.s2graph.core.storage.CanSKeyValue
 import org.apache.s2graph.core.types.{HBaseType, LabelWithDirection, 
SourceVertexId, TargetVertexId}


Reply via email to