[CARBONDATA-2705][CarbonStore] CarbonStore Java API and Implementation Support two implementations: 1.LocalCarbonStore for usage in local mode 2.DistributedCarbonStore leveraging multiple server (Master and Workers) via RPC
This closes #2473 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/85cdc404 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/85cdc404 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/85cdc404 Branch: refs/heads/carbonstore Commit: 85cdc404598dbcdd0d4cfb055419c39104985483 Parents: 2009009 Author: Jacky Li <jacky.li...@qq.com> Authored: Mon Jul 9 12:23:49 2018 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Wed Jul 18 10:10:54 2018 +0800 ---------------------------------------------------------------------- .../core/datastore/impl/FileFactory.java | 5 + .../schema/table/TableSchemaBuilder.java | 3 +- .../detailquery/SearchModeTestCase.scala | 3 +- .../carbondata/store/SparkCarbonStore.scala | 203 -------- .../apache/carbondata/store/WorkerManager.scala | 75 +++ .../org/apache/spark/sql/CarbonSession.scala | 59 ++- .../carbondata/store/SparkCarbonStoreTest.scala | 86 --- .../processing/loading/DataLoadExecutor.java | 6 +- .../processing/util/CarbonLoaderUtil.java | 2 +- store/core/pom.xml | 5 + .../carbondata/store/CarbonRowReadSupport.java | 53 -- .../apache/carbondata/store/CarbonStore.java | 68 --- .../carbondata/store/LocalCarbonStore.java | 130 ----- .../carbondata/store/MetaCachedCarbonStore.java | 59 --- .../carbondata/store/api/CarbonStore.java | 32 ++ .../store/api/CarbonStoreFactory.java | 93 ++++ .../apache/carbondata/store/api/DataStore.java | 51 ++ .../apache/carbondata/store/api/MetaStore.java | 50 ++ .../apache/carbondata/store/api/SqlStore.java | 34 ++ .../carbondata/store/api/conf/StoreConf.java | 191 +++++++ .../store/api/descriptor/LoadDescriptor.java | 114 ++++ .../store/api/descriptor/SelectDescriptor.java | 111 ++++ .../store/api/descriptor/TableDescriptor.java | 174 +++++++ .../store/api/descriptor/TableIdentifier.java | 37 ++ .../exception/ExecutionTimeoutException.java | 22 + .../store/api/exception/SchedulerException.java | 26 + .../store/api/exception/StoreException.java | 33 ++ .../apache/carbondata/store/conf/StoreConf.java | 185 ------- .../exception/ExecutionTimeoutException.java | 22 - .../store/exception/StoreException.java | 29 -- .../store/exception/WorkerTooBusyException.java | 26 - .../carbondata/store/impl/CarbonStoreBase.java | 177 +++++++ .../store/impl/DistributedCarbonStore.java | 232 +++++++++ .../store/impl/IndexedRecordReader.java | 183 +++++++ .../carbondata/store/impl/LocalCarbonStore.java | 164 ++++++ .../carbondata/store/impl/MetaProcessor.java | 170 ++++++ .../store/impl/SegmentTxnManager.java | 121 +++++ .../apache/carbondata/store/impl/Status.java | 28 + .../carbondata/store/impl/master/Master.java | 161 ++++++ .../store/impl/master/RegistryServiceImpl.java | 53 ++ .../store/impl/master/Schedulable.java | 76 +++ .../carbondata/store/impl/master/Scheduler.java | 137 +++++ .../store/impl/rpc/RegistryService.java | 32 ++ .../store/impl/rpc/ServiceFactory.java | 43 ++ .../carbondata/store/impl/rpc/StoreService.java | 40 ++ .../store/impl/rpc/model/BaseResponse.java | 69 +++ .../store/impl/rpc/model/LoadDataRequest.java | 60 +++ .../store/impl/rpc/model/QueryResponse.java | 73 +++ .../impl/rpc/model/RegisterWorkerRequest.java | 73 +++ .../impl/rpc/model/RegisterWorkerResponse.java | 54 ++ .../carbondata/store/impl/rpc/model/Scan.java | 108 ++++ .../store/impl/rpc/model/ShutdownRequest.java | 53 ++ .../store/impl/rpc/model/ShutdownResponse.java | 61 +++ .../store/impl/worker/RequestHandler.java | 166 ++++++ .../store/impl/worker/StoreServiceImpl.java | 77 +++ .../carbondata/store/impl/worker/Worker.java | 166 ++++++ .../apache/carbondata/store/master/Master.java | 522 ------------------- .../carbondata/store/rpc/RegistryService.java | 32 -- .../carbondata/store/rpc/ServiceFactory.java | 43 -- .../carbondata/store/rpc/StoreService.java | 40 -- .../store/rpc/impl/IndexedRecordReader.java | 183 ------- .../store/rpc/impl/RegistryServiceImpl.java | 54 -- .../store/rpc/impl/RequestHandler.java | 218 -------- .../carbondata/store/rpc/impl/Status.java | 28 - .../store/rpc/impl/StoreServiceImpl.java | 78 --- .../store/rpc/model/BaseResponse.java | 69 --- .../store/rpc/model/LoadDataRequest.java | 60 --- .../store/rpc/model/QueryRequest.java | 108 ---- .../store/rpc/model/QueryResponse.java | 73 --- .../store/rpc/model/RegisterWorkerRequest.java | 73 --- .../store/rpc/model/RegisterWorkerResponse.java | 54 -- .../store/rpc/model/ShutdownRequest.java | 53 -- .../store/rpc/model/ShutdownResponse.java | 61 --- .../carbondata/store/scheduler/Schedulable.java | 74 --- .../carbondata/store/scheduler/Scheduler.java | 136 ----- .../apache/carbondata/store/util/StoreUtil.java | 4 +- .../apache/carbondata/store/worker/Worker.java | 166 ------ .../store/DistributedCarbonStoreTest.java | 142 +++++ .../carbondata/store/LocalCarbonStoreTest.java | 111 +++- .../org/apache/carbondata/store/TestUtil.java | 101 +--- store/core/src/test/resources/data1.csv | 11 + store/horizon/pom.xml | 10 - store/horizon/src/main/anltr/Expression.g4 | 163 ------ store/horizon/src/main/antlr/Expression.g4 | 163 ++++++ .../apache/carbondata/horizon/antlr/Parser.java | 42 ++ .../horizon/antlr/gen/ExpressionLexer.java | 3 - .../horizon/antlr/gen/ExpressionParser.java | 3 - .../horizon/rest/client/HorizonClient.java | 77 +++ .../rest/client/impl/SimpleHorizonClient.java | 87 ++++ .../horizon/rest/controller/Horizon.java | 6 +- .../rest/controller/HorizonController.java | 76 ++- .../rest/model/descriptor/LoadDescriptor.java | 81 --- .../rest/model/descriptor/SelectDescriptor.java | 88 ---- .../rest/model/descriptor/TableDescriptor.java | 90 ---- .../rest/model/validate/RequestValidator.java | 15 +- .../rest/model/view/CreateTableRequest.java | 13 +- .../rest/model/view/DropTableRequest.java | 45 ++ .../horizon/rest/model/view/LoadRequest.java | 8 +- .../horizon/rest/model/view/Request.java | 30 ++ .../horizon/rest/model/view/Response.java | 33 ++ .../horizon/rest/model/view/SelectRequest.java | 20 +- .../horizon/rest/model/view/SelectResponse.java | 18 +- .../horizon/rest/service/HorizonService.java | 162 ------ .../carbondata/horizon/FilterParseTest.java | 18 +- .../apache/carbondata/horizon/HorizonTest.java | 165 +++--- store/horizon/src/test/resources/data1.csv | 11 - .../sdk/file/CarbonWriterBuilder.java | 11 +- .../org/apache/carbondata/sdk/file/Field.java | 65 +-- 108 files changed, 4624 insertions(+), 3937 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java index e353623..9cdda85 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java @@ -220,6 +220,11 @@ public final class FileFactory { final FsPermission permission) throws IOException { return getCarbonFile(filePath).createNewFile(filePath, fileType, doAs, permission); } + + public static boolean deleteFile(String filePath) throws IOException { + return deleteFile(filePath, getFileType(filePath)); + } + public static boolean deleteFile(String filePath, FileType fileType) throws IOException { return getCarbonFile(filePath).deleteFile(filePath, fileType); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java index 6f575a4..80f0aa5 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java @@ -155,8 +155,9 @@ public class TableSchemaBuilder { return schema; } - public void setSortColumns(List<ColumnSchema> sortColumns) { + public TableSchemaBuilder setSortColumns(List<ColumnSchema> sortColumns) { this.sortColumns = sortColumns; + return this; } public ColumnSchema addColumn(StructField field, AtomicInteger valIndex, boolean isSortColumn) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala index af9e50f..0b9b6ba 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala @@ -19,7 +19,7 @@ package org.apache.carbondata.spark.testsuite.detailquery import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.sql.{CarbonSession, Row, SaveMode} -import org.scalatest.BeforeAndAfterAll +import org.scalatest.{BeforeAndAfterAll, Ignore} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties @@ -28,7 +28,6 @@ import org.apache.carbondata.spark.util.DataGenerator /** * Test Suite for search mode */ - class SearchModeTestCase extends QueryTest with BeforeAndAfterAll { val numRows = 500 * 1000 http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala deleted file mode 100644 index a4124a2..0000000 --- a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store - -import java.io.IOException -import java.net.InetAddress - -import scala.collection.JavaConverters._ - -import org.apache.spark.{CarbonInputMetrics, SparkConf, SparkEnv} -import org.apache.spark.sql.CarbonSession._ -import org.apache.spark.sql.SparkSession - -import org.apache.carbondata.common.annotations.InterfaceAudience -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datastore.row.CarbonRow -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.scan.expression.Expression -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.hadoop.CarbonProjection -import org.apache.carbondata.spark.rdd.CarbonScanRDD -import org.apache.carbondata.spark.util.Util -import org.apache.carbondata.store.conf.StoreConf -import org.apache.carbondata.store.master.Master -import org.apache.carbondata.store.worker.Worker - -/** - * A CarbonStore implementation that uses Spark as underlying compute engine - * with CarbonData query optimization capability - */ -@InterfaceAudience.Internal -class SparkCarbonStore extends MetaCachedCarbonStore { - private var session: SparkSession = _ - private var master: Master = _ - private final val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - - /** - * Initialize SparkCarbonStore - * @param storeName store name - * @param storeLocation location to store data - */ - def this(storeName: String, storeLocation: String) = { - this() - val sparkConf = new SparkConf(loadDefaults = true) - session = SparkSession.builder - .config(sparkConf) - .appName("SparkCarbonStore-" + storeName) - .config("spark.sql.warehouse.dir", storeLocation) - .getOrCreateCarbonSession() - } - - def this(sparkSession: SparkSession) = { - this() - session = sparkSession - } - - @throws[IOException] - override def scan( - path: String, - projectColumns: Array[String]): java.util.Iterator[CarbonRow] = { - require(path != null) - require(projectColumns != null) - scan(path, projectColumns, null) - } - - @throws[IOException] - override def scan( - path: String, - projectColumns: Array[String], - filter: Expression): java.util.Iterator[CarbonRow] = { - require(path != null) - require(projectColumns != null) - val table = getTable(path) - val rdd = new CarbonScanRDD[CarbonRow]( - spark = session, - columnProjection = new CarbonProjection(projectColumns), - filterExpression = filter, - identifier = table.getAbsoluteTableIdentifier, - serializedTableInfo = table.getTableInfo.serialize, - tableInfo = table.getTableInfo, - inputMetricsStats = new CarbonInputMetrics, - partitionNames = null, - dataTypeConverterClz = null, - readSupportClz = classOf[CarbonRowReadSupport]) - rdd.collect - .iterator - .asJava - } - - @throws[IOException] - override def sql(sqlString: String): java.util.Iterator[CarbonRow] = { - val df = session.sql(sqlString) - df.rdd - .map(row => new CarbonRow(row.toSeq.toArray.asInstanceOf[Array[Object]])) - .collect() - .iterator - .asJava - } - - def startSearchMode(): Unit = { - LOG.info("Starting search mode master") - val conf = new StoreConf() - conf.conf(StoreConf.MASTER_HOST, InetAddress.getLocalHost.getHostAddress) - conf.conf(StoreConf.MASTER_PORT, CarbonProperties.getSearchMasterPort) - conf.conf(StoreConf.STORE_LOCATION, CarbonProperties.getStorePath) - master = Master.getInstance(conf) - master.startService() - startAllWorkers() - } - - def stopSearchMode(): Unit = { - if (master != null) { - LOG.info("Shutting down all workers...") - try { - master.stopAllWorkers() - LOG.info("All workers are shut down") - } catch { - case e: Exception => - LOG.error(s"failed to shutdown worker: ${ e.toString }") - } - LOG.info("Stopping master...") - master.stopService() - LOG.info("Master stopped") - master = null - } - } - - /** search mode */ - def search( - table: CarbonTable, - projectColumns: Array[String], - filter: Expression, - globalLimit: Long, - localLimit: Long): java.util.Iterator[CarbonRow] = { - if (master == null) { - throw new IllegalStateException("search mode is not started") - } - master.search(table, projectColumns, filter, globalLimit, localLimit) - .iterator - .asJava - } - - private def startAllWorkers(): Array[Int] = { - // TODO: how to ensure task is sent to every executor? - val numExecutors = session.sparkContext.getExecutorMemoryStatus.keySet.size - val masterIp = InetAddress.getLocalHost.getHostAddress - val rows = session.sparkContext.parallelize(1 to numExecutors * 10, numExecutors) - .mapPartitions { f => - // start worker - val conf = new StoreConf() - conf.conf(StoreConf.WORKER_HOST, InetAddress.getLocalHost.getHostAddress) - conf.conf(StoreConf.WORKER_PORT, CarbonProperties.getSearchWorkerPort) - conf.conf(StoreConf.WORKER_CORE_NUM, 2) - conf.conf(StoreConf.STORE_LOCATION, CarbonProperties.getStorePath) - conf.conf(StoreConf.MASTER_HOST, masterIp) - conf.conf(StoreConf.MASTER_PORT, CarbonProperties.getSearchMasterPort) - - var storeLocation: String = null - val carbonUseLocalDir = CarbonProperties.getInstance() - .getProperty("carbon.use.local.dir", "false") - if (carbonUseLocalDir.equalsIgnoreCase("true")) { - - val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf) - if (null != storeLocations && storeLocations.nonEmpty) { - storeLocation = storeLocations.mkString(",") - } - if (storeLocation == null) { - storeLocation = System.getProperty("java.io.tmpdir") - } - } else { - storeLocation = System.getProperty("java.io.tmpdir") - } - conf.conf(StoreConf.STORE_TEMP_LOCATION, storeLocation) - - val worker = new Worker(conf) - worker.start() - new Iterator[Int] { - override def hasNext: Boolean = false - - override def next(): Int = 1 - } - }.collect() - LOG.info(s"Tried to start $numExecutors workers, ${master.getWorkers.size} " + - s"workers are started successfully") - rows - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/integration/spark2/src/main/scala/org/apache/carbondata/store/WorkerManager.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/store/WorkerManager.scala b/integration/spark2/src/main/scala/org/apache/carbondata/store/WorkerManager.scala new file mode 100644 index 0000000..7fff2e5 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/carbondata/store/WorkerManager.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store + +import java.net.InetAddress + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.SparkSession + +import org.apache.carbondata.common.annotations.InterfaceAudience +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.spark.util.Util +import org.apache.carbondata.store.api.CarbonStoreFactory +import org.apache.carbondata.store.api.conf.StoreConf +import org.apache.carbondata.store.impl.worker.Worker + +/** + * A CarbonStore implementation that uses Spark as underlying compute engine + * with CarbonData query optimization capability + */ +@InterfaceAudience.Internal +object WorkerManager { + private final val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + // TODO: how to ensure task is sent to every executor? + def startAllWorker(session: SparkSession, storeConf: StoreConf): Unit = { + val numExecutors = session.sparkContext.getExecutorMemoryStatus.keySet.size + LOG.info("Starting search mode master") + val rows = session.sparkContext.parallelize(1 to numExecutors * 10, numExecutors) + .mapPartitions { f => + // start worker + var storeLocation: String = null + val carbonUseLocalDir = CarbonProperties.getInstance() + .getProperty("carbon.use.local.dir", "false") + if (carbonUseLocalDir.equalsIgnoreCase("true")) { + val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf) + if (null != storeLocations && storeLocations.nonEmpty) { + storeLocation = storeLocations.mkString(",") + } + if (storeLocation == null) { + storeLocation = System.getProperty("java.io.tmpdir") + } + } else { + storeLocation = System.getProperty("java.io.tmpdir") + } + storeConf.conf(StoreConf.STORE_TEMP_LOCATION, storeLocation) + + val worker = new Worker(storeConf) + worker.start() + new Iterator[Int] { + override def hasNext: Boolean = false + + override def next(): Int = 1 + } + }.collect() + LOG.info(s"Tried to start $numExecutors workers, $rows workers started successfully") + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index 5ab0d29..e9a0634 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql import java.io.File +import java.net.InetAddress import java.util.concurrent.atomic.AtomicLong import scala.collection.JavaConverters._ @@ -41,7 +42,10 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, ThreadLocalSessionInfo} import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil -import org.apache.carbondata.store.SparkCarbonStore +import org.apache.carbondata.store.WorkerManager +import org.apache.carbondata.store.api.{CarbonStore, CarbonStoreFactory} +import org.apache.carbondata.store.api.conf.StoreConf +import org.apache.carbondata.store.api.descriptor.{SelectDescriptor, TableIdentifier => CTableIdentifier} import org.apache.carbondata.streaming.CarbonStreamingQueryListener /** @@ -120,7 +124,7 @@ class CarbonSession(@transient val sc: SparkContext, message(0).getString(0).contains(dataMapName) } - def isSearchModeEnabled: Boolean = carbonStore != null + def isSearchModeEnabled: Boolean = store != null /** * Run SparkSQL directly @@ -192,24 +196,34 @@ class CarbonSession(@transient val sc: SparkContext, } } - @transient private var carbonStore: SparkCarbonStore = _ + // variable that used in search mode + @transient private var store: CarbonStore = _ def startSearchMode(): Unit = { - CarbonProperties.enableSearchMode(true) - CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "false") - if (carbonStore == null) { - carbonStore = new SparkCarbonStore(this) - carbonStore.startSearchMode() + if (store == null) { + val storeConf = new StoreConf() + storeConf.conf(StoreConf.STORE_LOCATION, CarbonProperties.getStorePath) + storeConf.conf(StoreConf.MASTER_HOST, InetAddress.getLocalHost.getHostAddress) + storeConf.conf(StoreConf.MASTER_PORT, CarbonProperties.getSearchMasterPort) + storeConf.conf(StoreConf.WORKER_HOST, InetAddress.getLocalHost.getHostAddress) + storeConf.conf(StoreConf.WORKER_PORT, CarbonProperties.getSearchWorkerPort) + storeConf.conf(StoreConf.WORKER_CORE_NUM, 2) + + store = CarbonStoreFactory.getDistributedStore("GlobalStore", storeConf) + CarbonProperties.enableSearchMode(true) + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "false") + WorkerManager.startAllWorker(this, storeConf) } } def stopSearchMode(): Unit = { - CarbonProperties.enableSearchMode(false) - CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "true") - if (carbonStore != null) { + if (store != null) { + CarbonProperties.enableSearchMode(false) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "true") try { - carbonStore.stopSearchMode() - carbonStore = null + CarbonStoreFactory.removeDistributedStore("GlobalStore") + store = null } catch { case e: RuntimeException => LogServiceFactory.getLogService(this.getClass.getCanonicalName) @@ -226,16 +240,21 @@ class CarbonSession(@transient val sc: SparkContext, relation: LogicalRelation, maxRows: Option[Long] = None, localMaxRows: Option[Long] = None): DataFrame = { - val rows = carbonStore.search( - relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable, - columns.map(_.name).toArray, - if (expr != null) CarbonFilters.transformExpression(expr) else null, - maxRows.getOrElse(Long.MaxValue), - localMaxRows.getOrElse(Long.MaxValue)) + val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable + val select = new SelectDescriptor( + new CTableIdentifier(table.getTableName, table.getDatabaseName), + columns.map(_.name).toArray, + if (expr != null) CarbonFilters.transformExpression(expr) else null, + localMaxRows.getOrElse(Long.MaxValue) + ) + val rows = store.select(select).iterator() val output = new java.util.ArrayList[Row]() - while (rows.hasNext) { + val maxRowCount = maxRows.getOrElse(Long.MaxValue) + var rowCount = 0 + while (rows.hasNext && rowCount < maxRowCount) { val row = rows.next() output.add(Row.fromSeq(row.getData)) + rowCount = rowCount + 1 } createDataFrame(output, logicalPlan.schema) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/integration/spark2/src/test/scala/org/apache/carbondata/store/SparkCarbonStoreTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/store/SparkCarbonStoreTest.scala b/integration/spark2/src/test/scala/org/apache/carbondata/store/SparkCarbonStoreTest.scala deleted file mode 100644 index d389670..0000000 --- a/integration/spark2/src/test/scala/org/apache/carbondata/store/SparkCarbonStoreTest.scala +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store - -import org.apache.spark.sql.{CarbonEnv, Row} -import org.apache.spark.sql.test.util.QueryTest -import org.scalatest.BeforeAndAfterAll - -import org.apache.carbondata.core.metadata.datatype.DataTypes -import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression -import org.apache.carbondata.core.scan.expression.{ColumnExpression, LiteralExpression} - -class SparkCarbonStoreTest extends QueryTest with BeforeAndAfterAll { - - private var store: CarbonStore = _ - - override def beforeAll { - sql("DROP TABLE IF EXISTS t1") - sql("CREATE TABLE t1 (" + - "empno int, empname String, designation String, doj Timestamp, " + - "workgroupcategory int, workgroupcategoryname String, deptno int, deptname String," + - "projectcode int, projectjoindate Timestamp, projectenddate Timestamp," + - "attendance int,utilization int,salary int)" + - "STORED BY 'org.apache.carbondata.format'") - sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE t1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""") - - store = new SparkCarbonStore("test", storeLocation) - } - - test("test CarbonStore.get, compare projection result") { - val tablePath = CarbonEnv.getCarbonTable(None, "t1")(sqlContext.sparkSession).getTablePath - val rows = store.scan(s"$tablePath", Seq("empno", "empname").toArray) - val sparkResult: Array[Row] = sql("select empno, empname from t1").collect() - sparkResult.zipWithIndex.foreach { case (r: Row, i: Int) => - val carbonRow = rows.next() - assertResult(r.get(0))(carbonRow.getData()(0)) - assertResult(r.get(1))(carbonRow.getData()(1)) - } - assert(!rows.hasNext) - } - - test("test CarbonStore.get, compare projection and filter result") { - val tablePath = CarbonEnv.getCarbonTable(None, "t1")(sqlContext.sparkSession).getTablePath - val filter = new EqualToExpression( - new ColumnExpression("empno", DataTypes.INT), - new LiteralExpression(10, DataTypes.INT)) - val rows = store.scan(s"$tablePath", Seq("empno", "empname").toArray, filter) - val sparkResult: Array[Row] = sql("select empno, empname from t1 where empno = 10").collect() - sparkResult.zipWithIndex.foreach { case (r: Row, i: Int) => - val carbonRow = rows.next() - assertResult(r.get(0))(carbonRow.getData()(0)) - assertResult(r.get(1))(carbonRow.getData()(1)) - } - assert(!rows.hasNext) - } - - test("test CarbonStore.sql") { - val rows = store.sql("select empno, empname from t1 where empno = 10") - val sparkResult: Array[Row] = sql("select empno, empname from t1 where empno = 10").collect() - sparkResult.zipWithIndex.foreach { case (r: Row, i: Int) => - val carbonRow = rows.next() - assertResult(r.get(0))(carbonRow.getData()(0)) - assertResult(r.get(1))(carbonRow.getData()(1)) - } - assert(!rows.hasNext) - } - - override def afterAll { - sql("DROP TABLE IF EXISTS t1") - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java index fc5c41f..65d72cc 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java @@ -39,11 +39,11 @@ public class DataLoadExecutor { private boolean isClosed; - public void execute(CarbonLoadModel loadModel, String[] storeLocation, - CarbonIterator<Object[]>[] inputIterators) throws Exception { + public void execute(CarbonLoadModel loadModel, String[] tempLocation, + CarbonIterator<Object[]>[] inputIterators) { try { loadProcessorStep = - new DataLoadProcessBuilder().build(loadModel, storeLocation, inputIterators); + new DataLoadProcessBuilder().build(loadModel, tempLocation, inputIterators); // 1. initialize loadProcessorStep.initialize(); LOGGER.info("Data Loading is started for table " + loadModel.getTableName()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index 0ff71c7..0f3abb6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -578,7 +578,7 @@ public final class CarbonLoaderUtil { * based on block location information * @param blockAssignmentStrategy strategy used to assign blocks * @param expectedMinSizePerNode expected minimum size per node - * @return a map that maps node to blocks + * @return a map that maps node (hostname) to blocks */ public static Map<String, List<Distributable>> nodeBlockMapping( List<Distributable> blockInfos, int noOfNodesInput, List<String> activeNodes, http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/pom.xml ---------------------------------------------------------------------- diff --git a/store/core/pom.xml b/store/core/pom.xml index 6b2703e..44d5ab1 100644 --- a/store/core/pom.xml +++ b/store/core/pom.xml @@ -26,6 +26,11 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.antlr</groupId> + <artifactId>antlr4-runtime</artifactId> + <version>4.7.1</version> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java b/store/core/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java deleted file mode 100644 index bafbb9f..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store; - -import java.io.IOException; - -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; -import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; -import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport; - -/** - * ReadSupport that convert row object to CarbonRow - */ -@InterfaceAudience.Internal -public class CarbonRowReadSupport implements CarbonReadSupport<CarbonRow> { - private CarbonReadSupport<Object[]> delegate; - - public CarbonRowReadSupport() { - this.delegate = new DictionaryDecodeReadSupport<>(); - } - - @Override public void initialize(CarbonColumn[] carbonColumns, CarbonTable carbonTable) - throws IOException { - delegate.initialize(carbonColumns, carbonTable); - } - - @Override public CarbonRow readRow(Object[] data) { - Object[] converted = delegate.readRow(data); - return new CarbonRow(converted); - } - - @Override public void close() { - delegate.close(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/CarbonStore.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/CarbonStore.java b/store/core/src/main/java/org/apache/carbondata/store/CarbonStore.java deleted file mode 100644 index c6b2fb8..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/CarbonStore.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Iterator; - -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.common.annotations.InterfaceStability; -import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.core.scan.expression.Expression; - -/** - * User can use {@link CarbonStore} to query data - */ -@InterfaceAudience.User -@InterfaceStability.Unstable -public interface CarbonStore extends Closeable { - - /** - * Scan query on the data in the table path - * @param path table path - * @param projectColumns column names to read - * @return rows - * @throws IOException if unable to read files in table path - */ - Iterator<CarbonRow> scan( - String path, - String[] projectColumns) throws IOException; - - /** - * Scan query with filter, on the data in the table path - * @param path table path - * @param projectColumns column names to read - * @param filter filter condition, can be null - * @return rows that satisfy filter condition - * @throws IOException if unable to read files in table path - */ - Iterator<CarbonRow> scan( - String path, - String[] projectColumns, - Expression filter) throws IOException; - - /** - * SQL query, table should be created before calling this function - * @param sqlString SQL statement - * @return rows - * @throws IOException if unable to read files in table path - */ - Iterator<CarbonRow> sql(String sqlString) throws IOException; - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java b/store/core/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java deleted file mode 100644 index daa1447..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Objects; - -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.scan.expression.Expression; -import org.apache.carbondata.hadoop.CarbonProjection; -import org.apache.carbondata.hadoop.api.CarbonInputFormat; -import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.task.JobContextImpl; -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; - -/** - * A CarbonStore implementation that works locally, without other compute framework dependency. - * It can be used to read data in local disk. - * - * Note that this class is experimental, it is not intended to be used in production. - */ -@InterfaceAudience.Internal -class LocalCarbonStore extends MetaCachedCarbonStore { - - private static final LogService LOGGER = - LogServiceFactory.getLogService(LocalCarbonStore.class.getName()); - - @Override - public Iterator<CarbonRow> scan(String path, String[] projectColumns) throws IOException { - return scan(path, projectColumns, null); - } - - @Override public Iterator<CarbonRow> scan(String path, String[] projectColumns, Expression filter) - throws IOException { - Objects.requireNonNull(path); - Objects.requireNonNull(projectColumns); - - CarbonTable table = getTable(path); - if (table.isStreamingSink() || table.isHivePartitionTable()) { - throw new UnsupportedOperationException("streaming and partition table is not supported"); - } - // TODO: use InputFormat to prune data and read data - - final CarbonTableInputFormat format = new CarbonTableInputFormat(); - final Job job = new Job(new Configuration()); - CarbonInputFormat.setTableInfo(job.getConfiguration(), table.getTableInfo()); - CarbonInputFormat.setTablePath(job.getConfiguration(), table.getTablePath()); - CarbonInputFormat.setTableName(job.getConfiguration(), table.getTableName()); - CarbonInputFormat.setDatabaseName(job.getConfiguration(), table.getDatabaseName()); - CarbonInputFormat.setCarbonReadSupport(job.getConfiguration(), CarbonRowReadSupport.class); - CarbonInputFormat - .setColumnProjection(job.getConfiguration(), new CarbonProjection(projectColumns)); - if (filter != null) { - CarbonInputFormat.setFilterPredicates(job.getConfiguration(), filter); - } - - final List<InputSplit> splits = - format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID())); - - List<RecordReader<Void, Object>> readers = new ArrayList<>(splits.size()); - - List<CarbonRow> rows = new ArrayList<>(); - - try { - for (InputSplit split : splits) { - TaskAttemptContextImpl attempt = - new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID()); - RecordReader reader = format.createRecordReader(split, attempt); - reader.initialize(split, attempt); - readers.add(reader); - } - - for (RecordReader<Void, Object> reader : readers) { - while (reader.nextKeyValue()) { - rows.add((CarbonRow) reader.getCurrentValue()); - } - try { - reader.close(); - } catch (IOException e) { - LOGGER.error(e); - } - } - } catch (InterruptedException e) { - throw new IOException(e); - } finally { - for (RecordReader<Void, Object> reader : readers) { - try { - reader.close(); - } catch (IOException e) { - LOGGER.error(e); - } - } - } - return rows.iterator(); - } - - @Override - public Iterator<CarbonRow> sql(String sqlString) throws IOException { - throw new UnsupportedOperationException(); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java b/store/core/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java deleted file mode 100644 index e43f750..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.core.metadata.converter.SchemaConverter; -import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.TableInfo; -import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonTablePath; - -/** - * A CarbonStore base class that caches CarbonTable object - */ -@InterfaceAudience.Internal -abstract class MetaCachedCarbonStore implements CarbonStore { - - // mapping of table path to CarbonTable object - private Map<String, CarbonTable> cache = new HashMap<>(); - - CarbonTable getTable(String path) throws IOException { - if (cache.containsKey(path)) { - return cache.get(path); - } - org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil - .readSchemaFile(CarbonTablePath.getSchemaFilePath(path)); - SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); - TableInfo tableInfo1 = schemaConverter.fromExternalToWrapperTableInfo(tableInfo, "", "", ""); - tableInfo1.setTablePath(path); - CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo1); - cache.put(path, table); - return table; - } - - @Override - public void close() throws IOException { - cache.clear(); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStore.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStore.java b/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStore.java new file mode 100644 index 0000000..3525389 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStore.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.api; + +import java.io.Closeable; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; + +/** + * Public Interface of CarbonStore + */ +@InterfaceAudience.User +@InterfaceStability.Unstable +public interface CarbonStore extends MetaStore, DataStore, Closeable { + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStoreFactory.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStoreFactory.java b/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStoreFactory.java new file mode 100644 index 0000000..76ef450 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStoreFactory.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.api; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.carbondata.store.api.conf.StoreConf; +import org.apache.carbondata.store.api.exception.StoreException; + +public class CarbonStoreFactory { + private static Map<String, CarbonStore> distributedStores = new ConcurrentHashMap<>(); + private static Map<String, CarbonStore> localStores = new ConcurrentHashMap<>(); + + private CarbonStoreFactory() { + } + + public static CarbonStore getDistributedStore(String storeName, StoreConf storeConf) + throws StoreException { + if (distributedStores.containsKey(storeName)) { + return distributedStores.get(storeName); + } + + // create a new instance + try { + String className = "org.apache.carbondata.store.impl.DistributedCarbonStore"; + CarbonStore store = createCarbonStore(storeConf, className); + distributedStores.put(storeName, store); + return store; + } catch (ClassNotFoundException | IllegalAccessException | InvocationTargetException | + InstantiationException e) { + throw new StoreException(e); + } + } + + public static void removeDistributedStore(String storeName) throws IOException { + if (distributedStores.containsKey(storeName)) { + distributedStores.get(storeName).close(); + distributedStores.remove(storeName); + } + } + + public static CarbonStore getLocalStore(String storeName, StoreConf storeConf) + throws StoreException { + if (localStores.containsKey(storeName)) { + return localStores.get(storeName); + } + + // create a new instance + try { + String className = "org.apache.carbondata.store.impl.LocalCarbonStore"; + CarbonStore store = createCarbonStore(storeConf, className); + localStores.put(storeName, store); + return store; + } catch (ClassNotFoundException | IllegalAccessException | InvocationTargetException | + InstantiationException e) { + throw new StoreException(e); + } + } + + public static void removeLocalStore(String storeName) throws IOException { + if (localStores.containsKey(storeName)) { + localStores.get(storeName).close(); + localStores.remove(storeName); + } + } + + private static CarbonStore createCarbonStore(StoreConf storeConf, String className) + throws ClassNotFoundException, InstantiationException, IllegalAccessException, + InvocationTargetException { + Constructor[] constructor = Class.forName(className).getDeclaredConstructors(); + constructor[0].setAccessible(true); + return (CarbonStore) constructor[0].newInstance(storeConf); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/api/DataStore.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/DataStore.java b/store/core/src/main/java/org/apache/carbondata/store/api/DataStore.java new file mode 100644 index 0000000..d35c133 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/api/DataStore.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.api; + +import java.io.IOException; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.store.api.descriptor.LoadDescriptor; +import org.apache.carbondata.store.api.descriptor.SelectDescriptor; +import org.apache.carbondata.store.api.exception.StoreException; + +/** + * Public interface to write and read data in CarbonStore + */ +@InterfaceAudience.User +@InterfaceStability.Unstable +public interface DataStore { + + /** + * Load data into a Table + * @param load descriptor for load operation + * @throws IOException if network or disk IO error occurs + */ + void loadData(LoadDescriptor load) throws IOException, StoreException; + + /** + * Scan a Table and return matched rows + * @param select descriptor for scan operation, including required column, filter, etc + * @return matched rows + * @throws IOException if network or disk IO error occurs + */ + List<CarbonRow> select(SelectDescriptor select) throws IOException, StoreException; +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/api/MetaStore.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/MetaStore.java b/store/core/src/main/java/org/apache/carbondata/store/api/MetaStore.java new file mode 100644 index 0000000..dea6873 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/api/MetaStore.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.api; + +import java.io.IOException; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.store.api.descriptor.TableDescriptor; +import org.apache.carbondata.store.api.descriptor.TableIdentifier; +import org.apache.carbondata.store.api.exception.StoreException; + +/** + * Public interface to manage table in CarbonStore + */ +@InterfaceAudience.User +@InterfaceStability.Unstable +public interface MetaStore { + /** + * Create a Table + * @param table descriptor for create table operation + * @throws IOException if network or disk IO error occurs + */ + void createTable(TableDescriptor table) throws IOException, StoreException; + + /** + * Drop a Table, and remove all data in it + * @param table table identifier + * @throws IOException if network or disk IO error occurs + */ + void dropTable(TableIdentifier table) throws IOException; + + CarbonTable getTable(TableIdentifier table) throws IOException; +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/api/SqlStore.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/SqlStore.java b/store/core/src/main/java/org/apache/carbondata/store/api/SqlStore.java new file mode 100644 index 0000000..3f52eed --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/api/SqlStore.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.api; + +import java.io.IOException; +import java.util.List; + +import org.apache.carbondata.core.datastore.row.CarbonRow; + +public interface SqlStore { + + /** + * Executor a SQL statement + * @param sqlString SQL statement + * @return matched rows + * @throws IOException if network or disk IO error occurs + */ + List<CarbonRow> sql(String sqlString) throws IOException; +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/api/conf/StoreConf.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/conf/StoreConf.java b/store/core/src/main/java/org/apache/carbondata/store/api/conf/StoreConf.java new file mode 100644 index 0000000..98a670a --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/api/conf/StoreConf.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.api.conf; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.store.util.StoreUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +public class StoreConf implements Serializable, Writable { + + public static final String SELECT_PROJECTION = "carbon.select.projection"; + public static final String SELECT_FILTER = "carbon.select.filter"; + public static final String SELECT_LIMIT = "carbon.select.limit"; + + public static final String SELECT_ID = "carbon.select.id"; + + public static final String WORKER_HOST = "carbon.worker.host"; + public static final String WORKER_PORT = "carbon.worker.port"; + public static final String WORKER_CORE_NUM = "carbon.worker.core.num"; + public static final String MASTER_HOST = "carbon.master.host"; + public static final String MASTER_PORT = "carbon.master.port"; + + public static final String STORE_TEMP_LOCATION = "carbon.store.temp.location"; + public static final String STORE_LOCATION = "carbon.store.location"; + public static final String STORE_NAME = "carbon.store.name"; + + private Map<String, String> conf = new HashMap<>(); + + public StoreConf() { + } + + public StoreConf(String storeName, String storeLocation) { + conf.put(STORE_NAME, storeName); + conf.put(STORE_LOCATION, storeLocation); + } + + public StoreConf(String confFilePath) { + load(confFilePath); + } + + public StoreConf conf(String key, String value) { + conf.put(key, value); + return this; + } + + public StoreConf conf(String key, int value) { + conf.put(key, "" + value); + return this; + } + + public void load(String filePath) { + StoreUtil.loadProperties(filePath, this); + } + + public void conf(StoreConf conf) { + this.conf.putAll(conf.conf); + } + + public Object conf(String key) { + return conf.get(key); + } + + public String[] projection() { + return stringArrayValue(SELECT_PROJECTION); + } + + public String filter() { + return stringValue(SELECT_FILTER); + } + + public int limit() { + return intValue(SELECT_LIMIT); + } + + public String masterHost() { + return stringValue(MASTER_HOST); + } + + public int masterPort() { + return intValue(MASTER_PORT); + } + + public String workerHost() { + return stringValue(WORKER_HOST); + } + + public int workerPort() { + return intValue(WORKER_PORT); + } + + public int workerCoreNum() { + return intValue(WORKER_CORE_NUM); + } + + public String storeLocation() { + return stringValue(STORE_LOCATION); + } + + public String[] storeTempLocation() { + return stringArrayValue(STORE_TEMP_LOCATION); + } + + public String selectId() { + return stringValue(SELECT_ID); + } + + public Configuration newHadoopConf() { + Configuration hadoopConf = FileFactory.getConfiguration(); + for (Map.Entry<String, String> entry : conf.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if (key != null && value != null && key.startsWith("carbon.hadoop.")) { + hadoopConf.set(key.substring("carbon.hadoop.".length()), value); + } + } + return hadoopConf; + } + + private String stringValue(String key) { + Object obj = conf.get(key); + if (obj == null) { + return null; + } + return obj.toString(); + } + + private int intValue(String key) { + String value = conf.get(key); + if (value == null) { + return -1; + } + return Integer.parseInt(value); + } + + private String[] stringArrayValue(String key) { + String value = conf.get(key); + if (value == null) { + return null; + } + return value.split(",", -1); + } + + @Override public void write(DataOutput out) throws IOException { + Set<Map.Entry<String, String>> entries = conf.entrySet(); + WritableUtils.writeVInt(out, conf.size()); + for (Map.Entry<String, String> entry : entries) { + WritableUtils.writeString(out, entry.getKey()); + WritableUtils.writeString(out, entry.getValue()); + } + } + + @Override public void readFields(DataInput in) throws IOException { + if (conf == null) { + conf = new HashMap<>(); + } + + int size = WritableUtils.readVInt(in); + String key, value; + for (int i = 0; i < size; i++) { + key = WritableUtils.readString(in); + value = WritableUtils.readString(in); + conf.put(key, value); + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/LoadDescriptor.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/LoadDescriptor.java b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/LoadDescriptor.java new file mode 100644 index 0000000..c3a4ff7 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/LoadDescriptor.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.api.descriptor; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +public class LoadDescriptor { + + private TableIdentifier table; + private String inputPath; + private Map<String, String> options; + private boolean isOverwrite; + + private LoadDescriptor() { + } + + public LoadDescriptor(TableIdentifier table, String inputPath, + Map<String, String> options, boolean isOverwrite) { + Objects.requireNonNull(table); + Objects.requireNonNull(inputPath); + this.table = table; + this.inputPath = inputPath; + this.options = options; + this.isOverwrite = isOverwrite; + } + + public TableIdentifier getTable() { + return table; + } + + public void setTable(TableIdentifier table) { + this.table = table; + } + + public String getInputPath() { + return inputPath; + } + + public void setInputPath(String inputPath) { + this.inputPath = inputPath; + } + + public Map<String, String> getOptions() { + return options; + } + + public void setOptions(Map<String, String> options) { + this.options = options; + } + + public boolean isOverwrite() { + return isOverwrite; + } + + public void setOverwrite(boolean overwrite) { + isOverwrite = overwrite; + } + + public static class Builder { + private LoadDescriptor load; + private Map<String, String> options; + + private Builder() { + load = new LoadDescriptor(); + options = new HashMap<>(); + } + + public Builder table(TableIdentifier tableIdentifier) { + load.setTable(tableIdentifier); + return this; + } + + public Builder overwrite(boolean isOverwrite) { + load.setOverwrite(isOverwrite); + return this; + } + + public Builder inputPath(String inputPath) { + load.setInputPath(inputPath); + return this; + } + + public Builder options(String key, String value) { + options.put(key, value); + return this; + } + + public LoadDescriptor create() { + load.setOptions(options); + return load; + } + } + + public static Builder builder() { + return new Builder(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/SelectDescriptor.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/SelectDescriptor.java b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/SelectDescriptor.java new file mode 100644 index 0000000..c3627a9 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/SelectDescriptor.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.api.descriptor; + +import java.util.Objects; + +import org.apache.carbondata.core.scan.expression.Expression; + +public class SelectDescriptor { + + private TableIdentifier table; + private String[] projection; + private Expression filter; + private long limit; + + private SelectDescriptor() { + } + + public SelectDescriptor(TableIdentifier table, String[] projection, + Expression filter, long limit) { + Objects.requireNonNull(table); + Objects.requireNonNull(projection); + this.table = table; + this.projection = projection; + this.filter = filter; + this.limit = limit; + } + + public TableIdentifier getTable() { + return table; + } + + public void setTable(TableIdentifier table) { + this.table = table; + } + + public String[] getProjection() { + return projection; + } + + public void setProjection(String[] projection) { + this.projection = projection; + } + + public Expression getFilter() { + return filter; + } + + public void setFilter(Expression filter) { + this.filter = filter; + } + + public long getLimit() { + return limit; + } + + public void setLimit(long limit) { + this.limit = limit; + } + + public static class Builder { + private SelectDescriptor select; + + private Builder() { + select = new SelectDescriptor(); + } + + public Builder table(TableIdentifier tableIdentifier) { + select.setTable(tableIdentifier); + return this; + } + + public Builder select(String... columnNames) { + select.setProjection(columnNames); + return this; + } + + public Builder filter(Expression filter) { + select.setFilter(filter); + return this; + } + + public Builder limit(long limit) { + select.setLimit(limit); + return this; + } + + public SelectDescriptor create() { + return select; + } + } + + public static Builder builder() { + return new Builder(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableDescriptor.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableDescriptor.java b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableDescriptor.java new file mode 100644 index 0000000..2d677a8 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableDescriptor.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.api.descriptor; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.sdk.file.Field; +import org.apache.carbondata.sdk.file.Schema; + +public class TableDescriptor { + + private boolean ifNotExists; + private TableIdentifier table; + private String tablePath; + private Schema schema; + private Map<String, String> properties; + private String comment; + + private TableDescriptor() { + } + + public TableDescriptor(TableIdentifier table, Schema schema, + Map<String, String> properties, String tablePath, String comment, boolean ifNotExists) { + Objects.requireNonNull(table); + Objects.requireNonNull(schema); + this.table = table; + this.ifNotExists = ifNotExists; + this.schema = schema; + this.properties = properties; + this.tablePath = tablePath; + this.comment = comment; + } + + public boolean isIfNotExists() { + return ifNotExists; + } + + public void setIfNotExists(boolean ifNotExists) { + this.ifNotExists = ifNotExists; + } + + public TableIdentifier getTable() { + return table; + } + + public void setTable(TableIdentifier table) { + this.table = table; + } + + public Schema getSchema() { + return schema; + } + + public void setSchema(Schema schema) { + this.schema = schema; + } + + public Map<String, String> getProperties() { + return properties; + } + + public void setProperties(Map<String, String> properties) { + this.properties = properties; + } + + public String getComment() { + return comment; + } + + public void setComment(String comment) { + this.comment = comment; + } + + public void setTablePath(String tablePath) { + this.tablePath = tablePath; + } + + public String getTablePath() { + return tablePath; + } + + public static class Builder { + + private TableDescriptor table; + private List<Field> fields; + private Map<String, String> tblProperties; + + private Builder() { + table = new TableDescriptor(); + fields = new ArrayList<>(); + tblProperties = new HashMap<>(); + } + + public Builder ifNotExists() { + table.setIfNotExists(true); + return this; + } + + public Builder table(TableIdentifier tableId) { + table.setTable(tableId); + return this; + } + + public Builder tablePath(String tablePath) { + table.setTablePath(tablePath); + return this; + } + + public Builder comment(String tableComment) { + table.setComment(tableComment); + return this; + } + + public Builder column(String name, DataType dataType) { + fields.add(new Field(name, dataType)); + return this; + } + + public Builder column(String name, DataType dataType, String comment) { + Field field = new Field(name, dataType); + field.setColumnComment(comment); + fields.add(field); + return this; + } + + public Builder column(String name, DataType dataType, int precision, int scale, String comment) + { + Field field = new Field(name, dataType); + field.setColumnComment(comment); + field.setScale(scale); + field.setPrecision(precision); + fields.add(field); + return this; + } + + public Builder tblProperties(String key, String value) { + tblProperties.put(key, value); + return this; + } + + public TableDescriptor create() { + Field[] fieldArray = new Field[fields.size()]; + fieldArray = fields.toArray(fieldArray); + Schema schema = new Schema(fieldArray); + table.setSchema(schema); + table.setProperties(tblProperties); + return table; + } + } + + public static Builder builder() { + return new Builder(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableIdentifier.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableIdentifier.java b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableIdentifier.java new file mode 100644 index 0000000..ab8edf8 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableIdentifier.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.api.descriptor; + +public class TableIdentifier { + private String tableName; + private String databaseName; + + public TableIdentifier(String tableName, String databaseName) { + this.tableName = tableName; + this.databaseName = databaseName; + } + + public String getTableName() { + return tableName; + } + + public String getDatabaseName() { + return databaseName; + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/api/exception/ExecutionTimeoutException.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/exception/ExecutionTimeoutException.java b/store/core/src/main/java/org/apache/carbondata/store/api/exception/ExecutionTimeoutException.java new file mode 100644 index 0000000..728837d --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/api/exception/ExecutionTimeoutException.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.api.exception; + +public class ExecutionTimeoutException extends RuntimeException { + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/api/exception/SchedulerException.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/exception/SchedulerException.java b/store/core/src/main/java/org/apache/carbondata/store/api/exception/SchedulerException.java new file mode 100644 index 0000000..28b8a50 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/api/exception/SchedulerException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.api.exception; + +public class SchedulerException extends RuntimeException { + + public SchedulerException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/api/exception/StoreException.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/exception/StoreException.java b/store/core/src/main/java/org/apache/carbondata/store/api/exception/StoreException.java new file mode 100644 index 0000000..315a09b --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/api/exception/StoreException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.api.exception; + +public class StoreException extends Exception { + + public StoreException() { + super(); + } + + public StoreException(String message) { + super(message); + } + + public StoreException(Exception e) { + super(e); + } +}