ajantha-bhat commented on a change in pull request #3856:
URL: https://github.com/apache/carbondata/pull/3856#discussion_r473876207
##
File path:
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
##
@@ -106,18 +106,34 @@ case class CarbonMergeDataSetCommand(
// decide join type based on match conditions
val joinType = decideJoinType
+val joinColumn = mergeMatches.joinExpr.expr.asInstanceOf[EqualTo].left
+ .asInstanceOf[UnresolvedAttribute].nameParts.tail.head
+// repartition the the srsDs, if the target as bucketing and the bucketing
column and join
Review comment:
```suggestion
// repartition the srcDs, if the target has bucketing and the bucketing
column and join
```
##
File path:
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
##
@@ -106,18 +106,34 @@ case class CarbonMergeDataSetCommand(
// decide join type based on match conditions
val joinType = decideJoinType
+val joinColumn = mergeMatches.joinExpr.expr.asInstanceOf[EqualTo].left
+ .asInstanceOf[UnresolvedAttribute].nameParts.tail.head
+// repartition the the srsDs, if the target as bucketing and the bucketing
column and join
+// column are same
+val repartitionedSrsDs =
Review comment:
```suggestion
val repartitionedSrcDs =
```
##
File path:
integration/spark/src/main/spark2.3/org/apache/spark/sql/avro/AvroFileFormatFactory.scala
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.avro
+
+import com.databricks.spark.avro.{AvroReader, AvroWriter}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.datasources.OutputWriterFactory
+
+object AvroFileFormatFactory {
Review comment:
same as above doubt, may be just use databricks spark acro for both 2.3
and 2.4
##
File path: integration/spark/pom.xml
##
@@ -153,6 +153,28 @@
+
+ com.databricks
+ spark-avro_${scala.binary.version}
+ 4.0.0
+
+
+ org.apache.avro
+ avro
+
+
+
+
+ org.apache.spark
+ spark-avro_${scala.binary.version}
Review comment:
why can't spark2.3 and 2.4 both use databricks spark-avro ? I can
understand that other way around is not possible (for both to use spark avro)
##
File path:
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
##
@@ -439,6 +449,11 @@ case class CarbonInsertIntoCommand(databaseNameOp:
Option[String],
def insertData(loadParams: CarbonLoadParams): (Seq[Row],
LoadMetadataDetails) = {
var rows = Seq.empty[Row]
+val loadDataFrame = if (updateModel.isDefined &&
!updateModel.get.loadAsNewSegment) {
+ Some(CommonLoadUtils.getDataFrameWithTupleID(Some(dataFrame)))
Review comment:
This InsertIntoCommand flow is not meant for update flow yet. Because
update will have an implicit column and rearrange schema and all will fail. so,
I suggest if `updateModel.get.loadAsNewSegment` is `false` throw unsupported
exception now and handle this requirement later.
Also when `updateModel.get.loadAsNewSegment = true` (which is our current
cdc history data case), **this flow can be used** (as it is just a insert, no
actual update flow used). only when `updateModel.get.loadAsNewSegment = false`
we cannot use this flow.
so someone might use it because of update model support. so, I suggest to
throw an exception in the beginning of this function when
`updateModel.get.loadAsNewSegment = false`
##
File path:
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
##
@@ -439,6 +449,11 @@ case class CarbonInsertIntoCommand(databaseNameOp:
Option[String],
def insertData(loadParams: CarbonLoadParams): (Seq[Row],
LoadMetadataDetails) = {
var rows = Seq.empty[Row]
+val loadDataFrame = if (updateModel.i