Repository: incubator-carbondata
Updated Branches:
  refs/heads/master a011aafb0 -> 7788f468c


inserInto without kettle for spark2

fix comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/498cf982
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/498cf982
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/498cf982

Branch: refs/heads/master
Commit: 498cf982995feba7012ce3993b3fd5172d2e5a15
Parents: a011aaf
Author: QiangCai <qiang...@qq.com>
Authored: Mon Dec 19 11:01:40 2016 +0800
Committer: jackylk <jacky.li...@huawei.com>
Committed: Wed Dec 28 14:45:18 2016 +0800

----------------------------------------------------------------------
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 132 +++++++--------
 .../readsupport/SparkRowReadSupportImpl.java    |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  22 +--
 .../emptyrow/TestCSVHavingOnlySpaceChar.scala   |   1 -
 .../testsuite/emptyrow/TestEmptyRows.scala      |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  36 +++--
 .../apache/spark/sql/hive/CarbonMetastore.scala |   1 -
 .../InsertIntoCarbonTableTestCase.scala         | 162 +++++++++++++++++++
 .../carbondata/CarbonDataSourceSuite.scala      |   1 +
 .../sql/common/util/CarbonSessionTest.scala     |   0
 .../store/CarbonFactDataHandlerColumnar.java    |  29 +++-
 .../store/SingleThreadFinalSortFilesMerger.java |   2 +-
 .../store/writer/AbstractFactDataWriter.java    |  19 ++-
 13 files changed, 287 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 96bb5ed..64b8b61 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -18,20 +18,23 @@
 package org.apache.carbondata.spark.rdd
 
 import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
+import java.nio.ByteBuffer
 import java.text.SimpleDateFormat
 import java.util
 import java.util.{Date, UUID}
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.spark.{Partition, SparkContext, TaskContext}
-import org.apache.spark.rdd.RDD
+import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionWrap, RDD}
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.execution.command.Partitioner
+import org.apache.spark.util.SparkUtil
 
 import org.apache.carbondata.common.CarbonIterator
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -41,12 +44,13 @@ import org.apache.carbondata.core.load.{BlockDetails, 
LoadMetadataDetails}
 import org.apache.carbondata.core.util.{CarbonProperties, 
CarbonTimeStatisticsFactory}
 import org.apache.carbondata.hadoop.csv.CSVInputFormat
 import org.apache.carbondata.hadoop.csv.recorditerator.RecordReaderIterator
+import org.apache.carbondata.processing.csvreaderstep.JavaRddIterator
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.processing.newflow.DataLoadExecutor
 import 
org.apache.carbondata.processing.newflow.exception.BadRecordFoundException
 import org.apache.carbondata.spark.DataLoadResult
 import org.apache.carbondata.spark.splits.TableSplit
-import org.apache.carbondata.spark.util.CarbonQueryUtil
+import org.apache.carbondata.spark.util.{CarbonQueryUtil, CarbonScalaUtil}
 
 class SerializableConfiguration(@transient var value: Configuration) extends 
Serializable {
 
@@ -323,7 +327,7 @@ class NewDataFrameLoaderRDD[K, V](
                                    loadCount: Integer,
                                    tableCreationTime: Long,
                                    schemaLastUpdatedTime: Long,
-                                   prev: RDD[Row]) extends RDD[(K, V)](prev) {
+                                   prev: DataLoadCoalescedRDD[Row]) extends 
RDD[(K, V)](prev) {
 
 
   override def compute(theSplit: Partition, context: TaskContext): 
Iterator[(K, V)] = {
@@ -342,29 +346,25 @@ class NewDataFrameLoaderRDD[K, V](
         carbonLoadModel.setSegmentId(String.valueOf(loadCount))
         carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
 
-        val iterator = new NewRddIterator(
-          firstParent[Row].iterator(theSplit, context),
-          carbonLoadModel)
-
-        class CarbonIteratorImpl(iterator: util.Iterator[Array[AnyRef]])
-          extends CarbonIterator[Array[AnyRef]] {
-          override def initialize(): Unit = {}
-
-          override def close(): Unit = {}
-
-          override def next(): Array[AnyRef] = {
-            iterator.next
-          }
-
-          override def hasNext: Boolean = {
-            iterator.hasNext
+        val recordReaders = mutable.Buffer[CarbonIterator[Array[AnyRef]]]()
+        val partitionIterator = 
firstParent[DataLoadPartitionWrap[Row]].iterator(theSplit, context)
+        val serializer = SparkEnv.get.closureSerializer.newInstance()
+        var serializeBuffer: ByteBuffer = null
+        while(partitionIterator.hasNext) {
+          val value = partitionIterator.next()
+          val newInstance = {
+            if (serializeBuffer == null) {
+              serializeBuffer = serializer.serialize[RDD[Row]](value.rdd)
+            }
+            serializeBuffer.rewind()
+            serializer.deserialize[RDD[Row]](serializeBuffer)
           }
+          recordReaders += new CarbonIteratorImpl(
+            new NewRddIterator(newInstance.iterator(value.partition, context),
+              carbonLoadModel,
+              context))
         }
 
-
-        val recordReaders: Array[CarbonIterator[Array[AnyRef]]] =
-          Array(new CarbonIteratorImpl(iterator))
-
         val loader = new SparkPartitionLoader(model,
           theSplit.index,
           null,
@@ -375,7 +375,7 @@ class NewDataFrameLoaderRDD[K, V](
         loader.initialize()
 
         
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
-        new DataLoadExecutor().execute(model, loader.storeLocation, 
recordReaders)
+        new DataLoadExecutor().execute(model, loader.storeLocation, 
recordReaders.toArray)
 
       } catch {
         case e: BadRecordFoundException =>
@@ -402,76 +402,52 @@ class NewDataFrameLoaderRDD[K, V](
 
 /**
  * This class wrap Scala's Iterator to Java's Iterator.
- * It also convert all columns to string data since carbondata will recognize 
the right type
- * according to schema from spark DataFrame.
- * @see org.apache.carbondata.spark.rdd.RddIterator
+ * It also convert all columns to string data to use csv data loading flow.
+ *
  * @param rddIter
  * @param carbonLoadModel
+ * @param context
  */
 class NewRddIterator(rddIter: Iterator[Row],
-                     carbonLoadModel: CarbonLoadModel) extends 
java.util.Iterator[Array[AnyRef]] {
+    carbonLoadModel: CarbonLoadModel,
+    context: TaskContext) extends JavaRddIterator[Array[AnyRef]] {
+
   val formatString = 
CarbonProperties.getInstance().getProperty(CarbonCommonConstants
     .CARBON_TIMESTAMP_FORMAT, 
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
   val format = new SimpleDateFormat(formatString)
   val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
   val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
-
+  val serializationNullFormat =
+    
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 
2)(1)
   def hasNext: Boolean = rddIter.hasNext
 
-  private def getString(value: Any, level: Int = 1): String = {
-    if (value == null) {
-      ""
-    } else {
-      value match {
-        case s: String => s
-        case i: java.lang.Integer => i.toString
-        case d: java.lang.Double => d.toString
-        case t: java.sql.Timestamp => format format t
-        case d: java.sql.Date => format format d
-        case d: java.math.BigDecimal => d.toPlainString
-        case b: java.lang.Boolean => b.toString
-        case s: java.lang.Short => s.toString
-        case f: java.lang.Float => f.toString
-        case bs: Array[Byte] => new String(bs)
-        case s: scala.collection.Seq[Any] =>
-          val delimiter = if (level == 1) {
-            delimiterLevel1
-          } else {
-            delimiterLevel2
-          }
-          val builder = new StringBuilder()
-          s.foreach { x =>
-            builder.append(getString(x, level + 1)).append(delimiter)
-          }
-          builder.substring(0, builder.length - 1)
-        case m: scala.collection.Map[Any, Any] =>
-          throw new Exception("Unsupported data type: Map")
-        case r: org.apache.spark.sql.Row =>
-          val delimiter = if (level == 1) {
-            delimiterLevel1
-          } else {
-            delimiterLevel2
-          }
-          val builder = new StringBuilder()
-          for (i <- 0 until r.length) {
-            builder.append(getString(r(i), level + 1)).append(delimiter)
-          }
-          builder.substring(0, builder.length - 1)
-        case other => other.toString
-      }
-    }
-  }
-
   def next: Array[AnyRef] = {
     val row = rddIter.next()
-    val columns = new Array[Object](row.length)
-    for (i <- 0 until row.length) {
-      columns(i) = getString(row(i))
+    val columns = new Array[AnyRef](row.length)
+    for (i <- 0 until columns.length) {
+      columns(i) = CarbonScalaUtil.getString(row.get(i), 
serializationNullFormat,
+        delimiterLevel1, delimiterLevel2, format)
     }
     columns
   }
 
-  def remove(): Unit = {
+  def initialize: Unit = {
+    SparkUtil.setTaskContext(context)
   }
 
 }
+
+class CarbonIteratorImpl(iterator: NewRddIterator)
+  extends CarbonIterator[Array[AnyRef]] {
+  override def initialize(): Unit = iterator.initialize
+
+  override def close(): Unit = {}
+
+  override def next(): Array[AnyRef] = {
+    iterator.next
+  }
+
+  override def hasNext: Boolean = {
+    iterator.hasNext
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
 
b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
index 68f923d..46e5244 100644
--- 
a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ 
b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
@@ -71,7 +71,7 @@ public class SparkRowReadSupportImpl extends 
AbstractDictionaryDecodedReadSuppor
       else if (carbonColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
         //convert the long to timestamp in case of direct dictionary column
         if (DataType.TIMESTAMP == carbonColumns[i].getDataType()) {
-          data[i] = new Timestamp((long) data[i]);
+          data[i] = new Timestamp((long) data[i] / 1000L);
         } else if (DataType.DATE == carbonColumns[i].getDataType()) {
           data[i] = new Date((long) data[i]);
         }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index ff7bf23..d975502 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -635,15 +635,14 @@ object CarbonDataRDDFactory {
         try {
           val rdd = dataFrame.get.rdd
 
-          if (useKettle) {
-
-            val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]]{ 
p =>
-              DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
-            }.distinct.size
-            val nodes = 
DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData,
-              sqlContext.sparkContext)
-            val newRdd = new DataLoadCoalescedRDD[Row](rdd, 
nodes.toArray.distinct)
+          val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]]{ p 
=>
+            DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
+          }.distinct.size
+          val nodes = 
DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData,
+            sqlContext.sparkContext)
+          val newRdd = new DataLoadCoalescedRDD[Row](rdd, 
nodes.toArray.distinct)
 
+          if (useKettle) {
             status = new DataFrameLoaderRDD(sqlContext.sparkContext,
               new DataLoadResultImpl(),
               carbonLoadModel,
@@ -655,18 +654,13 @@ object CarbonDataRDDFactory {
               schemaLastUpdatedTime,
               newRdd).collect()
           } else {
-
-            var numPartitions = 
DistributionUtil.getNodeList(sqlContext.sparkContext).length
-            numPartitions = Math.max(1, Math.min(numPartitions, 
rdd.partitions.length))
-            val coalesceRdd = rdd.coalesce(numPartitions, shuffle = false)
-
             status = new NewDataFrameLoaderRDD(sqlContext.sparkContext,
               new DataLoadResultImpl(),
               carbonLoadModel,
               currentLoadCount,
               tableCreationTime,
               schemaLastUpdatedTime,
-              coalesceRdd).collect()
+              newRdd).collect()
           }
 
         } catch {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestCSVHavingOnlySpaceChar.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestCSVHavingOnlySpaceChar.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestCSVHavingOnlySpaceChar.scala
index 82d6fdf..06cfadf 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestCSVHavingOnlySpaceChar.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestCSVHavingOnlySpaceChar.scala
@@ -61,7 +61,6 @@ class TestCSVHavingOnlySpaceChar extends QueryTest with 
BeforeAndAfterAll {
 
   override def afterAll {
     sql("drop table emptyRowCarbonTable")
-    sql("drop table emptyRowHiveTable")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestEmptyRows.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestEmptyRows.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestEmptyRows.scala
index 44165ac..de2c541 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestEmptyRows.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestEmptyRows.scala
@@ -56,7 +56,7 @@ class TestEmptyRows extends QueryTest with BeforeAndAfterAll {
     sql(
       "LOAD DATA LOCAL INPATH '" + csvFilePath + "' into table " +
         "emptyRowHiveTable"
-    );
+    )
   }
 
   test("select eid from table") {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index f451a54..0f32ad9 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -336,7 +336,7 @@ object CarbonDataRDDFactory {
       carbonLoadModel: CarbonLoadModel,
       storePath: String,
       kettleHomePath: String,
-      columinar: Boolean,
+      columnar: Boolean,
       partitionStatus: String = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS,
       useKettle: Boolean,
       dataFrame: Option[DataFrame] = None): Unit = {
@@ -612,7 +612,7 @@ object CarbonDataRDDFactory {
             carbonLoadModel,
             storePath,
             kettleHomePath,
-            columinar,
+            columnar,
             currentLoadCount,
             tableCreationTime,
             schemaLastUpdatedTime,
@@ -632,6 +632,7 @@ object CarbonDataRDDFactory {
       def loadDataFrame(): Unit = {
         try {
           val rdd = dataFrame.get.rdd
+
           val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]]{ p 
=>
             DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
           }.distinct.size
@@ -639,16 +640,27 @@ object CarbonDataRDDFactory {
             sqlContext.sparkContext)
           val newRdd = new DataLoadCoalescedRDD[Row](rdd, 
nodes.toArray.distinct)
 
-          status = new DataFrameLoaderRDD(sqlContext.sparkContext,
-            new DataLoadResultImpl(),
-            carbonLoadModel,
-            storePath,
-            kettleHomePath,
-            columinar,
-            currentLoadCount,
-            tableCreationTime,
-            schemaLastUpdatedTime,
-            newRdd).collect()
+          if (useKettle) {
+            status = new DataFrameLoaderRDD(sqlContext.sparkContext,
+              new DataLoadResultImpl(),
+              carbonLoadModel,
+              storePath,
+              kettleHomePath,
+              columnar,
+              currentLoadCount,
+              tableCreationTime,
+              schemaLastUpdatedTime,
+              newRdd).collect()
+          } else {
+            status = new NewDataFrameLoaderRDD(sqlContext.sparkContext,
+              new DataLoadResultImpl(),
+              carbonLoadModel,
+              currentLoadCount,
+              tableCreationTime,
+              schemaLastUpdatedTime,
+              newRdd).collect()
+          }
+
         } catch {
           case ex: Exception =>
             LOGGER.error(ex, "load data frame failed")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index f174126..f9ad661 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -157,7 +157,6 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: 
String) {
         CarbonRelation(database, tableIdentifier.table,
           CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), 
tables.head, alias)
       case None =>
-        LOGGER.audit(s"Table Not Found: ${tableIdentifier.table}")
         throw new NoSuchTableException(database, tableIdentifier.table)
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
 
b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
new file mode 100644
index 0000000..adb7a1c
--- /dev/null
+++ 
b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.testsuite.allqueries
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
+  var timeStampPropOrig: String = _
+  override def beforeAll {
+    dropTableIfExists
+    timeStampPropOrig = 
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT)
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+    sql("create table THive (imei string,deviceInformationId int,MAC 
string,deviceColor string,device_backColor string,modelId string,marketName 
string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series 
string,productionDate timestamp,bomCode string,internalModels string, 
deliveryTime string, channelsId string, channelsName string , deliveryAreaId 
string, deliveryCountry string, deliveryProvince string, deliveryCity 
string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, 
ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, 
ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet 
string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion 
string, Active_operaSysVersion string, Active_BacVerNumber string, 
Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer 
string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, 
Active_phonePADPartitionedVersions st
 ring, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), 
Latest_HOUR string, Latest_areaId string, Latest_country string, 
Latest_province string, Latest_city string, Latest_district string, 
Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, 
Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer 
string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, 
Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, 
Latest_phonePADPartitionedVersions string, Latest_operatorId string, 
gamePointDescription string,gamePointId double,contractNumber BigInt) ROW 
FORMAT DELIMITED FIELDS TERMINATED BY ','")
+    sql("LOAD DATA local INPATH '../spark/src/test/resources/100_olap.csv' 
INTO TABLE THive")
+  }
+  test("insert from hive") {
+    sql("create table TCarbon1 (imei string,deviceInformationId int,MAC 
string,deviceColor string,device_backColor string,modelId string,marketName 
string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series 
string,productionDate timestamp,bomCode string,internalModels string, 
deliveryTime string, channelsId string, channelsName string , deliveryAreaId 
string, deliveryCountry string, deliveryProvince string, deliveryCity 
string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, 
ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, 
ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet 
string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion 
string, Active_operaSysVersion string, Active_BacVerNumber string, 
Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer 
string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, 
Active_phonePADPartitionedVersions
  string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), 
Latest_HOUR string, Latest_areaId string, Latest_country string, 
Latest_province string, Latest_city string, Latest_district string, 
Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, 
Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer 
string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, 
Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, 
Latest_phonePADPartitionedVersions string, Latest_operatorId string, 
gamePointDescription string,gamePointId double,contractNumber BigInt) USING 
org.apache.spark.sql.CarbonSource 
OPTIONS('dbName'='default','tableName'='TCarbon1')")
+    sql("insert into TCarbon1 select * from THive")
+    checkAnswer(
+        sql("select 
imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_opera
 
torsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription
 from THive order by 
imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Lates
 
t_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription"),
+        sql("select 
imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_opera
 
torsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription
 from TCarbon1 order by 
imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,La
 
test_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription")
+    )
+  }
+  test("insert from hive-sum expression") {
+    sql("create table TCarbon2 (MAC string,deviceInformationIdSum int) USING 
org.apache.spark.sql.CarbonSource 
OPTIONS('dbName'='default','tableName'='TCarbon2')")
+    sql("insert into TCarbon2 select MAC,sum(deviceInformationId+ 10) as a 
from THive group by MAC")
+    checkAnswer(
+         sql("select MAC,deviceInformationIdSum from TCarbon2 order by MAC"),
+         sql("select MAC,sum(deviceInformationId+ 10) as a from THive group by 
MAC order by MAC")
+     )
+  }
+  test("insert from carbon-select columns") {
+    sql("create table TCarbon3 (imei string,deviceInformationId int,MAC 
string,deviceColor string,device_backColor string,modelId string,marketName 
string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series 
string,productionDate timestamp,bomCode string,internalModels string, 
deliveryTime string, channelsId string, channelsName string , deliveryAreaId 
string, deliveryCountry string, deliveryProvince string, deliveryCity 
string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, 
ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, 
ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet 
string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion 
string, Active_operaSysVersion string, Active_BacVerNumber string, 
Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer 
string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, 
Active_phonePADPartitionedVersions
  string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), 
Latest_HOUR string, Latest_areaId string, Latest_country string, 
Latest_province string, Latest_city string, Latest_district string, 
Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, 
Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer 
string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, 
Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, 
Latest_phonePADPartitionedVersions string, Latest_operatorId string, 
gamePointDescription string,gamePointId double,contractNumber BigInt) USING 
org.apache.spark.sql.CarbonSource 
OPTIONS('dbName'='default','tableName'='TCarbon3')")
+    sql("insert into TCarbon3 select * from TCarbon1")
+    checkAnswer(
+        sql("select 
imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_opera
 
torsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription
 from TCarbon1 order by 
imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,La
 
test_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription"),
+        sql("select 
imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_opera
 
torsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription
 from TCarbon3 order by 
imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,La
 
test_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription")
+    )
+  }
+  test("insert from carbon-select columns-source table has more column then 
target column") {
+    sql("create table TCarbon10 (imei string,deviceInformationId int,MAC 
string,deviceColor string,device_backColor string) USING 
org.apache.spark.sql.CarbonSource 
OPTIONS('dbName'='default','tableName'='TCarbon10')")
+    try{
+      sql("insert into TCarbon10 select * from TCarbon1")
+      assert(false)
+    } catch {
+      case ex: AnalysisException =>
+        if (ex.getMessage().contains("the number of columns are different")) {
+          assert(true)
+        } else {
+          assert(false)
+        }
+      case _ => assert(false)
+    }
+  }
+  test("insert from carbon-select * columns") {
+    sql("create table TCarbon4 (imei string,deviceInformationId int,MAC 
string,deviceColor string,device_backColor string,modelId string,marketName 
string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series 
string,productionDate timestamp,bomCode string,internalModels string, 
deliveryTime string, channelsId string, channelsName string , deliveryAreaId 
string, deliveryCountry string, deliveryProvince string, deliveryCity 
string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, 
ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, 
ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet 
string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion 
string, Active_operaSysVersion string, Active_BacVerNumber string, 
Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer 
string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, 
Active_phonePADPartitionedVersions
  string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), 
Latest_HOUR string, Latest_areaId string, Latest_country string, 
Latest_province string, Latest_city string, Latest_district string, 
Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, 
Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer 
string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, 
Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, 
Latest_phonePADPartitionedVersions string, Latest_operatorId string, 
gamePointDescription string,gamePointId double,contractNumber BigInt) USING 
org.apache.spark.sql.CarbonSource 
OPTIONS('dbName'='default','tableName'='TCarbon4')")
+    sql("insert into TCarbon4 select * from TCarbon1")
+    checkAnswer(
+        sql("select * from TCarbon1"),
+        sql("select * from TCarbon4")
+    )
+  }
+  test("insert->hive column more than carbon column->success") {
+    sql("create table TCarbon5 (imei string,deviceInformationId int,MAC 
string,deviceColor string,gamePointId double,contractNumber BigInt) USING 
org.apache.spark.sql.CarbonSource 
OPTIONS('dbName'='default','tableName'='TCarbon5')")
+    try {
+      sql("insert into TCarbon5 select 
imei,deviceInformationId,MAC,deviceColor,gamePointId,contractNumber,device_backColor,modelId,CUPAudit,CPIClocked
 from THive")
+      assert(false)
+    } catch {
+      case ex: AnalysisException =>
+        if (ex.getMessage().contains("the number of columns are different")) {
+          assert(true)
+        } else {
+          assert(false)
+        }
+      case _ => assert(false)
+    }
+
+  }
+  test("insert->carbon column is more then hive-fails") {
+    sql("create table TCarbon6 (imei string,deviceInformationId int,MAC 
string,deviceColor string,gamePointId double,contractNumber BigInt) USING 
org.apache.spark.sql.CarbonSource 
OPTIONS('dbName'='default','tableName'='TCarbon6')")
+    try {
+      sql("insert into TCarbon6 select 
imei,deviceInformationId,MAC,deviceColor,gamePointId from THive")
+      assert(false)
+    } catch  {
+      case ex: Exception => assert(true)
+    }
+  }
+  test("insert->insert wrong data types-pass") {
+    sql("create table TCarbon7 (imei string,deviceInformationId int,MAC 
string) USING org.apache.spark.sql.CarbonSource 
OPTIONS('dbName'='default','tableName'='TCarbon7')")
+    sql("insert into TCarbon7 select imei,MAC,deviceInformationId from THive")
+    sql("create table THive7 (imei string,deviceInformationId int,MAC string) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','")
+    sql("insert into THive7 select imei,MAC,deviceInformationId from THive")
+    checkAnswer(
+        sql("select imei,deviceInformationId,MAC from TCarbon7"),
+        sql("select imei,deviceInformationId,MAC from THive7")
+    )
+  }
+  test("insert->insert empty data -pass") {
+    sql("create table TCarbon8 (imei string,deviceInformationId int,MAC 
string) USING org.apache.spark.sql.CarbonSource 
OPTIONS('dbName'='default','tableName'='TCarbon8')")
+    sql("insert into TCarbon8 select imei,deviceInformationId,MAC from THive 
where MAC='wrongdata'")
+    checkAnswer(
+        sql("select imei,deviceInformationId,MAC from THive where 
MAC='wrongdata'"),
+        sql("select imei,deviceInformationId,MAC from TCarbon8 where 
MAC='wrongdata'")
+    )
+  }
+  test("insert into existing load-pass") {
+    sql("create table TCarbon9 (imei string,deviceInformationId int,MAC 
string,deviceColor string,device_backColor string,modelId string,marketName 
string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series 
string,productionDate timestamp,bomCode string,internalModels string, 
deliveryTime string, channelsId string, channelsName string , deliveryAreaId 
string, deliveryCountry string, deliveryProvince string, deliveryCity 
string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, 
ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, 
ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet 
string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion 
string, Active_operaSysVersion string, Active_BacVerNumber string, 
Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer 
string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, 
Active_phonePADPartitionedVersions
  string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), 
Latest_HOUR string, Latest_areaId string, Latest_country string, 
Latest_province string, Latest_city string, Latest_district string, 
Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, 
Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer 
string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, 
Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, 
Latest_phonePADPartitionedVersions string, Latest_operatorId string, 
gamePointDescription string,gamePointId double,contractNumber BigInt) USING 
org.apache.spark.sql.CarbonSource 
OPTIONS('dbName'='default','tableName'='TCarbon9')")
+    sql("create table THive9 (imei string,deviceInformationId int,MAC 
string,deviceColor string,device_backColor string,modelId string,marketName 
string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series 
string,productionDate timestamp,bomCode string,internalModels string, 
deliveryTime string, channelsId string, channelsName string , deliveryAreaId 
string, deliveryCountry string, deliveryProvince string, deliveryCity 
string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, 
ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, 
ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet 
string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion 
string, Active_operaSysVersion string, Active_BacVerNumber string, 
Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer 
string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, 
Active_phonePADPartitionedVersions s
 tring, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), 
Latest_HOUR string, Latest_areaId string, Latest_country string, 
Latest_province string, Latest_city string, Latest_district string, 
Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, 
Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer 
string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, 
Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, 
Latest_phonePADPartitionedVersions string, Latest_operatorId string, 
gamePointDescription string,gamePointId double,contractNumber BigInt) ROW 
FORMAT DELIMITED FIELDS TERMINATED BY ','")
+    sql("insert into TCarbon9 select * from THive")
+    sql("insert into TCarbon9 select * from THive")
+    sql("insert into THive9 select * from THive")
+    sql("insert into THive9 select * from THive")
+    checkAnswer(
+        sql("select 
imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_opera
 
torsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription
 from THive9 order by 
imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Late
 
st_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription"),
+        sql("select 
imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_opera
 
torsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription
 from TCarbon9 order by 
imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,La
 
test_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription")
+    )
+  }
+  override def afterAll {
+    dropTableIfExists
+  }
+
+  def dropTableIfExists: Unit = {
+    sql("DROP TABLE IF EXISTS THive")
+    sql("drop table if exists TCarbonSource3")
+    sql("drop table if exists TCarbonSource4")
+    sql("drop table if exists load")
+    sql("drop table if exists inser")
+    sql("drop table if exists TCarbon1")
+    sql("drop table if exists TCarbon2")
+    sql("drop table if exists TCarbon3")
+    sql("drop table if exists TCarbon4")
+    sql("drop table if exists TCarbon5")
+    sql("drop table if exists TCarbon6")
+    sql("drop table if exists TCarbon7")
+    sql("drop table if exists TCarbon8")
+    sql("drop table if exists TCarbon9")
+    if (timeStampPropOrig != null) {
+      
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
 timeStampPropOrig)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
index 057d894..3ba9f6a 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
@@ -26,6 +26,7 @@ class CarbonDataSourceSuite extends QueryTest with 
BeforeAndAfterAll {
     // Drop table
     spark.sql("DROP TABLE IF EXISTS carbon_testtable")
     spark.sql("DROP TABLE IF EXISTS csv_table")
+
     // Create table
     spark.sql(
       s"""

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index cde19bd..2d468ac 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -493,7 +493,6 @@ public class CarbonFactDataHandlerColumnar implements 
CarbonFactHandler {
       } else if (type[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
         max[i] = -Double.MAX_VALUE;
       } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-        //max[i] = new BigDecimal(0.0);
         Long[] bigdMinVal = new Long[2];
         bigdMinVal[0] = Long.MIN_VALUE;
         bigdMinVal[1] = Long.MIN_VALUE;
@@ -513,9 +512,7 @@ public class CarbonFactDataHandlerColumnar implements 
CarbonFactHandler {
         Long[] bigdMaxVal = new Long[2];
         bigdMaxVal[0] = Long.MAX_VALUE;
         bigdMaxVal[1] = Long.MAX_VALUE;
-        //min[i] = new BigDecimal(Double.MAX_VALUE);
         min[i] = bigdMaxVal;
-        uniqueValue[i] = new BigDecimal(Double.MIN_VALUE);
         Long[] bigdUniqueVal = new Long[2];
         bigdUniqueVal[0] = Long.MIN_VALUE;
         bigdUniqueVal[1] = Long.MIN_VALUE;
@@ -654,7 +651,10 @@ public class CarbonFactDataHandlerColumnar implements 
CarbonFactHandler {
       } else if (type[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
         max[i] = -Double.MAX_VALUE;
       } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-        max[i] = new BigDecimal(0.0);
+        Long[] bigdMinVal = new Long[2];
+        bigdMinVal[0] = Long.MIN_VALUE;
+        bigdMinVal[1] = Long.MIN_VALUE;
+        max[i] = bigdMinVal;
       } else {
         max[i] = 0.0;
       }
@@ -667,8 +667,14 @@ public class CarbonFactDataHandlerColumnar implements 
CarbonFactHandler {
         min[i] = Double.MAX_VALUE;
         uniqueValue[i] = Double.MIN_VALUE;
       } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-        min[i] = new BigDecimal(Double.MAX_VALUE);
-        uniqueValue[i] = new BigDecimal(Double.MIN_VALUE);
+        Long[] bigdMaxVal = new Long[2];
+        bigdMaxVal[0] = Long.MAX_VALUE;
+        bigdMaxVal[1] = Long.MAX_VALUE;
+        min[i] = bigdMaxVal;
+        Long[] bigdUniqueVal = new Long[2];
+        bigdUniqueVal[0] = Long.MIN_VALUE;
+        bigdUniqueVal[1] = Long.MIN_VALUE;
+        uniqueValue[i] = bigdUniqueVal;
       } else {
         min[i] = 0.0;
         uniqueValue[i] = 0.0;
@@ -747,11 +753,22 @@ public class CarbonFactDataHandlerColumnar implements 
CarbonFactHandler {
             b = (byte[]) row[customMeasureIndex[i]];
           }
         }
+        BigDecimal value = DataTypeUtil.byteToBigDecimal(b);
+        String[] bigdVals = value.toPlainString().split("\\.");
+        long[] bigDvalue = new long[2];
+        if (bigdVals.length == 2) {
+          bigDvalue[0] = Long.parseLong(bigdVals[0]);
+          BigDecimal bd = new 
BigDecimal(CarbonCommonConstants.POINT+bigdVals[1]);
+          bigDvalue[1] = (long)(bd.doubleValue()*Math.pow(10, value.scale()));
+        } else {
+          bigDvalue[0] = Long.parseLong(bigdVals[0]);
+        }
         byteBuffer = ByteBuffer.allocate(b.length + 
CarbonCommonConstants.INT_SIZE_IN_BYTE);
         byteBuffer.putInt(b.length);
         byteBuffer.put(b);
         byteBuffer.flip();
         b = byteBuffer.array();
+        
dataHolder[customMeasureIndex[i]].setWritableBigDecimalValueByIndex(count, 
bigDvalue);
         
dataHolder[customMeasureIndex[i]].setWritableByteArrayValueByIndex(count, b);
       }
       calculateMaxMin(max, min, decimal, customMeasureIndex, row);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
index 1579415..2c82672 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
@@ -134,7 +134,7 @@ public class SingleThreadFinalSortFilesMerger extends 
CarbonIterator<Object[]> {
       }
     });
 
-    if (null == fileList || fileList.length < 0) {
+    if (null == fileList || fileList.length == 0) {
       return;
     }
     startSorting(fileList);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index a94bea5..a1e7311 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -402,12 +402,15 @@ public abstract class AbstractFactDataWriter<T> 
implements CarbonFactDataWriter<
    */
   public void closeWriter() throws CarbonDataWriterException {
     CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
-    renameCarbonDataFile();
-    copyCarbonDataFileToCarbonStorePath(this.fileName.substring(0, 
this.fileName.lastIndexOf('.')));
-    try {
-      writeIndexFile();
-    } catch (IOException e) {
-      throw new CarbonDataWriterException("Problem while writing the index 
file", e);
+    if (this.blockletInfoList.size() > 0) {
+      renameCarbonDataFile();
+      copyCarbonDataFileToCarbonStorePath(
+          this.fileName.substring(0, this.fileName.lastIndexOf('.')));
+      try {
+        writeIndexFile();
+      } catch (IOException e) {
+        throw new CarbonDataWriterException("Problem while writing the index 
file", e);
+      }
     }
     closeExecutorService();
   }
@@ -539,7 +542,9 @@ public abstract class AbstractFactDataWriter<T> implements 
CarbonFactDataWriter<
    * @throws CarbonDataWriterException
    */
   @Override public void writeBlockletInfoToFile() throws 
CarbonDataWriterException {
-    writeBlockletInfoToFile(this.blockletInfoList, fileChannel, fileName);
+    if (this.blockletInfoList.size() > 0) {
+      writeBlockletInfoToFile(this.blockletInfoList, fileChannel, fileName);
+    }
   }
 
   /**

Reply via email to