Repository: incubator-carbondata Updated Branches: refs/heads/master b5c20a80a -> 1d5b5fcc4
fixing no kettle issue for IUD. load count/ segment count should be string because in compaction case it will be 2.1 Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/2cf2d193 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/2cf2d193 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/2cf2d193 Branch: refs/heads/master Commit: 2cf2d1936237144a23c0a4393584a15b7beb600c Parents: b5c20a8 Author: ravikiran <ravikiran.sn...@gmail.com> Authored: Mon Jan 9 18:58:13 2017 +0530 Committer: jackylk <jacky.li...@huawei.com> Committed: Mon Jan 16 20:13:24 2017 +0800 ---------------------------------------------------------------------- .../spark/rdd/CarbonDataLoadRDD.scala | 6 +- .../spark/rdd/NewCarbonDataLoadRDD.scala | 4 +- .../carbondata/spark/rdd/UpdateDataLoad.scala | 79 ++++++++++++++++++++ .../spark/rdd/CarbonDataRDDFactory.scala | 49 +++++++----- .../iud/UpdateCarbonTableTestCase.scala | 13 ++++ 5 files changed, 128 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf2d193/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala index ff3a174..c8e3c67 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala @@ -83,7 +83,7 @@ class SparkPartitionLoader(model: CarbonLoadModel, splitIndex: Int, storePath: String, kettleHomePath: String, - loadCount: Int, + loadCount: String, loadMetadataDetails: LoadMetadataDetails) { private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) @@ -239,7 +239,7 @@ class DataFileLoaderRDD[K, V]( carbonLoadModel.setSegmentId(String.valueOf(loadCount)) setModelAndBlocksInfo() val loader = new SparkPartitionLoader(model, theSplit.index, storePath, - kettleHomePath, loadCount, loadMetadataDetails) + kettleHomePath, String.valueOf(loadCount), loadMetadataDetails) loader.initialize if (model.isRetentionRequest) { recreateAggregationTableForRetention @@ -511,7 +511,7 @@ class DataFrameLoaderRDD[K, V]( carbonLoadModel.setSegmentId(String.valueOf(loadCount)) carbonLoadModel.setTaskNo(String.valueOf(theSplit.index)) val loader = new SparkPartitionLoader(carbonLoadModel, theSplit.index, storePath, - kettleHomePath, loadCount, loadMetadataDetails) + kettleHomePath, String.valueOf(loadCount), loadMetadataDetails) loader.initialize val rddIteratorKey = UUID.randomUUID().toString try { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf2d193/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 46e83a5..9e13883 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -157,7 +157,7 @@ class NewCarbonDataLoadRDD[K, V]( theSplit.index, null, null, - loadCount, + String.valueOf(loadCount), loadMetadataDetails) // Intialize to set carbon properties loader.initialize() @@ -355,7 +355,7 @@ class NewDataFrameLoaderRDD[K, V]( theSplit.index, null, null, - loadCount, + String.valueOf(loadCount), loadMetadataDetails) // Intialize to set carbon properties loader.initialize() http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf2d193/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala new file mode 100644 index 0000000..7dee88a --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import java.nio.ByteBuffer + +import scala.collection.mutable + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.sql.Row + +import org.apache.carbondata.common.CarbonIterator +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.load.LoadMetadataDetails +import org.apache.carbondata.processing.model.CarbonLoadModel +import org.apache.carbondata.processing.newflow.DataLoadExecutor +import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException + +/** + * Data load in case of update command with out kettle. + */ +object UpdateDataLoad { + + def DataLoadNoKettleForUpdate(segId: String, + index: Int, + iter: Iterator[Row], + carbonLoadModel: CarbonLoadModel, + loadMetadataDetails: LoadMetadataDetails): Unit = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + try { + val recordReaders = mutable.Buffer[CarbonIterator[Array[AnyRef]]]() + val serializer = SparkEnv.get.closureSerializer.newInstance() + var serializeBuffer: ByteBuffer = null + recordReaders += new NewRddIterator(iter, + carbonLoadModel, + TaskContext.get()) + + val loader = new SparkPartitionLoader(carbonLoadModel, + index, + null, + null, + segId, + loadMetadataDetails) + // Intialize to set carbon properties + loader.initialize() + + loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS) + new DataLoadExecutor().execute(carbonLoadModel, + loader.storeLocation, + recordReaders.toArray) + + } catch { + case e: BadRecordFoundException => + loadMetadataDetails.setLoadStatus( + CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) + LOGGER.info("Bad Record Found") + case e: Exception => + LOGGER.error(e) + throw e + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf2d193/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 7a7aa64..610f5fb 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -17,11 +17,13 @@ package org.apache.carbondata.spark.rdd +import java.nio.ByteBuffer import java.util import java.util.UUID import java.util.concurrent._ import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.util.Random import scala.util.control.Breaks._ @@ -30,36 +32,35 @@ import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} -import org.apache.spark.{SparkContext, SparkEnv, SparkException} +import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskContext} import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, UpdateCoalescedRDD} import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext} -import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel, CompactionModel, ExecutionErrors, UpdateTableModel} +import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel, ExecutionErrors, UpdateTableModel} import org.apache.spark.sql.hive.DistributionUtil import org.apache.spark.util.SparkUtil +import org.apache.carbondata.common.CarbonIterator import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier, ColumnarFormatVersion} +import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, ColumnarFormatVersion} import org.apache.carbondata.core.carbon.datastore.block.{Distributable, TableBlockInfo} -import org.apache.carbondata.core.carbon.metadata.CarbonMetadata import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable import org.apache.carbondata.core.carbon.path.CarbonStorePath import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.dictionary.server.DictionaryServer import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails} import org.apache.carbondata.core.update.CarbonUpdateUtil -import org.apache.carbondata.core.updatestatus.SegmentStatusManager import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.locks.{CarbonLockFactory, ICarbonLock, LockUsage} import org.apache.carbondata.processing.csvreaderstep.RddInpututilsForUpdate import org.apache.carbondata.processing.etl.DataLoadingException import org.apache.carbondata.processing.model.CarbonLoadModel -import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException +import org.apache.carbondata.processing.newflow.DataLoadExecutor +import org.apache.carbondata.processing.newflow.exception.{BadRecordFoundException, CarbonDataLoadingException} import org.apache.carbondata.spark._ import org.apache.carbondata.spark.load._ -import org.apache.carbondata.spark.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionCallable, CompactionType} -import org.apache.carbondata.spark.partition.api.Partition +import org.apache.carbondata.spark.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType} import org.apache.carbondata.spark.splits.TableSplit -import org.apache.carbondata.spark.util.{CarbonQueryUtil, CommonUtil, LoadMetadataUtil} +import org.apache.carbondata.spark.util.{CarbonQueryUtil, CommonUtil} /** * This is the factory class which can create different RDD depends on user needs. @@ -715,16 +716,28 @@ object CarbonDataRDDFactory { loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS) val rddIteratorKey = CarbonCommonConstants.RDDUTIL_UPDATE_KEY + UUID.randomUUID().toString + if (useKettle) { + try { + RddInpututilsForUpdate.put(rddIteratorKey, + new RddIteratorForUpdate(iter, carbonLoadModel)) + carbonLoadModel.setRddIteratorKey(rddIteratorKey) + CarbonDataLoadForUpdate.run(carbonLoadModel, + index, + storePath, + kettleHomePath, + segId, + loadMetadataDetails, + executionErrors) + } finally { + RddInpututilsForUpdate.remove(rddIteratorKey) + } + } else { + UpdateDataLoad.DataLoadNoKettleForUpdate(segId, + index, + iter, + carbonLoadModel, + loadMetadataDetails) - try { - RddInpututilsForUpdate.put(rddIteratorKey, - new RddIteratorForUpdate(iter, carbonLoadModel)) - carbonLoadModel.setRddIteratorKey(rddIteratorKey) - CarbonDataLoadForUpdate - .run(carbonLoadModel, index, storePath, kettleHomePath, - segId, loadMetadataDetails, executionErrors) - } finally { - RddInpututilsForUpdate.remove(rddIteratorKey) } } catch { case e: Exception => http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf2d193/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala index b4c6940..2eb859b 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala @@ -102,6 +102,19 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { sql("""drop table iud.dest33""").show } + test("update carbon table without alias in set columns with mulitple loads") { + sql("""drop table iud.dest33""").show + sql("""create table iud.dest33 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show() + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest33""") + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest33""") + sql("""update iud.dest33 d set (c3,c5 ) = (select s.c33 ,s.c55 from iud.source2 s where d.c1 = s.c11) where d.c1 = 'a'""").show() + checkAnswer( + sql("""select c3,c5 from iud.dest33 where c1='a'"""), + Seq(Row("MGM","Disco"),Row("MGM","Disco")) + ) + sql("""drop table iud.dest33""").show + } + test("update carbon table without alias in set three columns") { sql("""drop table iud.dest44""").show sql("""create table iud.dest44 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()