pratyakshsharma commented on a change in pull request #4227:
URL: https://github.com/apache/carbondata/pull/4227#discussion_r731965589



##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
##########
@@ -98,8 +99,35 @@ case class CarbonMergeDataSetCommand(
       throw new UnsupportedOperationException(
         "Carbon table supposed to be present in merge dataset")
     }
+
+    val properties = CarbonProperties.getInstance()
+    val filterDupes = properties
+      .getProperty(CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE,
+        
CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE_DEFAULT).toBoolean
+    if (operationType != null &&
+        
!MergeOperationType.withName(operationType.toUpperCase).equals(MergeOperationType.INSERT)
 &&
+        filterDupes) {
+      throw new MalformedCarbonCommandException("property 
CARBON_STREAMER_INSERT_DEDUPLICATE" +
+                                                " should only be set with 
operation type INSERT")
+    }
+    val isSchemaEnforcementEnabled = properties
+      .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT,
+        
CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean
+    if (operationType != null) {
+      if (isSchemaEnforcementEnabled) {
+        // call the util function to verify if incoming schema matches with 
target schema
+        CarbonMergeDataSetUtil.verifySourceAndTargetSchemas(targetDsOri, srcDS)
+      } else {
+        CarbonMergeDataSetUtil.handleSchemaEvolution(
+          targetDsOri, srcDS, sparkSession)
+      }
+    }
+
     // Target dataset must be backed by carbondata table.
-    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+    val tgtTable = relations.head.carbonRelation.carbonTable
+    val targetCarbonTable: CarbonTable = 
CarbonEnv.getCarbonTable(Option(tgtTable.getDatabaseName),

Review comment:
       this is added to handle the cases where target table schema needs to be 
evolved. If some new column gets added, we want the updated target schema to be 
used henceforth so that values for new column will be populated without any 
issues. We are calling the handleSchemaEvolution() method just before this line 
of code. Hope that makes it clear. @Indhumathi27 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to