http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/schema/ColumnMeta.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/ColumnMeta.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/ColumnMeta.scala new file mode 100644 index 0000000..e850541 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/ColumnMeta.scala @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.schema + +import play.api.libs.json.Json +import scalikejdbc._ + +import scala.util.Try + +object ColumnMeta extends SQLSyntaxSupport[ColumnMeta] { + import Schema._ + val className = ColumnMeta.getClass.getSimpleName + + val timeStampSeq = 0.toByte + val countSeq = -1.toByte + val lastModifiedAtColumnSeq = 0.toByte + val lastModifiedAtColumn = ColumnMeta(Some(0), 0, "lastModifiedAt", lastModifiedAtColumnSeq, "long", "-1") + val maxValue = Byte.MaxValue + + val timestamp = ColumnMeta(None, -1, "_timestamp", timeStampSeq.toByte, "long", "-1") + val reservedMetas = Seq(timestamp, lastModifiedAtColumn) + val reservedMetaNamesSet = reservedMetas.map(_.name).toSet + + def isValid(columnMeta: ColumnMeta): Boolean = + columnMeta.id.isDefined && columnMeta.id.get > 0 && columnMeta.seq >= 0 + + def valueOf(rs: WrappedResultSet): ColumnMeta = { + ColumnMeta(Some(rs.int("id")), rs.int("column_id"), rs.string("name"), + rs.byte("seq"), rs.string("data_type").toLowerCase(), rs.string("default_value"), rs.boolean("store_in_global_index")) + } + + def findById(id: Int)(implicit session: DBSession = AutoSession) = { + val cacheKey = className + "id=" + id + withCache(cacheKey) { + sql"""select * from column_metas where id = ${id}""".map { rs => ColumnMeta.valueOf(rs) }.single.apply + }.get + } + + def findAllByColumn(columnId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = { + // val cacheKey = s"columnId=$columnId" + val cacheKey = className + "columnId=" + columnId + if (useCache) { + withCaches(cacheKey)( sql"""select *from column_metas where column_id = ${columnId} order by seq ASC""" + .map { rs => ColumnMeta.valueOf(rs) }.list.apply()) + } else { + sql"""select * from column_metas where column_id = ${columnId} order by seq ASC""" + .map { rs => ColumnMeta.valueOf(rs) }.list.apply() + } + } + + def findByName(columnId: Int, name: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = { + // val cacheKey = s"columnId=$columnId:name=$name" + val cacheKey = className + "columnId=" + columnId + ":name=" + name + lazy val sql = sql"""select * from column_metas where column_id = ${columnId} and name = ${name}""" + .map { rs => ColumnMeta.valueOf(rs) }.single.apply() + + if (useCache) withCache(cacheKey)(sql) + else sql + } + + def insert(columnId: Int, name: String, dataType: String, defaultValue: String, storeInGlobalIndex: Boolean = false)(implicit session: DBSession = AutoSession) = { + val ls = findAllByColumn(columnId, false) + val seq = ls.size + 1 + if (seq <= maxValue) { + sql"""insert into column_metas(column_id, name, seq, data_type, default_value, store_in_global_index) + select ${columnId}, ${name}, ${seq}, ${dataType}, ${defaultValue}, ${storeInGlobalIndex}""" + .updateAndReturnGeneratedKey.apply() + } + } + + def findOrInsert(columnId: Int, + name: String, + dataType: String, + defaultValue: String, + storeInGlobalIndex: Boolean = false, + useCache: Boolean = true)(implicit session: DBSession = AutoSession): ColumnMeta = { + findByName(columnId, name, useCache) match { + case Some(c) => c + case None => + insert(columnId, name, dataType, defaultValue, storeInGlobalIndex) + expireCache(className + s"columnId=$columnId:name=$name") + findByName(columnId, name).get + } + } + + def findByIdAndSeq(columnId: Int, seq: Byte, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = { + val cacheKey = className + "columnId=" + columnId + ":seq=" + seq + lazy val columnMetaOpt = sql""" + select * from column_metas where column_id = ${columnId} and seq = ${seq} + """.map { rs => ColumnMeta.valueOf(rs) }.single.apply() + + if (useCache) withCache(cacheKey)(columnMetaOpt) + else columnMetaOpt + } + + def delete(id: Int)(implicit session: DBSession = AutoSession) = { + val columnMeta = findById(id) + val (columnId, name) = (columnMeta.columnId, columnMeta.name) + sql"""delete from column_metas where id = ${id}""".execute.apply() + val cacheKeys = List(s"id=$id", s"columnId=$columnId:name=$name", s"colunmId=$columnId") + cacheKeys.foreach { key => + expireCache(className + key) + expireCaches(className + key) + } + } + + def findAll()(implicit session: DBSession = AutoSession) = { + val ls = sql"""select * from column_metas""".map { rs => ColumnMeta.valueOf(rs) }.list().apply() + + putsToCacheOption(ls.flatMap { x => + Seq( + s"id=${x.id.get}", + s"columnId=${x.columnId}:name=${x.name}", + s"columnId=${x.columnId}:seq=${x.seq}" + ).map(cacheKey => (className + cacheKey, x)) + }) + + putsToCaches(ls.groupBy(x => x.columnId).map { case (columnId, ls) => + val cacheKey = className + s"columnId=${columnId}" + (cacheKey -> ls) + }.toList) + } + + def updateStoreInGlobalIndex(id: Int, storeInGlobalIndex: Boolean)(implicit session: DBSession = AutoSession): Try[Long] = Try { + sql""" + update column_metas set store_in_global_index = ${storeInGlobalIndex} where id = ${id} + """.updateAndReturnGeneratedKey.apply() + } +} + +case class ColumnMeta(id: Option[Int], + columnId: Int, + name: String, + seq: Byte, + dataType: String, + defaultValue: String, + storeInGlobalIndex: Boolean = false) { + lazy val toJson = Json.obj("name" -> name, "dataType" -> dataType) + override def equals(other: Any): Boolean = { + if (!other.isInstanceOf[ColumnMeta]) false + else { + val o = other.asInstanceOf[ColumnMeta] + // labelId == o.labelId && + seq == o.seq + } + } + override def hashCode(): Int = seq.toInt +}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/schema/Experiment.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Experiment.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Experiment.scala new file mode 100644 index 0000000..860bf70 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Experiment.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.schema + +import org.apache.s2graph.core.GraphUtil +import scalikejdbc._ + +import scala.util.{Try, Random} + +object Experiment extends SQLSyntaxSupport[Experiment] { + import Schema._ + val className = Experiment.getClass.getSimpleName + + val ImpressionKey = "S2-Impression-Id" + val ImpressionId = "Impression-Id" + + def apply(rs: WrappedResultSet): Experiment = { + Experiment(rs.intOpt("id"), + rs.int("service_id"), + rs.string("name"), + rs.string("description"), + rs.string("experiment_type"), + rs.int("total_modular")) + } + + def finds(serviceId: Int)(implicit session: DBSession = AutoSession): List[Experiment] = { + val cacheKey = className + "serviceId=" + serviceId + withCaches(cacheKey, false) { + sql"""select * from experiments where service_id = ${serviceId}""" + .map { rs => Experiment(rs) }.list().apply() + } + } + + def findBy(serviceId: Int, name: String)(implicit session: DBSession = AutoSession): Option[Experiment] = { + val cacheKey = className + "serviceId=" + serviceId + ":name=" + name + withCache(cacheKey, false) { + sql"""select * from experiments where service_id = ${serviceId} and name = ${name}""" + .map { rs => Experiment(rs) }.single.apply + } + } + + def findById(id: Int)(implicit session: DBSession = AutoSession): Option[Experiment] = { + val cacheKey = className + "id=" + id + withCache(cacheKey, false)( + sql"""select * from experiments where id = ${id}""" + .map { rs => Experiment(rs) }.single.apply + ) + } + + def insert(service: Service, name: String, description: String, experimentType: String = "t", totalModular: Int = 100) + (implicit session: DBSession = AutoSession): Try[Experiment] = { + Try { + sql"""INSERT INTO experiments(service_id, service_name, `name`, description, experiment_type, total_modular) + VALUES(${service.id.get}, ${service.serviceName}, $name, $description, $experimentType, $totalModular)""" + .updateAndReturnGeneratedKey().apply() + }.map { newId => + Experiment(Some(newId.toInt), service.id.get, name, description, experimentType, totalModular) + } + } +} + +case class Experiment(id: Option[Int], + serviceId: Int, + name: String, + description: String, + experimentType: String, + totalModular: Int) { + + def buckets = Bucket.finds(id.get) + + def rangeBuckets = for { + bucket <- buckets + range <- bucket.rangeOpt + } yield range -> bucket + + + def findBucket(uuid: String, impIdOpt: Option[String] = None): Option[Bucket] = { + impIdOpt match { + case Some(impId) => Bucket.findByImpressionId(impId) + case None => + val seed = experimentType match { + case "u" => (GraphUtil.murmur3(uuid) % totalModular) + 1 + case _ => Random.nextInt(totalModular) + 1 + } + findBucket(seed) + } + } + + def findBucket(uuidMod: Int): Option[Bucket] = { + rangeBuckets.find { case ((from, to), bucket) => + from <= uuidMod && uuidMod <= to + }.map(_._2) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala new file mode 100644 index 0000000..7fb1183 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala @@ -0,0 +1,506 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.schema + +import java.util.Calendar + +import com.typesafe.config.Config +import org.apache.s2graph.core.GraphExceptions.ModelNotFoundException +import org.apache.s2graph.core.GraphUtil +import org.apache.s2graph.core.JSONParser._ +import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} +import org.apache.s2graph.core.types.{InnerVal, InnerValLike, InnerValLikeWithTs} +import org.apache.s2graph.core.utils.logger +import play.api.libs.json.{JsArray, JsObject, JsValue, Json} +import scalikejdbc._ + +object Label extends SQLSyntaxSupport[Label] { + import Schema._ + val className = Label.getClass.getSimpleName + + val maxHBaseTableNames = 2 + + def apply(rs: WrappedResultSet): Label = { + Label(Option(rs.int("id")), rs.string("label"), + rs.int("src_service_id"), rs.string("src_column_name"), rs.string("src_column_type"), + rs.int("tgt_service_id"), rs.string("tgt_column_name"), rs.string("tgt_column_type"), + rs.boolean("is_directed"), rs.string("service_name"), rs.int("service_id"), rs.string("consistency_level"), + rs.string("hbase_table_name"), rs.intOpt("hbase_table_ttl"), rs.string("schema_version"), rs.boolean("is_async"), + rs.string("compressionAlgorithm"), rs.stringOpt("options")) + } + + def deleteAll(label: Label)(implicit session: DBSession) = { + val id = label.id + LabelMeta.findAllByLabelId(id.get, false).foreach { x => LabelMeta.delete(x.id.get) } + LabelIndex.findByLabelIdAll(id.get, false).foreach { x => LabelIndex.delete(x.id.get) } + Label.delete(id.get) + } + + def findByName(labelName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Label] = { + val cacheKey = className + "label=" + labelName + lazy val labelOpt = + sql""" + select * + from labels + where label = ${labelName} + and deleted_at is null """.map { rs => Label(rs) }.single.apply() + + if (useCache) withCache(cacheKey)(labelOpt) + else labelOpt + } + + def insert(label: String, + srcServiceId: Int, + srcColumnName: String, + srcColumnType: String, + tgtServiceId: Int, + tgtColumnName: String, + tgtColumnType: String, + isDirected: Boolean, + serviceName: String, + serviceId: Int, + consistencyLevel: String, + hTableName: String, + hTableTTL: Option[Int], + schemaVersion: String, + isAsync: Boolean, + compressionAlgorithm: String, + options: Option[String])(implicit session: DBSession = AutoSession) = { + sql""" + insert into labels(label, + src_service_id, src_column_name, src_column_type, + tgt_service_id, tgt_column_name, tgt_column_type, + is_directed, service_name, service_id, consistency_level, hbase_table_name, hbase_table_ttl, schema_version, is_async, + compressionAlgorithm, options) + values (${label}, + ${srcServiceId}, ${srcColumnName}, ${srcColumnType}, + ${tgtServiceId}, ${tgtColumnName}, ${tgtColumnType}, + ${isDirected}, ${serviceName}, ${serviceId}, ${consistencyLevel}, ${hTableName}, ${hTableTTL}, + ${schemaVersion}, ${isAsync}, ${compressionAlgorithm}, ${options}) + """ + .updateAndReturnGeneratedKey.apply() + } + + def findByIdOpt(id: Int)(implicit session: DBSession = AutoSession): Option[Label] = { + val cacheKey = className + "id=" + id + withCache(cacheKey)( + sql""" + select * + from labels + where id = ${id} + and deleted_at is null""" + .map { rs => Label(rs) }.single.apply()) + } + + def findById(id: Int)(implicit session: DBSession = AutoSession): Label = { + val cacheKey = className + "id=" + id + withCache(cacheKey)( + sql""" + select * + from labels + where id = ${id} + and deleted_at is null""" + .map { rs => Label(rs) }.single.apply()).get + } + + def findByTgtColumnId(columnId: Int)(implicit session: DBSession = AutoSession): List[Label] = { + val cacheKey = className + "tgtColumnId=" + columnId + val col = ServiceColumn.findById(columnId) + withCaches(cacheKey)( + sql""" + select * + from labels + where tgt_column_name = ${col.columnName} + and service_id = ${col.serviceId} + and deleted_at is null + """.map { rs => Label(rs) }.list().apply()) + } + + def findBySrcColumnId(columnId: Int)(implicit session: DBSession = AutoSession): List[Label] = { + val cacheKey = className + "srcColumnId=" + columnId + val col = ServiceColumn.findById(columnId) + withCaches(cacheKey)( + sql""" + select * + from labels + where src_column_name = ${col.columnName} + and service_id = ${col.serviceId} + and deleted_at is null + """.map { rs => Label(rs) }.list().apply()) + } + + def findBySrcServiceId(serviceId: Int)(implicit session: DBSession = AutoSession): List[Label] = { + val cacheKey = className + "srcServiceId=" + serviceId + withCaches(cacheKey)( + sql"""select * from labels where src_service_id = ${serviceId} and deleted_at is null""".map { rs => Label(rs) }.list().apply + ) + } + + def findByTgtServiceId(serviceId: Int)(implicit session: DBSession = AutoSession): List[Label] = { + val cacheKey = className + "tgtServiceId=" + serviceId + withCaches(cacheKey)( + sql"""select * from labels where tgt_service_id = ${serviceId} and deleted_at is null""".map { rs => Label(rs) }.list().apply + ) + } + + def insertAll(labelName: String, srcServiceName: String, srcColumnName: String, srcColumnType: String, + tgtServiceName: String, tgtColumnName: String, tgtColumnType: String, + isDirected: Boolean = true, + serviceName: String, + indices: Seq[Index], + metaProps: Seq[Prop], + consistencyLevel: String, + hTableName: Option[String], + hTableTTL: Option[Int], + schemaVersion: String, + isAsync: Boolean, + compressionAlgorithm: String, + options: Option[String])(implicit session: DBSession = AutoSession): Label = { + + val srcServiceOpt = Service.findByName(srcServiceName, useCache = false) + val tgtServiceOpt = Service.findByName(tgtServiceName, useCache = false) + val serviceOpt = Service.findByName(serviceName, useCache = false) + if (srcServiceOpt.isEmpty) throw new RuntimeException(s"source service $srcServiceName is not created.") + if (tgtServiceOpt.isEmpty) throw new RuntimeException(s"target service $tgtServiceName is not created.") + if (serviceOpt.isEmpty) throw new RuntimeException(s"service $serviceName is not created.") + + val newLabel = for { + srcService <- srcServiceOpt + tgtService <- tgtServiceOpt + service <- serviceOpt + } yield { + val srcServiceId = srcService.id.get + val tgtServiceId = tgtService.id.get + val serviceId = service.id.get + + /** insert serviceColumn */ + val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, Some(srcColumnType), schemaVersion) + val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, Some(tgtColumnType), schemaVersion) + + if (srcCol.columnType != srcColumnType) throw new RuntimeException(s"source service column type not matched ${srcCol.columnType} != ${srcColumnType}") + if (tgtCol.columnType != tgtColumnType) throw new RuntimeException(s"target service column type not matched ${tgtCol.columnType} != ${tgtColumnType}") + + /* create label */ + Label.findByName(labelName, useCache = false).getOrElse { + + val createdId = insert(labelName, srcServiceId, srcColumnName, srcColumnType, + tgtServiceId, tgtColumnName, tgtColumnType, isDirected, serviceName, serviceId, consistencyLevel, + hTableName.getOrElse(service.hTableName), hTableTTL.orElse(service.hTableTTL), schemaVersion, isAsync, + compressionAlgorithm, options).toInt + + val labelMetaMap = metaProps.map { case Prop(propName, defaultValue, dataType, storeInGlobalIndex) => + val labelMeta = LabelMeta.findOrInsert(createdId, propName, defaultValue, dataType, storeInGlobalIndex) + (propName -> labelMeta.seq) + }.toMap ++ LabelMeta.reservedMetas.map (labelMeta => labelMeta.name -> labelMeta.seq).toMap + + if (indices.isEmpty) { + // make default index with _PK, _timestamp, 0 + LabelIndex.findOrInsert(createdId, LabelIndex.DefaultName, LabelIndex.DefaultMetaSeqs.toList, "none", None, None) + } else { + indices.foreach { index => + val metaSeq = index.propNames.map { name => labelMetaMap(name) } + LabelIndex.findOrInsert(createdId, index.name, metaSeq.toList, "none", index.direction, index.options) + } + } + + val cacheKeys = List(s"id=$createdId", s"label=$labelName") + val ret = findByName(labelName, useCache = false).get + putsToCacheOption(cacheKeys.map(k => className + k -> ret)) + ret + } + } + + newLabel.getOrElse(throw new RuntimeException("failed to create label")) + } + + def findAll()(implicit session: DBSession = AutoSession) = { + val ls = sql"""select * from labels where deleted_at is null""".map { rs => Label(rs) }.list().apply() + putsToCacheOption(ls.flatMap { x => + Seq( + s"id=${x.id.get}", + s"label=${x.label}" + ).map(cacheKey => (className + cacheKey, x)) + }) + ls + } + + def updateName(oldName: String, newName: String)(implicit session: DBSession = AutoSession) = { + logger.info(s"rename label: $oldName -> $newName") + sql"""update labels set label = ${newName} where label = ${oldName}""".update.apply() + } + + def updateHTableName(labelName: String, newHTableName: String)(implicit session: DBSession = AutoSession) = { + logger.info(s"update HTable of label $labelName to $newHTableName") + val cnt = sql"""update labels set hbase_table_name = $newHTableName where label = $labelName""".update().apply() + val label = Label.findByName(labelName, useCache = false).get + + val cacheKeys = List(s"id=${label.id.get}", s"label=${label.label}") + cacheKeys.foreach { key => + expireCache(className + key) + expireCaches(className + key) + } + cnt + } + + def delete(id: Int)(implicit session: DBSession = AutoSession) = { + val label = findById(id) + logger.info(s"delete label: $label") + val cnt = sql"""delete from labels where id = ${label.id.get}""".update().apply() + val cacheKeys = List(s"id=$id", s"label=${label.label}") + cacheKeys.foreach { key => + expireCache(className + key) + expireCaches(className + key) + } + cnt + } + + def markDeleted(label: Label)(implicit session: DBSession = AutoSession) = { + + logger.info(s"mark deleted label: $label") + val oldName = label.label + val now = Calendar.getInstance().getTime + val newName = s"deleted_${now.getTime}_"+ label.label + val cnt = sql"""update labels set label = ${newName}, deleted_at = ${now} where id = ${label.id.get}""".update.apply() + val cacheKeys = List(s"id=${label.id.get}", s"label=${oldName}") + cacheKeys.foreach { key => + expireCache(className + key) + expireCaches(className + key) + } + cnt + } +} + +case class Label(id: Option[Int], label: String, + srcServiceId: Int, srcColumnName: String, srcColumnType: String, + tgtServiceId: Int, tgtColumnName: String, tgtColumnType: String, + isDirected: Boolean = true, serviceName: String, serviceId: Int, consistencyLevel: String = "strong", + hTableName: String, hTableTTL: Option[Int], + schemaVersion: String, isAsync: Boolean = false, + compressionAlgorithm: String, + options: Option[String]) { + def metas(useCache: Boolean = true) = LabelMeta.findAllByLabelId(id.get, useCache = useCache) + + def indices(useCache: Boolean = true) = LabelIndex.findByLabelIdAll(id.get, useCache = useCache) + + // lazy val firstHBaseTableName = hbaseTableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME) + lazy val srcService = Service.findById(srcServiceId) + lazy val tgtService = Service.findById(tgtServiceId) + lazy val service = Service.findById(serviceId) + /** + * TODO + * change this to apply hbase table from target serviceName + */ + // lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, service.tableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME)) + // lazy val (hbaseZkAddr, hbaseTableName) = (Config.HBASE_ZOOKEEPER_QUORUM, hTableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME)) + // lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, hTableName.split(",").headOption.getOrElse(GraphConnection.getConfVal("hbase.table.name"))) + lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, hTableName.split(",").head) + + lazy val srcColumn = ServiceColumn.find(srcServiceId, srcColumnName).getOrElse(throw ModelNotFoundException("Source column not found")) + lazy val tgtColumn = ServiceColumn.find(tgtServiceId, tgtColumnName).getOrElse(throw ModelNotFoundException("Target column not found")) + + lazy val defaultIndex = LabelIndex.findByLabelIdAndSeq(id.get, LabelIndex.DefaultSeq) + + //TODO: Make sure this is correct + +// lazy val metas = metas(useCache = true) + lazy val indices = LabelIndex.findByLabelIdAll(id.get, useCache = true) + lazy val labelMetas = LabelMeta.findAllByLabelId(id.get, useCache = true) + lazy val labelMetaSet = labelMetas.toSet + lazy val labelMetaMap = (labelMetas ++ LabelMeta.reservedMetas).map(m => m.seq -> m).toMap + + lazy val indicesMap = indices.map(idx => (idx.seq, idx)) toMap + lazy val indexSeqsMap = indices.map(idx => (idx.metaSeqs, idx)) toMap + lazy val indexNameMap = indices.map(idx => (idx.name, idx)) toMap + lazy val extraIndices = indices.filter(idx => defaultIndex.isDefined && idx.id.get != defaultIndex.get.id.get) + // indices filterNot (_.id.get == defaultIndex.get.id.get) + lazy val extraIndicesMap = extraIndices.map(idx => (idx.seq, idx)) toMap + + lazy val metaProps = LabelMeta.reservedMetas.map { m => + if (m == LabelMeta.to) m.copy(dataType = tgtColumnType) + else if (m == LabelMeta.from) m.copy(dataType = srcColumnType) + else m + } ::: LabelMeta.findAllByLabelId(id.get, useCache = true) + + lazy val metaPropsInner = LabelMeta.reservedMetasInner.map { m => + if (m == LabelMeta.to) m.copy(dataType = tgtColumnType) + else if (m == LabelMeta.from) m.copy(dataType = srcColumnType) + else m + } ::: LabelMeta.findAllByLabelId(id.get, useCache = true) + + lazy val metaPropsMap = metaProps.map(x => (x.seq, x)).toMap + lazy val metaPropsInvMap = metaProps.map(x => (x.name, x)).toMap + lazy val metaPropNames = metaProps.map(x => x.name) + lazy val metaPropNamesMap = metaProps.map(x => (x.seq, x.name)) toMap + + /** this is used only by edgeToProps */ + lazy val metaPropsDefaultMap = (for { + prop <- metaProps if LabelMeta.isValidSeq(prop.seq) + jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), prop.dataType) + } yield prop.name -> jsValue).toMap + + lazy val metaPropsDefaultMapInnerString = (for { + prop <- metaPropsInner if LabelMeta.isValidSeq(prop.seq) + innerVal = InnerValLikeWithTs(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), System.currentTimeMillis()) + } yield prop.name -> innerVal).toMap + + lazy val metaPropsDefaultMapInner = (for { + prop <- metaPropsInner + innerVal = InnerValLikeWithTs(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), System.currentTimeMillis()) + } yield prop -> innerVal).toMap + lazy val metaPropsDefaultMapInnerSeq = metaPropsDefaultMapInner.toSeq + lazy val metaPropsJsValueWithDefault = (for { + prop <- metaProps if LabelMeta.isValidSeq(prop.seq) + jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), prop.dataType) + } yield prop -> jsValue).toMap +// lazy val extraOptions = Model.extraOptions(Option("""{ +// "storage": { +// "s2graph.storage.backend": "rocks", +// "rocks.db.path": "/tmp/db" +// } +// }""")) + + lazy val tokens: Set[String] = extraOptions.get("tokens").fold(Set.empty[String]) { + case JsArray(tokens) => tokens.map(_.as[String]).toSet + case _ => + logger.error("Invalid token JSON") + Set.empty[String] + } + + lazy val extraOptions = Schema.extraOptions(options) + + lazy val durability = extraOptions.get("durability").map(_.as[Boolean]).getOrElse(true) + + lazy val storageConfigOpt: Option[Config] = toStorageConfig + + def toStorageConfig: Option[Config] = { + Schema.toStorageConfig(extraOptions) + } + + def srcColumnWithDir(dir: Int) = { + // GraphUtil.directions("out" + if (dir == 0) srcColumn else tgtColumn + } + + def tgtColumnWithDir(dir: Int) = { + // GraphUtil.directions("out" + if (dir == 0) tgtColumn else srcColumn + } + + lazy val tgtSrc = (tgtColumn, srcColumn) + lazy val srcTgt = (srcColumn, tgtColumn) + + def srcTgtColumn(dir: Int) = if (dir == 1) tgtSrc else srcTgt + + lazy val EmptyPropsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 0)) +// def init() = { +// metas() +// metaSeqsToNames() +// service +// srcColumn +// tgtColumn +// defaultIndex +// indices +// metaProps +// } + + // def srcColumnInnerVal(jsValue: JsValue) = { + // jsValueToInnerVal(jsValue, srcColumnType, version) + // } + // def tgtColumnInnerVal(jsValue: JsValue) = { + // jsValueToInnerVal(jsValue, tgtColumnType, version) + // } + + override def toString(): String = { + val orderByKeys = LabelMeta.findAllByLabelId(id.get) + super.toString() + orderByKeys.toString() + } + + // def findLabelIndexSeq(scoring: List[(Byte, Double)]): Byte = { + // if (scoring.isEmpty) LabelIndex.defaultSeq + // else { + // LabelIndex.findByLabelIdAndSeqs(id.get, scoring.map(_._1).sorted).map(_.seq).getOrElse(LabelIndex.defaultSeq) + // + //// LabelIndex.findByLabelIdAndSeqs(id.get, scoring.map(_._1).sorted).map(_.seq).getOrElse(LabelIndex.defaultSeq) + // } + // } + lazy val toJson = { + val allIdxs = LabelIndex.findByLabelIdAll(id.get, useCache = false) + val defaultIdxOpt = LabelIndex.findByLabelIdAndSeq(id.get, LabelIndex.DefaultSeq, useCache = false) + val extraIdxs = allIdxs.filter(idx => defaultIdxOpt.isDefined && idx.id.get != defaultIdxOpt.get.id.get) + val metaProps = LabelMeta.reservedMetas.map { m => + if (m == LabelMeta.to) m.copy(dataType = tgtColumnType) + else if (m == LabelMeta.from) m.copy(dataType = srcColumnType) + else m + } ::: LabelMeta.findAllByLabelId(id.get, useCache = false) + + val defaultIdx = defaultIdxOpt.map(x => x.toJson).getOrElse(Json.obj()) + val optionsJs = try { + val obj = options.map(Json.parse).getOrElse(Json.obj()).as[JsObject] + if (!obj.value.contains("tokens")) obj + else obj ++ Json.obj("tokens" -> obj.value("tokens").as[Seq[String]].map("*" * _.length)) + + } catch { case e: Exception => Json.obj() } + + Json.obj("labelName" -> label, + "from" -> srcColumn.toJson, "to" -> tgtColumn.toJson, + "isDirected" -> isDirected, + "serviceName" -> serviceName, + "consistencyLevel" -> consistencyLevel, + "schemaVersion" -> schemaVersion, + "isAsync" -> isAsync, + "compressionAlgorithm" -> compressionAlgorithm, + "defaultIndex" -> defaultIdx, + "extraIndex" -> extraIdxs.map(exIdx => exIdx.toJson), + "metaProps" -> metaProps.filter { labelMeta => LabelMeta.isValidSeqForAdmin(labelMeta.seq) }.map(_.toJson), + "options" -> optionsJs + ) + } + + def propsToInnerValsWithTs(props: Map[String, Any], + ts: Long = System.currentTimeMillis()): Map[LabelMeta, InnerValLikeWithTs] = { + for { + (k, v) <- props + labelMeta <- metaPropsInvMap.get(k) + innerVal = toInnerVal(v, labelMeta.dataType, schemaVersion) + } yield labelMeta -> InnerValLikeWithTs(innerVal, ts) + + } + + def innerValsWithTsToProps(props: Map[LabelMeta, InnerValLikeWithTs], + selectColumns: Map[Byte, Boolean]): Map[String, Any] = { + if (selectColumns.isEmpty) { + for { + (meta, v) <- metaPropsDefaultMapInner ++ props + } yield { + meta.name -> innerValToAny(v.innerVal, meta.dataType) + } + } else { + for { + (k, _) <- selectColumns + if k != LabelMeta.toSeq && k != LabelMeta.fromSeq + labelMeta <- metaPropsMap.get(k) + } yield { + val v = props.get(labelMeta).orElse(metaPropsDefaultMapInner.get(labelMeta)).get + labelMeta.name -> innerValToAny(v.innerVal, labelMeta.dataType) + } + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelIndex.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelIndex.scala new file mode 100644 index 0000000..bb8425f --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelIndex.scala @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.schema + +import org.apache.s2graph.core.GraphUtil +import org.apache.s2graph.core.schema.LabelIndex.LabelIndexMutateOption +import org.apache.s2graph.core.utils.logger +import play.api.libs.json.{JsObject, JsString, Json} +import scalikejdbc._ + +object LabelIndex extends SQLSyntaxSupport[LabelIndex] { + import Schema._ + val className = LabelIndex.getClass.getSimpleName + + val DefaultName = "_PK" + val DefaultMetaSeqs = Seq(LabelMeta.timestampSeq) + val DefaultSeq = 1.toByte + val MaxOrderSeq = 7 + + def apply(rs: WrappedResultSet): LabelIndex = { + LabelIndex(rs.intOpt("id"), rs.int("label_id"), rs.string("name"), rs.byte("seq"), + rs.string("meta_seqs").split(",").filter(_ != "").map(s => s.toByte).toList match { + case metaSeqsList => metaSeqsList + }, + rs.string("formulars"), + rs.intOpt("dir"), + rs.stringOpt("options") + ) + } + + case class LabelIndexMutateOption(dir: Byte, + method: String, + rate: Double, + totalModular: Long, + storeDegree: Boolean) { + + val isBufferIncrement = method == "drop" || method == "sample" || method == "hash_sample" + + def sample[T](a: T, hashOpt: Option[Long]): Boolean = { + if (method == "drop") false + else if (method == "sample") { + if (scala.util.Random.nextDouble() < rate) true + else false + } else if (method == "hash_sample") { + val hash = hashOpt.getOrElse(throw new RuntimeException("hash_sample need _from_hash value")) + if ((hash.abs % totalModular) / totalModular.toDouble < rate) true + else false + } else true + } + } + + def findById(id: Int)(implicit session: DBSession = AutoSession) = { + val cacheKey = className + "id=" + id + withCache(cacheKey) { + sql"""select * from label_indices where id = ${id}""".map { rs => LabelIndex(rs) }.single.apply + }.get + } + + def findByLabelIdAll(labelId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = { + val cacheKey = className + "labelId=" + labelId + if (useCache) { + withCaches(cacheKey)( sql""" + select * from label_indices where label_id = ${labelId} and seq > 0 order by seq ASC + """.map { rs => LabelIndex(rs) }.list.apply) + } else { + sql""" + select * from label_indices where label_id = ${labelId} and seq > 0 order by seq ASC + """.map { rs => LabelIndex(rs) }.list.apply + } + } + + def insert(labelId: Int, indexName: String, seq: Byte, metaSeqs: List[Byte], formulars: String, + direction: Option[Int], options: Option[String])(implicit session: DBSession = AutoSession): Long = { + sql""" + insert into label_indices(label_id, name, seq, meta_seqs, formulars, dir, options) + values (${labelId}, ${indexName}, ${seq}, ${metaSeqs.mkString(",")}, ${formulars}, ${direction}, ${options}) + """ + .updateAndReturnGeneratedKey.apply() + } + + def findOrInsert(labelId: Int, indexName: String, metaSeqs: List[Byte], formulars: String, + direction: Option[Int], options: Option[String])(implicit session: DBSession = AutoSession): LabelIndex = { + findByLabelIdAndSeqs(labelId, metaSeqs, direction) match { + case Some(s) => s + case None => + val orders = findByLabelIdAll(labelId, false) + val seq = (orders.size + 1).toByte + assert(seq <= MaxOrderSeq) + val createdId = insert(labelId, indexName, seq, metaSeqs, formulars, direction, options) + val cacheKeys = List(s"labelId=$labelId:seq=$seq", + s"labelId=$labelId:seqs=$metaSeqs:dir=$direction", s"labelId=$labelId:seq=$seq", s"id=$createdId") + + cacheKeys.foreach { key => + expireCache(className + key) + expireCaches(className + key) + } + + findByLabelIdAndSeq(labelId, seq).get + } + } + + def findByLabelIdAndSeqs(labelId: Int, seqs: List[Byte], direction: Option[Int])(implicit session: DBSession = AutoSession): Option[LabelIndex] = { + val cacheKey = className + "labelId=" + labelId + ":seqs=" + seqs.mkString(",") + ":dir=" + direction + withCache(cacheKey) { + sql""" + select * from label_indices where label_id = ${labelId} and meta_seqs = ${seqs.mkString(",")} and dir = ${direction} + """.map { rs => LabelIndex(rs) }.single.apply + } + } + + def findByLabelIdAndSeq(labelId: Int, seq: Byte, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[LabelIndex] = { + // val cacheKey = s"labelId=$labelId:seq=$seq" + val cacheKey = className + "labelId=" + labelId + ":seq=" + seq + if (useCache) { + withCache(cacheKey)( sql""" + select * from label_indices where label_id = ${labelId} and seq = ${seq} + """.map { rs => LabelIndex(rs) }.single.apply) + } else { + sql""" + select * from label_indices where label_id = ${labelId} and seq = ${seq} + """.map { rs => LabelIndex(rs) }.single.apply + } + } + + def delete(id: Int)(implicit session: DBSession = AutoSession) = { + val labelIndex = findById(id) + val seqs = labelIndex.metaSeqs.mkString(",") + val (labelId, seq) = (labelIndex.labelId, labelIndex.seq) + sql"""delete from label_indices where id = ${id}""".execute.apply() + + val cacheKeys = List(s"id=$id", s"labelId=$labelId", s"labelId=$labelId:seq=$seq", s"labelId=$labelId:seqs=$seqs:dir=${labelIndex.dir}") + cacheKeys.foreach { key => + expireCache(className + key) + expireCaches(className + key) + } + } + + def findAll()(implicit session: DBSession = AutoSession) = { + val ls = sql"""select * from label_indices""".map { rs => LabelIndex(rs) }.list.apply + putsToCacheOption(ls.flatMap { x => + Seq( + s"id=${x.id.get}", + s"labelId=${x.labelId}:seq=${x.seq}", + s"labelId=${x.labelId}:seqs=${x.metaSeqs.mkString(",")}:dir=${x.dir}" + ).map(cacheKey => (className + cacheKey, x)) + }) + + putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) => + val cacheKey = s"labelId=${labelId}" + (className + cacheKey -> ls) + }.toList) + } +} + + +case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, metaSeqs: Seq[Byte], formulars: String, + dir: Option[Int], options: Option[String]) { + // both + lazy val label = Label.findById(labelId) + lazy val metas = label.metaPropsMap + lazy val sortKeyTypes = metaSeqs.flatMap(metaSeq => label.metaPropsMap.get(metaSeq)) + lazy val sortKeyTypesArray = sortKeyTypes.toArray + lazy val propNames = sortKeyTypes.map { labelMeta => labelMeta.name } + + lazy val toJson = { + val dirJs = dir.map(GraphUtil.fromDirection).getOrElse("both") + val optionsJs = try { options.map(Json.parse).getOrElse(Json.obj()) } catch { case e: Exception => Json.obj() } + + Json.obj( + "name" -> name, + "propNames" -> sortKeyTypes.map(x => x.name), + "dir" -> dirJs, + "options" -> optionsJs + ) + } + + def parseOption(dir: String): Option[LabelIndexMutateOption] = try { + options.map { string => + val jsObj = Json.parse(string) \ dir + + val method = (jsObj \ "method").asOpt[String].getOrElse("default") + val rate = (jsObj \ "rate").asOpt[Double].getOrElse(1.0) + val totalModular = (jsObj \ "totalModular").asOpt[Long].getOrElse(100L) + val storeDegree = (jsObj \ "storeDegree").asOpt[Boolean].getOrElse(true) + + LabelIndexMutateOption(GraphUtil.directions(dir).toByte, method, rate, totalModular, storeDegree) + } + } catch { + case e: Exception => + logger.error(s"Parse failed labelOption: ${this.label}", e) + None + } + + lazy val inDirOption = parseOption("in") + + lazy val outDirOption = parseOption("out") +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelMeta.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelMeta.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelMeta.scala new file mode 100644 index 0000000..f25be5f --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelMeta.scala @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.schema + +/** + * Created by shon on 6/3/15. + */ + +import org.apache.s2graph.core.GraphExceptions.MaxPropSizeReachedException +import org.apache.s2graph.core.{GraphExceptions, JSONParser} +import play.api.libs.json.Json +import scalikejdbc._ + +import scala.util.Try + +object LabelMeta extends SQLSyntaxSupport[LabelMeta] { + import Schema._ + val className = LabelMeta.getClass.getSimpleName + /** dummy sequences */ + + val fromSeq = (-4).toByte + val toSeq = (-5).toByte + val lastOpSeq = (-3).toByte + val lastDeletedAtSeq = (-2).toByte + val timestampSeq = (0).toByte + val labelSeq = (-6).toByte + val directionSeq = -7.toByte + val fromHashSeq = -8.toByte + + val countSeq = (Byte.MaxValue - 2).toByte + val degreeSeq = (Byte.MaxValue - 1).toByte + val maxValue = Byte.MaxValue + val emptySeq = Byte.MaxValue + + /** reserved sequences */ + // val deleted = LabelMeta(id = Some(lastDeletedAt), labelId = lastDeletedAt, name = "lastDeletedAt", + // seq = lastDeletedAt, defaultValue = "", dataType = "long") + val fromHash = LabelMeta(id = None, labelId = fromHashSeq, name = "_from_hash", + seq = fromHashSeq, defaultValue = fromHashSeq.toString, dataType = "long") + val from = LabelMeta(id = Some(fromSeq), labelId = fromSeq, name = "_from", + seq = fromSeq, defaultValue = fromSeq.toString, dataType = "string") + val to = LabelMeta(id = Some(toSeq), labelId = toSeq, name = "_to", + seq = toSeq, defaultValue = toSeq.toString, dataType = "string") + val timestamp = LabelMeta(id = Some(-1), labelId = -1, name = "_timestamp", + seq = timestampSeq, defaultValue = "0", dataType = "long") + val degree = LabelMeta(id = Some(-1), labelId = -1, name = "_degree", + seq = degreeSeq, defaultValue = "0", dataType = "long") + val count = LabelMeta(id = Some(-1), labelId = -1, name = "_count", + seq = countSeq, defaultValue = "-1", dataType = "long") + val lastDeletedAt = LabelMeta(id = Some(-1), labelId = -1, name = "_lastDeletedAt", + seq = lastDeletedAtSeq, defaultValue = "-1", dataType = "long") + val label = LabelMeta(id = Some(-1), labelId = -1, name = "label", + seq = labelSeq, defaultValue = "", dataType = "string") + val direction = LabelMeta(id = Some(-1), labelId = -1, name = "direction", + seq = directionSeq, defaultValue = "out", dataType = "string") + val empty = LabelMeta(id = Some(-1), labelId = -1, name = "_empty", + seq = emptySeq, defaultValue = "-1", dataType = "long") + + // Each reserved column(_timestamp, timestamp) has same seq number, starts with '_' has high priority + val reservedMetas = List(empty, label, direction, lastDeletedAt, from, fromHash, to, degree, timestamp, count).flatMap { lm => List(lm, lm.copy(name = lm.name.drop(1))) }.reverse + val reservedMetasInner = List(empty, label, direction, lastDeletedAt, from, fromHash, to, degree, timestamp, count) + val reservedMetaNamesSet = reservedMetasInner.map(_.name).toSet + + val defaultRequiredMetaNames = Set("from", "_from", "to", "_to", "_from_hash", "label", "direction", "timestamp", "_timestamp") + + def apply(rs: WrappedResultSet): LabelMeta = { + LabelMeta(Some(rs.int("id")), rs.int("label_id"), rs.string("name"), rs.byte("seq"), + rs.string("default_value"), rs.string("data_type").toLowerCase, rs.boolean("store_in_global_index")) + } + + /** Note: DegreeSeq should not be included in serializer/deserializer. + * only 0 <= seq <= CountSeq(Int.MaxValue - 2), not DegreeSet(Int.MaxValue - 1) should be + * included in actual bytes in storage. + * */ + def isValidSeq(seq: Byte): Boolean = seq >= 0 && seq <= countSeq // || seq == fromHashSeq + + def isValidSeqForAdmin(seq: Byte): Boolean = seq > 0 && seq < countSeq // || seq == fromHashSeq + + def findById(id: Int)(implicit session: DBSession = AutoSession): LabelMeta = { + val cacheKey = className + "id=" + id + + withCache(cacheKey) { + sql"""select * from label_metas where id = ${id}""".map { rs => LabelMeta(rs) }.single.apply + }.get + } + + def findAllByLabelId(labelId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): List[LabelMeta] = { + val cacheKey = className + "labelId=" + labelId + lazy val labelMetas = sql"""select * + from label_metas + where label_id = ${labelId} order by seq ASC""".map(LabelMeta(_)).list.apply() + + if (useCache) withCaches(cacheKey)(labelMetas) + else labelMetas + } + + def findByName(labelId: Int, name: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[LabelMeta] = { + name match { + case timestamp.name => Some(timestamp) + case from.name => Some(from) + case to.name => Some(to) + case _ => + val cacheKey = className + "labelId=" + labelId + ":name=" + name + lazy val labelMeta = sql""" + select * + from label_metas where label_id = ${labelId} and name = ${name}""" + .map { rs => LabelMeta(rs) }.single.apply() + + if (useCache) withCache(cacheKey)(labelMeta) + else labelMeta + } + } + + def insert(labelId: Int, name: String, defaultValue: String, dataType: String, storeInGlobalIndex: Boolean = false)(implicit session: DBSession = AutoSession) = { + val ls = findAllByLabelId(labelId, useCache = false) + val seq = ls.size + 1 + + if (seq < maxValue) { + sql"""insert into label_metas(label_id, name, seq, default_value, data_type) + select ${labelId}, ${name}, ${seq}, ${defaultValue}, ${dataType}""".updateAndReturnGeneratedKey.apply() + } else { + throw MaxPropSizeReachedException("max property size reached") + } + } + + def findOrInsert(labelId: Int, + name: String, + defaultValue: String, + dataType: String, + storeInGlobalIndex: Boolean = false)(implicit session: DBSession = AutoSession): LabelMeta = { + + findByName(labelId, name) match { + case Some(c) => c + case None => + insert(labelId, name, defaultValue, dataType, storeInGlobalIndex) + val cacheKey = "labelId=" + labelId + ":name=" + name + val cacheKeys = "labelId=" + labelId + expireCache(className + cacheKey) + expireCaches(className + cacheKeys) + findByName(labelId, name, useCache = false).get + } + } + + def delete(id: Int)(implicit session: DBSession = AutoSession) = { + val labelMeta = findById(id) + val (labelId, name) = (labelMeta.labelId, labelMeta.name) + sql"""delete from label_metas where id = ${id}""".execute.apply() + val cacheKeys = List(s"id=$id", s"labelId=$labelId", s"labelId=$labelId:name=$name") + cacheKeys.foreach { key => + expireCache(className + key) + expireCaches(className + key) + } + } + + def findAll()(implicit session: DBSession = AutoSession) = { + val ls = sql"""select * from label_metas""".map { rs => LabelMeta(rs) }.list.apply + putsToCacheOption(ls.flatMap { x => + Seq( + s"id=${x.id.get}", + s"labelId=${x.labelId}:name=${x.name}", + s"labelId=${x.labelId}:seq=${x.seq}" + ).map(cacheKey => (className + cacheKey, x)) + }) + + putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) => + val cacheKey = s"labelId=${labelId}" + cacheKey -> ls + }.toList) + + ls + } + + def updateStoreInGlobalIndex(id: Int, storeInGlobalIndex: Boolean)(implicit session: DBSession = AutoSession): Try[Long] = Try { + sql""" + update label_metas set store_in_global_index = ${storeInGlobalIndex} where id = ${id} + """.updateAndReturnGeneratedKey.apply() + } +} + +case class LabelMeta(id: Option[Int], + labelId: Int, + name: String, + seq: Byte, + defaultValue: String, + dataType: String, + storeInGlobalIndex: Boolean = false) { + + lazy val toJson = Json.obj("name" -> name, "defaultValue" -> defaultValue, "dataType" -> dataType, "storeInGlobalIndex" -> storeInGlobalIndex) + + override def equals(other: Any): Boolean = { + if (!other.isInstanceOf[LabelMeta]) false + else { + val o = other.asInstanceOf[LabelMeta] +// labelId == o.labelId && + seq == o.seq + } + } + override def hashCode(): Int = seq.toInt +// (labelId, seq).hashCode() +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala new file mode 100644 index 0000000..5b9e0ae --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.schema + +import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicLong + +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.s2graph.core.JSONParser +import org.apache.s2graph.core.utils.{SafeUpdateCache, logger} +import play.api.libs.json.{JsObject, JsValue, Json} +import scalikejdbc._ + +import scala.concurrent.ExecutionContext +import scala.io.Source +import scala.language.{higherKinds, implicitConversions} +import scala.util.{Failure, Success, Try} + +object Schema { + var maxSize = 10000 + var ttl = 60 + var safeUpdateCache: SafeUpdateCache = _ + + val numOfThread = Runtime.getRuntime.availableProcessors() + val threadPool = Executors.newFixedThreadPool(numOfThread) + val ec = ExecutionContext.fromExecutor(threadPool) + + private val ModelReferenceCount = new AtomicLong(0L) + + def apply(config: Config) = { + maxSize = config.getInt("cache.max.size") + ttl = config.getInt("cache.ttl.seconds") + + Class.forName(config.getString("db.default.driver")) + + val settings = ConnectionPoolSettings( + initialSize = 10, + maxSize = 10, + connectionTimeoutMillis = 30000L, + validationQuery = "select 1;") + + ConnectionPool.singleton( + config.getString("db.default.url"), + config.getString("db.default.user"), + config.getString("db.default.password"), + settings) + + checkSchema() + safeUpdateCache = new SafeUpdateCache(maxSize, ttl)(ec) + ModelReferenceCount.incrementAndGet() + } + + def checkSchema(): Unit = { + withTx { implicit session => + sql"""show tables""".map(rs => rs.string(1)).list.apply() + } match { + case Success(tables) => + if (tables.isEmpty) { + // this is a very simple migration tool that only supports creating + // appropriate tables when there are no tables in the database at all. + // Ideally, it should be improved to a sophisticated migration tool + // that supports versioning, etc. + logger.info("Creating tables ...") + val schema = getClass.getResourceAsStream("schema.sql") + val lines = Source.fromInputStream(schema, "UTF-8").getLines + val sources = lines.map(_.split("-- ").head.trim).mkString("\n") + val statements = sources.split(";\n") + withTx { implicit session => + statements.foreach(sql => session.execute(sql)) + } match { + case Success(_) => + logger.info("Successfully imported schema") + case Failure(e) => + throw new RuntimeException("Error while importing schema", e) + } + } + case Failure(e) => + throw new RuntimeException("Could not list tables in the database", e) + } + } + + def withTx[T](block: DBSession => T): Try[T] = { + using(DB(ConnectionPool.borrow())) { conn => + Try { + conn.begin() + val session = conn.withinTxSession() + val result = block(session) + + conn.commit() + + result + } recoverWith { + case e: Exception => + conn.rollbackIfActive() + Failure(e) + } + } + } + + def shutdown(modelDataDelete: Boolean = false) = + if (ModelReferenceCount.decrementAndGet() <= 0) { + // FIXME: When Model is served by embedded database and deleteData is set, Model deletes + // the underlying database. Its purpose is clearing runtime footprint when running tests. + if (modelDataDelete) { + withTx { implicit session => + sql"SHOW TABLES" + .map(rs => rs.string(1)) + .list + .apply() + .map { table => s"TRUNCATE TABLE $table" } + } match { + case Success(stmts) => + val newStmts = List("SET FOREIGN_KEY_CHECKS = 0") ++ stmts ++ List("SET FOREIGN_KEY_CHECKS = 1") + withTx { implicit session => + newStmts.foreach { stmt => + session.execute(stmt) + } + } match { + case Success(_) => + logger.info(s"Success to truncate models: $stmts") + case Failure(e) => + throw new IllegalStateException(s"Failed to truncate models", e) + } + case Failure(e) => + throw new IllegalStateException(s"Failed to list models", e) + } + } +// clearCache() + safeUpdateCache.shutdown() + ConnectionPool.closeAll() + } + + def loadCache() = { + Service.findAll() + ServiceColumn.findAll() + Label.findAll() + LabelMeta.findAll() + LabelIndex.findAll() + ColumnMeta.findAll() + } + +// def clearCache() = { +// Service.expireAll() +// ServiceColumn.expireAll() +// Label.expireAll() +// LabelMeta.expireAll() +// LabelIndex.expireAll() +// ColumnMeta.expireAll() +// } + + def extraOptions(options: Option[String]): Map[String, JsValue] = options match { + case None => Map.empty + case Some(v) => + try { + Json.parse(v).asOpt[JsObject].map { obj => obj.fields.toMap }.getOrElse(Map.empty) + } catch { + case e: Exception => + logger.error(s"An error occurs while parsing the extra label option", e) + Map.empty + } + } + + def toStorageConfig(options: Map[String, JsValue]): Option[Config] = { + try { + options.get("storage").map { jsValue => + import scala.collection.JavaConverters._ + val configMap = jsValue.as[JsObject].fieldSet.toMap.map { case (key, value) => + key -> JSONParser.jsValueToAny(value).getOrElse(throw new RuntimeException("!!")) + } + + ConfigFactory.parseMap(configMap.asJava) + } + } catch { + case e: Exception => + logger.error(s"toStorageConfig error. use default storage", e) + None + } + } + + private def toMultiKey(key: String): String = key + ".__m__" + + def withCache[T <: AnyRef](key: String, broadcast: Boolean = true)(op: => T) = safeUpdateCache.withCache(key, broadcast)(op) + + def withCaches[T <: AnyRef](key: String, broadcast: Boolean = true)(op: => T) = safeUpdateCache.withCache(toMultiKey(key), broadcast)(op) + + def expireCache(key: String) = safeUpdateCache.invalidate(key) + + def expireCaches(key: String) = safeUpdateCache.invalidate(toMultiKey(key)) + + def putsToCacheOption[T <: AnyRef](kvs: List[(String, T)]) = kvs.foreach { + case (key, value) => safeUpdateCache.put(key, Option(value)) + } + + def putsToCaches[T <: AnyRef](kvs: List[(String, T)]) = kvs.foreach { + case (key, values) => safeUpdateCache.put(toMultiKey(key), values) + } + + def getCacheSize(): Int = safeUpdateCache.asMap().size() + + def getAllCacheData[T <: AnyRef](): (List[(String, T)], List[(String, List[T])]) = { + (Nil, Nil) + } + + def toBytes(): Array[Byte] = safeUpdateCache.toBytes() + + def fromBytes(safeUpdateCache: SafeUpdateCache, bytes: Array[Byte]): Unit = SafeUpdateCache.fromBytes(safeUpdateCache, bytes) +} + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala new file mode 100644 index 0000000..611a746 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.schema + +import java.util.UUID + +import com.typesafe.config.Config +import org.apache.s2graph.core.utils.logger +import play.api.libs.json.Json +import scalikejdbc._ + +object Service extends SQLSyntaxSupport[Service] { + import Schema._ + val className = Service.getClass.getSimpleName + + def valueOf(rs: WrappedResultSet): Service = { + Service(rs.intOpt("id"), + rs.string("service_name"), + rs.string("access_token"), + rs.string("cluster"), + rs.string("hbase_table_name"), + rs.int("pre_split_size"), + rs.intOpt("hbase_table_ttl")) + } + + def findByAccessToken(accessToken: String)(implicit session: DBSession = AutoSession): Option[Service] = { + val cacheKey = className + s"accessToken=$accessToken" + withCache(cacheKey)( sql"""select * from services where access_token = ${accessToken}""".map { rs => Service.valueOf(rs) }.single.apply) + } + + def findById(id: Int)(implicit session: DBSession = AutoSession): Service = { + val cacheKey = className + "id=" + id + withCache(cacheKey)( sql"""select * from services where id = ${id}""".map { rs => Service.valueOf(rs) }.single.apply).get + } + + def findByName(serviceName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Service] = { + val cacheKey = className + "serviceName=" + serviceName + lazy val serviceOpt = sql""" + select * from services where service_name = ${serviceName} + """.map { rs => Service.valueOf(rs) }.single.apply() + + if (useCache) withCache(cacheKey)(serviceOpt) + else serviceOpt + } + + def insert(serviceName: String, cluster: String, + hTableName: String, preSplitSize: Int, hTableTTL: Option[Int], + compressionAlgorithm: String)(implicit session: DBSession = AutoSession): Unit = { + logger.info(s"$serviceName, $cluster, $hTableName, $preSplitSize, $hTableTTL, $compressionAlgorithm") + val accessToken = UUID.randomUUID().toString() + sql"""insert into services(service_name, access_token, cluster, hbase_table_name, pre_split_size, hbase_table_ttl) + values(${serviceName}, ${accessToken}, ${cluster}, ${hTableName}, ${preSplitSize}, ${hTableTTL})""".execute.apply() + } + + def delete(id: Int)(implicit session: DBSession = AutoSession) = { + val service = findById(id) + val serviceName = service.serviceName + sql"""delete from service_columns where id = ${id}""".execute.apply() + val cacheKeys = List(s"id=$id", s"serviceName=$serviceName") + cacheKeys.foreach { key => + expireCache(className + key) + expireCaches(className + key) + } + } + + def findOrInsert(serviceName: String, cluster: String, hTableName: String, + preSplitSize: Int, hTableTTL: Option[Int], compressionAlgorithm: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Service = { + findByName(serviceName, useCache) match { + case Some(s) => s + case None => + insert(serviceName, cluster, hTableName, preSplitSize, hTableTTL, compressionAlgorithm) + val cacheKey = "serviceName=" + serviceName + expireCache(className + cacheKey) + findByName(serviceName).get + } + } + + def findAll()(implicit session: DBSession = AutoSession) = { + val ls = sql"""select * from services""".map { rs => Service.valueOf(rs) }.list.apply + putsToCacheOption(ls.flatMap { x => + Seq( + s"id=${x.id.get}", + s"serviceName=${x.serviceName}" + ).map(cacheKey => (className + cacheKey, x)) + }) + + ls + } + + def findAllConn()(implicit session: DBSession = AutoSession): List[String] = { + sql"""select distinct(cluster) from services""".map { rs => rs.string("cluster") }.list.apply + } +} + +case class Service(id: Option[Int], + serviceName: String, + accessToken: String, + cluster: String, + hTableName: String, + preSplitSize: Int, + hTableTTL: Option[Int], + options: Option[String] = None) { + lazy val toJson = + id match { + case Some(_id) => + Json.obj("id" -> _id, "name" -> serviceName, "accessToken" -> accessToken, "cluster" -> cluster, + "hTableName" -> hTableName, "preSplitSize" -> preSplitSize, "hTableTTL" -> hTableTTL) + case None => + Json.parse("{}") + } + + lazy val extraOptions = Schema.extraOptions(options) + lazy val storageConfigOpt: Option[Config] = toStorageConfig + def serviceColumns(useCache: Boolean): Seq[ServiceColumn] = ServiceColumn.findByServiceId(id.get, useCache = useCache) + def toStorageConfig: Option[Config] = Schema.toStorageConfig(extraOptions) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala new file mode 100644 index 0000000..cc1698a --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.schema + +import org.apache.s2graph.core.JSONParser +import org.apache.s2graph.core.JSONParser._ +import org.apache.s2graph.core.types.{HBaseType, InnerValLike, InnerValLikeWithTs} +import play.api.libs.json.Json +import scalikejdbc._ + +object ServiceColumn extends SQLSyntaxSupport[ServiceColumn] { + import Schema._ + val className = ServiceColumn.getClass.getSimpleName + + val Default = ServiceColumn(Option(0), -1, "default", "string", "v4") + + def valueOf(rs: WrappedResultSet): ServiceColumn = { + ServiceColumn(rs.intOpt("id"), rs.int("service_id"), rs.string("column_name"), rs.string("column_type").toLowerCase(), rs.string("schema_version")) + } + + def findByServiceId(serviceId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Seq[ServiceColumn] = { + val cacheKey = "serviceId=" + serviceId + + lazy val sql = sql"""select * from service_columns where service_id = ${serviceId}""".map { x => ServiceColumn.valueOf(x) }.list().apply() + + if (useCache) withCaches(cacheKey)(sql) + else sql + } + + def findById(id: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): ServiceColumn = { + val cacheKey = className + "id=" + id + lazy val sql = sql"""select * from service_columns where id = ${id}""".map { x => ServiceColumn.valueOf(x) }.single.apply + if (useCache) withCache(cacheKey)(sql).get + else sql.get + } + + def find(serviceId: Int, columnName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[ServiceColumn] = { + val cacheKey = className + "serviceId=" + serviceId + ":columnName=" + columnName + if (useCache) { + withCache(cacheKey) { + sql""" + select * from service_columns where service_id = ${serviceId} and column_name = ${columnName} + """.map { rs => ServiceColumn.valueOf(rs) }.single.apply() + } + } else { + sql""" + select * from service_columns where service_id = ${serviceId} and column_name = ${columnName} + """.map { rs => ServiceColumn.valueOf(rs) }.single.apply() + } + } + def insert(serviceId: Int, columnName: String, columnType: Option[String], schemaVersion: String)(implicit session: DBSession = AutoSession) = { + sql"""insert into service_columns(service_id, column_name, column_type, schema_version) + values(${serviceId}, ${columnName}, ${columnType}, ${schemaVersion})""".execute.apply() + } + def delete(id: Int)(implicit session: DBSession = AutoSession) = { + val serviceColumn = findById(id, useCache = false) + val (serviceId, columnName) = (serviceColumn.serviceId, serviceColumn.columnName) + sql"""delete from service_columns where id = ${id}""".execute.apply() + val cacheKeys = List(s"id=$id", s"serviceId=$serviceId:columnName=$columnName") + cacheKeys.foreach { key => + expireCache(className + key) + expireCaches(className + key) + } + } + def findOrInsert(serviceId: Int, columnName: String, columnType: Option[String], schemaVersion: String = HBaseType.DEFAULT_VERSION, useCache: Boolean = true)(implicit session: DBSession = AutoSession): ServiceColumn = { + find(serviceId, columnName, useCache) match { + case Some(sc) => sc + case None => + insert(serviceId, columnName, columnType, schemaVersion) +// val cacheKey = s"serviceId=$serviceId:columnName=$columnName" + val cacheKey = "serviceId=" + serviceId + ":columnName=" + columnName + expireCache(className + cacheKey) + find(serviceId, columnName).get + } + } + def findAll()(implicit session: DBSession = AutoSession) = { + val ls = sql"""select * from service_columns""".map { rs => ServiceColumn.valueOf(rs) }.list.apply + putsToCacheOption(ls.flatMap { x => + Seq( + s"id=${x.id.get}", + s"serviceId=${x.serviceId}:columnName=${x.columnName}" + ).map(cacheKey => (className + cacheKey, x)) + }) + + ls + } +} +case class ServiceColumn(id: Option[Int], + serviceId: Int, + columnName: String, + columnType: String, + schemaVersion: String) { + + lazy val service = Service.findById(serviceId) + lazy val metasWithoutCache = ColumnMeta.timestamp +: ColumnMeta.findAllByColumn(id.get, false) :+ ColumnMeta.lastModifiedAtColumn + lazy val metas = ColumnMeta.timestamp +: ColumnMeta.findAllByColumn(id.get) :+ ColumnMeta.lastModifiedAtColumn + lazy val metasMap = metas.map { meta => meta.seq.toInt -> meta } toMap + lazy val metasInvMap = metas.map { meta => meta.name -> meta} toMap + lazy val metaNamesMap = (ColumnMeta.lastModifiedAtColumn :: metas).map(x => (x.seq.toInt, x.name)) toMap + lazy val metaPropsDefaultMap = metas.map { meta => + meta -> JSONParser.toInnerVal(meta.defaultValue, meta.dataType, schemaVersion) + }.toMap + lazy val toJson = Json.obj("serviceName" -> service.serviceName, "columnName" -> columnName, "columnType" -> columnType) + + def propsToInnerVals(props: Map[String, Any]): Map[ColumnMeta, InnerValLike] = { + val ret = for { + (k, v) <- props + labelMeta <- metasInvMap.get(k) + innerVal = toInnerVal(v, labelMeta.dataType, schemaVersion) + } yield labelMeta -> innerVal + + ret + } + + def innerValsToProps(props: Map[Int, InnerValLike]): Map[String, Any] = { + for { + (k, v) <- props + columnMeta <- metasMap.get(k) + } yield { + columnMeta.name -> v.value + } + } + + def innerValsWithTsToProps(props: Map[Int, InnerValLikeWithTs]): Map[String, Any] = { + for { + (k, v) <- props + columnMeta <- metasMap.get(k) + } yield { + columnMeta.name -> v.innerVal.value + } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala index 2d74a7c..2b02bdd 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala @@ -22,7 +22,7 @@ package org.apache.s2graph.core.storage import org.apache.hadoop.hbase.util.Bytes import org.apache.s2graph.core.TraversalHelper._ import org.apache.s2graph.core._ -import org.apache.s2graph.core.mysqls.LabelMeta +import org.apache.s2graph.core.schema.LabelMeta import org.apache.s2graph.core.parsers.WhereParser import org.apache.s2graph.core.utils.logger http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala index b1fdd7c..0526042 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala @@ -26,7 +26,7 @@ import com.stumbleupon.async.Deferred import com.typesafe.config.Config import org.apache.hadoop.hbase.util.Bytes import org.apache.s2graph.core._ -import org.apache.s2graph.core.mysqls.{Label, ServiceColumn} +import org.apache.s2graph.core.schema.{Label, ServiceColumn} import org.apache.s2graph.core.storage._ import org.apache.s2graph.core.storage.serde._ import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage.{AsyncRPC, ScanWithRange} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala index e575c5b..5db02cc 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala @@ -24,7 +24,7 @@ import java.util.Base64 import com.typesafe.config.Config import org.apache.hadoop.hbase.util.Bytes import org.apache.s2graph.core._ -import org.apache.s2graph.core.mysqls.{Label, ServiceColumn} +import org.apache.s2graph.core.schema.{Label, ServiceColumn} import org.apache.s2graph.core.storage.rocks.RocksHelper.{GetRequest, RocksRPC, ScanWithRange} import org.apache.s2graph.core.storage.serde.StorageSerializable import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageReadable, StorageSerDe} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala index 9871b36..0748efb 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala @@ -19,7 +19,7 @@ package org.apache.s2graph.core.storage.serde -import org.apache.s2graph.core.mysqls.LabelMeta +import org.apache.s2graph.core.schema.LabelMeta import org.apache.s2graph.core._ import org.apache.s2graph.core.storage._ import org.apache.s2graph.core.utils.logger http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageDeserializable.scala index dc7690b..884ca11 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageDeserializable.scala @@ -20,7 +20,7 @@ package org.apache.s2graph.core.storage.serde import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.mysqls.{ColumnMeta, Label, LabelMeta, ServiceColumn} +import org.apache.s2graph.core.schema.{ColumnMeta, Label, LabelMeta, ServiceColumn} import org.apache.s2graph.core.storage.CanSKeyValue import org.apache.s2graph.core.types.{InnerVal, InnerValLike, InnerValLikeWithTs} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageSerializable.scala index 219d097..20e676a 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageSerializable.scala @@ -20,7 +20,7 @@ package org.apache.s2graph.core.storage.serde import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.mysqls.{ColumnMeta, LabelMeta} +import org.apache.s2graph.core.schema.{ColumnMeta, LabelMeta} import org.apache.s2graph.core.storage.SKeyValue import org.apache.s2graph.core.types.{InnerValLike, InnerValLikeWithTs} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala index 945f246..44ae52f 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala @@ -21,7 +21,7 @@ package org.apache.s2graph.core.storage.serde.indexedge.tall import org.apache.hadoop.hbase.util.Bytes import org.apache.s2graph.core._ -import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn} +import org.apache.s2graph.core.schema.{Label, LabelMeta, ServiceColumn} import org.apache.s2graph.core.storage.CanSKeyValue import org.apache.s2graph.core.storage.serde.Deserializable import org.apache.s2graph.core.storage.serde.StorageDeserializable._ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala index 28982dc..6e467b6 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala @@ -20,7 +20,7 @@ package org.apache.s2graph.core.storage.serde.indexedge.tall import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.mysqls.LabelMeta +import org.apache.s2graph.core.schema.LabelMeta import org.apache.s2graph.core.types.VertexId import org.apache.s2graph.core.{GraphUtil, IndexEdge} import org.apache.s2graph.core.storage.serde.StorageSerializable._ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala index e533b4b..1051c6e 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala @@ -21,7 +21,7 @@ package org.apache.s2graph.core.storage.serde.indexedge.wide import org.apache.hadoop.hbase.util.Bytes import org.apache.s2graph.core._ -import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn} +import org.apache.s2graph.core.schema.{Label, LabelMeta, ServiceColumn} import org.apache.s2graph.core.storage.serde.StorageDeserializable._ import org.apache.s2graph.core.storage._ import org.apache.s2graph.core.storage.serde.Deserializable http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala index 34e9a6e..91127f9 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala @@ -20,7 +20,7 @@ package org.apache.s2graph.core.storage.serde.indexedge.wide import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.mysqls.LabelMeta +import org.apache.s2graph.core.schema.LabelMeta import org.apache.s2graph.core.types.VertexId import org.apache.s2graph.core.{GraphUtil, IndexEdge} import org.apache.s2graph.core.storage.serde.StorageSerializable._ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala index f5c10a7..580acd7 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala @@ -20,7 +20,7 @@ package org.apache.s2graph.core.storage.serde.snapshotedge.tall import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn} +import org.apache.s2graph.core.schema.{Label, LabelMeta, ServiceColumn} import org.apache.s2graph.core.storage.serde.StorageDeserializable._ import org.apache.s2graph.core.storage.CanSKeyValue import org.apache.s2graph.core.types._ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala index 02b2977..7937de8 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala @@ -21,7 +21,7 @@ package org.apache.s2graph.core.storage.serde.snapshotedge.tall import org.apache.hadoop.hbase.util.Bytes import org.apache.s2graph.core.{S2Edge, SnapshotEdge} -import org.apache.s2graph.core.mysqls.LabelIndex +import org.apache.s2graph.core.schema.LabelIndex import org.apache.s2graph.core.storage.serde._ import org.apache.s2graph.core.storage.serde.StorageSerializable._ import org.apache.s2graph.core.types.SourceAndTargetVertexIdPair http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala index 7dec6d9..e0243cc 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala @@ -20,7 +20,7 @@ package org.apache.s2graph.core.storage.serde.snapshotedge.wide import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.mysqls.{Label, LabelMeta} +import org.apache.s2graph.core.schema.{Label, LabelMeta} import org.apache.s2graph.core.storage.serde.StorageDeserializable._ import org.apache.s2graph.core.storage.CanSKeyValue import org.apache.s2graph.core.types.{HBaseType, LabelWithDirection, SourceVertexId, TargetVertexId}