Repository: incubator-carbondata
Updated Branches:
  refs/heads/master b5c20a80a -> 1d5b5fcc4


fixing no kettle issue for IUD.
load count/ segment count should be string because in compaction case it will 
be 2.1


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/2cf2d193
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/2cf2d193
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/2cf2d193

Branch: refs/heads/master
Commit: 2cf2d1936237144a23c0a4393584a15b7beb600c
Parents: b5c20a8
Author: ravikiran <ravikiran.sn...@gmail.com>
Authored: Mon Jan 9 18:58:13 2017 +0530
Committer: jackylk <jacky.li...@huawei.com>
Committed: Mon Jan 16 20:13:24 2017 +0800

----------------------------------------------------------------------
 .../spark/rdd/CarbonDataLoadRDD.scala           |  6 +-
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |  4 +-
 .../carbondata/spark/rdd/UpdateDataLoad.scala   | 79 ++++++++++++++++++++
 .../spark/rdd/CarbonDataRDDFactory.scala        | 49 +++++++-----
 .../iud/UpdateCarbonTableTestCase.scala         | 13 ++++
 5 files changed, 128 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf2d193/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index ff3a174..c8e3c67 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -83,7 +83,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
     splitIndex: Int,
     storePath: String,
     kettleHomePath: String,
-    loadCount: Int,
+    loadCount: String,
     loadMetadataDetails: LoadMetadataDetails) {
   private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
@@ -239,7 +239,7 @@ class DataFileLoaderRDD[K, V](
         carbonLoadModel.setSegmentId(String.valueOf(loadCount))
         setModelAndBlocksInfo()
         val loader = new SparkPartitionLoader(model, theSplit.index, storePath,
-          kettleHomePath, loadCount, loadMetadataDetails)
+          kettleHomePath, String.valueOf(loadCount), loadMetadataDetails)
         loader.initialize
         if (model.isRetentionRequest) {
           recreateAggregationTableForRetention
@@ -511,7 +511,7 @@ class DataFrameLoaderRDD[K, V](
         carbonLoadModel.setSegmentId(String.valueOf(loadCount))
         carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
         val loader = new SparkPartitionLoader(carbonLoadModel, theSplit.index, 
storePath,
-          kettleHomePath, loadCount, loadMetadataDetails)
+          kettleHomePath, String.valueOf(loadCount), loadMetadataDetails)
         loader.initialize
         val rddIteratorKey = UUID.randomUUID().toString
         try {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf2d193/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 46e83a5..9e13883 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -157,7 +157,7 @@ class NewCarbonDataLoadRDD[K, V](
           theSplit.index,
           null,
           null,
-          loadCount,
+          String.valueOf(loadCount),
           loadMetadataDetails)
         // Intialize to set carbon properties
         loader.initialize()
@@ -355,7 +355,7 @@ class NewDataFrameLoaderRDD[K, V](
           theSplit.index,
           null,
           null,
-          loadCount,
+          String.valueOf(loadCount),
           loadMetadataDetails)
         // Intialize to set carbon properties
         loader.initialize()

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf2d193/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
new file mode 100644
index 0000000..7dee88a
--- /dev/null
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.sql.Row
+
+import org.apache.carbondata.common.CarbonIterator
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.load.LoadMetadataDetails
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.processing.newflow.DataLoadExecutor
+import 
org.apache.carbondata.processing.newflow.exception.BadRecordFoundException
+
+/**
+ * Data load in case of update command with out kettle.
+ */
+object UpdateDataLoad {
+
+  def DataLoadNoKettleForUpdate(segId: String,
+      index: Int,
+      iter: Iterator[Row],
+      carbonLoadModel: CarbonLoadModel,
+      loadMetadataDetails: LoadMetadataDetails): Unit = {
+    val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    try {
+      val recordReaders = mutable.Buffer[CarbonIterator[Array[AnyRef]]]()
+      val serializer = SparkEnv.get.closureSerializer.newInstance()
+      var serializeBuffer: ByteBuffer = null
+      recordReaders += new NewRddIterator(iter,
+          carbonLoadModel,
+          TaskContext.get())
+
+      val loader = new SparkPartitionLoader(carbonLoadModel,
+        index,
+        null,
+        null,
+        segId,
+        loadMetadataDetails)
+      // Intialize to set carbon properties
+      loader.initialize()
+
+      
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+      new DataLoadExecutor().execute(carbonLoadModel,
+        loader.storeLocation,
+        recordReaders.toArray)
+
+    } catch {
+      case e: BadRecordFoundException =>
+        loadMetadataDetails.setLoadStatus(
+          CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+        LOGGER.info("Bad Record Found")
+      case e: Exception =>
+        LOGGER.error(e)
+        throw e
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf2d193/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 7a7aa64..610f5fb 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -17,11 +17,13 @@
 
 package org.apache.carbondata.spark.rdd
 
+import java.nio.ByteBuffer
 import java.util
 import java.util.UUID
 import java.util.concurrent._
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
 import scala.util.Random
 import scala.util.control.Breaks._
@@ -30,36 +32,35 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
-import org.apache.spark.{SparkContext, SparkEnv, SparkException}
+import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskContext}
 import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, 
UpdateCoalescedRDD}
 import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext}
-import org.apache.spark.sql.execution.command.{AlterTableModel, 
CompactionCallableModel, CompactionModel, ExecutionErrors, UpdateTableModel}
+import org.apache.spark.sql.execution.command.{AlterTableModel, 
CompactionModel, ExecutionErrors, UpdateTableModel}
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.util.SparkUtil
 
+import org.apache.carbondata.common.CarbonIterator
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, 
CarbonTableIdentifier, ColumnarFormatVersion}
+import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, 
ColumnarFormatVersion}
 import org.apache.carbondata.core.carbon.datastore.block.{Distributable, 
TableBlockInfo}
-import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.carbon.path.CarbonStorePath
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
 import org.apache.carbondata.core.update.CarbonUpdateUtil
-import org.apache.carbondata.core.updatestatus.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
 import org.apache.carbondata.processing.csvreaderstep.RddInpututilsForUpdate
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.CarbonLoadModel
-import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
+import org.apache.carbondata.processing.newflow.DataLoadExecutor
+import 
org.apache.carbondata.processing.newflow.exception.{BadRecordFoundException, 
CarbonDataLoadingException}
 import org.apache.carbondata.spark._
 import org.apache.carbondata.spark.load._
-import org.apache.carbondata.spark.merger.{CarbonCompactionUtil, 
CarbonDataMergerUtil, CompactionCallable, CompactionType}
-import org.apache.carbondata.spark.partition.api.Partition
+import org.apache.carbondata.spark.merger.{CarbonCompactionUtil, 
CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.spark.splits.TableSplit
-import org.apache.carbondata.spark.util.{CarbonQueryUtil, CommonUtil, 
LoadMetadataUtil}
+import org.apache.carbondata.spark.util.{CarbonQueryUtil, CommonUtil}
 
 /**
  * This is the factory class which can create different RDD depends on user 
needs.
@@ -715,16 +716,28 @@ object CarbonDataRDDFactory {
               
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
               val rddIteratorKey = CarbonCommonConstants.RDDUTIL_UPDATE_KEY +
                                    UUID.randomUUID().toString
+              if (useKettle) {
+                try {
+                  RddInpututilsForUpdate.put(rddIteratorKey,
+                    new RddIteratorForUpdate(iter, carbonLoadModel))
+                  carbonLoadModel.setRddIteratorKey(rddIteratorKey)
+                  CarbonDataLoadForUpdate.run(carbonLoadModel,
+                    index,
+                    storePath,
+                    kettleHomePath,
+                    segId,
+                    loadMetadataDetails,
+                    executionErrors)
+                } finally {
+                  RddInpututilsForUpdate.remove(rddIteratorKey)
+                }
+              } else {
+                UpdateDataLoad.DataLoadNoKettleForUpdate(segId,
+                  index,
+                  iter,
+                  carbonLoadModel,
+                  loadMetadataDetails)
 
-              try {
-                RddInpututilsForUpdate.put(rddIteratorKey,
-                  new RddIteratorForUpdate(iter, carbonLoadModel))
-                carbonLoadModel.setRddIteratorKey(rddIteratorKey)
-                CarbonDataLoadForUpdate
-                  .run(carbonLoadModel, index, storePath, kettleHomePath,
-                    segId, loadMetadataDetails, executionErrors)
-              } finally {
-                RddInpututilsForUpdate.remove(rddIteratorKey)
               }
             } catch {
               case e: Exception =>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf2d193/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index b4c6940..2eb859b 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -102,6 +102,19 @@ class UpdateCarbonTableTestCase extends QueryTest with 
BeforeAndAfterAll {
       sql("""drop table iud.dest33""").show
   }
 
+  test("update carbon table without alias in set columns with mulitple loads") 
{
+    sql("""drop table iud.dest33""").show
+    sql("""create table iud.dest33 (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""").show()
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.dest33""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.dest33""")
+    sql("""update iud.dest33 d set (c3,c5 ) = (select s.c33 ,s.c55  from 
iud.source2 s where d.c1 = s.c11) where d.c1 = 'a'""").show()
+    checkAnswer(
+      sql("""select c3,c5 from iud.dest33 where c1='a'"""),
+      Seq(Row("MGM","Disco"),Row("MGM","Disco"))
+    )
+    sql("""drop table iud.dest33""").show
+  }
+
    test("update carbon table without alias in set three columns") {
      sql("""drop table iud.dest44""").show
      sql("""create table iud.dest44 (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""").show()

Reply via email to