Repository: carbondata
Updated Branches:
  refs/heads/master 3740535d5 -> cfb8ed9f5


[CARBONDATA-2301][SDK] CarbonStore interface and two implementations (Spark and 
Local)

User should be able to query carbondata using CarbonStore interface.

Get API: It can be used for filter query. It accepts projection column names 
and filter expression, and returns matched rows.
SQL API: it accepts SQL statement and return query result set.

This closes #2127


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cfb8ed9f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cfb8ed9f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cfb8ed9f

Branch: refs/heads/master
Commit: cfb8ed9f5ec932911ddefe8fc2dcd07269411396
Parents: 3740535
Author: Jacky Li <jacky.li...@qq.com>
Authored: Sun Apr 1 16:30:22 2018 +0800
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Thu Apr 12 07:20:40 2018 +0530

----------------------------------------------------------------------
 .../carbondata/core/util/DataTypeUtil.java      |   8 +-
 .../hadoop/api/CarbonInputFormat.java           |  23 ++--
 .../dataload/TestGlobalSortDataLoad.scala       |   5 +-
 .../StandardPartitionTableLoadingTestCase.scala |   6 +-
 .../StandardPartitionTableQueryTestCase.scala   |   5 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  25 ++--
 .../apache/spark/sql/test/util/QueryTest.scala  |   1 +
 integration/spark2/pom.xml                      |   5 +
 .../carbondata/store/SparkCarbonStore.scala     |  98 ++++++++++++++
 .../sql/CarbonDatasourceHadoopRelation.scala    |   2 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     |  39 +-----
 .../org/apache/spark/sql/CarbonSession.scala    |   2 +-
 .../strategy/CarbonLateDecodeStrategy.scala     |   4 +-
 .../apache/spark/sql/hive/CarbonRelation.scala  |  69 ++--------
 .../apache/spark/util/SparkTypeConverter.scala  | 135 +++++++++++++++++++
 .../carbondata/store/SparkCarbonStoreTest.scala |  86 ++++++++++++
 store/sdk/pom.xml                               |   5 +
 .../carbondata/store/CarbonRowReadSupport.java  |  53 ++++++++
 .../apache/carbondata/store/CarbonStore.java    |  68 ++++++++++
 .../carbondata/store/LocalCarbonStore.java      | 116 ++++++++++++++++
 .../carbondata/store/MetaCachedCarbonStore.java |  54 ++++++++
 .../apache/carbondata/sdk/file/TestUtil.java    |   4 +-
 .../carbondata/store/LocalCarbonStoreTest.java  |  61 +++++++++
 23 files changed, 743 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index 25f0099..6967102 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -801,9 +801,11 @@ public final class DataTypeUtil {
    * @param converterLocal
    */
   public static void setDataTypeConverter(DataTypeConverter converterLocal) {
-    converter = converterLocal;
-    timeStampformatter.remove();
-    dateformatter.remove();
+    if (converterLocal != null) {
+      converter = converterLocal;
+      timeStampformatter.remove();
+      dateformatter.remove();
+    }
   }
 
   public static DataTypeConverter getDataTypeConverter() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index be97d05..b1d7603 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -513,24 +513,27 @@ public abstract class CarbonInputFormat<T> extends 
FileInputFormat<Void, T> {
    * It is optional, if user does not set then it reads from store
    *
    * @param configuration
-   * @param converter is the Data type converter for different computing engine
-   * @throws IOException
+   * @param converterClass is the Data type converter for different computing 
engine
    */
-  public static void setDataTypeConverter(Configuration configuration, 
DataTypeConverter converter)
-      throws IOException {
-    if (null != converter) {
-      configuration.set(CARBON_CONVERTER,
-          ObjectSerializationUtil.convertObjectToString(converter));
+  public static void setDataTypeConverter(
+      Configuration configuration, Class<? extends DataTypeConverter> 
converterClass) {
+    if (null != converterClass) {
+      configuration.set(CARBON_CONVERTER, converterClass.getCanonicalName());
     }
   }
 
   public static DataTypeConverter getDataTypeConverter(Configuration 
configuration)
       throws IOException {
-    String converter = configuration.get(CARBON_CONVERTER);
-    if (converter == null) {
+    String converterClass = configuration.get(CARBON_CONVERTER);
+    if (converterClass == null) {
       return new DataTypeConverterImpl();
     }
-    return (DataTypeConverter) 
ObjectSerializationUtil.convertStringToObject(converter);
+
+    try {
+      return (DataTypeConverter) Class.forName(converterClass).newInstance();
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
   }
 
   public static void setDatabaseName(Configuration configuration, String 
databaseName) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index 3babf4f..bba75ad 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -24,6 +24,7 @@ import org.apache.commons.io.FileUtils
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.BatchedDataSourceScanExec
 import org.apache.spark.sql.test.TestQueryExecutor.projectPath
 import org.apache.spark.sql.test.util.QueryTest
@@ -285,8 +286,8 @@ class TestGlobalSortDataLoad extends QueryTest with 
BeforeAndAfterEach with Befo
       }
       val df = sql("select * from carbon_globalsort")
       val scanRdd = df.queryExecution.sparkPlan.collect {
-        case b: BatchedDataSourceScanExec if b.rdd.isInstanceOf[CarbonScanRDD] 
=>
-          b.rdd.asInstanceOf[CarbonScanRDD]
+        case b: BatchedDataSourceScanExec if 
b.rdd.isInstanceOf[CarbonScanRDD[InternalRow]] =>
+          b.rdd.asInstanceOf[CarbonScanRDD[InternalRow]]
       }.head
       assertResult(defaultParallelism)(scanRdd.getPartitions.length)
       assertResult(10)(df.count)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 8342c69..b929364 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -22,7 +22,7 @@ import java.util
 import java.util.concurrent.{Callable, ExecutorService, Executors}
 
 import org.apache.commons.io.FileUtils
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.execution.BatchedDataSourceScanExec
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.test.util.QueryTest
@@ -483,8 +483,8 @@ class StandardPartitionTableLoadingTestCase extends 
QueryTest with BeforeAndAfte
       FileUtils.deleteDirectory(folder)
       val dataFrame = sql("select * from smallpartitionfilesread")
       val scanRdd = dataFrame.queryExecution.sparkPlan.collect {
-        case b: BatchedDataSourceScanExec if b.rdd.isInstanceOf[CarbonScanRDD] 
=> b.rdd
-          .asInstanceOf[CarbonScanRDD]
+        case b: BatchedDataSourceScanExec if 
b.rdd.isInstanceOf[CarbonScanRDD[InternalRow]] => b.rdd
+          .asInstanceOf[CarbonScanRDD[InternalRow]]
       }.head
       assert(scanRdd.getPartitions.length < 10)
       assertResult(100)(dataFrame.count)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
index 4cce7d5..a6e7c32 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.carbondata.spark.testsuite.standardpartition
 
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.BatchedDataSourceScanExec
 import org.apache.spark.sql.test.Spark2TestQueryExecutor
 import org.apache.spark.sql.test.util.QueryTest
@@ -426,8 +427,8 @@ test("Creation of partition table should fail if the 
colname in table schema and
   private def verifyPartitionInfo(frame: DataFrame, partitionNames: 
Seq[String]) = {
     val plan = frame.queryExecution.sparkPlan
     val scanRDD = plan collect {
-      case b: BatchedDataSourceScanExec if b.rdd.isInstanceOf[CarbonScanRDD] 
=> b.rdd
-        .asInstanceOf[CarbonScanRDD]
+      case b: BatchedDataSourceScanExec if 
b.rdd.isInstanceOf[CarbonScanRDD[InternalRow]] => b.rdd
+        .asInstanceOf[CarbonScanRDD[InternalRow]]
     }
     assert(scanRDD.nonEmpty)
     assert(!partitionNames.map(f => 
scanRDD.head.partitionNames.exists(_.getPartitions.contains(f))).exists(!_))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index df953da..e3a62b6 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -23,6 +23,7 @@ import java.util.{ArrayList, Date, List}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
 import scala.util.Random
 import scala.util.control.Breaks.{break, breakable}
 
@@ -32,7 +33,6 @@ import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.SQLExecution
@@ -45,7 +45,7 @@ import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.block.Distributable
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.{TableInfo}
+import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.filter.FilterUtil
 import org.apache.carbondata.core.scan.model.QueryModel
@@ -55,6 +55,7 @@ import org.apache.carbondata.core.util._
 import org.apache.carbondata.hadoop._
 import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, 
CarbonInputFormat}
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.InitInputMetrics
 import org.apache.carbondata.spark.util.{SparkDataTypeConverterImpl, Util}
@@ -65,7 +66,7 @@ import 
org.apache.carbondata.streaming.{CarbonStreamInputFormat, CarbonStreamRec
  * CarbonData file, this RDD will leverage CarbonData's index information to 
do CarbonData file
  * level filtering in driver side.
  */
-class CarbonScanRDD(
+class CarbonScanRDD[T: ClassTag](
     @transient spark: SparkSession,
     val columnProjection: CarbonProjection,
     var filterExpression: Expression,
@@ -73,8 +74,10 @@ class CarbonScanRDD(
     @transient serializedTableInfo: Array[Byte],
     @transient tableInfo: TableInfo,
     inputMetricsStats: InitInputMetrics,
-    @transient val partitionNames: Seq[PartitionSpec])
-  extends CarbonRDDWithTableInfo[InternalRow](spark.sparkContext, Nil, 
serializedTableInfo) {
+    @transient val partitionNames: Seq[PartitionSpec],
+    val dataTypeConverterClz: Class[_ <: DataTypeConverter] = 
classOf[SparkDataTypeConverterImpl],
+    val readSupportClz: Class[_ <: CarbonReadSupport[_]] = 
SparkReadSupport.readSupportClass)
+  extends CarbonRDDWithTableInfo[T](spark.sparkContext, Nil, 
serializedTableInfo) {
 
   private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() 
+ "")
   private val jobTrackerId: String = {
@@ -83,8 +86,6 @@ class CarbonScanRDD(
   }
   private var vectorReader = false
 
-  private val readSupport = SparkReadSupport.readSupportClass
-
   private val bucketedTable = tableInfo.getFactTable.getBucketingInfo
 
   @transient val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getName)
@@ -391,7 +392,7 @@ class CarbonScanRDD(
     new CarbonSparkPartition(id, partitionId, multiBlockSplit)
   }
 
-  override def internalCompute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+  override def internalCompute(split: Partition, context: TaskContext): 
Iterator[T] = {
     val queryStartTime = System.currentTimeMillis
     val carbonPropertiesFilePath = 
System.getProperty("carbon.properties.filepath", null)
     if (null == carbonPropertiesFilePath) {
@@ -415,7 +416,7 @@ class CarbonScanRDD(
       val reader: RecordReader[Void, Object] = inputSplit.getFileFormat match {
         case FileFormat.ROW_V1 =>
           // create record reader for row format
-          DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
+          DataTypeUtil.setDataTypeConverter(dataTypeConverterClz.newInstance())
           val inputFormat = new CarbonStreamInputFormat
           val streamReader = inputFormat.createRecordReader(inputSplit, 
attemptContext)
             .asInstanceOf[CarbonStreamRecordReader]
@@ -489,7 +490,7 @@ class CarbonScanRDD(
     }
 
 
-    iterator.asInstanceOf[Iterator[InternalRow]]
+    iterator.asInstanceOf[Iterator[T]]
   }
 
   private def close() {
@@ -520,12 +521,12 @@ class CarbonScanRDD(
   }
 
   private def prepareInputFormatForExecutor(conf: Configuration): 
CarbonInputFormat[Object] = {
-    CarbonInputFormat.setCarbonReadSupport(conf, readSupport)
+    CarbonInputFormat.setCarbonReadSupport(conf, readSupportClz)
     val tableInfo1 = getTableInfo
     CarbonInputFormat.setTableInfo(conf, tableInfo1)
     CarbonInputFormat.setDatabaseName(conf, tableInfo1.getDatabaseName)
     CarbonInputFormat.setTableName(conf, tableInfo1.getFactTable.getTableName)
-    CarbonInputFormat.setDataTypeConverter(conf, new 
SparkDataTypeConverterImpl)
+    CarbonInputFormat.setDataTypeConverter(conf, dataTypeConverterClz)
     createInputFormat(conf)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
index 8f550fb..c2c4ab3 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
@@ -114,6 +114,7 @@ class QueryTest extends PlanTest {
 
   val sqlContext: SQLContext = TestQueryExecutor.INSTANCE.sqlContext
 
+  lazy val warehouse = TestQueryExecutor.warehouse
   lazy val storeLocation = CarbonProperties.getInstance().
     getProperty(CarbonCommonConstants.STORE_LOCATION)
   val resourcesPath = TestQueryExecutor.resourcesPath

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index e4593be..46e1be0 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -40,6 +40,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-store-sdk</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
new file mode 100644
index 0000000..e29ee46
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{CarbonInputMetrics, SparkConf}
+import org.apache.spark.sql.CarbonSession._
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.common.annotations.InterfaceAudience
+import org.apache.carbondata.core.datastore.row.CarbonRow
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.hadoop.CarbonProjection
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+
+/**
+ * A CarbonStore implementation that uses Spark as underlying compute engine
+ * with CarbonData query optimization capability
+ */
+@InterfaceAudience.Internal
+private[store] class SparkCarbonStore extends MetaCachedCarbonStore {
+  private var session: SparkSession = _
+
+  /**
+   * Initialize SparkCarbonStore
+   * @param storeName store name
+   * @param storeLocation location to store data
+   */
+  def this(storeName: String, storeLocation: String) = {
+    this()
+    val sparkConf = new SparkConf(loadDefaults = true)
+    session = SparkSession.builder
+      .config(sparkConf)
+      .appName("SparkCarbonStore-" + storeName)
+      .config("spark.sql.warehouse.dir", storeLocation)
+      .getOrCreateCarbonSession()
+  }
+
+  @throws[IOException]
+  override def scan(
+      path: String,
+      projectColumns: Array[String]): java.util.Iterator[CarbonRow] = {
+    scan(path, projectColumns, null)
+  }
+
+  @throws[IOException]
+  override def scan(
+      path: String,
+      projectColumns: Array[String],
+      filter: Expression): java.util.Iterator[CarbonRow] = {
+    require(path != null)
+    require(projectColumns != null)
+    val table = getTable(path)
+    val rdd = new CarbonScanRDD[CarbonRow](
+      spark = session,
+      columnProjection = new CarbonProjection(projectColumns),
+      filterExpression = filter,
+      identifier = table.getAbsoluteTableIdentifier,
+      serializedTableInfo = table.getTableInfo.serialize,
+      tableInfo = table.getTableInfo,
+      inputMetricsStats = new CarbonInputMetrics,
+      partitionNames = null,
+      dataTypeConverterClz = null,
+      readSupportClz = classOf[CarbonRowReadSupport])
+    rdd.collect
+      .iterator
+      .asJava
+  }
+
+  @throws[IOException]
+  override def sql(sqlString: String): java.util.Iterator[CarbonRow] = {
+    val df = session.sql(sqlString)
+    df.rdd
+      .map(row => new CarbonRow(row.toSeq.toArray.asInstanceOf[Array[Object]]))
+      .collect()
+      .iterator
+      .asJava
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 46905b8..fc62ba0 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -36,7 +36,7 @@ import 
org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.expression.logical.AndExpression
 import org.apache.carbondata.hadoop.CarbonProjection
-import org.apache.carbondata.spark.rdd.CarbonScanRDD
+import org.apache.carbondata.spark.rdd.{CarbonScanRDD, SparkReadSupport}
 
 case class CarbonDatasourceHadoopRelation(
     sparkSession: SparkSession,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index d045ab3..1f65fce 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -28,9 +28,9 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode, ExpressionCanonicalizer}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution.{CodegenSupport, SparkPlan, 
UnaryExecNode}
-import org.apache.spark.sql.hive.{CarbonMetastoreTypes, CarbonRelation}
 import org.apache.spark.sql.optimizer.CarbonDecoderRelation
 import org.apache.spark.sql.types._
+import org.apache.spark.util.SparkTypeConverter
 
 import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
 import org.apache.carbondata.core.cache.dictionary.{Dictionary, 
DictionaryColumnUniqueIdentifier}
@@ -333,8 +333,8 @@ object CarbonDictionaryDecoder {
             !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
             !carbonDimension.isComplex()) {
           val newAttr = AttributeReference(a.name,
-            convertCarbonToSparkDataType(carbonDimension,
-              relation.get.carbonRelation.carbonRelation),
+            
SparkTypeConverter.convertCarbonToSparkDataType(carbonDimension.getColumnSchema,
+              relation.get.carbonRelation.carbonTable),
             a.nullable,
             a.metadata)(a.exprId).asInstanceOf[Attribute]
           newAttr
@@ -393,39 +393,6 @@ object CarbonDictionaryDecoder {
     }
   }
 
-  /**
-   * Converts from carbon datatype to corresponding spark datatype.
-   */
-  def convertCarbonToSparkDataType(carbonDimension: CarbonDimension,
-      relation: CarbonRelation): types.DataType = {
-    if (CarbonDataTypes.isDecimal(carbonDimension.getDataType)) {
-      val scale: Int = carbonDimension.getColumnSchema.getScale
-      val precision: Int = carbonDimension.getColumnSchema.getPrecision
-      if (scale == 0 && precision == 0) {
-        DecimalType(18, 2)
-      } else {
-        DecimalType(precision, scale)
-      }
-    } else if (CarbonDataTypes.isArrayType(carbonDimension.getDataType)) {
-      CarbonMetastoreTypes
-        .toDataType(s"array<${ 
relation.getArrayChildren(carbonDimension.getColName) }>")
-    } else if (CarbonDataTypes.isStructType(carbonDimension.getDataType)) {
-      CarbonMetastoreTypes
-        .toDataType(s"struct<${ 
relation.getStructChildren(carbonDimension.getColName) }>")
-    } else {
-      carbonDimension.getDataType match {
-        case CarbonDataTypes.STRING => StringType
-        case CarbonDataTypes.SHORT => ShortType
-        case CarbonDataTypes.INT => IntegerType
-        case CarbonDataTypes.LONG => LongType
-        case CarbonDataTypes.DOUBLE => DoubleType
-        case CarbonDataTypes.BOOLEAN => BooleanType
-        case CarbonDataTypes.TIMESTAMP => TimestampType
-        case CarbonDataTypes.DATE => DateType
-      }
-    }
-  }
-
   def getDictionaryColumnMapping(output: Seq[Attribute],
       relations: Seq[CarbonDecoderRelation],
       profile: CarbonProfile,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index a380308..7ee3038 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -26,7 +26,7 @@ import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.SparkSession.Builder
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.catalyst.plans.logical.{Command, LocalRelation, 
Union}
+import org.apache.spark.sql.catalyst.plans.logical.{Command, Union}
 import org.apache.spark.sql.hive.execution.command.CarbonSetCommand
 import org.apache.spark.sql.internal.{SessionState, SharedState}
 import org.apache.spark.sql.profiler.{Profiler, SQLStart}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 7123b93..59dd2e9 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -192,10 +192,10 @@ private[sql] class CarbonLateDecodeStrategy extends 
SparkStrategy {
       needDecode: ArrayBuffer[AttributeReference]):
   RDD[InternalRow] = {
     if (needDecode.nonEmpty) {
-      rdd.asInstanceOf[CarbonScanRDD].setVectorReaderSupport(false)
+      
rdd.asInstanceOf[CarbonScanRDD[InternalRow]].setVectorReaderSupport(false)
       getDecoderRDD(relation, needDecode, rdd, output)
     } else {
-      rdd.asInstanceOf[CarbonScanRDD]
+      rdd.asInstanceOf[CarbonScanRDD[InternalRow]]
         
.setVectorReaderSupport(supportBatchedDataSource(relation.relation.sqlContext, 
output))
       rdd
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index e9e93d2..0c69b9d 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -22,19 +22,18 @@ import scala.Array.canBuildFrom
 import scala.collection.JavaConverters._
 import scala.util.parsing.combinator.RegexParsers
 
-import org.apache.spark.sql.CarbonEnv
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.CarbonException
+import org.apache.spark.util.SparkTypeConverter
 
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, 
CarbonDimension}
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -49,45 +48,6 @@ case class CarbonRelation(
     carbonTable: CarbonTable)
   extends LeafNode with MultiInstanceRelation {
 
-  def recursiveMethod(dimName: String, childDim: CarbonDimension): String = {
-    childDim.getDataType.getName.toLowerCase match {
-      case "array" => s"${
-        childDim.getColName.substring(dimName.length + 1)
-      }:array<${ getArrayChildren(childDim.getColName) }>"
-      case "struct" => s"${
-        childDim.getColName.substring(dimName.length + 1)
-      }:struct<${ getStructChildren(childDim.getColName) }>"
-      case dType => s"${ childDim.getColName.substring(dimName.length + 1) 
}:${ dType }"
-    }
-  }
-
-  def getArrayChildren(dimName: String): String = {
-    metaData.carbonTable.getChildren(dimName).asScala.map(childDim => {
-      childDim.getDataType.getName.toLowerCase match {
-        case "array" => s"array<${ getArrayChildren(childDim.getColName) }>"
-        case "struct" => s"struct<${ getStructChildren(childDim.getColName) }>"
-        case dType => addDecimalScaleAndPrecision(childDim, dType)
-      }
-    }).mkString(",")
-  }
-
-  def getStructChildren(dimName: String): String = {
-    metaData.carbonTable.getChildren(dimName).asScala.map(childDim => {
-      childDim.getDataType.getName.toLowerCase match {
-        case "array" => s"${
-          childDim.getColName.substring(dimName.length + 1)
-        }:array<${ getArrayChildren(childDim.getColName) }>"
-        case "struct" => s"${
-          childDim.getColName.substring(dimName.length + 1)
-        }:struct<${ metaData.carbonTable.getChildren(childDim.getColName)
-          .asScala.map(f => s"${ recursiveMethod(childDim.getColName, f) 
}").mkString(",")
-        }>"
-        case dType => s"${ childDim.getColName
-          .substring(dimName.length() + 1) }:${ 
addDecimalScaleAndPrecision(childDim, dType) }"
-      }
-    }).mkString(",")
-  }
-
   override def newInstance(): LogicalPlan = {
     CarbonRelation(databaseName, tableName, metaData, carbonTable)
       .asInstanceOf[this.type]
@@ -102,9 +62,11 @@ case class CarbonRelation(
         .getDimensionByName(metaData.carbonTable.getTableName, dim.getColName)
       val output: DataType = dimval.getDataType.getName.toLowerCase match {
         case "array" =>
-          CarbonMetastoreTypes.toDataType(s"array<${ 
getArrayChildren(dim.getColName) }>")
+          CarbonMetastoreTypes.toDataType(
+            s"array<${SparkTypeConverter.getArrayChildren(carbonTable, 
dim.getColName)}>")
         case "struct" =>
-          CarbonMetastoreTypes.toDataType(s"struct<${ 
getStructChildren(dim.getColName) }>")
+          CarbonMetastoreTypes.toDataType(
+            s"struct<${SparkTypeConverter.getStructChildren(carbonTable, 
dim.getColName)}>")
         case dType =>
           val dataType = addDecimalScaleAndPrecision(dimval, dType)
           CarbonMetastoreTypes.toDataType(dataType)
@@ -142,11 +104,13 @@ case class CarbonRelation(
       if (column.isDimension()) {
         val output: DataType = column.getDataType.getName.toLowerCase match {
           case "array" =>
-            
CarbonMetastoreTypes.toDataType(s"array<${getArrayChildren(column.getColName)}>")
+            CarbonMetastoreTypes.toDataType(
+              s"array<${SparkTypeConverter.getArrayChildren(carbonTable, 
column.getColName)}>")
           case "struct" =>
-            
CarbonMetastoreTypes.toDataType(s"struct<${getStructChildren(column.getColName)}>")
+            CarbonMetastoreTypes.toDataType(
+              s"struct<${SparkTypeConverter.getStructChildren(carbonTable, 
column.getColName)}>")
           case dType =>
-            val dataType = addDecimalScaleAndPrecision(column, dType)
+            val dataType = 
SparkTypeConverter.addDecimalScaleAndPrecision(column, dType)
             CarbonMetastoreTypes.toDataType(dataType)
         }
         AttributeReference(column.getColName, output, nullable = true )(
@@ -165,15 +129,6 @@ case class CarbonRelation(
     }
   }
 
-  def addDecimalScaleAndPrecision(dimval: CarbonColumn, dataType: String): 
String = {
-    var dType = dataType
-    if (DataTypes.isDecimal(dimval.getDataType)) {
-      dType +=
-      "(" + dimval.getColumnSchema.getPrecision + "," + 
dimval.getColumnSchema.getScale + ")"
-    }
-    dType
-  }
-
   // TODO: Use data from the footers.
   // TODO For 2.1
   //  override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
new file mode 100644
index 0000000..65210b8
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.Objects
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.hive.CarbonMetastoreTypes
+import org.apache.spark.sql.types
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.metadata.datatype.{DataTypes => 
CarbonDataTypes}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, 
CarbonDimension, ColumnSchema}
+
+private[spark] object SparkTypeConverter {
+
+  def createSparkSchema(table: CarbonTable, columns: Seq[String]): StructType 
= {
+    Objects.requireNonNull(table)
+    Objects.requireNonNull(columns)
+    if (columns.isEmpty) {
+      throw new IllegalArgumentException("column list is empty")
+    }
+    val fields = new java.util.ArrayList[StructField](columns.size)
+    val allColumns = table.getTableInfo.getFactTable.getListOfColumns.asScala
+
+    // find the column and add it to fields array
+    columns.foreach { column =>
+      val col = 
allColumns.find(_.getColumnName.equalsIgnoreCase(column)).getOrElse(
+        throw new IllegalArgumentException(column + " does not exist")
+      )
+      fields.add(StructField(col.getColumnName, 
convertCarbonToSparkDataType(col, table)))
+    }
+    StructType(fields)
+  }
+
+  /**
+   * Converts from carbon datatype to corresponding spark datatype.
+   */
+  def convertCarbonToSparkDataType(
+      columnSchema: ColumnSchema,
+      table: CarbonTable): types.DataType = {
+    if (CarbonDataTypes.isDecimal(columnSchema.getDataType)) {
+      val scale = columnSchema.getScale
+      val precision = columnSchema.getPrecision
+      if (scale == 0 && precision == 0) {
+        DecimalType(18, 2)
+      } else {
+        DecimalType(precision, scale)
+      }
+    } else if (CarbonDataTypes.isArrayType(columnSchema.getDataType)) {
+      CarbonMetastoreTypes
+        .toDataType(s"array<${ getArrayChildren(table, 
columnSchema.getColumnName) }>")
+    } else if (CarbonDataTypes.isStructType(columnSchema.getDataType)) {
+      CarbonMetastoreTypes
+        .toDataType(s"struct<${ getStructChildren(table, 
columnSchema.getColumnName) }>")
+    } else {
+      columnSchema.getDataType match {
+        case CarbonDataTypes.STRING => StringType
+        case CarbonDataTypes.SHORT => ShortType
+        case CarbonDataTypes.INT => IntegerType
+        case CarbonDataTypes.LONG => LongType
+        case CarbonDataTypes.DOUBLE => DoubleType
+        case CarbonDataTypes.BOOLEAN => BooleanType
+        case CarbonDataTypes.TIMESTAMP => TimestampType
+        case CarbonDataTypes.DATE => DateType
+      }
+    }
+  }
+
+  def getArrayChildren(table: CarbonTable, dimName: String): String = {
+    table.getChildren(dimName).asScala.map(childDim => {
+      childDim.getDataType.getName.toLowerCase match {
+        case "array" => s"array<${ getArrayChildren(table, 
childDim.getColName) }>"
+        case "struct" => s"struct<${ getStructChildren(table, 
childDim.getColName) }>"
+        case dType => addDecimalScaleAndPrecision(childDim, dType)
+      }
+    }).mkString(",")
+  }
+
+  def getStructChildren(table: CarbonTable, dimName: String): String = {
+    table.getChildren(dimName).asScala.map(childDim => {
+      childDim.getDataType.getName.toLowerCase match {
+        case "array" => s"${
+          childDim.getColName.substring(dimName.length + 1)
+        }:array<${ getArrayChildren(table, childDim.getColName) }>"
+        case "struct" => s"${
+          childDim.getColName.substring(dimName.length + 1)
+        }:struct<${ table.getChildren(childDim.getColName)
+          .asScala.map(f => s"${ recursiveMethod(table, childDim.getColName, 
f) }").mkString(",")
+        }>"
+        case dType => s"${ childDim.getColName
+          .substring(dimName.length() + 1) }:${ 
addDecimalScaleAndPrecision(childDim, dType) }"
+      }
+    }).mkString(",")
+  }
+
+  def addDecimalScaleAndPrecision(dimval: CarbonColumn, dataType: String): 
String = {
+    var dType = dataType
+    if (CarbonDataTypes.isDecimal(dimval.getDataType)) {
+      dType +=
+      "(" + dimval.getColumnSchema.getPrecision + "," + 
dimval.getColumnSchema.getScale + ")"
+    }
+    dType
+  }
+
+  private def recursiveMethod(
+      table: CarbonTable, dimName: String, childDim: CarbonDimension) = {
+    childDim.getDataType.getName.toLowerCase match {
+      case "array" => s"${
+        childDim.getColName.substring(dimName.length + 1)
+      }:array<${ getArrayChildren(table, childDim.getColName) }>"
+      case "struct" => s"${
+        childDim.getColName.substring(dimName.length + 1)
+      }:struct<${ getStructChildren(table, childDim.getColName) }>"
+      case dType => s"${ childDim.getColName.substring(dimName.length + 1) 
}:${ dType }"
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/integration/spark2/src/test/scala/org/apache/carbondata/store/SparkCarbonStoreTest.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/carbondata/store/SparkCarbonStoreTest.scala
 
b/integration/spark2/src/test/scala/org/apache/carbondata/store/SparkCarbonStoreTest.scala
new file mode 100644
index 0000000..d389670
--- /dev/null
+++ 
b/integration/spark2/src/test/scala/org/apache/carbondata/store/SparkCarbonStoreTest.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store
+
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
+import org.apache.carbondata.core.scan.expression.{ColumnExpression, 
LiteralExpression}
+
+class SparkCarbonStoreTest extends QueryTest with BeforeAndAfterAll {
+
+  private var store: CarbonStore = _
+
+  override def beforeAll {
+    sql("DROP TABLE IF EXISTS t1")
+    sql("CREATE TABLE t1 (" +
+        "empno int, empname String, designation String, doj Timestamp, " +
+        "workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String," +
+        "projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp," +
+        "attendance int,utilization int,salary int)" +
+        "STORED BY 'org.apache.carbondata.format'")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE t1 
OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+
+    store = new SparkCarbonStore("test", storeLocation)
+  }
+
+  test("test CarbonStore.get, compare projection result") {
+    val tablePath = CarbonEnv.getCarbonTable(None, 
"t1")(sqlContext.sparkSession).getTablePath
+    val rows = store.scan(s"$tablePath", Seq("empno", "empname").toArray)
+    val sparkResult: Array[Row] = sql("select empno, empname from 
t1").collect()
+    sparkResult.zipWithIndex.foreach { case (r: Row, i: Int) =>
+      val carbonRow = rows.next()
+      assertResult(r.get(0))(carbonRow.getData()(0))
+      assertResult(r.get(1))(carbonRow.getData()(1))
+    }
+    assert(!rows.hasNext)
+  }
+
+  test("test CarbonStore.get, compare projection and filter result") {
+    val tablePath = CarbonEnv.getCarbonTable(None, 
"t1")(sqlContext.sparkSession).getTablePath
+    val filter = new EqualToExpression(
+      new ColumnExpression("empno", DataTypes.INT),
+      new LiteralExpression(10, DataTypes.INT))
+    val rows = store.scan(s"$tablePath", Seq("empno", "empname").toArray, 
filter)
+    val sparkResult: Array[Row] = sql("select empno, empname from t1 where 
empno = 10").collect()
+    sparkResult.zipWithIndex.foreach { case (r: Row, i: Int) =>
+      val carbonRow = rows.next()
+      assertResult(r.get(0))(carbonRow.getData()(0))
+      assertResult(r.get(1))(carbonRow.getData()(1))
+    }
+    assert(!rows.hasNext)
+  }
+
+  test("test CarbonStore.sql") {
+    val rows = store.sql("select empno, empname from t1 where empno = 10")
+    val sparkResult: Array[Row] = sql("select empno, empname from t1 where 
empno = 10").collect()
+    sparkResult.zipWithIndex.foreach { case (r: Row, i: Int) =>
+      val carbonRow = rows.next()
+      assertResult(r.get(0))(carbonRow.getData()(0))
+      assertResult(r.get(1))(carbonRow.getData()(1))
+    }
+    assert(!rows.hasNext)
+  }
+
+  override def afterAll {
+    sql("DROP TABLE IF EXISTS t1")
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/store/sdk/pom.xml
----------------------------------------------------------------------
diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml
index 1d1735e..d50c024 100644
--- a/store/sdk/pom.xml
+++ b/store/sdk/pom.xml
@@ -35,6 +35,11 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/store/sdk/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java 
b/store/sdk/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java
new file mode 100644
index 0000000..bafbb9f
--- /dev/null
+++ 
b/store/sdk/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import 
org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
+
+/**
+ * ReadSupport that convert row object to CarbonRow
+ */
+@InterfaceAudience.Internal
+public class CarbonRowReadSupport implements CarbonReadSupport<CarbonRow> {
+  private CarbonReadSupport<Object[]> delegate;
+
+  public CarbonRowReadSupport() {
+    this.delegate = new DictionaryDecodeReadSupport<>();
+  }
+
+  @Override public void initialize(CarbonColumn[] carbonColumns, CarbonTable 
carbonTable)
+      throws IOException {
+    delegate.initialize(carbonColumns, carbonTable);
+  }
+
+  @Override public CarbonRow readRow(Object[] data) {
+    Object[] converted = delegate.readRow(data);
+    return new CarbonRow(converted);
+  }
+
+  @Override public void close() {
+    delegate.close();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/store/sdk/src/main/java/org/apache/carbondata/store/CarbonStore.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/store/CarbonStore.java 
b/store/sdk/src/main/java/org/apache/carbondata/store/CarbonStore.java
new file mode 100644
index 0000000..c6b2fb8
--- /dev/null
+++ b/store/sdk/src/main/java/org/apache/carbondata/store/CarbonStore.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.scan.expression.Expression;
+
+/**
+ * User can use {@link CarbonStore} to query data
+ */
+@InterfaceAudience.User
+@InterfaceStability.Unstable
+public interface CarbonStore extends Closeable {
+
+  /**
+   * Scan query on the data in the table path
+   * @param path table path
+   * @param projectColumns column names to read
+   * @return rows
+   * @throws IOException if unable to read files in table path
+   */
+  Iterator<CarbonRow> scan(
+      String path,
+      String[] projectColumns) throws IOException;
+
+  /**
+   * Scan query with filter, on the data in the table path
+   * @param path table path
+   * @param projectColumns column names to read
+   * @param filter filter condition, can be null
+   * @return rows that satisfy filter condition
+   * @throws IOException if unable to read files in table path
+   */
+  Iterator<CarbonRow> scan(
+      String path,
+      String[] projectColumns,
+      Expression filter) throws IOException;
+
+  /**
+   * SQL query, table should be created before calling this function
+   * @param sqlString SQL statement
+   * @return rows
+   * @throws IOException if unable to read files in table path
+   */
+  Iterator<CarbonRow> sql(String sqlString) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java 
b/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
new file mode 100644
index 0000000..394ffea
--- /dev/null
+++ b/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.hadoop.CarbonProjection;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+/**
+ * A CarbonStore implementation that works locally, without other compute 
framework dependency.
+ * It can be used to read data in local disk.
+ *
+ * Note that this class is experimental, it is not intended to be used in 
production.
+ */
+@InterfaceAudience.Internal
+class LocalCarbonStore extends MetaCachedCarbonStore {
+
+  @Override
+  public Iterator<CarbonRow> scan(String path, String[] projectColumns) throws 
IOException {
+    return scan(path, projectColumns, null);
+  }
+
+  @Override
+  public Iterator<CarbonRow> scan(String path, String[] projectColumns, 
Expression filter)
+      throws IOException {
+    Objects.requireNonNull(path);
+    Objects.requireNonNull(projectColumns);
+
+    CarbonTable table = getTable(path);
+    if (table.isStreamingTable() || table.isHivePartitionTable()) {
+      throw new UnsupportedOperationException("streaming and partition table 
is not supported");
+    }
+    // TODO: use InputFormat to prune data and read data
+
+    final CarbonTableInputFormat format = new CarbonTableInputFormat();
+    final Job job = new Job(new Configuration());
+    CarbonInputFormat.setTableInfo(job.getConfiguration(), 
table.getTableInfo());
+    CarbonInputFormat.setTablePath(job.getConfiguration(), 
table.getTablePath());
+    CarbonInputFormat.setTableName(job.getConfiguration(), 
table.getTableName());
+    CarbonInputFormat.setDatabaseName(job.getConfiguration(), 
table.getDatabaseName());
+    CarbonInputFormat.setCarbonReadSupport(job.getConfiguration(), 
CarbonRowReadSupport.class);
+    CarbonInputFormat.setColumnProjection(
+        job.getConfiguration(), new CarbonProjection(projectColumns));
+    if (filter != null) {
+      CarbonInputFormat.setFilterPredicates(job.getConfiguration(), filter);
+    }
+
+    final List<InputSplit> splits =
+        format.getSplits(new JobContextImpl(job.getConfiguration(), new 
JobID()));
+
+    List<RecordReader<Void, Object>> readers = new ArrayList<>(splits.size());
+
+    try {
+      for (InputSplit split : splits) {
+        TaskAttemptContextImpl attempt =
+            new TaskAttemptContextImpl(job.getConfiguration(), new 
TaskAttemptID());
+        RecordReader reader = format.createRecordReader(split, attempt);
+        reader.initialize(split, attempt);
+        readers.add(reader);
+      }
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+
+    List<CarbonRow> rows = new ArrayList<>();
+    try {
+      for (RecordReader<Void, Object> reader : readers) {
+        while (reader.nextKeyValue()) {
+          rows.add((CarbonRow)reader.getCurrentValue());
+        }
+      }
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+    return rows.iterator();
+  }
+
+  @Override
+  public Iterator<CarbonRow> sql(String sqlString) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java
 
b/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java
new file mode 100644
index 0000000..d847e67
--- /dev/null
+++ 
b/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.sdk.file.CarbonReader;
+
+/**
+ * A CarbonStore base class that caches CarbonTable object
+ */
+@InterfaceAudience.Internal
+abstract class MetaCachedCarbonStore implements CarbonStore {
+
+  // mapping of table path to CarbonTable object
+  private Map<String, CarbonTable> cache = new HashMap<>();
+
+  CarbonTable getTable(String path) throws IOException {
+    if (cache.containsKey(path)) {
+      return cache.get(path);
+    }
+    TableInfo schema = 
CarbonReader.readSchemaFile(CarbonTablePath.getSchemaFilePath(path));
+    schema.setTablePath(path);
+    CarbonTable table = CarbonTable.buildFromTableInfo(schema);
+    cache.put(path, table);
+    return table;
+  }
+
+  @Override
+  public void close() throws IOException {
+    cache.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
index dcedf10..068164d 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
@@ -27,7 +27,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import org.junit.Assert;
 
-class TestUtil {
+public class TestUtil {
 
   static void writeFilesAndVerify(Schema schema, String path) {
     writeFilesAndVerify(schema, path, null);
@@ -37,7 +37,7 @@ class TestUtil {
     writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1);
   }
 
-  static void writeFilesAndVerify(Schema schema, String path, boolean 
persistSchema) {
+  public static void writeFilesAndVerify(Schema schema, String path, boolean 
persistSchema) {
     writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb8ed9f/store/sdk/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java 
b/store/sdk/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
new file mode 100644
index 0000000..a5b5edc
--- /dev/null
+++ 
b/store/sdk/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.sdk.file.Field;
+import org.apache.carbondata.sdk.file.Schema;
+import org.apache.carbondata.sdk.file.TestUtil;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Test;
+
+public class LocalCarbonStoreTest {
+
+  // TODO: complete this testcase
+  // Currently result rows are empty, because SDK is not writing table status 
file
+  // so that reader does not find any segment.
+  // Complete this testcase after flat folder reader is done.
+  @Test
+  public void testWriteAndReadFiles() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    TestUtil.writeFilesAndVerify(new Schema(fields), path, true);
+
+    CarbonStore store = new LocalCarbonStore();
+    Iterator<CarbonRow> rows = store.scan(path, new String[]{"name, age"}, 
null);
+
+    while (rows.hasNext()) {
+      CarbonRow row = rows.next();
+      System.out.println(row.toString());
+    }
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+}

Reply via email to