Repository: incubator-carbondata Updated Branches: refs/heads/master a011aafb0 -> 7788f468c
inserInto without kettle for spark2 fix comments Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/498cf982 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/498cf982 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/498cf982 Branch: refs/heads/master Commit: 498cf982995feba7012ce3993b3fd5172d2e5a15 Parents: a011aaf Author: QiangCai <qiang...@qq.com> Authored: Mon Dec 19 11:01:40 2016 +0800 Committer: jackylk <jacky.li...@huawei.com> Committed: Wed Dec 28 14:45:18 2016 +0800 ---------------------------------------------------------------------- .../spark/rdd/NewCarbonDataLoadRDD.scala | 132 +++++++-------- .../readsupport/SparkRowReadSupportImpl.java | 2 +- .../spark/rdd/CarbonDataRDDFactory.scala | 22 +-- .../emptyrow/TestCSVHavingOnlySpaceChar.scala | 1 - .../testsuite/emptyrow/TestEmptyRows.scala | 2 +- .../spark/rdd/CarbonDataRDDFactory.scala | 36 +++-- .../apache/spark/sql/hive/CarbonMetastore.scala | 1 - .../InsertIntoCarbonTableTestCase.scala | 162 +++++++++++++++++++ .../carbondata/CarbonDataSourceSuite.scala | 1 + .../sql/common/util/CarbonSessionTest.scala | 0 .../store/CarbonFactDataHandlerColumnar.java | 29 +++- .../store/SingleThreadFinalSortFilesMerger.java | 2 +- .../store/writer/AbstractFactDataWriter.java | 19 ++- 13 files changed, 287 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 96bb5ed..64b8b61 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -18,20 +18,23 @@ package org.apache.carbondata.spark.rdd import java.io.{IOException, ObjectInputStream, ObjectOutputStream} +import java.nio.ByteBuffer import java.text.SimpleDateFormat import java.util import java.util.{Date, UUID} import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.spark.{Partition, SparkContext, TaskContext} -import org.apache.spark.rdd.RDD +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionWrap, RDD} import org.apache.spark.sql.Row import org.apache.spark.sql.execution.command.Partitioner +import org.apache.spark.util.SparkUtil import org.apache.carbondata.common.CarbonIterator import org.apache.carbondata.common.logging.LogServiceFactory @@ -41,12 +44,13 @@ import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails} import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory} import org.apache.carbondata.hadoop.csv.CSVInputFormat import org.apache.carbondata.hadoop.csv.recorditerator.RecordReaderIterator +import org.apache.carbondata.processing.csvreaderstep.JavaRddIterator import org.apache.carbondata.processing.model.CarbonLoadModel import org.apache.carbondata.processing.newflow.DataLoadExecutor import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException import org.apache.carbondata.spark.DataLoadResult import org.apache.carbondata.spark.splits.TableSplit -import org.apache.carbondata.spark.util.CarbonQueryUtil +import org.apache.carbondata.spark.util.{CarbonQueryUtil, CarbonScalaUtil} class SerializableConfiguration(@transient var value: Configuration) extends Serializable { @@ -323,7 +327,7 @@ class NewDataFrameLoaderRDD[K, V]( loadCount: Integer, tableCreationTime: Long, schemaLastUpdatedTime: Long, - prev: RDD[Row]) extends RDD[(K, V)](prev) { + prev: DataLoadCoalescedRDD[Row]) extends RDD[(K, V)](prev) { override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { @@ -342,29 +346,25 @@ class NewDataFrameLoaderRDD[K, V]( carbonLoadModel.setSegmentId(String.valueOf(loadCount)) carbonLoadModel.setTaskNo(String.valueOf(theSplit.index)) - val iterator = new NewRddIterator( - firstParent[Row].iterator(theSplit, context), - carbonLoadModel) - - class CarbonIteratorImpl(iterator: util.Iterator[Array[AnyRef]]) - extends CarbonIterator[Array[AnyRef]] { - override def initialize(): Unit = {} - - override def close(): Unit = {} - - override def next(): Array[AnyRef] = { - iterator.next - } - - override def hasNext: Boolean = { - iterator.hasNext + val recordReaders = mutable.Buffer[CarbonIterator[Array[AnyRef]]]() + val partitionIterator = firstParent[DataLoadPartitionWrap[Row]].iterator(theSplit, context) + val serializer = SparkEnv.get.closureSerializer.newInstance() + var serializeBuffer: ByteBuffer = null + while(partitionIterator.hasNext) { + val value = partitionIterator.next() + val newInstance = { + if (serializeBuffer == null) { + serializeBuffer = serializer.serialize[RDD[Row]](value.rdd) + } + serializeBuffer.rewind() + serializer.deserialize[RDD[Row]](serializeBuffer) } + recordReaders += new CarbonIteratorImpl( + new NewRddIterator(newInstance.iterator(value.partition, context), + carbonLoadModel, + context)) } - - val recordReaders: Array[CarbonIterator[Array[AnyRef]]] = - Array(new CarbonIteratorImpl(iterator)) - val loader = new SparkPartitionLoader(model, theSplit.index, null, @@ -375,7 +375,7 @@ class NewDataFrameLoaderRDD[K, V]( loader.initialize() loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS) - new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders) + new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders.toArray) } catch { case e: BadRecordFoundException => @@ -402,76 +402,52 @@ class NewDataFrameLoaderRDD[K, V]( /** * This class wrap Scala's Iterator to Java's Iterator. - * It also convert all columns to string data since carbondata will recognize the right type - * according to schema from spark DataFrame. - * @see org.apache.carbondata.spark.rdd.RddIterator + * It also convert all columns to string data to use csv data loading flow. + * * @param rddIter * @param carbonLoadModel + * @param context */ class NewRddIterator(rddIter: Iterator[Row], - carbonLoadModel: CarbonLoadModel) extends java.util.Iterator[Array[AnyRef]] { + carbonLoadModel: CarbonLoadModel, + context: TaskContext) extends JavaRddIterator[Array[AnyRef]] { + val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) val format = new SimpleDateFormat(formatString) val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 - + val serializationNullFormat = + carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) def hasNext: Boolean = rddIter.hasNext - private def getString(value: Any, level: Int = 1): String = { - if (value == null) { - "" - } else { - value match { - case s: String => s - case i: java.lang.Integer => i.toString - case d: java.lang.Double => d.toString - case t: java.sql.Timestamp => format format t - case d: java.sql.Date => format format d - case d: java.math.BigDecimal => d.toPlainString - case b: java.lang.Boolean => b.toString - case s: java.lang.Short => s.toString - case f: java.lang.Float => f.toString - case bs: Array[Byte] => new String(bs) - case s: scala.collection.Seq[Any] => - val delimiter = if (level == 1) { - delimiterLevel1 - } else { - delimiterLevel2 - } - val builder = new StringBuilder() - s.foreach { x => - builder.append(getString(x, level + 1)).append(delimiter) - } - builder.substring(0, builder.length - 1) - case m: scala.collection.Map[Any, Any] => - throw new Exception("Unsupported data type: Map") - case r: org.apache.spark.sql.Row => - val delimiter = if (level == 1) { - delimiterLevel1 - } else { - delimiterLevel2 - } - val builder = new StringBuilder() - for (i <- 0 until r.length) { - builder.append(getString(r(i), level + 1)).append(delimiter) - } - builder.substring(0, builder.length - 1) - case other => other.toString - } - } - } - def next: Array[AnyRef] = { val row = rddIter.next() - val columns = new Array[Object](row.length) - for (i <- 0 until row.length) { - columns(i) = getString(row(i)) + val columns = new Array[AnyRef](row.length) + for (i <- 0 until columns.length) { + columns(i) = CarbonScalaUtil.getString(row.get(i), serializationNullFormat, + delimiterLevel1, delimiterLevel2, format) } columns } - def remove(): Unit = { + def initialize: Unit = { + SparkUtil.setTaskContext(context) } } + +class CarbonIteratorImpl(iterator: NewRddIterator) + extends CarbonIterator[Array[AnyRef]] { + override def initialize(): Unit = iterator.initialize + + override def close(): Unit = {} + + override def next(): Array[AnyRef] = { + iterator.next + } + + override def hasNext: Boolean = { + iterator.hasNext + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java index 68f923d..46e5244 100644 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java +++ b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java @@ -71,7 +71,7 @@ public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSuppor else if (carbonColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) { //convert the long to timestamp in case of direct dictionary column if (DataType.TIMESTAMP == carbonColumns[i].getDataType()) { - data[i] = new Timestamp((long) data[i]); + data[i] = new Timestamp((long) data[i] / 1000L); } else if (DataType.DATE == carbonColumns[i].getDataType()) { data[i] = new Date((long) data[i]); } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index ff7bf23..d975502 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -635,15 +635,14 @@ object CarbonDataRDDFactory { try { val rdd = dataFrame.get.rdd - if (useKettle) { - - val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]]{ p => - DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host) - }.distinct.size - val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData, - sqlContext.sparkContext) - val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct) + val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]]{ p => + DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host) + }.distinct.size + val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData, + sqlContext.sparkContext) + val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct) + if (useKettle) { status = new DataFrameLoaderRDD(sqlContext.sparkContext, new DataLoadResultImpl(), carbonLoadModel, @@ -655,18 +654,13 @@ object CarbonDataRDDFactory { schemaLastUpdatedTime, newRdd).collect() } else { - - var numPartitions = DistributionUtil.getNodeList(sqlContext.sparkContext).length - numPartitions = Math.max(1, Math.min(numPartitions, rdd.partitions.length)) - val coalesceRdd = rdd.coalesce(numPartitions, shuffle = false) - status = new NewDataFrameLoaderRDD(sqlContext.sparkContext, new DataLoadResultImpl(), carbonLoadModel, currentLoadCount, tableCreationTime, schemaLastUpdatedTime, - coalesceRdd).collect() + newRdd).collect() } } catch { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestCSVHavingOnlySpaceChar.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestCSVHavingOnlySpaceChar.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestCSVHavingOnlySpaceChar.scala index 82d6fdf..06cfadf 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestCSVHavingOnlySpaceChar.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestCSVHavingOnlySpaceChar.scala @@ -61,7 +61,6 @@ class TestCSVHavingOnlySpaceChar extends QueryTest with BeforeAndAfterAll { override def afterAll { sql("drop table emptyRowCarbonTable") - sql("drop table emptyRowHiveTable") CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestEmptyRows.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestEmptyRows.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestEmptyRows.scala index 44165ac..de2c541 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestEmptyRows.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestEmptyRows.scala @@ -56,7 +56,7 @@ class TestEmptyRows extends QueryTest with BeforeAndAfterAll { sql( "LOAD DATA LOCAL INPATH '" + csvFilePath + "' into table " + "emptyRowHiveTable" - ); + ) } test("select eid from table") { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index f451a54..0f32ad9 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -336,7 +336,7 @@ object CarbonDataRDDFactory { carbonLoadModel: CarbonLoadModel, storePath: String, kettleHomePath: String, - columinar: Boolean, + columnar: Boolean, partitionStatus: String = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS, useKettle: Boolean, dataFrame: Option[DataFrame] = None): Unit = { @@ -612,7 +612,7 @@ object CarbonDataRDDFactory { carbonLoadModel, storePath, kettleHomePath, - columinar, + columnar, currentLoadCount, tableCreationTime, schemaLastUpdatedTime, @@ -632,6 +632,7 @@ object CarbonDataRDDFactory { def loadDataFrame(): Unit = { try { val rdd = dataFrame.get.rdd + val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]]{ p => DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host) }.distinct.size @@ -639,16 +640,27 @@ object CarbonDataRDDFactory { sqlContext.sparkContext) val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct) - status = new DataFrameLoaderRDD(sqlContext.sparkContext, - new DataLoadResultImpl(), - carbonLoadModel, - storePath, - kettleHomePath, - columinar, - currentLoadCount, - tableCreationTime, - schemaLastUpdatedTime, - newRdd).collect() + if (useKettle) { + status = new DataFrameLoaderRDD(sqlContext.sparkContext, + new DataLoadResultImpl(), + carbonLoadModel, + storePath, + kettleHomePath, + columnar, + currentLoadCount, + tableCreationTime, + schemaLastUpdatedTime, + newRdd).collect() + } else { + status = new NewDataFrameLoaderRDD(sqlContext.sparkContext, + new DataLoadResultImpl(), + carbonLoadModel, + currentLoadCount, + tableCreationTime, + schemaLastUpdatedTime, + newRdd).collect() + } + } catch { case ex: Exception => LOGGER.error(ex, "load data frame failed") http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala index f174126..f9ad661 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala @@ -157,7 +157,6 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) { CarbonRelation(database, tableIdentifier.table, CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head, alias) case None => - LOGGER.audit(s"Table Not Found: ${tableIdentifier.table}") throw new NoSuchTableException(database, tableIdentifier.table) } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala new file mode 100644 index 0000000..adb7a1c --- /dev/null +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.spark.testsuite.allqueries + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.common.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { + var timeStampPropOrig: String = _ + override def beforeAll { + dropTableIfExists + timeStampPropOrig = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + sql("create table THive (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions st ring, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','") + sql("LOAD DATA local INPATH '../spark/src/test/resources/100_olap.csv' INTO TABLE THive") + } + test("insert from hive") { + sql("create table TCarbon1 (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) USING org.apache.spark.sql.CarbonSource OPTIONS('dbName'='default','tableName'='TCarbon1')") + sql("insert into TCarbon1 select * from THive") + checkAnswer( + sql("select imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_opera torsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription from THive order by imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Lates t_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription"), + sql("select imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_opera torsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription from TCarbon1 order by imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,La test_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription") + ) + } + test("insert from hive-sum expression") { + sql("create table TCarbon2 (MAC string,deviceInformationIdSum int) USING org.apache.spark.sql.CarbonSource OPTIONS('dbName'='default','tableName'='TCarbon2')") + sql("insert into TCarbon2 select MAC,sum(deviceInformationId+ 10) as a from THive group by MAC") + checkAnswer( + sql("select MAC,deviceInformationIdSum from TCarbon2 order by MAC"), + sql("select MAC,sum(deviceInformationId+ 10) as a from THive group by MAC order by MAC") + ) + } + test("insert from carbon-select columns") { + sql("create table TCarbon3 (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) USING org.apache.spark.sql.CarbonSource OPTIONS('dbName'='default','tableName'='TCarbon3')") + sql("insert into TCarbon3 select * from TCarbon1") + checkAnswer( + sql("select imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_opera torsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription from TCarbon1 order by imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,La test_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription"), + sql("select imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_opera torsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription from TCarbon3 order by imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,La test_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription") + ) + } + test("insert from carbon-select columns-source table has more column then target column") { + sql("create table TCarbon10 (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string) USING org.apache.spark.sql.CarbonSource OPTIONS('dbName'='default','tableName'='TCarbon10')") + try{ + sql("insert into TCarbon10 select * from TCarbon1") + assert(false) + } catch { + case ex: AnalysisException => + if (ex.getMessage().contains("the number of columns are different")) { + assert(true) + } else { + assert(false) + } + case _ => assert(false) + } + } + test("insert from carbon-select * columns") { + sql("create table TCarbon4 (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) USING org.apache.spark.sql.CarbonSource OPTIONS('dbName'='default','tableName'='TCarbon4')") + sql("insert into TCarbon4 select * from TCarbon1") + checkAnswer( + sql("select * from TCarbon1"), + sql("select * from TCarbon4") + ) + } + test("insert->hive column more than carbon column->success") { + sql("create table TCarbon5 (imei string,deviceInformationId int,MAC string,deviceColor string,gamePointId double,contractNumber BigInt) USING org.apache.spark.sql.CarbonSource OPTIONS('dbName'='default','tableName'='TCarbon5')") + try { + sql("insert into TCarbon5 select imei,deviceInformationId,MAC,deviceColor,gamePointId,contractNumber,device_backColor,modelId,CUPAudit,CPIClocked from THive") + assert(false) + } catch { + case ex: AnalysisException => + if (ex.getMessage().contains("the number of columns are different")) { + assert(true) + } else { + assert(false) + } + case _ => assert(false) + } + + } + test("insert->carbon column is more then hive-fails") { + sql("create table TCarbon6 (imei string,deviceInformationId int,MAC string,deviceColor string,gamePointId double,contractNumber BigInt) USING org.apache.spark.sql.CarbonSource OPTIONS('dbName'='default','tableName'='TCarbon6')") + try { + sql("insert into TCarbon6 select imei,deviceInformationId,MAC,deviceColor,gamePointId from THive") + assert(false) + } catch { + case ex: Exception => assert(true) + } + } + test("insert->insert wrong data types-pass") { + sql("create table TCarbon7 (imei string,deviceInformationId int,MAC string) USING org.apache.spark.sql.CarbonSource OPTIONS('dbName'='default','tableName'='TCarbon7')") + sql("insert into TCarbon7 select imei,MAC,deviceInformationId from THive") + sql("create table THive7 (imei string,deviceInformationId int,MAC string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','") + sql("insert into THive7 select imei,MAC,deviceInformationId from THive") + checkAnswer( + sql("select imei,deviceInformationId,MAC from TCarbon7"), + sql("select imei,deviceInformationId,MAC from THive7") + ) + } + test("insert->insert empty data -pass") { + sql("create table TCarbon8 (imei string,deviceInformationId int,MAC string) USING org.apache.spark.sql.CarbonSource OPTIONS('dbName'='default','tableName'='TCarbon8')") + sql("insert into TCarbon8 select imei,deviceInformationId,MAC from THive where MAC='wrongdata'") + checkAnswer( + sql("select imei,deviceInformationId,MAC from THive where MAC='wrongdata'"), + sql("select imei,deviceInformationId,MAC from TCarbon8 where MAC='wrongdata'") + ) + } + test("insert into existing load-pass") { + sql("create table TCarbon9 (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) USING org.apache.spark.sql.CarbonSource OPTIONS('dbName'='default','tableName'='TCarbon9')") + sql("create table THive9 (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions s tring, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','") + sql("insert into TCarbon9 select * from THive") + sql("insert into TCarbon9 select * from THive") + sql("insert into THive9 select * from THive") + sql("insert into THive9 select * from THive") + checkAnswer( + sql("select imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_opera torsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription from THive9 order by imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Late st_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription"), + sql("select imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_opera torsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription from TCarbon9 order by imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,La test_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription") + ) + } + override def afterAll { + dropTableIfExists + } + + def dropTableIfExists: Unit = { + sql("DROP TABLE IF EXISTS THive") + sql("drop table if exists TCarbonSource3") + sql("drop table if exists TCarbonSource4") + sql("drop table if exists load") + sql("drop table if exists inser") + sql("drop table if exists TCarbon1") + sql("drop table if exists TCarbon2") + sql("drop table if exists TCarbon3") + sql("drop table if exists TCarbon4") + sql("drop table if exists TCarbon5") + sql("drop table if exists TCarbon6") + sql("drop table if exists TCarbon7") + sql("drop table if exists TCarbon8") + sql("drop table if exists TCarbon9") + if (timeStampPropOrig != null) { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, timeStampPropOrig) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala index 057d894..3ba9f6a 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala @@ -26,6 +26,7 @@ class CarbonDataSourceSuite extends QueryTest with BeforeAndAfterAll { // Drop table spark.sql("DROP TABLE IF EXISTS carbon_testtable") spark.sql("DROP TABLE IF EXISTS csv_table") + // Create table spark.sql( s""" http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java index cde19bd..2d468ac 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java @@ -493,7 +493,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { } else if (type[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) { max[i] = -Double.MAX_VALUE; } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) { - //max[i] = new BigDecimal(0.0); Long[] bigdMinVal = new Long[2]; bigdMinVal[0] = Long.MIN_VALUE; bigdMinVal[1] = Long.MIN_VALUE; @@ -513,9 +512,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { Long[] bigdMaxVal = new Long[2]; bigdMaxVal[0] = Long.MAX_VALUE; bigdMaxVal[1] = Long.MAX_VALUE; - //min[i] = new BigDecimal(Double.MAX_VALUE); min[i] = bigdMaxVal; - uniqueValue[i] = new BigDecimal(Double.MIN_VALUE); Long[] bigdUniqueVal = new Long[2]; bigdUniqueVal[0] = Long.MIN_VALUE; bigdUniqueVal[1] = Long.MIN_VALUE; @@ -654,7 +651,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { } else if (type[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) { max[i] = -Double.MAX_VALUE; } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) { - max[i] = new BigDecimal(0.0); + Long[] bigdMinVal = new Long[2]; + bigdMinVal[0] = Long.MIN_VALUE; + bigdMinVal[1] = Long.MIN_VALUE; + max[i] = bigdMinVal; } else { max[i] = 0.0; } @@ -667,8 +667,14 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { min[i] = Double.MAX_VALUE; uniqueValue[i] = Double.MIN_VALUE; } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) { - min[i] = new BigDecimal(Double.MAX_VALUE); - uniqueValue[i] = new BigDecimal(Double.MIN_VALUE); + Long[] bigdMaxVal = new Long[2]; + bigdMaxVal[0] = Long.MAX_VALUE; + bigdMaxVal[1] = Long.MAX_VALUE; + min[i] = bigdMaxVal; + Long[] bigdUniqueVal = new Long[2]; + bigdUniqueVal[0] = Long.MIN_VALUE; + bigdUniqueVal[1] = Long.MIN_VALUE; + uniqueValue[i] = bigdUniqueVal; } else { min[i] = 0.0; uniqueValue[i] = 0.0; @@ -747,11 +753,22 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { b = (byte[]) row[customMeasureIndex[i]]; } } + BigDecimal value = DataTypeUtil.byteToBigDecimal(b); + String[] bigdVals = value.toPlainString().split("\\."); + long[] bigDvalue = new long[2]; + if (bigdVals.length == 2) { + bigDvalue[0] = Long.parseLong(bigdVals[0]); + BigDecimal bd = new BigDecimal(CarbonCommonConstants.POINT+bigdVals[1]); + bigDvalue[1] = (long)(bd.doubleValue()*Math.pow(10, value.scale())); + } else { + bigDvalue[0] = Long.parseLong(bigdVals[0]); + } byteBuffer = ByteBuffer.allocate(b.length + CarbonCommonConstants.INT_SIZE_IN_BYTE); byteBuffer.putInt(b.length); byteBuffer.put(b); byteBuffer.flip(); b = byteBuffer.array(); + dataHolder[customMeasureIndex[i]].setWritableBigDecimalValueByIndex(count, bigDvalue); dataHolder[customMeasureIndex[i]].setWritableByteArrayValueByIndex(count, b); } calculateMaxMin(max, min, decimal, customMeasureIndex, row); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java index 1579415..2c82672 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java @@ -134,7 +134,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> { } }); - if (null == fileList || fileList.length < 0) { + if (null == fileList || fileList.length == 0) { return; } startSorting(fileList); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/498cf982/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java index a94bea5..a1e7311 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java @@ -402,12 +402,15 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< */ public void closeWriter() throws CarbonDataWriterException { CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel); - renameCarbonDataFile(); - copyCarbonDataFileToCarbonStorePath(this.fileName.substring(0, this.fileName.lastIndexOf('.'))); - try { - writeIndexFile(); - } catch (IOException e) { - throw new CarbonDataWriterException("Problem while writing the index file", e); + if (this.blockletInfoList.size() > 0) { + renameCarbonDataFile(); + copyCarbonDataFileToCarbonStorePath( + this.fileName.substring(0, this.fileName.lastIndexOf('.'))); + try { + writeIndexFile(); + } catch (IOException e) { + throw new CarbonDataWriterException("Problem while writing the index file", e); + } } closeExecutorService(); } @@ -539,7 +542,9 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< * @throws CarbonDataWriterException */ @Override public void writeBlockletInfoToFile() throws CarbonDataWriterException { - writeBlockletInfoToFile(this.blockletInfoList, fileChannel, fileName); + if (this.blockletInfoList.size() > 0) { + writeBlockletInfoToFile(this.blockletInfoList, fileChannel, fileName); + } } /**