Repository: incubator-carbondata
Updated Branches:
  refs/heads/master db2c5f917 -> 68a16d2c9


Fixed insert with select query when functions are used in query

reverted example

Fixed testcase


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/529c06d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/529c06d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/529c06d2

Branch: refs/heads/master
Commit: 529c06d22d715249cf2e42db3420af698f0f917a
Parents: db2c5f9
Author: ravipesala <ravi.pes...@gmail.com>
Authored: Thu Jan 19 06:30:24 2017 +0530
Committer: jackylk <jacky.li...@huawei.com>
Committed: Thu Jan 19 23:19:26 2017 +0800

----------------------------------------------------------------------
 .../examples/CarbonSessionExample.scala         | 19 +++++-
 .../src/test/resources/data_with_all_types.csv  | 10 ++++
 .../InsertIntoCarbonTableTestCase.scala         | 62 ++++++++++++++++----
 .../execution/CarbonLateDecodeStrategy.scala    | 45 +++++++-------
 4 files changed, 102 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/529c06d2/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
index 1d485cd..0d9c43f 100644
--- 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
+++ 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
@@ -19,6 +19,7 @@ package org.apache.carbondata.examples
 
 import java.io.File
 
+import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.SparkSession
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -31,10 +32,19 @@ object CarbonSessionExample {
                             + "../../../..").getCanonicalPath
     val storeLocation = s"$rootPath/examples/spark2/target/store"
     val warehouse = s"$rootPath/examples/spark2/target/warehouse"
-    val metastoredb = s"$rootPath/examples/spark2/target"
+    val metastoredb = s"$rootPath/examples/spark2/target/metastore_db"
+
+    // clean data folder
+    if (true) {
+      val clean = (path: String) => FileUtils.deleteDirectory(new File(path))
+      clean(storeLocation)
+      clean(warehouse)
+      clean(metastoredb)
+    }
 
     CarbonProperties.getInstance()
       .addProperty("carbon.kettle.home", s"$rootPath/processing/carbonplugins")
+      .addProperty("carbon.storelocation", storeLocation)
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
 
     import org.apache.spark.sql.CarbonSession._
@@ -42,9 +52,12 @@ object CarbonSessionExample {
     val spark = SparkSession
       .builder()
       .master("local")
-      .appName("CarbonSessionExample")
+      .appName("CarbonExample")
+      .enableHiveSupport()
       .config("spark.sql.warehouse.dir", warehouse)
-      .getOrCreateCarbonSession(storeLocation, metastoredb)
+      .config("javax.jdo.option.ConnectionURL",
+    s"jdbc:derby:;databaseName=$metastoredb;create=true")
+      .getOrCreateCarbonSession()
 
     spark.sparkContext.setLogLevel("WARN")
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/529c06d2/integration/spark-common-test/src/test/resources/data_with_all_types.csv
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/resources/data_with_all_types.csv 
b/integration/spark-common-test/src/test/resources/data_with_all_types.csv
new file mode 100644
index 0000000..5efff00
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/data_with_all_types.csv
@@ -0,0 +1,10 @@
+1,10,100,48.4,spark,2015/4/23 12:01:01,1.23,2015/4/23 11:01:01,aaa,2.5
+5,17,140,43.4,spark,2015/7/27 12:01:02,3.45,2015/7/27 11:01:02,bbb,2.5
+1,11,100,44.4,flink,2015/5/23 12:01:03,23.23,2015/5/23 11:01:03,ccc,2.5
+1,10,150,43.4,spark,2015/7/24 12:01:04,254.12,2015/7/24 11:01:04,ddd,2.5
+1,10,100,47.4,spark,2015/7/23 12:01:05,876.14,2015/7/23 11:01:05,eeee,3.5
+3,14,160,43.4,hive,2015/7/26 12:01:06,3454.32,2015/7/26 11:01:06,ff,2.5
+2,10,100,43.4,impala,2015/7/23 12:01:07,456.98,2015/7/23 11:01:07,ggg,2.5
+1,10,100,43.4,spark,2015/5/23 12:01:08,32.53,2015/5/23 11:01:08,hhh,2.5
+4,16,130,42.4,impala,2015/7/23 12:01:09,67.23,2015/7/23 11:01:09,iii,2.5
+1,10,100,43.4,spark,2015/7/23 12:01:10,832.23,2015/7/23 11:01:10,jjj,2.5

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/529c06d2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
index 3cb3520..db85393 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
@@ -113,16 +113,58 @@ class InsertIntoCarbonTableTestCase extends QueryTest 
with BeforeAndAfterAll {
          sql("select imei,deviceInformationId,MAC from TCarbonLocal")
      )
   }
-//  test("insert->insert empty data -pass") {
-//     sql("drop table if exists TCarbon")
-//     sql("create table TCarbon (imei string,deviceInformationId int,MAC 
string) STORED BY 'org.apache.carbondata.format'")
-//     sql("insert into TCarbon select imei,deviceInformationId,MAC from THive 
where MAC='wrongdata'")
-//     val result = sql("select imei,deviceInformationId,MAC from TCarbon 
where MAC='wrongdata'").collect()
-//     checkAnswer(
-//         sql("select imei,deviceInformationId,MAC from THive where 
MAC='wrongdata'"),
-//         sql("select imei,deviceInformationId,MAC from TCarbon where 
MAC='wrongdata'")
-//     )
-//  }
+
+  test("insert->insert with functions") {
+    sql("DROP TABLE IF EXISTS carbon_table")
+    sql("DROP TABLE IF EXISTS carbon_table1")
+    // Create table
+    sql(
+      s"""
+         | CREATE TABLE carbon_table(
+         |    shortField smallint,
+         |    intField int,
+         |    bigintField bigint,
+         |    doubleField double,
+         |    stringField string,
+         |    timestampField timestamp,
+         |    decimalField decimal(18,2),
+         |    dateField date,
+         |    charField string,
+         |    floatField float
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    sql(
+      s"""
+         | CREATE TABLE carbon_table1(
+         |    shortField smallint,
+         |    intField int,
+         |    bigintField bigint,
+         |    doubleField double,
+         |    stringField string,
+         |    timestampField timestamp,
+         |    decimalField decimal(18,2),
+         |    dateField date,
+         |    charField string,
+         |    floatField float
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '${resourcesPath + 
"/data_with_all_types.csv"}'
+         | INTO TABLE carbon_table
+         | 
options('FILEHEADER'='shortField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField')
+       """.stripMargin)
+
+    sql("""insert into table carbon_table1 select 
shortField,intField,bigintField,doubleField,ASCII(stringField),
+                timestampField,decimalField,dateField,charField,floatField 
from carbon_table
+              """).show
+  }
+
   test("insert into existing load-pass") {
     val timeStampPropOrig = 
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT)
      CarbonProperties.getInstance()

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/529c06d2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 6f913a8..11166bf 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -223,7 +223,7 @@ private[sql] class CarbonLateDecodeStrategy extends 
SparkStrategy {
         pushedFilters,
         metadata,
         needDecoder,
-        updateRequestedColumns)
+        updateRequestedColumns.asInstanceOf[Seq[Attribute]])
       filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
     } else {
       // Don't request columns that are only referenced by pushed filters.
@@ -231,15 +231,16 @@ private[sql] class CarbonLateDecodeStrategy extends 
SparkStrategy {
       (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq
       val updateRequestedColumns = 
updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
       val scan = getDataSourceScan(relation,
-        updateRequestedColumns,
+        updateRequestedColumns.asInstanceOf[Seq[Attribute]],
         scanBuilder,
         candidatePredicates,
         pushedFilters,
         metadata,
         needDecoder,
-        updateRequestedColumns)
+        updateRequestedColumns.asInstanceOf[Seq[Attribute]])
       execution.ProjectExec(
-        projects, filterCondition.map(execution.FilterExec(_, 
scan)).getOrElse(scan))
+        updateRequestedColumnsFunc(projects, table, 
needDecoder).asInstanceOf[Seq[NamedExpression]],
+        filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
     }
   }
 
@@ -251,7 +252,7 @@ private[sql] class CarbonLateDecodeStrategy extends 
SparkStrategy {
       pushedFilters: Seq[Filter],
       metadata: Map[String, String],
       needDecoder: ArrayBuffer[AttributeReference],
-      updateRequestedColumns: Seq[AttributeReference]): DataSourceScanExec = {
+      updateRequestedColumns: Seq[Attribute]): DataSourceScanExec = {
     val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
     if (supportBatchedDataSource(relation.relation.sqlContext, 
updateRequestedColumns) &&
         needDecoder.isEmpty) {
@@ -272,29 +273,31 @@ private[sql] class CarbonLateDecodeStrategy extends 
SparkStrategy {
     }
   }
 
-  def updateRequestedColumnsFunc(requestedColumns: Seq[AttributeReference],
+  def updateRequestedColumnsFunc(requestedColumns: Seq[Expression],
       relation: CarbonDatasourceHadoopRelation,
-      needDecoder: ArrayBuffer[AttributeReference]): Seq[AttributeReference] = 
{
+      needDecoder: ArrayBuffer[AttributeReference]): Seq[Expression] = {
     val map = relation.carbonRelation.metaData.dictionaryMap
-    requestedColumns.map { attr =>
-      if (needDecoder.exists(_.name.equalsIgnoreCase(attr.name))) {
-        attr
-      } else {
-        val dict = map.get(attr.name)
-        if (dict.isDefined && dict.get) {
-          AttributeReference(attr.name,
-            IntegerType,
-            attr.nullable,
-            attr.metadata)(attr.exprId, attr.qualifier)
-        } else {
+    requestedColumns.map {
+      case attr: AttributeReference =>
+        if (needDecoder.exists(_.name.equalsIgnoreCase(attr.name))) {
           attr
+        } else {
+          val dict = map.get(attr.name)
+          if (dict.isDefined && dict.get) {
+            AttributeReference(attr.name,
+              IntegerType,
+              attr.nullable,
+              attr.metadata)(attr.exprId, attr.qualifier)
+          } else {
+            attr
+          }
         }
-      }
+      case others => others
     }
   }
 
   private def getPartitioning(carbonTable: CarbonTable,
-      output: Seq[AttributeReference]): Partitioning = {
+      output: Seq[Attribute]): Partitioning = {
     val info: BucketingInfo = 
carbonTable.getBucketingInfo(carbonTable.getFactTableName)
     if (info != null) {
       val cols = info.getListOfColumns.asScala
@@ -304,7 +307,7 @@ private[sql] class CarbonLateDecodeStrategy extends 
SparkStrategy {
       val bucketColumns = cols.flatMap { n =>
         val attrRef = output.find(_.name.equalsIgnoreCase(n.getColumnName))
         attrRef match {
-          case Some(attr) =>
+          case Some(attr: AttributeReference) =>
             Some(AttributeReference(attr.name,
               CarbonScalaUtil.convertCarbonToSparkDataType(n.getDataType),
               attr.nullable,

Reply via email to