Repository: carbondata Updated Branches: refs/heads/master 3740535d5 -> cfb8ed9f5
[CARBONDATA-2301][SDK] CarbonStore interface and two implementations (Spark and Local) User should be able to query carbondata using CarbonStore interface. Get API: It can be used for filter query. It accepts projection column names and filter expression, and returns matched rows. SQL API: it accepts SQL statement and return query result set. This closes #2127 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cfb8ed9f Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cfb8ed9f Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cfb8ed9f Branch: refs/heads/master Commit: cfb8ed9f5ec932911ddefe8fc2dcd07269411396 Parents: 3740535 Author: Jacky Li <jacky.li...@qq.com> Authored: Sun Apr 1 16:30:22 2018 +0800 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Apr 12 07:20:40 2018 +0530 ---------------------------------------------------------------------- .../carbondata/core/util/DataTypeUtil.java | 8 +- .../hadoop/api/CarbonInputFormat.java | 23 ++-- .../dataload/TestGlobalSortDataLoad.scala | 5 +- .../StandardPartitionTableLoadingTestCase.scala | 6 +- .../StandardPartitionTableQueryTestCase.scala | 5 +- .../carbondata/spark/rdd/CarbonScanRDD.scala | 25 ++-- .../apache/spark/sql/test/util/QueryTest.scala | 1 + integration/spark2/pom.xml | 5 + .../carbondata/store/SparkCarbonStore.scala | 98 ++++++++++++++ .../sql/CarbonDatasourceHadoopRelation.scala | 2 +- .../spark/sql/CarbonDictionaryDecoder.scala | 39 +----- .../org/apache/spark/sql/CarbonSession.scala | 2 +- .../strategy/CarbonLateDecodeStrategy.scala | 4 +- .../apache/spark/sql/hive/CarbonRelation.scala | 69 ++-------- .../apache/spark/util/SparkTypeConverter.scala | 135 +++++++++++++++++++ .../carbondata/store/SparkCarbonStoreTest.scala | 86 ++++++++++++ store/sdk/pom.xml | 5 + .../carbondata/store/CarbonRowReadSupport.java | 53 ++++++++ .../apache/carbondata/store/CarbonStore.java | 68 ++++++++++ .../carbondata/store/LocalCarbonStore.java | 116 ++++++++++++++++ .../carbondata/store/MetaCachedCarbonStore.java | 54 ++++++++ .../apache/carbondata/sdk/file/TestUtil.java | 4 +- .../carbondata/store/LocalCarbonStoreTest.java | 61 +++++++++ 23 files changed, 743 insertions(+), 131 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java index 25f0099..6967102 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java @@ -801,9 +801,11 @@ public final class DataTypeUtil { * @param converterLocal */ public static void setDataTypeConverter(DataTypeConverter converterLocal) { - converter = converterLocal; - timeStampformatter.remove(); - dateformatter.remove(); + if (converterLocal != null) { + converter = converterLocal; + timeStampformatter.remove(); + dateformatter.remove(); + } } public static DataTypeConverter getDataTypeConverter() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java index be97d05..b1d7603 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -513,24 +513,27 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { * It is optional, if user does not set then it reads from store * * @param configuration - * @param converter is the Data type converter for different computing engine - * @throws IOException + * @param converterClass is the Data type converter for different computing engine */ - public static void setDataTypeConverter(Configuration configuration, DataTypeConverter converter) - throws IOException { - if (null != converter) { - configuration.set(CARBON_CONVERTER, - ObjectSerializationUtil.convertObjectToString(converter)); + public static void setDataTypeConverter( + Configuration configuration, Class<? extends DataTypeConverter> converterClass) { + if (null != converterClass) { + configuration.set(CARBON_CONVERTER, converterClass.getCanonicalName()); } } public static DataTypeConverter getDataTypeConverter(Configuration configuration) throws IOException { - String converter = configuration.get(CARBON_CONVERTER); - if (converter == null) { + String converterClass = configuration.get(CARBON_CONVERTER); + if (converterClass == null) { return new DataTypeConverterImpl(); } - return (DataTypeConverter) ObjectSerializationUtil.convertStringToObject(converter); + + try { + return (DataTypeConverter) Class.forName(converterClass).newInstance(); + } catch (Exception e) { + throw new IOException(e); + } } public static void setDatabaseName(Configuration configuration, String databaseName) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala index 3babf4f..bba75ad 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala @@ -24,6 +24,7 @@ import org.apache.commons.io.FileUtils import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.BatchedDataSourceScanExec import org.apache.spark.sql.test.TestQueryExecutor.projectPath import org.apache.spark.sql.test.util.QueryTest @@ -285,8 +286,8 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo } val df = sql("select * from carbon_globalsort") val scanRdd = df.queryExecution.sparkPlan.collect { - case b: BatchedDataSourceScanExec if b.rdd.isInstanceOf[CarbonScanRDD] => - b.rdd.asInstanceOf[CarbonScanRDD] + case b: BatchedDataSourceScanExec if b.rdd.isInstanceOf[CarbonScanRDD[InternalRow]] => + b.rdd.asInstanceOf[CarbonScanRDD[InternalRow]] }.head assertResult(defaultParallelism)(scanRdd.getPartitions.length) assertResult(10)(df.count) http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala index 8342c69..b929364 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala @@ -22,7 +22,7 @@ import java.util import java.util.concurrent.{Callable, ExecutorService, Executors} import org.apache.commons.io.FileUtils -import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.execution.BatchedDataSourceScanExec import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.test.util.QueryTest @@ -483,8 +483,8 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte FileUtils.deleteDirectory(folder) val dataFrame = sql("select * from smallpartitionfilesread") val scanRdd = dataFrame.queryExecution.sparkPlan.collect { - case b: BatchedDataSourceScanExec if b.rdd.isInstanceOf[CarbonScanRDD] => b.rdd - .asInstanceOf[CarbonScanRDD] + case b: BatchedDataSourceScanExec if b.rdd.isInstanceOf[CarbonScanRDD[InternalRow]] => b.rdd + .asInstanceOf[CarbonScanRDD[InternalRow]] }.head assert(scanRdd.getPartitions.length < 10) assertResult(100)(dataFrame.count) http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala index 4cce7d5..a6e7c32 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala @@ -16,6 +16,7 @@ */ package org.apache.carbondata.spark.testsuite.standardpartition +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.BatchedDataSourceScanExec import org.apache.spark.sql.test.Spark2TestQueryExecutor import org.apache.spark.sql.test.util.QueryTest @@ -426,8 +427,8 @@ test("Creation of partition table should fail if the colname in table schema and private def verifyPartitionInfo(frame: DataFrame, partitionNames: Seq[String]) = { val plan = frame.queryExecution.sparkPlan val scanRDD = plan collect { - case b: BatchedDataSourceScanExec if b.rdd.isInstanceOf[CarbonScanRDD] => b.rdd - .asInstanceOf[CarbonScanRDD] + case b: BatchedDataSourceScanExec if b.rdd.isInstanceOf[CarbonScanRDD[InternalRow]] => b.rdd + .asInstanceOf[CarbonScanRDD[InternalRow]] } assert(scanRDD.nonEmpty) assert(!partitionNames.map(f => scanRDD.head.partitionNames.exists(_.getPartitions.contains(f))).exists(!_)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index df953da..e3a62b6 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -23,6 +23,7 @@ import java.util.{ArrayList, Date, List} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag import scala.util.Random import scala.util.control.Breaks.{break, breakable} @@ -32,7 +33,6 @@ import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.hive.DistributionUtil import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.SQLExecution @@ -45,7 +45,7 @@ import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.datastore.block.Distributable import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier -import org.apache.carbondata.core.metadata.schema.table.{TableInfo} +import org.apache.carbondata.core.metadata.schema.table.TableInfo import org.apache.carbondata.core.scan.expression.Expression import org.apache.carbondata.core.scan.filter.FilterUtil import org.apache.carbondata.core.scan.model.QueryModel @@ -55,6 +55,7 @@ import org.apache.carbondata.core.util._ import org.apache.carbondata.hadoop._ import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat} import org.apache.carbondata.hadoop.api.CarbonTableInputFormat +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport import org.apache.carbondata.processing.util.CarbonLoaderUtil import org.apache.carbondata.spark.InitInputMetrics import org.apache.carbondata.spark.util.{SparkDataTypeConverterImpl, Util} @@ -65,7 +66,7 @@ import org.apache.carbondata.streaming.{CarbonStreamInputFormat, CarbonStreamRec * CarbonData file, this RDD will leverage CarbonData's index information to do CarbonData file * level filtering in driver side. */ -class CarbonScanRDD( +class CarbonScanRDD[T: ClassTag]( @transient spark: SparkSession, val columnProjection: CarbonProjection, var filterExpression: Expression, @@ -73,8 +74,10 @@ class CarbonScanRDD( @transient serializedTableInfo: Array[Byte], @transient tableInfo: TableInfo, inputMetricsStats: InitInputMetrics, - @transient val partitionNames: Seq[PartitionSpec]) - extends CarbonRDDWithTableInfo[InternalRow](spark.sparkContext, Nil, serializedTableInfo) { + @transient val partitionNames: Seq[PartitionSpec], + val dataTypeConverterClz: Class[_ <: DataTypeConverter] = classOf[SparkDataTypeConverterImpl], + val readSupportClz: Class[_ <: CarbonReadSupport[_]] = SparkReadSupport.readSupportClass) + extends CarbonRDDWithTableInfo[T](spark.sparkContext, Nil, serializedTableInfo) { private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "") private val jobTrackerId: String = { @@ -83,8 +86,6 @@ class CarbonScanRDD( } private var vectorReader = false - private val readSupport = SparkReadSupport.readSupportClass - private val bucketedTable = tableInfo.getFactTable.getBucketingInfo @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) @@ -391,7 +392,7 @@ class CarbonScanRDD( new CarbonSparkPartition(id, partitionId, multiBlockSplit) } - override def internalCompute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + override def internalCompute(split: Partition, context: TaskContext): Iterator[T] = { val queryStartTime = System.currentTimeMillis val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null) if (null == carbonPropertiesFilePath) { @@ -415,7 +416,7 @@ class CarbonScanRDD( val reader: RecordReader[Void, Object] = inputSplit.getFileFormat match { case FileFormat.ROW_V1 => // create record reader for row format - DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl) + DataTypeUtil.setDataTypeConverter(dataTypeConverterClz.newInstance()) val inputFormat = new CarbonStreamInputFormat val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext) .asInstanceOf[CarbonStreamRecordReader] @@ -489,7 +490,7 @@ class CarbonScanRDD( } - iterator.asInstanceOf[Iterator[InternalRow]] + iterator.asInstanceOf[Iterator[T]] } private def close() { @@ -520,12 +521,12 @@ class CarbonScanRDD( } private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[Object] = { - CarbonInputFormat.setCarbonReadSupport(conf, readSupport) + CarbonInputFormat.setCarbonReadSupport(conf, readSupportClz) val tableInfo1 = getTableInfo CarbonInputFormat.setTableInfo(conf, tableInfo1) CarbonInputFormat.setDatabaseName(conf, tableInfo1.getDatabaseName) CarbonInputFormat.setTableName(conf, tableInfo1.getFactTable.getTableName) - CarbonInputFormat.setDataTypeConverter(conf, new SparkDataTypeConverterImpl) + CarbonInputFormat.setDataTypeConverter(conf, dataTypeConverterClz) createInputFormat(conf) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala index 8f550fb..c2c4ab3 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala @@ -114,6 +114,7 @@ class QueryTest extends PlanTest { val sqlContext: SQLContext = TestQueryExecutor.INSTANCE.sqlContext + lazy val warehouse = TestQueryExecutor.warehouse lazy val storeLocation = CarbonProperties.getInstance(). getProperty(CarbonCommonConstants.STORE_LOCATION) val resourcesPath = TestQueryExecutor.resourcesPath http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark2/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml index e4593be..46e1be0 100644 --- a/integration/spark2/pom.xml +++ b/integration/spark2/pom.xml @@ -40,6 +40,11 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-store-sdk</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala new file mode 100644 index 0000000..e29ee46 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store + +import java.io.IOException + +import scala.collection.JavaConverters._ + +import org.apache.spark.{CarbonInputMetrics, SparkConf} +import org.apache.spark.sql.CarbonSession._ +import org.apache.spark.sql.SparkSession + +import org.apache.carbondata.common.annotations.InterfaceAudience +import org.apache.carbondata.core.datastore.row.CarbonRow +import org.apache.carbondata.core.scan.expression.Expression +import org.apache.carbondata.hadoop.CarbonProjection +import org.apache.carbondata.spark.rdd.CarbonScanRDD + +/** + * A CarbonStore implementation that uses Spark as underlying compute engine + * with CarbonData query optimization capability + */ +@InterfaceAudience.Internal +private[store] class SparkCarbonStore extends MetaCachedCarbonStore { + private var session: SparkSession = _ + + /** + * Initialize SparkCarbonStore + * @param storeName store name + * @param storeLocation location to store data + */ + def this(storeName: String, storeLocation: String) = { + this() + val sparkConf = new SparkConf(loadDefaults = true) + session = SparkSession.builder + .config(sparkConf) + .appName("SparkCarbonStore-" + storeName) + .config("spark.sql.warehouse.dir", storeLocation) + .getOrCreateCarbonSession() + } + + @throws[IOException] + override def scan( + path: String, + projectColumns: Array[String]): java.util.Iterator[CarbonRow] = { + scan(path, projectColumns, null) + } + + @throws[IOException] + override def scan( + path: String, + projectColumns: Array[String], + filter: Expression): java.util.Iterator[CarbonRow] = { + require(path != null) + require(projectColumns != null) + val table = getTable(path) + val rdd = new CarbonScanRDD[CarbonRow]( + spark = session, + columnProjection = new CarbonProjection(projectColumns), + filterExpression = filter, + identifier = table.getAbsoluteTableIdentifier, + serializedTableInfo = table.getTableInfo.serialize, + tableInfo = table.getTableInfo, + inputMetricsStats = new CarbonInputMetrics, + partitionNames = null, + dataTypeConverterClz = null, + readSupportClz = classOf[CarbonRowReadSupport]) + rdd.collect + .iterator + .asJava + } + + @throws[IOException] + override def sql(sqlString: String): java.util.Iterator[CarbonRow] = { + val df = session.sql(sqlString) + df.rdd + .map(row => new CarbonRow(row.toSeq.toArray.asInstanceOf[Array[Object]])) + .collect() + .iterator + .asJava + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala index 46905b8..fc62ba0 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala @@ -36,7 +36,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.scan.expression.Expression import org.apache.carbondata.core.scan.expression.logical.AndExpression import org.apache.carbondata.hadoop.CarbonProjection -import org.apache.carbondata.spark.rdd.CarbonScanRDD +import org.apache.carbondata.spark.rdd.{CarbonScanRDD, SparkReadSupport} case class CarbonDatasourceHadoopRelation( sparkSession: SparkSession, http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala index d045ab3..1f65fce 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala @@ -28,9 +28,9 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.{CodegenSupport, SparkPlan, UnaryExecNode} -import org.apache.spark.sql.hive.{CarbonMetastoreTypes, CarbonRelation} import org.apache.spark.sql.optimizer.CarbonDecoderRelation import org.apache.spark.sql.types._ +import org.apache.spark.util.SparkTypeConverter import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType} import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier} @@ -333,8 +333,8 @@ object CarbonDictionaryDecoder { !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) && !carbonDimension.isComplex()) { val newAttr = AttributeReference(a.name, - convertCarbonToSparkDataType(carbonDimension, - relation.get.carbonRelation.carbonRelation), + SparkTypeConverter.convertCarbonToSparkDataType(carbonDimension.getColumnSchema, + relation.get.carbonRelation.carbonTable), a.nullable, a.metadata)(a.exprId).asInstanceOf[Attribute] newAttr @@ -393,39 +393,6 @@ object CarbonDictionaryDecoder { } } - /** - * Converts from carbon datatype to corresponding spark datatype. - */ - def convertCarbonToSparkDataType(carbonDimension: CarbonDimension, - relation: CarbonRelation): types.DataType = { - if (CarbonDataTypes.isDecimal(carbonDimension.getDataType)) { - val scale: Int = carbonDimension.getColumnSchema.getScale - val precision: Int = carbonDimension.getColumnSchema.getPrecision - if (scale == 0 && precision == 0) { - DecimalType(18, 2) - } else { - DecimalType(precision, scale) - } - } else if (CarbonDataTypes.isArrayType(carbonDimension.getDataType)) { - CarbonMetastoreTypes - .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>") - } else if (CarbonDataTypes.isStructType(carbonDimension.getDataType)) { - CarbonMetastoreTypes - .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>") - } else { - carbonDimension.getDataType match { - case CarbonDataTypes.STRING => StringType - case CarbonDataTypes.SHORT => ShortType - case CarbonDataTypes.INT => IntegerType - case CarbonDataTypes.LONG => LongType - case CarbonDataTypes.DOUBLE => DoubleType - case CarbonDataTypes.BOOLEAN => BooleanType - case CarbonDataTypes.TIMESTAMP => TimestampType - case CarbonDataTypes.DATE => DateType - } - } - } - def getDictionaryColumnMapping(output: Seq[Attribute], relations: Seq[CarbonDecoderRelation], profile: CarbonProfile, http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index a380308..7ee3038 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -26,7 +26,7 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.SparkSession.Builder import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.catalyst.plans.logical.{Command, LocalRelation, Union} +import org.apache.spark.sql.catalyst.plans.logical.{Command, Union} import org.apache.spark.sql.hive.execution.command.CarbonSetCommand import org.apache.spark.sql.internal.{SessionState, SharedState} import org.apache.spark.sql.profiler.{Profiler, SQLStart} http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala index 7123b93..59dd2e9 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala @@ -192,10 +192,10 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { needDecode: ArrayBuffer[AttributeReference]): RDD[InternalRow] = { if (needDecode.nonEmpty) { - rdd.asInstanceOf[CarbonScanRDD].setVectorReaderSupport(false) + rdd.asInstanceOf[CarbonScanRDD[InternalRow]].setVectorReaderSupport(false) getDecoderRDD(relation, needDecode, rdd, output) } else { - rdd.asInstanceOf[CarbonScanRDD] + rdd.asInstanceOf[CarbonScanRDD[InternalRow]] .setVectorReaderSupport(supportBatchedDataSource(relation.relation.sqlContext, output)) rdd } http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala index e9e93d2..0c69b9d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala @@ -22,19 +22,18 @@ import scala.Array.canBuildFrom import scala.collection.JavaConverters._ import scala.util.parsing.combinator.RegexParsers -import org.apache.spark.sql.CarbonEnv import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} import org.apache.spark.sql.types._ import org.apache.spark.sql.util.CarbonException +import org.apache.spark.util.SparkTypeConverter import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.metadata.SegmentFileStore import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension} +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.CarbonTablePath @@ -49,45 +48,6 @@ case class CarbonRelation( carbonTable: CarbonTable) extends LeafNode with MultiInstanceRelation { - def recursiveMethod(dimName: String, childDim: CarbonDimension): String = { - childDim.getDataType.getName.toLowerCase match { - case "array" => s"${ - childDim.getColName.substring(dimName.length + 1) - }:array<${ getArrayChildren(childDim.getColName) }>" - case "struct" => s"${ - childDim.getColName.substring(dimName.length + 1) - }:struct<${ getStructChildren(childDim.getColName) }>" - case dType => s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }" - } - } - - def getArrayChildren(dimName: String): String = { - metaData.carbonTable.getChildren(dimName).asScala.map(childDim => { - childDim.getDataType.getName.toLowerCase match { - case "array" => s"array<${ getArrayChildren(childDim.getColName) }>" - case "struct" => s"struct<${ getStructChildren(childDim.getColName) }>" - case dType => addDecimalScaleAndPrecision(childDim, dType) - } - }).mkString(",") - } - - def getStructChildren(dimName: String): String = { - metaData.carbonTable.getChildren(dimName).asScala.map(childDim => { - childDim.getDataType.getName.toLowerCase match { - case "array" => s"${ - childDim.getColName.substring(dimName.length + 1) - }:array<${ getArrayChildren(childDim.getColName) }>" - case "struct" => s"${ - childDim.getColName.substring(dimName.length + 1) - }:struct<${ metaData.carbonTable.getChildren(childDim.getColName) - .asScala.map(f => s"${ recursiveMethod(childDim.getColName, f) }").mkString(",") - }>" - case dType => s"${ childDim.getColName - .substring(dimName.length() + 1) }:${ addDecimalScaleAndPrecision(childDim, dType) }" - } - }).mkString(",") - } - override def newInstance(): LogicalPlan = { CarbonRelation(databaseName, tableName, metaData, carbonTable) .asInstanceOf[this.type] @@ -102,9 +62,11 @@ case class CarbonRelation( .getDimensionByName(metaData.carbonTable.getTableName, dim.getColName) val output: DataType = dimval.getDataType.getName.toLowerCase match { case "array" => - CarbonMetastoreTypes.toDataType(s"array<${ getArrayChildren(dim.getColName) }>") + CarbonMetastoreTypes.toDataType( + s"array<${SparkTypeConverter.getArrayChildren(carbonTable, dim.getColName)}>") case "struct" => - CarbonMetastoreTypes.toDataType(s"struct<${ getStructChildren(dim.getColName) }>") + CarbonMetastoreTypes.toDataType( + s"struct<${SparkTypeConverter.getStructChildren(carbonTable, dim.getColName)}>") case dType => val dataType = addDecimalScaleAndPrecision(dimval, dType) CarbonMetastoreTypes.toDataType(dataType) @@ -142,11 +104,13 @@ case class CarbonRelation( if (column.isDimension()) { val output: DataType = column.getDataType.getName.toLowerCase match { case "array" => - CarbonMetastoreTypes.toDataType(s"array<${getArrayChildren(column.getColName)}>") + CarbonMetastoreTypes.toDataType( + s"array<${SparkTypeConverter.getArrayChildren(carbonTable, column.getColName)}>") case "struct" => - CarbonMetastoreTypes.toDataType(s"struct<${getStructChildren(column.getColName)}>") + CarbonMetastoreTypes.toDataType( + s"struct<${SparkTypeConverter.getStructChildren(carbonTable, column.getColName)}>") case dType => - val dataType = addDecimalScaleAndPrecision(column, dType) + val dataType = SparkTypeConverter.addDecimalScaleAndPrecision(column, dType) CarbonMetastoreTypes.toDataType(dataType) } AttributeReference(column.getColName, output, nullable = true )( @@ -165,15 +129,6 @@ case class CarbonRelation( } } - def addDecimalScaleAndPrecision(dimval: CarbonColumn, dataType: String): String = { - var dType = dataType - if (DataTypes.isDecimal(dimval.getDataType)) { - dType += - "(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")" - } - dType - } - // TODO: Use data from the footers. // TODO For 2.1 // override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes) http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala b/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala new file mode 100644 index 0000000..65210b8 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.util.Objects + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.hive.CarbonMetastoreTypes +import org.apache.spark.sql.types +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes} +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension, ColumnSchema} + +private[spark] object SparkTypeConverter { + + def createSparkSchema(table: CarbonTable, columns: Seq[String]): StructType = { + Objects.requireNonNull(table) + Objects.requireNonNull(columns) + if (columns.isEmpty) { + throw new IllegalArgumentException("column list is empty") + } + val fields = new java.util.ArrayList[StructField](columns.size) + val allColumns = table.getTableInfo.getFactTable.getListOfColumns.asScala + + // find the column and add it to fields array + columns.foreach { column => + val col = allColumns.find(_.getColumnName.equalsIgnoreCase(column)).getOrElse( + throw new IllegalArgumentException(column + " does not exist") + ) + fields.add(StructField(col.getColumnName, convertCarbonToSparkDataType(col, table))) + } + StructType(fields) + } + + /** + * Converts from carbon datatype to corresponding spark datatype. + */ + def convertCarbonToSparkDataType( + columnSchema: ColumnSchema, + table: CarbonTable): types.DataType = { + if (CarbonDataTypes.isDecimal(columnSchema.getDataType)) { + val scale = columnSchema.getScale + val precision = columnSchema.getPrecision + if (scale == 0 && precision == 0) { + DecimalType(18, 2) + } else { + DecimalType(precision, scale) + } + } else if (CarbonDataTypes.isArrayType(columnSchema.getDataType)) { + CarbonMetastoreTypes + .toDataType(s"array<${ getArrayChildren(table, columnSchema.getColumnName) }>") + } else if (CarbonDataTypes.isStructType(columnSchema.getDataType)) { + CarbonMetastoreTypes + .toDataType(s"struct<${ getStructChildren(table, columnSchema.getColumnName) }>") + } else { + columnSchema.getDataType match { + case CarbonDataTypes.STRING => StringType + case CarbonDataTypes.SHORT => ShortType + case CarbonDataTypes.INT => IntegerType + case CarbonDataTypes.LONG => LongType + case CarbonDataTypes.DOUBLE => DoubleType + case CarbonDataTypes.BOOLEAN => BooleanType + case CarbonDataTypes.TIMESTAMP => TimestampType + case CarbonDataTypes.DATE => DateType + } + } + } + + def getArrayChildren(table: CarbonTable, dimName: String): String = { + table.getChildren(dimName).asScala.map(childDim => { + childDim.getDataType.getName.toLowerCase match { + case "array" => s"array<${ getArrayChildren(table, childDim.getColName) }>" + case "struct" => s"struct<${ getStructChildren(table, childDim.getColName) }>" + case dType => addDecimalScaleAndPrecision(childDim, dType) + } + }).mkString(",") + } + + def getStructChildren(table: CarbonTable, dimName: String): String = { + table.getChildren(dimName).asScala.map(childDim => { + childDim.getDataType.getName.toLowerCase match { + case "array" => s"${ + childDim.getColName.substring(dimName.length + 1) + }:array<${ getArrayChildren(table, childDim.getColName) }>" + case "struct" => s"${ + childDim.getColName.substring(dimName.length + 1) + }:struct<${ table.getChildren(childDim.getColName) + .asScala.map(f => s"${ recursiveMethod(table, childDim.getColName, f) }").mkString(",") + }>" + case dType => s"${ childDim.getColName + .substring(dimName.length() + 1) }:${ addDecimalScaleAndPrecision(childDim, dType) }" + } + }).mkString(",") + } + + def addDecimalScaleAndPrecision(dimval: CarbonColumn, dataType: String): String = { + var dType = dataType + if (CarbonDataTypes.isDecimal(dimval.getDataType)) { + dType += + "(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")" + } + dType + } + + private def recursiveMethod( + table: CarbonTable, dimName: String, childDim: CarbonDimension) = { + childDim.getDataType.getName.toLowerCase match { + case "array" => s"${ + childDim.getColName.substring(dimName.length + 1) + }:array<${ getArrayChildren(table, childDim.getColName) }>" + case "struct" => s"${ + childDim.getColName.substring(dimName.length + 1) + }:struct<${ getStructChildren(table, childDim.getColName) }>" + case dType => s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }" + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark2/src/test/scala/org/apache/carbondata/store/SparkCarbonStoreTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/store/SparkCarbonStoreTest.scala b/integration/spark2/src/test/scala/org/apache/carbondata/store/SparkCarbonStoreTest.scala new file mode 100644 index 0000000..d389670 --- /dev/null +++ b/integration/spark2/src/test/scala/org/apache/carbondata/store/SparkCarbonStoreTest.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store + +import org.apache.spark.sql.{CarbonEnv, Row} +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression +import org.apache.carbondata.core.scan.expression.{ColumnExpression, LiteralExpression} + +class SparkCarbonStoreTest extends QueryTest with BeforeAndAfterAll { + + private var store: CarbonStore = _ + + override def beforeAll { + sql("DROP TABLE IF EXISTS t1") + sql("CREATE TABLE t1 (" + + "empno int, empname String, designation String, doj Timestamp, " + + "workgroupcategory int, workgroupcategoryname String, deptno int, deptname String," + + "projectcode int, projectjoindate Timestamp, projectenddate Timestamp," + + "attendance int,utilization int,salary int)" + + "STORED BY 'org.apache.carbondata.format'") + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE t1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""") + + store = new SparkCarbonStore("test", storeLocation) + } + + test("test CarbonStore.get, compare projection result") { + val tablePath = CarbonEnv.getCarbonTable(None, "t1")(sqlContext.sparkSession).getTablePath + val rows = store.scan(s"$tablePath", Seq("empno", "empname").toArray) + val sparkResult: Array[Row] = sql("select empno, empname from t1").collect() + sparkResult.zipWithIndex.foreach { case (r: Row, i: Int) => + val carbonRow = rows.next() + assertResult(r.get(0))(carbonRow.getData()(0)) + assertResult(r.get(1))(carbonRow.getData()(1)) + } + assert(!rows.hasNext) + } + + test("test CarbonStore.get, compare projection and filter result") { + val tablePath = CarbonEnv.getCarbonTable(None, "t1")(sqlContext.sparkSession).getTablePath + val filter = new EqualToExpression( + new ColumnExpression("empno", DataTypes.INT), + new LiteralExpression(10, DataTypes.INT)) + val rows = store.scan(s"$tablePath", Seq("empno", "empname").toArray, filter) + val sparkResult: Array[Row] = sql("select empno, empname from t1 where empno = 10").collect() + sparkResult.zipWithIndex.foreach { case (r: Row, i: Int) => + val carbonRow = rows.next() + assertResult(r.get(0))(carbonRow.getData()(0)) + assertResult(r.get(1))(carbonRow.getData()(1)) + } + assert(!rows.hasNext) + } + + test("test CarbonStore.sql") { + val rows = store.sql("select empno, empname from t1 where empno = 10") + val sparkResult: Array[Row] = sql("select empno, empname from t1 where empno = 10").collect() + sparkResult.zipWithIndex.foreach { case (r: Row, i: Int) => + val carbonRow = rows.next() + assertResult(r.get(0))(carbonRow.getData()(0)) + assertResult(r.get(1))(carbonRow.getData()(1)) + } + assert(!rows.hasNext) + } + + override def afterAll { + sql("DROP TABLE IF EXISTS t1") + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/store/sdk/pom.xml ---------------------------------------------------------------------- diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml index 1d1735e..d50c024 100644 --- a/store/sdk/pom.xml +++ b/store/sdk/pom.xml @@ -35,6 +35,11 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/store/sdk/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java b/store/sdk/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java new file mode 100644 index 0000000..bafbb9f --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store; + +import java.io.IOException; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport; + +/** + * ReadSupport that convert row object to CarbonRow + */ +@InterfaceAudience.Internal +public class CarbonRowReadSupport implements CarbonReadSupport<CarbonRow> { + private CarbonReadSupport<Object[]> delegate; + + public CarbonRowReadSupport() { + this.delegate = new DictionaryDecodeReadSupport<>(); + } + + @Override public void initialize(CarbonColumn[] carbonColumns, CarbonTable carbonTable) + throws IOException { + delegate.initialize(carbonColumns, carbonTable); + } + + @Override public CarbonRow readRow(Object[] data) { + Object[] converted = delegate.readRow(data); + return new CarbonRow(converted); + } + + @Override public void close() { + delegate.close(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/store/sdk/src/main/java/org/apache/carbondata/store/CarbonStore.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/store/CarbonStore.java b/store/sdk/src/main/java/org/apache/carbondata/store/CarbonStore.java new file mode 100644 index 0000000..c6b2fb8 --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/store/CarbonStore.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.scan.expression.Expression; + +/** + * User can use {@link CarbonStore} to query data + */ +@InterfaceAudience.User +@InterfaceStability.Unstable +public interface CarbonStore extends Closeable { + + /** + * Scan query on the data in the table path + * @param path table path + * @param projectColumns column names to read + * @return rows + * @throws IOException if unable to read files in table path + */ + Iterator<CarbonRow> scan( + String path, + String[] projectColumns) throws IOException; + + /** + * Scan query with filter, on the data in the table path + * @param path table path + * @param projectColumns column names to read + * @param filter filter condition, can be null + * @return rows that satisfy filter condition + * @throws IOException if unable to read files in table path + */ + Iterator<CarbonRow> scan( + String path, + String[] projectColumns, + Expression filter) throws IOException; + + /** + * SQL query, table should be created before calling this function + * @param sqlString SQL statement + * @return rows + * @throws IOException if unable to read files in table path + */ + Iterator<CarbonRow> sql(String sqlString) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java b/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java new file mode 100644 index 0000000..394ffea --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.hadoop.CarbonProjection; +import org.apache.carbondata.hadoop.api.CarbonInputFormat; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +/** + * A CarbonStore implementation that works locally, without other compute framework dependency. + * It can be used to read data in local disk. + * + * Note that this class is experimental, it is not intended to be used in production. + */ +@InterfaceAudience.Internal +class LocalCarbonStore extends MetaCachedCarbonStore { + + @Override + public Iterator<CarbonRow> scan(String path, String[] projectColumns) throws IOException { + return scan(path, projectColumns, null); + } + + @Override + public Iterator<CarbonRow> scan(String path, String[] projectColumns, Expression filter) + throws IOException { + Objects.requireNonNull(path); + Objects.requireNonNull(projectColumns); + + CarbonTable table = getTable(path); + if (table.isStreamingTable() || table.isHivePartitionTable()) { + throw new UnsupportedOperationException("streaming and partition table is not supported"); + } + // TODO: use InputFormat to prune data and read data + + final CarbonTableInputFormat format = new CarbonTableInputFormat(); + final Job job = new Job(new Configuration()); + CarbonInputFormat.setTableInfo(job.getConfiguration(), table.getTableInfo()); + CarbonInputFormat.setTablePath(job.getConfiguration(), table.getTablePath()); + CarbonInputFormat.setTableName(job.getConfiguration(), table.getTableName()); + CarbonInputFormat.setDatabaseName(job.getConfiguration(), table.getDatabaseName()); + CarbonInputFormat.setCarbonReadSupport(job.getConfiguration(), CarbonRowReadSupport.class); + CarbonInputFormat.setColumnProjection( + job.getConfiguration(), new CarbonProjection(projectColumns)); + if (filter != null) { + CarbonInputFormat.setFilterPredicates(job.getConfiguration(), filter); + } + + final List<InputSplit> splits = + format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID())); + + List<RecordReader<Void, Object>> readers = new ArrayList<>(splits.size()); + + try { + for (InputSplit split : splits) { + TaskAttemptContextImpl attempt = + new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID()); + RecordReader reader = format.createRecordReader(split, attempt); + reader.initialize(split, attempt); + readers.add(reader); + } + } catch (InterruptedException e) { + throw new IOException(e); + } + + List<CarbonRow> rows = new ArrayList<>(); + try { + for (RecordReader<Void, Object> reader : readers) { + while (reader.nextKeyValue()) { + rows.add((CarbonRow)reader.getCurrentValue()); + } + } + } catch (InterruptedException e) { + throw new IOException(e); + } + return rows.iterator(); + } + + @Override + public Iterator<CarbonRow> sql(String sqlString) throws IOException { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java b/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java new file mode 100644 index 0000000..d847e67 --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.sdk.file.CarbonReader; + +/** + * A CarbonStore base class that caches CarbonTable object + */ +@InterfaceAudience.Internal +abstract class MetaCachedCarbonStore implements CarbonStore { + + // mapping of table path to CarbonTable object + private Map<String, CarbonTable> cache = new HashMap<>(); + + CarbonTable getTable(String path) throws IOException { + if (cache.containsKey(path)) { + return cache.get(path); + } + TableInfo schema = CarbonReader.readSchemaFile(CarbonTablePath.getSchemaFilePath(path)); + schema.setTablePath(path); + CarbonTable table = CarbonTable.buildFromTableInfo(schema); + cache.put(path, table); + return table; + } + + @Override + public void close() throws IOException { + cache.clear(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java index dcedf10..068164d 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java @@ -27,7 +27,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath; import org.junit.Assert; -class TestUtil { +public class TestUtil { static void writeFilesAndVerify(Schema schema, String path) { writeFilesAndVerify(schema, path, null); @@ -37,7 +37,7 @@ class TestUtil { writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1); } - static void writeFilesAndVerify(Schema schema, String path, boolean persistSchema) { + public static void writeFilesAndVerify(Schema schema, String path, boolean persistSchema) { writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/store/sdk/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java b/store/sdk/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java new file mode 100644 index 0000000..a5b5edc --- /dev/null +++ b/store/sdk/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.sdk.file.Field; +import org.apache.carbondata.sdk.file.Schema; +import org.apache.carbondata.sdk.file.TestUtil; + +import org.apache.commons.io.FileUtils; +import org.junit.Test; + +public class LocalCarbonStoreTest { + + // TODO: complete this testcase + // Currently result rows are empty, because SDK is not writing table status file + // so that reader does not find any segment. + // Complete this testcase after flat folder reader is done. + @Test + public void testWriteAndReadFiles() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + TestUtil.writeFilesAndVerify(new Schema(fields), path, true); + + CarbonStore store = new LocalCarbonStore(); + Iterator<CarbonRow> rows = store.scan(path, new String[]{"name, age"}, null); + + while (rows.hasNext()) { + CarbonRow row = rows.next(); + System.out.println(row.toString()); + } + + FileUtils.deleteDirectory(new File(path)); + } + +}