pratyakshsharma commented on a change in pull request #4227: URL: https://github.com/apache/carbondata/pull/4227#discussion_r731965589
########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala ########## @@ -98,8 +99,35 @@ case class CarbonMergeDataSetCommand( throw new UnsupportedOperationException( "Carbon table supposed to be present in merge dataset") } + + val properties = CarbonProperties.getInstance() + val filterDupes = properties + .getProperty(CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE, + CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE_DEFAULT).toBoolean + if (operationType != null && + !MergeOperationType.withName(operationType.toUpperCase).equals(MergeOperationType.INSERT) && + filterDupes) { + throw new MalformedCarbonCommandException("property CARBON_STREAMER_INSERT_DEDUPLICATE" + + " should only be set with operation type INSERT") + } + val isSchemaEnforcementEnabled = properties + .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT, + CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean + if (operationType != null) { + if (isSchemaEnforcementEnabled) { + // call the util function to verify if incoming schema matches with target schema + CarbonMergeDataSetUtil.verifySourceAndTargetSchemas(targetDsOri, srcDS) + } else { + CarbonMergeDataSetUtil.handleSchemaEvolution( + targetDsOri, srcDS, sparkSession) + } + } + // Target dataset must be backed by carbondata table. - val targetCarbonTable = relations.head.carbonRelation.carbonTable + val tgtTable = relations.head.carbonRelation.carbonTable + val targetCarbonTable: CarbonTable = CarbonEnv.getCarbonTable(Option(tgtTable.getDatabaseName), Review comment: this is added to handle the cases where target table schema needs to be evolved. If some new column gets added, we want the updated target schema to be used henceforth so that values for new column will be populated without any issues. We are calling the handleSchemaEvolution() method just before this line of code. Hope that makes it clear. @Indhumathi27 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@carbondata.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org