Indhumathi27 commented on a change in pull request #4227:
URL: https://github.com/apache/carbondata/pull/4227#discussion_r728023802



##########
File path: 
common/src/main/java/org/apache/carbondata/common/exceptions/sql/CarbonSchemaException.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.common.exceptions.sql;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+@InterfaceAudience.User
+@InterfaceStability.Stable
+public class CarbonSchemaException extends Exception {
+
+  private static final long serialVersionUID = 1L;
+
+  private final String msg;
+
+  public CarbonSchemaException(String msg) {
+    super(msg);
+    this.msg = msg;
+  }
+
+  public String getMsg() {

Review comment:
       ```suggestion
     public String getMsg() {
   ```
   ```suggestion
     public String getMessage() {
   ```

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
##########
@@ -98,8 +99,35 @@ case class CarbonMergeDataSetCommand(
       throw new UnsupportedOperationException(
         "Carbon table supposed to be present in merge dataset")
     }
+
+    val properties = CarbonProperties.getInstance()
+    val filterDupes = properties
+      .getProperty(CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE,
+        
CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE_DEFAULT).toBoolean
+    if (operationType != null &&
+        
!MergeOperationType.withName(operationType.toUpperCase).equals(MergeOperationType.INSERT)
 &&
+        filterDupes) {
+      throw new MalformedCarbonCommandException("property 
CARBON_STREAMER_INSERT_DEDUPLICATE" +
+                                                " should only be set with 
operation type INSERT")
+    }
+    val isSchemaEnforcementEnabled = properties
+      .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT,
+        
CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean
+    if (operationType != null) {
+      if (isSchemaEnforcementEnabled) {
+        // call the util function to verify if incoming schema matches with 
target schema
+        CarbonMergeDataSetUtil.verifySourceAndTargetSchemas(targetDsOri, srcDS)
+      } else {
+        CarbonMergeDataSetUtil.handleSchemaEvolution(
+          targetDsOri, srcDS, sparkSession)
+      }
+    }
+
     // Target dataset must be backed by carbondata table.
-    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+    val tgtTable = relations.head.carbonRelation.carbonTable
+    val targetCarbonTable: CarbonTable = 
CarbonEnv.getCarbonTable(Option(tgtTable.getDatabaseName),

Review comment:
       why this code is required ?

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
##########
@@ -98,8 +99,35 @@ case class CarbonMergeDataSetCommand(
       throw new UnsupportedOperationException(
         "Carbon table supposed to be present in merge dataset")
     }
+
+    val properties = CarbonProperties.getInstance()

Review comment:
       looks like, the validations added here, is applicable only when 
operationType is not Null. So, please move these validations inside  if check 
(operationType != null )

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil {
       columnMinMaxInBlocklet.asScala
     }
   }
+
+  /**
+   * This method verifies source and target schemas for the following:
+   * If additional columns are present in source schema as compared to target, 
simply ignore them.
+   * If some columns are missing in source schema as compared to target 
schema, exception is thrown.
+   * If data type of some column differs in source and target schemas, 
exception is thrown.
+   * If source schema has multiple columns whose names differ only in case 
sensitivity, exception
+   * is thrown.
+   * @param targetDs target carbondata table
+   * @param srcDs source/incoming data
+   */
+  def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: 
Dataset[Row]): Unit = {
+    LOGGER.info("schema enforcement is enabled. Source and target schemas will 
be verified")
+    // get the source and target dataset schema
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {
+      // check if some field is missing in source schema
+      if 
(!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase))
 {
+        LOGGER.error(s"source schema does not contain field: ${ tgtField.name 
}")
+        throw new CarbonSchemaException(s"source schema does not contain " +
+                                                  s"field: ${ tgtField.name }")
+      }
+
+      // check if data type got modified for some column
+      val sourceField = sourceSchema.fields
+        .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase))
+      if (!sourceField.get.dataType.equals(tgtField.dataType)) {
+        LOGGER.error(s"source schema has different data type for field: ${
+          tgtField.name
+        }, source type: ${ sourceField.get.dataType }, target type: ${ 
tgtField.dataType }")
+        throw new CarbonSchemaException(s"source schema has different data 
type " +
+                                                  s"for field: ${ 
tgtField.name }")
+      }
+    })
+
+    // check if some additional column got added in source schema
+    if (sourceSchema.fields.length > targetSchema.fields.length) {
+      val additionalSourceFields = sourceSchema.fields.map(_.name.toLowerCase)
+        .filterNot(srcField => {
+          targetSchema.fields.map(_.name.toLowerCase).contains(srcField)
+        })
+      LOGGER.warn(s"source schema contains additional fields which are not 
present in " +
+                  s"target schema: ${ additionalSourceFields.mkString(",") }")
+    }
+
+    // check if source schema has fields whose names only differ in case 
sensitivity
+    val similarFields = sourceSchema.fields.map(_.name.toLowerCase).groupBy(a 
=> identity(a)).map {

Review comment:
       can create new variable for sourceSchema.fields.map(_.name.toLowerCase) 
and reuse in line:516 

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil {
       columnMinMaxInBlocklet.asScala
     }
   }
+
+  /**
+   * This method verifies source and target schemas for the following:
+   * If additional columns are present in source schema as compared to target, 
simply ignore them.
+   * If some columns are missing in source schema as compared to target 
schema, exception is thrown.
+   * If data type of some column differs in source and target schemas, 
exception is thrown.
+   * If source schema has multiple columns whose names differ only in case 
sensitivity, exception
+   * is thrown.
+   * @param targetDs target carbondata table
+   * @param srcDs source/incoming data
+   */
+  def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: 
Dataset[Row]): Unit = {
+    LOGGER.info("schema enforcement is enabled. Source and target schemas will 
be verified")
+    // get the source and target dataset schema
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {
+      // check if some field is missing in source schema
+      if 
(!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase))
 {
+        LOGGER.error(s"source schema does not contain field: ${ tgtField.name 
}")

Review comment:
       can add exception message to a new variable and reuse. Please handle in 
all places applicable

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil {
       columnMinMaxInBlocklet.asScala
     }
   }
+
+  /**
+   * This method verifies source and target schemas for the following:
+   * If additional columns are present in source schema as compared to target, 
simply ignore them.
+   * If some columns are missing in source schema as compared to target 
schema, exception is thrown.
+   * If data type of some column differs in source and target schemas, 
exception is thrown.
+   * If source schema has multiple columns whose names differ only in case 
sensitivity, exception
+   * is thrown.
+   * @param targetDs target carbondata table
+   * @param srcDs source/incoming data
+   */
+  def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: 
Dataset[Row]): Unit = {
+    LOGGER.info("schema enforcement is enabled. Source and target schemas will 
be verified")
+    // get the source and target dataset schema
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {
+      // check if some field is missing in source schema
+      if 
(!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase))
 {
+        LOGGER.error(s"source schema does not contain field: ${ tgtField.name 
}")
+        throw new CarbonSchemaException(s"source schema does not contain " +
+                                                  s"field: ${ tgtField.name }")
+      }
+
+      // check if data type got modified for some column
+      val sourceField = sourceSchema.fields
+        .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase))
+      if (!sourceField.get.dataType.equals(tgtField.dataType)) {
+        LOGGER.error(s"source schema has different data type for field: ${
+          tgtField.name
+        }, source type: ${ sourceField.get.dataType }, target type: ${ 
tgtField.dataType }")
+        throw new CarbonSchemaException(s"source schema has different data 
type " +
+                                                  s"for field: ${ 
tgtField.name }")
+      }
+    })
+
+    // check if some additional column got added in source schema
+    if (sourceSchema.fields.length > targetSchema.fields.length) {
+      val additionalSourceFields = sourceSchema.fields.map(_.name.toLowerCase)
+        .filterNot(srcField => {
+          targetSchema.fields.map(_.name.toLowerCase).contains(srcField)
+        })
+      LOGGER.warn(s"source schema contains additional fields which are not 
present in " +
+                  s"target schema: ${ additionalSourceFields.mkString(",") }")
+    }
+
+    // check if source schema has fields whose names only differ in case 
sensitivity
+    val similarFields = sourceSchema.fields.map(_.name.toLowerCase).groupBy(a 
=> identity(a)).map {
+      case (str, times) => (str, times.length)
+    }.toList.filter(e => e._2 > 1).map(_._1)
+    if (similarFields.nonEmpty) {
+      LOGGER.error(s"source schema has similar fields which differ only in 
case sensitivity: " +
+                   s"${ similarFields.mkString(",") }")
+      throw new CarbonSchemaException(s"source schema has similar fields which 
differ" +
+                                                s" only in case sensitivity: ${
+                                                  similarFields.mkString(",")
+                                                }")
+    }
+  }
+
+  /**
+   * This method takes care of handling schema evolution scenarios for 
CarbonStreamer class.
+   * Currently only addition of columns is supported.
+   * @param targetDs target dataset whose schema needs to be modified, if 
applicable
+   * @param srcDs incoming dataset
+   * @param sparkSession SparkSession
+   */
+  def handleSchemaEvolutionForCarbonStreamer(targetDs: Dataset[Row], srcDs: 
Dataset[Row],
+      sparkSession: SparkSession): Unit = {
+    // read the property here
+    val isSchemaEnforcementEnabled = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT,
+        
CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean
+    if (isSchemaEnforcementEnabled) {
+      verifySourceAndTargetSchemas(targetDs, srcDs)
+    } else {
+      // These meta columns should be removed before actually writing the data
+      val metaColumnsString = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_STREAMER_META_COLUMNS, "")
+      val metaCols = metaColumnsString.split(",").map(_.trim)
+      val srcDsWithoutMeta = if (metaCols.length > 0) srcDs.drop(metaCols: _*)
+      else srcDs
+      handleSchemaEvolution(targetDs, srcDsWithoutMeta, sparkSession, 
isStreamerInvolved = true)
+    }
+  }
+
+  def verifyBackwardsCompatibility(
+      targetDs: Dataset[Row],
+      srcDs: Dataset[Row]): Unit = {
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {
+      // check if some field is missing in source schema
+      if 
(!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase))
 {
+        LOGGER.error(s"source schema does not contain field: ${ tgtField.name 
}")
+        throw new CarbonSchemaException(s"source schema does not contain " +
+                                                  s"field: ${ tgtField.name }")
+      }
+
+      // check if data type got modified for some column
+      val sourceField = sourceSchema.fields
+        .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase))
+      if (!sourceField.get.dataType.equals(tgtField.dataType)) {
+        LOGGER.error(s"source schema has different data type for field: ${
+          tgtField.name
+        }, source type: ${ sourceField.get.dataType }, target type: ${ 
tgtField.dataType }")
+        throw new CarbonSchemaException(s"source schema has different data 
type " +
+                                                  s"for field: ${ 
tgtField.name }")
+      }
+    })
+  }
+
+  /**
+   * The method takes care of following schema evolution cases:
+   * Addition of a new column in source schema which is not present in target
+   * Deletion of a column in source schema which is present in target
+   * Data type changes for an existing column.
+   * The method does not take care of column renames and table renames
+   * @param targetDs existing target dataset
+   * @param srcDs incoming source dataset
+   * @return new target schema to write the incoming batch with
+   */
+  def handleSchemaEvolution(
+      targetDs: Dataset[Row],
+      srcDs: Dataset[Row],
+      sparkSession: SparkSession,
+      isStreamerInvolved: Boolean = false): Unit = {
+
+    if (isStreamerInvolved) {
+      verifyBackwardsCompatibility(targetDs, srcDs)
+    }
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    // check if any column got added in source
+    val addedColumns = sourceSchema.fields
+      .map(_.name)
+      .filterNot(f => targetSchema.fields.map(_.name).contains(f))
+    if (addedColumns.nonEmpty) {
+      handleAddColumnScenario(targetDs,
+        srcDs.schema.fields.filter(f => addedColumns.contains(f.name)).toSeq,
+        sparkSession)
+    }
+
+    // check if any column got deleted from source
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+    val partitionInfo = targetCarbonTable.getPartitionInfo
+    val partitionColumns = if (partitionInfo != null) 
partitionInfo.getColumnSchemaList.asScala
+      .map(_.getColumnName).toList else List[String]()
+    val deletedColumns = targetSchema.fields.map(_.name.toLowerCase)
+      .filterNot(f => {
+        sourceSchema.fields.map(_.name.toLowerCase).contains(f) ||

Review comment:
       can move sourceSchema.fields.map(_.name.toLowerCase) outside the loop to 
new variable and reuse

##########
File path: pom.xml
##########
@@ -130,7 +130,7 @@
     <scala.version>2.11.8</scala.version>
     <hadoop.deps.scope>compile</hadoop.deps.scope>
     <spark.version>2.3.4</spark.version>
-    <spark.binary.version>2.3</spark.binary.version>
+    <spark.binary.version>2.4</spark.binary.version>

Review comment:
       why this change ? please revert

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil {
       columnMinMaxInBlocklet.asScala
     }
   }
+
+  /**
+   * This method verifies source and target schemas for the following:
+   * If additional columns are present in source schema as compared to target, 
simply ignore them.
+   * If some columns are missing in source schema as compared to target 
schema, exception is thrown.
+   * If data type of some column differs in source and target schemas, 
exception is thrown.
+   * If source schema has multiple columns whose names differ only in case 
sensitivity, exception
+   * is thrown.
+   * @param targetDs target carbondata table
+   * @param srcDs source/incoming data
+   */
+  def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: 
Dataset[Row]): Unit = {
+    LOGGER.info("schema enforcement is enabled. Source and target schemas will 
be verified")
+    // get the source and target dataset schema
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {
+      // check if some field is missing in source schema
+      if 
(!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase))
 {
+        LOGGER.error(s"source schema does not contain field: ${ tgtField.name 
}")
+        throw new CarbonSchemaException(s"source schema does not contain " +
+                                                  s"field: ${ tgtField.name }")
+      }
+
+      // check if data type got modified for some column
+      val sourceField = sourceSchema.fields
+        .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase))
+      if (!sourceField.get.dataType.equals(tgtField.dataType)) {
+        LOGGER.error(s"source schema has different data type for field: ${
+          tgtField.name
+        }, source type: ${ sourceField.get.dataType }, target type: ${ 
tgtField.dataType }")
+        throw new CarbonSchemaException(s"source schema has different data 
type " +
+                                                  s"for field: ${ 
tgtField.name }")
+      }
+    })
+
+    // check if some additional column got added in source schema
+    if (sourceSchema.fields.length > targetSchema.fields.length) {
+      val additionalSourceFields = sourceSchema.fields.map(_.name.toLowerCase)
+        .filterNot(srcField => {
+          targetSchema.fields.map(_.name.toLowerCase).contains(srcField)
+        })
+      LOGGER.warn(s"source schema contains additional fields which are not 
present in " +
+                  s"target schema: ${ additionalSourceFields.mkString(",") }")
+    }
+
+    // check if source schema has fields whose names only differ in case 
sensitivity
+    val similarFields = sourceSchema.fields.map(_.name.toLowerCase).groupBy(a 
=> identity(a)).map {
+      case (str, times) => (str, times.length)
+    }.toList.filter(e => e._2 > 1).map(_._1)
+    if (similarFields.nonEmpty) {
+      LOGGER.error(s"source schema has similar fields which differ only in 
case sensitivity: " +
+                   s"${ similarFields.mkString(",") }")
+      throw new CarbonSchemaException(s"source schema has similar fields which 
differ" +
+                                                s" only in case sensitivity: ${
+                                                  similarFields.mkString(",")
+                                                }")
+    }
+  }
+
+  /**
+   * This method takes care of handling schema evolution scenarios for 
CarbonStreamer class.
+   * Currently only addition of columns is supported.
+   * @param targetDs target dataset whose schema needs to be modified, if 
applicable
+   * @param srcDs incoming dataset
+   * @param sparkSession SparkSession
+   */
+  def handleSchemaEvolutionForCarbonStreamer(targetDs: Dataset[Row], srcDs: 
Dataset[Row],
+      sparkSession: SparkSession): Unit = {
+    // read the property here
+    val isSchemaEnforcementEnabled = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT,
+        
CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean
+    if (isSchemaEnforcementEnabled) {
+      verifySourceAndTargetSchemas(targetDs, srcDs)
+    } else {
+      // These meta columns should be removed before actually writing the data
+      val metaColumnsString = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_STREAMER_META_COLUMNS, "")
+      val metaCols = metaColumnsString.split(",").map(_.trim)
+      val srcDsWithoutMeta = if (metaCols.length > 0) srcDs.drop(metaCols: _*)
+      else srcDs
+      handleSchemaEvolution(targetDs, srcDsWithoutMeta, sparkSession, 
isStreamerInvolved = true)
+    }
+  }
+
+  def verifyBackwardsCompatibility(
+      targetDs: Dataset[Row],
+      srcDs: Dataset[Row]): Unit = {
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {
+      // check if some field is missing in source schema
+      if 
(!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase))
 {
+        LOGGER.error(s"source schema does not contain field: ${ tgtField.name 
}")
+        throw new CarbonSchemaException(s"source schema does not contain " +
+                                                  s"field: ${ tgtField.name }")
+      }
+
+      // check if data type got modified for some column
+      val sourceField = sourceSchema.fields
+        .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase))
+      if (!sourceField.get.dataType.equals(tgtField.dataType)) {
+        LOGGER.error(s"source schema has different data type for field: ${
+          tgtField.name
+        }, source type: ${ sourceField.get.dataType }, target type: ${ 
tgtField.dataType }")
+        throw new CarbonSchemaException(s"source schema has different data 
type " +
+                                                  s"for field: ${ 
tgtField.name }")
+      }
+    })
+  }
+
+  /**
+   * The method takes care of following schema evolution cases:
+   * Addition of a new column in source schema which is not present in target
+   * Deletion of a column in source schema which is present in target
+   * Data type changes for an existing column.
+   * The method does not take care of column renames and table renames
+   * @param targetDs existing target dataset
+   * @param srcDs incoming source dataset
+   * @return new target schema to write the incoming batch with
+   */
+  def handleSchemaEvolution(
+      targetDs: Dataset[Row],
+      srcDs: Dataset[Row],
+      sparkSession: SparkSession,
+      isStreamerInvolved: Boolean = false): Unit = {
+
+    if (isStreamerInvolved) {
+      verifyBackwardsCompatibility(targetDs, srcDs)
+    }
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    // check if any column got added in source
+    val addedColumns = sourceSchema.fields
+      .map(_.name)
+      .filterNot(f => targetSchema.fields.map(_.name).contains(f))
+    if (addedColumns.nonEmpty) {
+      handleAddColumnScenario(targetDs,
+        srcDs.schema.fields.filter(f => addedColumns.contains(f.name)).toSeq,
+        sparkSession)
+    }
+
+    // check if any column got deleted from source
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+    val partitionInfo = targetCarbonTable.getPartitionInfo
+    val partitionColumns = if (partitionInfo != null) 
partitionInfo.getColumnSchemaList.asScala
+      .map(_.getColumnName).toList else List[String]()
+    val deletedColumns = targetSchema.fields.map(_.name.toLowerCase)
+      .filterNot(f => {
+        sourceSchema.fields.map(_.name.toLowerCase).contains(f) ||
+        partitionColumns.contains(f)
+      })
+    if (deletedColumns.nonEmpty) {
+      handleDeleteColumnScenario(targetDs, deletedColumns.toList, sparkSession)
+    }
+
+    val modifiedColumns = targetSchema.fields.filter(tgtField => {
+      val sourceField = sourceSchema.fields.find(f => 
f.name.equalsIgnoreCase(tgtField.name))
+      if (sourceField.isDefined) 
!sourceField.get.dataType.equals(tgtField.dataType) else false
+    })
+
+    if (modifiedColumns.nonEmpty) {
+      handleDataTypeChangeScenario(targetDs, modifiedColumns.toList, 
sparkSession)
+    }
+  }
+
+  /**
+   * This method calls CarbonAlterTableAddColumnCommand for adding new columns
+   * @param targetDs target dataset whose schema needs to be modified
+   * @param colsToAdd new columns to be added
+   * @param sparkSession SparkSession
+   */
+  def handleAddColumnScenario(targetDs: Dataset[Row], colsToAdd: 
Seq[StructField],
+      sparkSession: SparkSession): Unit = {
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+    val fields = new CarbonSpark2SqlParser().getFields(colsToAdd)
+    val tableModel = CarbonParserUtil.prepareTableModel(ifNotExistPresent = 
false,
+      
CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)),
+      targetCarbonTable.getTableName.toLowerCase,
+      fields.map(CarbonParserUtil.convertFieldNamesToLowercase),
+      Seq.empty,
+      scala.collection.mutable.Map.empty[String, String],
+      None,
+      isAlterFlow = true)
+    //    targetCarbonTable.getAllDimensions.asScala.map(f => Field(column = 
f.getColName,
+    //      dataType = Some(f.getDataType.getName), name = 
Option(f.getColName),
+    //      children = None, ))
+    val alterTableAddColumnsModel = AlterTableAddColumnsModel(
+      
CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)),
+      targetCarbonTable.getTableName.toLowerCase,
+      Map.empty[String, String],
+      tableModel.dimCols,
+      tableModel.msrCols,
+      tableModel.highCardinalityDims.getOrElse(Seq.empty))
+    
CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel).run(sparkSession)
+  }
+
+  /**
+   * This method calls CarbonAlterTableDropColumnCommand for deleting columns
+   * @param targetDs target dataset whose schema needs to be modified
+   * @param colsToDrop columns to be dropped from carbondata table
+   * @param sparkSession SparkSession
+   */
+  def handleDeleteColumnScenario(targetDs: Dataset[Row], colsToDrop: 
List[String],
+      sparkSession: SparkSession): Unit = {
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+    val alterTableDropColumnModel = AlterTableDropColumnModel(
+      
CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)),
+      targetCarbonTable.getTableName.toLowerCase,
+      colsToDrop.map(_.toLowerCase))
+    
CarbonAlterTableDropColumnCommand(alterTableDropColumnModel).run(sparkSession)
+  }
+
+  /**
+   * This method calls CarbonAlterTableColRenameDataTypeChangeCommand for 
handling data type changes
+   * @param targetDs target dataset whose schema needs to be modified
+   * @param modifiedCols columns with data type changes
+   * @param sparkSession SparkSession
+   */
+  def handleDataTypeChangeScenario(targetDs: Dataset[Row], modifiedCols: 
List[StructField],
+      sparkSession: SparkSession): Unit = {
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+
+    // need to call the command one by one for each modified column
+    modifiedCols.foreach(col => {
+      val values = col.dataType match {
+        case d: DecimalType => Some(List((d.precision, d.scale)))
+        case _ => None
+      }
+      val dataTypeInfo = CarbonParserUtil.parseColumn(col.name, col.dataType, 
values)
+
+      val alterTableColRenameAndDataTypeChangeModel =
+        AlterTableDataTypeChangeModel(
+          dataTypeInfo,
+          Option(targetCarbonTable.getDatabaseName.toLowerCase),
+          targetCarbonTable.getTableName.toLowerCase,
+          col.name.toLowerCase,
+          col.name.toLowerCase,
+          isColumnRename = false,
+          Option.empty)
+
+      CarbonAlterTableColRenameDataTypeChangeCommand(
+        alterTableColRenameAndDataTypeChangeModel
+      ).run(sparkSession)
+    })
+  }
+
+  def deduplicateBeforeWriting(
+      srcDs: Dataset[Row],
+      targetDs: Dataset[Row],
+      sparkSession: SparkSession,
+      srcAlias: String,
+      targetAlias: String,
+      keyColumn: String,
+      orderingField: String,
+      targetCarbonTable: CarbonTable): Dataset[Row] = {
+    val properties = CarbonProperties.getInstance()
+    val filterDupes = properties
+      .getProperty(CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE,
+        
CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE_DEFAULT).toBoolean
+    val combineBeforeUpsert = properties
+      .getProperty(CarbonCommonConstants.CARBON_STREAMER_UPSERT_DEDUPLICATE,
+        
CarbonCommonConstants.CARBON_STREAMER_UPSERT_DEDUPLICATE_DEFAULT).toBoolean
+    var dedupedDataset: Dataset[Row] = srcDs
+    if (combineBeforeUpsert) {
+      dedupedDataset = deduplicateAgainstIncomingDataset(srcDs, sparkSession, 
srcAlias, keyColumn,
+        orderingField, targetCarbonTable)
+    }
+    if (filterDupes) {
+      dedupedDataset = deduplicateAgainstExistingDataset(dedupedDataset, 
targetDs,
+        srcAlias, targetAlias, keyColumn)
+    }
+    dedupedDataset.show()
+    dedupedDataset
+  }
+
+  def deduplicateAgainstIncomingDataset(
+      srcDs: Dataset[Row],
+      sparkSession: SparkSession,
+      srcAlias: String,
+      keyColumn: String,
+      orderingField: String,
+      table: CarbonTable): Dataset[Row] = {
+    if 
(orderingField.equals(CarbonCommonConstants.CARBON_STREAMER_SOURCE_ORDERING_FIELD_DEFAULT))
 {
+      return srcDs
+    }
+    val schema = srcDs.schema
+    val carbonKeyColumn = table.getColumnByName(keyColumn)
+    val keyColumnDataType = getCarbonDataType(keyColumn, srcDs)
+    val orderingFieldDataType = getCarbonDataType(orderingField, srcDs)
+    val isPrimitiveAndNotDate = 
DataTypeUtil.isPrimitiveColumn(orderingFieldDataType) &&
+                                (orderingFieldDataType != DataTypes.DATE)
+    val comparator = getComparator(orderingFieldDataType)
+    val rdd = srcDs.rdd
+    val dedupedRDD: RDD[Row] = rdd.map{row =>
+      val index = row.fieldIndex(keyColumn)
+      val rowKey = getRowKey(row, index, carbonKeyColumn, 
isPrimitiveAndNotDate, keyColumnDataType)
+      (rowKey, row)
+    }.reduceByKey{(row1, row2) =>
+      val orderingValue1 = row1.getAs(orderingField).asInstanceOf[Any]
+      val orderingValue2 = row2.getAs(orderingField).asInstanceOf[Any]
+      if (orderingFieldDataType.equals(DataTypes.STRING)) {
+        if (orderingValue1 == null) {
+          row2
+        } else if (orderingValue2 == null) {
+          row1
+        } else {
+          if (ByteUtil.UnsafeComparer.INSTANCE
+                .compareTo(orderingValue1.toString
+                  
.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)),
+                  orderingValue2.toString
+                    
.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))) >= 0) {
+            row1
+          } else {
+            row2
+          }
+        }
+      } else {
+        if (comparator.compare(orderingValue1, orderingValue2) >= 0) {
+          row1
+        } else {
+          row2
+        }
+      }
+    }.map(_._2)
+    sparkSession.createDataFrame(dedupedRDD, schema).alias(srcAlias)
+  }
+
+  def getComparator(
+      orderingFieldDataType: CarbonDataType
+  ): SerializableComparator = {
+    val isPrimitiveAndNotDate = 
DataTypeUtil.isPrimitiveColumn(orderingFieldDataType) &&
+                                (orderingFieldDataType != DataTypes.DATE)
+    if (isPrimitiveAndNotDate) {
+      Comparator.getComparator(orderingFieldDataType)
+    } else if (orderingFieldDataType == DataTypes.STRING) {
+      null
+    } else {
+      Comparator.getComparatorByDataTypeForMeasure(orderingFieldDataType)
+    }
+  }
+
+  def getRowKey(
+      row: Row,
+      index: Integer,
+      carbonKeyColumn: CarbonColumn,
+      isPrimitiveAndNotDate: Boolean,
+      keyColumnDataType: CarbonDataType
+  ): AnyRef = {
+    if (!row.isNullAt(index)) {
+      row.getAs(index).toString
+    } else {
+      val value: Long = 0
+      if (carbonKeyColumn.isDimension) {
+        if (isPrimitiveAndNotDate) {
+          CarbonCommonConstants.EMPTY_BYTE_ARRAY
+        } else {
+          CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY
+        }
+      } else {
+        val nullValueForMeasure = if ((keyColumnDataType eq DataTypes.BOOLEAN) 
||

Review comment:
       can replace this with CASE MATCH

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil {
       columnMinMaxInBlocklet.asScala
     }
   }
+
+  /**
+   * This method verifies source and target schemas for the following:
+   * If additional columns are present in source schema as compared to target, 
simply ignore them.
+   * If some columns are missing in source schema as compared to target 
schema, exception is thrown.
+   * If data type of some column differs in source and target schemas, 
exception is thrown.
+   * If source schema has multiple columns whose names differ only in case 
sensitivity, exception
+   * is thrown.
+   * @param targetDs target carbondata table
+   * @param srcDs source/incoming data
+   */
+  def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: 
Dataset[Row]): Unit = {
+    LOGGER.info("schema enforcement is enabled. Source and target schemas will 
be verified")
+    // get the source and target dataset schema
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {
+      // check if some field is missing in source schema
+      if 
(!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase))
 {
+        LOGGER.error(s"source schema does not contain field: ${ tgtField.name 
}")
+        throw new CarbonSchemaException(s"source schema does not contain " +
+                                                  s"field: ${ tgtField.name }")
+      }
+
+      // check if data type got modified for some column
+      val sourceField = sourceSchema.fields
+        .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase))
+      if (!sourceField.get.dataType.equals(tgtField.dataType)) {
+        LOGGER.error(s"source schema has different data type for field: ${
+          tgtField.name
+        }, source type: ${ sourceField.get.dataType }, target type: ${ 
tgtField.dataType }")
+        throw new CarbonSchemaException(s"source schema has different data 
type " +
+                                                  s"for field: ${ 
tgtField.name }")
+      }
+    })
+
+    // check if some additional column got added in source schema
+    if (sourceSchema.fields.length > targetSchema.fields.length) {
+      val additionalSourceFields = sourceSchema.fields.map(_.name.toLowerCase)
+        .filterNot(srcField => {
+          targetSchema.fields.map(_.name.toLowerCase).contains(srcField)
+        })
+      LOGGER.warn(s"source schema contains additional fields which are not 
present in " +
+                  s"target schema: ${ additionalSourceFields.mkString(",") }")
+    }
+
+    // check if source schema has fields whose names only differ in case 
sensitivity
+    val similarFields = sourceSchema.fields.map(_.name.toLowerCase).groupBy(a 
=> identity(a)).map {
+      case (str, times) => (str, times.length)
+    }.toList.filter(e => e._2 > 1).map(_._1)
+    if (similarFields.nonEmpty) {
+      LOGGER.error(s"source schema has similar fields which differ only in 
case sensitivity: " +
+                   s"${ similarFields.mkString(",") }")
+      throw new CarbonSchemaException(s"source schema has similar fields which 
differ" +
+                                                s" only in case sensitivity: ${
+                                                  similarFields.mkString(",")
+                                                }")
+    }
+  }
+
+  /**
+   * This method takes care of handling schema evolution scenarios for 
CarbonStreamer class.
+   * Currently only addition of columns is supported.
+   * @param targetDs target dataset whose schema needs to be modified, if 
applicable
+   * @param srcDs incoming dataset
+   * @param sparkSession SparkSession
+   */
+  def handleSchemaEvolutionForCarbonStreamer(targetDs: Dataset[Row], srcDs: 
Dataset[Row],
+      sparkSession: SparkSession): Unit = {
+    // read the property here
+    val isSchemaEnforcementEnabled = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT,
+        
CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean
+    if (isSchemaEnforcementEnabled) {
+      verifySourceAndTargetSchemas(targetDs, srcDs)
+    } else {
+      // These meta columns should be removed before actually writing the data
+      val metaColumnsString = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_STREAMER_META_COLUMNS, "")
+      val metaCols = metaColumnsString.split(",").map(_.trim)
+      val srcDsWithoutMeta = if (metaCols.length > 0) srcDs.drop(metaCols: _*)
+      else srcDs
+      handleSchemaEvolution(targetDs, srcDsWithoutMeta, sparkSession, 
isStreamerInvolved = true)
+    }
+  }
+
+  def verifyBackwardsCompatibility(
+      targetDs: Dataset[Row],
+      srcDs: Dataset[Row]): Unit = {
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {
+      // check if some field is missing in source schema
+      if 
(!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase))
 {
+        LOGGER.error(s"source schema does not contain field: ${ tgtField.name 
}")
+        throw new CarbonSchemaException(s"source schema does not contain " +
+                                                  s"field: ${ tgtField.name }")
+      }
+
+      // check if data type got modified for some column
+      val sourceField = sourceSchema.fields
+        .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase))
+      if (!sourceField.get.dataType.equals(tgtField.dataType)) {
+        LOGGER.error(s"source schema has different data type for field: ${
+          tgtField.name
+        }, source type: ${ sourceField.get.dataType }, target type: ${ 
tgtField.dataType }")
+        throw new CarbonSchemaException(s"source schema has different data 
type " +
+                                                  s"for field: ${ 
tgtField.name }")
+      }
+    })
+  }
+
+  /**
+   * The method takes care of following schema evolution cases:
+   * Addition of a new column in source schema which is not present in target
+   * Deletion of a column in source schema which is present in target
+   * Data type changes for an existing column.
+   * The method does not take care of column renames and table renames
+   * @param targetDs existing target dataset
+   * @param srcDs incoming source dataset
+   * @return new target schema to write the incoming batch with
+   */
+  def handleSchemaEvolution(
+      targetDs: Dataset[Row],
+      srcDs: Dataset[Row],
+      sparkSession: SparkSession,
+      isStreamerInvolved: Boolean = false): Unit = {
+
+    if (isStreamerInvolved) {
+      verifyBackwardsCompatibility(targetDs, srcDs)
+    }
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    // check if any column got added in source
+    val addedColumns = sourceSchema.fields
+      .map(_.name)
+      .filterNot(f => targetSchema.fields.map(_.name).contains(f))
+    if (addedColumns.nonEmpty) {
+      handleAddColumnScenario(targetDs,
+        srcDs.schema.fields.filter(f => addedColumns.contains(f.name)).toSeq,
+        sparkSession)
+    }
+
+    // check if any column got deleted from source
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+    val partitionInfo = targetCarbonTable.getPartitionInfo
+    val partitionColumns = if (partitionInfo != null) 
partitionInfo.getColumnSchemaList.asScala
+      .map(_.getColumnName).toList else List[String]()
+    val deletedColumns = targetSchema.fields.map(_.name.toLowerCase)
+      .filterNot(f => {
+        sourceSchema.fields.map(_.name.toLowerCase).contains(f) ||
+        partitionColumns.contains(f)
+      })
+    if (deletedColumns.nonEmpty) {
+      handleDeleteColumnScenario(targetDs, deletedColumns.toList, sparkSession)
+    }
+
+    val modifiedColumns = targetSchema.fields.filter(tgtField => {
+      val sourceField = sourceSchema.fields.find(f => 
f.name.equalsIgnoreCase(tgtField.name))
+      if (sourceField.isDefined) 
!sourceField.get.dataType.equals(tgtField.dataType) else false
+    })
+
+    if (modifiedColumns.nonEmpty) {
+      handleDataTypeChangeScenario(targetDs, modifiedColumns.toList, 
sparkSession)
+    }
+  }
+
+  /**
+   * This method calls CarbonAlterTableAddColumnCommand for adding new columns
+   * @param targetDs target dataset whose schema needs to be modified
+   * @param colsToAdd new columns to be added
+   * @param sparkSession SparkSession
+   */
+  def handleAddColumnScenario(targetDs: Dataset[Row], colsToAdd: 
Seq[StructField],
+      sparkSession: SparkSession): Unit = {
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+    val fields = new CarbonSpark2SqlParser().getFields(colsToAdd)
+    val tableModel = CarbonParserUtil.prepareTableModel(ifNotExistPresent = 
false,
+      
CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)),
+      targetCarbonTable.getTableName.toLowerCase,
+      fields.map(CarbonParserUtil.convertFieldNamesToLowercase),
+      Seq.empty,
+      scala.collection.mutable.Map.empty[String, String],
+      None,
+      isAlterFlow = true)
+    //    targetCarbonTable.getAllDimensions.asScala.map(f => Field(column = 
f.getColName,
+    //      dataType = Some(f.getDataType.getName), name = 
Option(f.getColName),
+    //      children = None, ))
+    val alterTableAddColumnsModel = AlterTableAddColumnsModel(
+      
CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)),
+      targetCarbonTable.getTableName.toLowerCase,
+      Map.empty[String, String],
+      tableModel.dimCols,
+      tableModel.msrCols,
+      tableModel.highCardinalityDims.getOrElse(Seq.empty))
+    
CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel).run(sparkSession)
+  }
+
+  /**
+   * This method calls CarbonAlterTableDropColumnCommand for deleting columns
+   * @param targetDs target dataset whose schema needs to be modified
+   * @param colsToDrop columns to be dropped from carbondata table
+   * @param sparkSession SparkSession
+   */
+  def handleDeleteColumnScenario(targetDs: Dataset[Row], colsToDrop: 
List[String],
+      sparkSession: SparkSession): Unit = {
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+    val alterTableDropColumnModel = AlterTableDropColumnModel(
+      
CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)),
+      targetCarbonTable.getTableName.toLowerCase,
+      colsToDrop.map(_.toLowerCase))
+    
CarbonAlterTableDropColumnCommand(alterTableDropColumnModel).run(sparkSession)
+  }
+
+  /**
+   * This method calls CarbonAlterTableColRenameDataTypeChangeCommand for 
handling data type changes
+   * @param targetDs target dataset whose schema needs to be modified
+   * @param modifiedCols columns with data type changes
+   * @param sparkSession SparkSession
+   */
+  def handleDataTypeChangeScenario(targetDs: Dataset[Row], modifiedCols: 
List[StructField],
+      sparkSession: SparkSession): Unit = {
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+
+    // need to call the command one by one for each modified column
+    modifiedCols.foreach(col => {
+      val values = col.dataType match {
+        case d: DecimalType => Some(List((d.precision, d.scale)))
+        case _ => None
+      }
+      val dataTypeInfo = CarbonParserUtil.parseColumn(col.name, col.dataType, 
values)
+
+      val alterTableColRenameAndDataTypeChangeModel =
+        AlterTableDataTypeChangeModel(
+          dataTypeInfo,
+          Option(targetCarbonTable.getDatabaseName.toLowerCase),
+          targetCarbonTable.getTableName.toLowerCase,
+          col.name.toLowerCase,
+          col.name.toLowerCase,
+          isColumnRename = false,
+          Option.empty)
+
+      CarbonAlterTableColRenameDataTypeChangeCommand(
+        alterTableColRenameAndDataTypeChangeModel
+      ).run(sparkSession)
+    })
+  }
+
+  def deduplicateBeforeWriting(
+      srcDs: Dataset[Row],
+      targetDs: Dataset[Row],
+      sparkSession: SparkSession,
+      srcAlias: String,
+      targetAlias: String,
+      keyColumn: String,
+      orderingField: String,
+      targetCarbonTable: CarbonTable): Dataset[Row] = {
+    val properties = CarbonProperties.getInstance()
+    val filterDupes = properties
+      .getProperty(CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE,
+        
CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE_DEFAULT).toBoolean
+    val combineBeforeUpsert = properties
+      .getProperty(CarbonCommonConstants.CARBON_STREAMER_UPSERT_DEDUPLICATE,
+        
CarbonCommonConstants.CARBON_STREAMER_UPSERT_DEDUPLICATE_DEFAULT).toBoolean
+    var dedupedDataset: Dataset[Row] = srcDs
+    if (combineBeforeUpsert) {
+      dedupedDataset = deduplicateAgainstIncomingDataset(srcDs, sparkSession, 
srcAlias, keyColumn,
+        orderingField, targetCarbonTable)
+    }
+    if (filterDupes) {
+      dedupedDataset = deduplicateAgainstExistingDataset(dedupedDataset, 
targetDs,
+        srcAlias, targetAlias, keyColumn)
+    }
+    dedupedDataset.show()
+    dedupedDataset
+  }
+
+  def deduplicateAgainstIncomingDataset(
+      srcDs: Dataset[Row],
+      sparkSession: SparkSession,
+      srcAlias: String,
+      keyColumn: String,
+      orderingField: String,
+      table: CarbonTable): Dataset[Row] = {
+    if 
(orderingField.equals(CarbonCommonConstants.CARBON_STREAMER_SOURCE_ORDERING_FIELD_DEFAULT))
 {
+      return srcDs
+    }
+    val schema = srcDs.schema
+    val carbonKeyColumn = table.getColumnByName(keyColumn)
+    val keyColumnDataType = getCarbonDataType(keyColumn, srcDs)
+    val orderingFieldDataType = getCarbonDataType(orderingField, srcDs)
+    val isPrimitiveAndNotDate = 
DataTypeUtil.isPrimitiveColumn(orderingFieldDataType) &&
+                                (orderingFieldDataType != DataTypes.DATE)
+    val comparator = getComparator(orderingFieldDataType)
+    val rdd = srcDs.rdd
+    val dedupedRDD: RDD[Row] = rdd.map{row =>
+      val index = row.fieldIndex(keyColumn)
+      val rowKey = getRowKey(row, index, carbonKeyColumn, 
isPrimitiveAndNotDate, keyColumnDataType)
+      (rowKey, row)
+    }.reduceByKey{(row1, row2) =>
+      val orderingValue1 = row1.getAs(orderingField).asInstanceOf[Any]
+      val orderingValue2 = row2.getAs(orderingField).asInstanceOf[Any]
+      if (orderingFieldDataType.equals(DataTypes.STRING)) {
+        if (orderingValue1 == null) {
+          row2
+        } else if (orderingValue2 == null) {
+          row1
+        } else {
+          if (ByteUtil.UnsafeComparer.INSTANCE
+                .compareTo(orderingValue1.toString
+                  
.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)),
+                  orderingValue2.toString
+                    
.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))) >= 0) {
+            row1
+          } else {
+            row2
+          }
+        }
+      } else {
+        if (comparator.compare(orderingValue1, orderingValue2) >= 0) {
+          row1
+        } else {
+          row2
+        }
+      }
+    }.map(_._2)
+    sparkSession.createDataFrame(dedupedRDD, schema).alias(srcAlias)
+  }
+
+  def getComparator(
+      orderingFieldDataType: CarbonDataType
+  ): SerializableComparator = {
+    val isPrimitiveAndNotDate = 
DataTypeUtil.isPrimitiveColumn(orderingFieldDataType) &&

Review comment:
       1. I think, this method itself not needed. Comparator.getComparator can 
be directly called. For string type, ByteArraySerializableComparator can be 
used.
   2. Please refactor the caller method also
   3. For DATE type, might throw IllegalArgumentException. Please check and 
handle

##########
File path: 
core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
##########
@@ -1215,6 +1215,17 @@ private CarbonCommonConstants() {
 
   public static final String 
CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT_DEFAULT = "false";
 
+  /**
+   * This flag decides if table schema needs to change as per the incoming 
batch schema.

Review comment:
       can move/group CDC related properties in same place

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil {
       columnMinMaxInBlocklet.asScala
     }
   }
+
+  /**
+   * This method verifies source and target schemas for the following:
+   * If additional columns are present in source schema as compared to target, 
simply ignore them.
+   * If some columns are missing in source schema as compared to target 
schema, exception is thrown.
+   * If data type of some column differs in source and target schemas, 
exception is thrown.
+   * If source schema has multiple columns whose names differ only in case 
sensitivity, exception
+   * is thrown.
+   * @param targetDs target carbondata table
+   * @param srcDs source/incoming data
+   */
+  def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: 
Dataset[Row]): Unit = {
+    LOGGER.info("schema enforcement is enabled. Source and target schemas will 
be verified")
+    // get the source and target dataset schema
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {
+      // check if some field is missing in source schema
+      if 
(!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase))
 {

Review comment:
       This check is not needed. Can move sourceField variable before this 
check and replace this check with sourceField.isempty

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil {
       columnMinMaxInBlocklet.asScala
     }
   }
+
+  /**
+   * This method verifies source and target schemas for the following:
+   * If additional columns are present in source schema as compared to target, 
simply ignore them.
+   * If some columns are missing in source schema as compared to target 
schema, exception is thrown.
+   * If data type of some column differs in source and target schemas, 
exception is thrown.
+   * If source schema has multiple columns whose names differ only in case 
sensitivity, exception
+   * is thrown.
+   * @param targetDs target carbondata table
+   * @param srcDs source/incoming data
+   */
+  def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: 
Dataset[Row]): Unit = {
+    LOGGER.info("schema enforcement is enabled. Source and target schemas will 
be verified")
+    // get the source and target dataset schema
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {
+      // check if some field is missing in source schema
+      if 
(!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase))
 {
+        LOGGER.error(s"source schema does not contain field: ${ tgtField.name 
}")
+        throw new CarbonSchemaException(s"source schema does not contain " +
+                                                  s"field: ${ tgtField.name }")
+      }
+
+      // check if data type got modified for some column
+      val sourceField = sourceSchema.fields
+        .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase))
+      if (!sourceField.get.dataType.equals(tgtField.dataType)) {
+        LOGGER.error(s"source schema has different data type for field: ${
+          tgtField.name
+        }, source type: ${ sourceField.get.dataType }, target type: ${ 
tgtField.dataType }")
+        throw new CarbonSchemaException(s"source schema has different data 
type " +
+                                                  s"for field: ${ 
tgtField.name }")
+      }
+    })
+
+    // check if some additional column got added in source schema
+    if (sourceSchema.fields.length > targetSchema.fields.length) {
+      val additionalSourceFields = sourceSchema.fields.map(_.name.toLowerCase)
+        .filterNot(srcField => {
+          targetSchema.fields.map(_.name.toLowerCase).contains(srcField)
+        })
+      LOGGER.warn(s"source schema contains additional fields which are not 
present in " +
+                  s"target schema: ${ additionalSourceFields.mkString(",") }")
+    }
+
+    // check if source schema has fields whose names only differ in case 
sensitivity
+    val similarFields = sourceSchema.fields.map(_.name.toLowerCase).groupBy(a 
=> identity(a)).map {
+      case (str, times) => (str, times.length)
+    }.toList.filter(e => e._2 > 1).map(_._1)
+    if (similarFields.nonEmpty) {
+      LOGGER.error(s"source schema has similar fields which differ only in 
case sensitivity: " +
+                   s"${ similarFields.mkString(",") }")
+      throw new CarbonSchemaException(s"source schema has similar fields which 
differ" +
+                                                s" only in case sensitivity: ${
+                                                  similarFields.mkString(",")
+                                                }")
+    }
+  }
+
+  /**
+   * This method takes care of handling schema evolution scenarios for 
CarbonStreamer class.
+   * Currently only addition of columns is supported.
+   * @param targetDs target dataset whose schema needs to be modified, if 
applicable
+   * @param srcDs incoming dataset
+   * @param sparkSession SparkSession
+   */
+  def handleSchemaEvolutionForCarbonStreamer(targetDs: Dataset[Row], srcDs: 
Dataset[Row],
+      sparkSession: SparkSession): Unit = {
+    // read the property here
+    val isSchemaEnforcementEnabled = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT,
+        
CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean
+    if (isSchemaEnforcementEnabled) {
+      verifySourceAndTargetSchemas(targetDs, srcDs)
+    } else {
+      // These meta columns should be removed before actually writing the data
+      val metaColumnsString = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_STREAMER_META_COLUMNS, "")
+      val metaCols = metaColumnsString.split(",").map(_.trim)
+      val srcDsWithoutMeta = if (metaCols.length > 0) srcDs.drop(metaCols: _*)
+      else srcDs
+      handleSchemaEvolution(targetDs, srcDsWithoutMeta, sparkSession, 
isStreamerInvolved = true)
+    }
+  }
+
+  def verifyBackwardsCompatibility(
+      targetDs: Dataset[Row],
+      srcDs: Dataset[Row]): Unit = {
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {

Review comment:
       same code is present in verifySourceAndTargetSchemas method. Can move 
this code to common method and reuse

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -16,31 +16,41 @@
  */
 package org.apache.spark.sql.execution.command.mutation.merge
 
+import java.nio.charset.Charset
 import java.util
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, 
SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
+import 
org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
+import org.apache.spark.sql.catalyst.{CarbonParserUtil, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.expressions.EqualTo
 import org.apache.spark.sql.execution.CastExpressionOptimization
+import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, 
AlterTableDataTypeChangeModel, AlterTableDropColumnModel}
+import 
org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand,
 CarbonAlterTableColRenameDataTypeChangeCommand, 
CarbonAlterTableDropColumnCommand}
+import org.apache.spark.sql.functions.expr
 import org.apache.spark.sql.optimizer.CarbonFilters
-import org.apache.spark.sql.types.DateType
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.spark.sql.types.{DateType, DecimalType, StructField}
 
+import org.apache.carbondata.common.exceptions.sql.{CarbonSchemaException, 
MalformedCarbonCommandException}

Review comment:
       please remove unused import - MalformedCarbonCommandException

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil {
       columnMinMaxInBlocklet.asScala
     }
   }
+
+  /**
+   * This method verifies source and target schemas for the following:
+   * If additional columns are present in source schema as compared to target, 
simply ignore them.
+   * If some columns are missing in source schema as compared to target 
schema, exception is thrown.
+   * If data type of some column differs in source and target schemas, 
exception is thrown.
+   * If source schema has multiple columns whose names differ only in case 
sensitivity, exception
+   * is thrown.
+   * @param targetDs target carbondata table
+   * @param srcDs source/incoming data
+   */
+  def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: 
Dataset[Row]): Unit = {
+    LOGGER.info("schema enforcement is enabled. Source and target schemas will 
be verified")
+    // get the source and target dataset schema
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {
+      // check if some field is missing in source schema
+      if 
(!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase))
 {
+        LOGGER.error(s"source schema does not contain field: ${ tgtField.name 
}")
+        throw new CarbonSchemaException(s"source schema does not contain " +
+                                                  s"field: ${ tgtField.name }")

Review comment:
       can move this to previous line

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil {
       columnMinMaxInBlocklet.asScala
     }
   }
+
+  /**
+   * This method verifies source and target schemas for the following:
+   * If additional columns are present in source schema as compared to target, 
simply ignore them.
+   * If some columns are missing in source schema as compared to target 
schema, exception is thrown.
+   * If data type of some column differs in source and target schemas, 
exception is thrown.
+   * If source schema has multiple columns whose names differ only in case 
sensitivity, exception
+   * is thrown.
+   * @param targetDs target carbondata table
+   * @param srcDs source/incoming data
+   */
+  def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: 
Dataset[Row]): Unit = {
+    LOGGER.info("schema enforcement is enabled. Source and target schemas will 
be verified")
+    // get the source and target dataset schema
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {
+      // check if some field is missing in source schema
+      if 
(!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase))
 {
+        LOGGER.error(s"source schema does not contain field: ${ tgtField.name 
}")
+        throw new CarbonSchemaException(s"source schema does not contain " +
+                                                  s"field: ${ tgtField.name }")
+      }
+
+      // check if data type got modified for some column
+      val sourceField = sourceSchema.fields
+        .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase))

Review comment:
       tgtField.name.toLowerCase - converting to lowercase is not needed, since 
equalsIgnoreCase check is used here

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil {
       columnMinMaxInBlocklet.asScala
     }
   }
+
+  /**
+   * This method verifies source and target schemas for the following:
+   * If additional columns are present in source schema as compared to target, 
simply ignore them.
+   * If some columns are missing in source schema as compared to target 
schema, exception is thrown.
+   * If data type of some column differs in source and target schemas, 
exception is thrown.
+   * If source schema has multiple columns whose names differ only in case 
sensitivity, exception
+   * is thrown.
+   * @param targetDs target carbondata table
+   * @param srcDs source/incoming data
+   */
+  def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: 
Dataset[Row]): Unit = {
+    LOGGER.info("schema enforcement is enabled. Source and target schemas will 
be verified")
+    // get the source and target dataset schema
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {
+      // check if some field is missing in source schema
+      if 
(!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase))
 {
+        LOGGER.error(s"source schema does not contain field: ${ tgtField.name 
}")
+        throw new CarbonSchemaException(s"source schema does not contain " +
+                                                  s"field: ${ tgtField.name }")
+      }
+
+      // check if data type got modified for some column
+      val sourceField = sourceSchema.fields
+        .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase))
+      if (!sourceField.get.dataType.equals(tgtField.dataType)) {
+        LOGGER.error(s"source schema has different data type for field: ${
+          tgtField.name
+        }, source type: ${ sourceField.get.dataType }, target type: ${ 
tgtField.dataType }")
+        throw new CarbonSchemaException(s"source schema has different data 
type " +
+                                                  s"for field: ${ 
tgtField.name }")
+      }
+    })
+
+    // check if some additional column got added in source schema
+    if (sourceSchema.fields.length > targetSchema.fields.length) {
+      val additionalSourceFields = sourceSchema.fields.map(_.name.toLowerCase)
+        .filterNot(srcField => {
+          targetSchema.fields.map(_.name.toLowerCase).contains(srcField)
+        })
+      LOGGER.warn(s"source schema contains additional fields which are not 
present in " +
+                  s"target schema: ${ additionalSourceFields.mkString(",") }")
+    }
+
+    // check if source schema has fields whose names only differ in case 
sensitivity
+    val similarFields = sourceSchema.fields.map(_.name.toLowerCase).groupBy(a 
=> identity(a)).map {
+      case (str, times) => (str, times.length)
+    }.toList.filter(e => e._2 > 1).map(_._1)
+    if (similarFields.nonEmpty) {
+      LOGGER.error(s"source schema has similar fields which differ only in 
case sensitivity: " +
+                   s"${ similarFields.mkString(",") }")
+      throw new CarbonSchemaException(s"source schema has similar fields which 
differ" +
+                                                s" only in case sensitivity: ${
+                                                  similarFields.mkString(",")
+                                                }")
+    }
+  }
+
+  /**
+   * This method takes care of handling schema evolution scenarios for 
CarbonStreamer class.
+   * Currently only addition of columns is supported.
+   * @param targetDs target dataset whose schema needs to be modified, if 
applicable
+   * @param srcDs incoming dataset
+   * @param sparkSession SparkSession
+   */
+  def handleSchemaEvolutionForCarbonStreamer(targetDs: Dataset[Row], srcDs: 
Dataset[Row],
+      sparkSession: SparkSession): Unit = {
+    // read the property here
+    val isSchemaEnforcementEnabled = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT,
+        
CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean
+    if (isSchemaEnforcementEnabled) {
+      verifySourceAndTargetSchemas(targetDs, srcDs)
+    } else {
+      // These meta columns should be removed before actually writing the data
+      val metaColumnsString = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_STREAMER_META_COLUMNS, "")
+      val metaCols = metaColumnsString.split(",").map(_.trim)
+      val srcDsWithoutMeta = if (metaCols.length > 0) srcDs.drop(metaCols: _*)
+      else srcDs
+      handleSchemaEvolution(targetDs, srcDsWithoutMeta, sparkSession, 
isStreamerInvolved = true)
+    }
+  }
+
+  def verifyBackwardsCompatibility(
+      targetDs: Dataset[Row],
+      srcDs: Dataset[Row]): Unit = {
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {
+      // check if some field is missing in source schema
+      if 
(!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase))
 {
+        LOGGER.error(s"source schema does not contain field: ${ tgtField.name 
}")
+        throw new CarbonSchemaException(s"source schema does not contain " +
+                                                  s"field: ${ tgtField.name }")
+      }
+
+      // check if data type got modified for some column
+      val sourceField = sourceSchema.fields
+        .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase))
+      if (!sourceField.get.dataType.equals(tgtField.dataType)) {
+        LOGGER.error(s"source schema has different data type for field: ${
+          tgtField.name
+        }, source type: ${ sourceField.get.dataType }, target type: ${ 
tgtField.dataType }")
+        throw new CarbonSchemaException(s"source schema has different data 
type " +
+                                                  s"for field: ${ 
tgtField.name }")
+      }
+    })
+  }
+
+  /**
+   * The method takes care of following schema evolution cases:
+   * Addition of a new column in source schema which is not present in target
+   * Deletion of a column in source schema which is present in target
+   * Data type changes for an existing column.
+   * The method does not take care of column renames and table renames
+   * @param targetDs existing target dataset
+   * @param srcDs incoming source dataset
+   * @return new target schema to write the incoming batch with
+   */
+  def handleSchemaEvolution(
+      targetDs: Dataset[Row],
+      srcDs: Dataset[Row],
+      sparkSession: SparkSession,
+      isStreamerInvolved: Boolean = false): Unit = {
+
+    if (isStreamerInvolved) {
+      verifyBackwardsCompatibility(targetDs, srcDs)
+    }
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    // check if any column got added in source
+    val addedColumns = sourceSchema.fields
+      .map(_.name)
+      .filterNot(f => targetSchema.fields.map(_.name).contains(f))
+    if (addedColumns.nonEmpty) {
+      handleAddColumnScenario(targetDs,
+        srcDs.schema.fields.filter(f => addedColumns.contains(f.name)).toSeq,
+        sparkSession)
+    }
+
+    // check if any column got deleted from source
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+    val partitionInfo = targetCarbonTable.getPartitionInfo
+    val partitionColumns = if (partitionInfo != null) 
partitionInfo.getColumnSchemaList.asScala
+      .map(_.getColumnName).toList else List[String]()
+    val deletedColumns = targetSchema.fields.map(_.name.toLowerCase)
+      .filterNot(f => {
+        sourceSchema.fields.map(_.name.toLowerCase).contains(f) ||
+        partitionColumns.contains(f)
+      })
+    if (deletedColumns.nonEmpty) {
+      handleDeleteColumnScenario(targetDs, deletedColumns.toList, sparkSession)
+    }
+
+    val modifiedColumns = targetSchema.fields.filter(tgtField => {
+      val sourceField = sourceSchema.fields.find(f => 
f.name.equalsIgnoreCase(tgtField.name))
+      if (sourceField.isDefined) 
!sourceField.get.dataType.equals(tgtField.dataType) else false
+    })
+
+    if (modifiedColumns.nonEmpty) {
+      handleDataTypeChangeScenario(targetDs, modifiedColumns.toList, 
sparkSession)
+    }
+  }
+
+  /**
+   * This method calls CarbonAlterTableAddColumnCommand for adding new columns
+   * @param targetDs target dataset whose schema needs to be modified
+   * @param colsToAdd new columns to be added
+   * @param sparkSession SparkSession
+   */
+  def handleAddColumnScenario(targetDs: Dataset[Row], colsToAdd: 
Seq[StructField],
+      sparkSession: SparkSession): Unit = {
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+    val fields = new CarbonSpark2SqlParser().getFields(colsToAdd)
+    val tableModel = CarbonParserUtil.prepareTableModel(ifNotExistPresent = 
false,

Review comment:
       same code is available in DDLHelper.addColumns. can move common code to 
new method  and reuse

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil {
       columnMinMaxInBlocklet.asScala
     }
   }
+
+  /**
+   * This method verifies source and target schemas for the following:
+   * If additional columns are present in source schema as compared to target, 
simply ignore them.
+   * If some columns are missing in source schema as compared to target 
schema, exception is thrown.
+   * If data type of some column differs in source and target schemas, 
exception is thrown.
+   * If source schema has multiple columns whose names differ only in case 
sensitivity, exception
+   * is thrown.
+   * @param targetDs target carbondata table
+   * @param srcDs source/incoming data
+   */
+  def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: 
Dataset[Row]): Unit = {
+    LOGGER.info("schema enforcement is enabled. Source and target schemas will 
be verified")
+    // get the source and target dataset schema
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {
+      // check if some field is missing in source schema
+      if 
(!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase))
 {
+        LOGGER.error(s"source schema does not contain field: ${ tgtField.name 
}")
+        throw new CarbonSchemaException(s"source schema does not contain " +
+                                                  s"field: ${ tgtField.name }")
+      }
+
+      // check if data type got modified for some column
+      val sourceField = sourceSchema.fields
+        .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase))
+      if (!sourceField.get.dataType.equals(tgtField.dataType)) {
+        LOGGER.error(s"source schema has different data type for field: ${
+          tgtField.name
+        }, source type: ${ sourceField.get.dataType }, target type: ${ 
tgtField.dataType }")
+        throw new CarbonSchemaException(s"source schema has different data 
type " +
+                                                  s"for field: ${ 
tgtField.name }")
+      }
+    })
+
+    // check if some additional column got added in source schema
+    if (sourceSchema.fields.length > targetSchema.fields.length) {
+      val additionalSourceFields = sourceSchema.fields.map(_.name.toLowerCase)
+        .filterNot(srcField => {
+          targetSchema.fields.map(_.name.toLowerCase).contains(srcField)
+        })
+      LOGGER.warn(s"source schema contains additional fields which are not 
present in " +
+                  s"target schema: ${ additionalSourceFields.mkString(",") }")
+    }
+
+    // check if source schema has fields whose names only differ in case 
sensitivity
+    val similarFields = sourceSchema.fields.map(_.name.toLowerCase).groupBy(a 
=> identity(a)).map {
+      case (str, times) => (str, times.length)
+    }.toList.filter(e => e._2 > 1).map(_._1)
+    if (similarFields.nonEmpty) {
+      LOGGER.error(s"source schema has similar fields which differ only in 
case sensitivity: " +
+                   s"${ similarFields.mkString(",") }")
+      throw new CarbonSchemaException(s"source schema has similar fields which 
differ" +
+                                                s" only in case sensitivity: ${
+                                                  similarFields.mkString(",")
+                                                }")
+    }
+  }
+
+  /**
+   * This method takes care of handling schema evolution scenarios for 
CarbonStreamer class.
+   * Currently only addition of columns is supported.
+   * @param targetDs target dataset whose schema needs to be modified, if 
applicable
+   * @param srcDs incoming dataset
+   * @param sparkSession SparkSession
+   */
+  def handleSchemaEvolutionForCarbonStreamer(targetDs: Dataset[Row], srcDs: 
Dataset[Row],
+      sparkSession: SparkSession): Unit = {
+    // read the property here
+    val isSchemaEnforcementEnabled = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT,
+        
CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean
+    if (isSchemaEnforcementEnabled) {
+      verifySourceAndTargetSchemas(targetDs, srcDs)
+    } else {
+      // These meta columns should be removed before actually writing the data
+      val metaColumnsString = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_STREAMER_META_COLUMNS, "")
+      val metaCols = metaColumnsString.split(",").map(_.trim)
+      val srcDsWithoutMeta = if (metaCols.length > 0) srcDs.drop(metaCols: _*)

Review comment:
       please add braces and format the code here

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil {
       columnMinMaxInBlocklet.asScala
     }
   }
+
+  /**
+   * This method verifies source and target schemas for the following:
+   * If additional columns are present in source schema as compared to target, 
simply ignore them.
+   * If some columns are missing in source schema as compared to target 
schema, exception is thrown.
+   * If data type of some column differs in source and target schemas, 
exception is thrown.
+   * If source schema has multiple columns whose names differ only in case 
sensitivity, exception
+   * is thrown.
+   * @param targetDs target carbondata table
+   * @param srcDs source/incoming data
+   */
+  def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: 
Dataset[Row]): Unit = {
+    LOGGER.info("schema enforcement is enabled. Source and target schemas will 
be verified")
+    // get the source and target dataset schema
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {
+      // check if some field is missing in source schema
+      if 
(!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase))
 {
+        LOGGER.error(s"source schema does not contain field: ${ tgtField.name 
}")
+        throw new CarbonSchemaException(s"source schema does not contain " +
+                                                  s"field: ${ tgtField.name }")
+      }
+
+      // check if data type got modified for some column
+      val sourceField = sourceSchema.fields
+        .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase))
+      if (!sourceField.get.dataType.equals(tgtField.dataType)) {
+        LOGGER.error(s"source schema has different data type for field: ${
+          tgtField.name
+        }, source type: ${ sourceField.get.dataType }, target type: ${ 
tgtField.dataType }")
+        throw new CarbonSchemaException(s"source schema has different data 
type " +
+                                                  s"for field: ${ 
tgtField.name }")
+      }
+    })
+
+    // check if some additional column got added in source schema
+    if (sourceSchema.fields.length > targetSchema.fields.length) {
+      val additionalSourceFields = sourceSchema.fields.map(_.name.toLowerCase)
+        .filterNot(srcField => {
+          targetSchema.fields.map(_.name.toLowerCase).contains(srcField)
+        })
+      LOGGER.warn(s"source schema contains additional fields which are not 
present in " +
+                  s"target schema: ${ additionalSourceFields.mkString(",") }")
+    }
+
+    // check if source schema has fields whose names only differ in case 
sensitivity
+    val similarFields = sourceSchema.fields.map(_.name.toLowerCase).groupBy(a 
=> identity(a)).map {
+      case (str, times) => (str, times.length)
+    }.toList.filter(e => e._2 > 1).map(_._1)
+    if (similarFields.nonEmpty) {
+      LOGGER.error(s"source schema has similar fields which differ only in 
case sensitivity: " +
+                   s"${ similarFields.mkString(",") }")
+      throw new CarbonSchemaException(s"source schema has similar fields which 
differ" +
+                                                s" only in case sensitivity: ${
+                                                  similarFields.mkString(",")
+                                                }")
+    }
+  }
+
+  /**
+   * This method takes care of handling schema evolution scenarios for 
CarbonStreamer class.
+   * Currently only addition of columns is supported.
+   * @param targetDs target dataset whose schema needs to be modified, if 
applicable
+   * @param srcDs incoming dataset
+   * @param sparkSession SparkSession
+   */
+  def handleSchemaEvolutionForCarbonStreamer(targetDs: Dataset[Row], srcDs: 
Dataset[Row],
+      sparkSession: SparkSession): Unit = {
+    // read the property here
+    val isSchemaEnforcementEnabled = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT,
+        
CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean
+    if (isSchemaEnforcementEnabled) {
+      verifySourceAndTargetSchemas(targetDs, srcDs)
+    } else {
+      // These meta columns should be removed before actually writing the data
+      val metaColumnsString = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_STREAMER_META_COLUMNS, "")
+      val metaCols = metaColumnsString.split(",").map(_.trim)
+      val srcDsWithoutMeta = if (metaCols.length > 0) srcDs.drop(metaCols: _*)
+      else srcDs
+      handleSchemaEvolution(targetDs, srcDsWithoutMeta, sparkSession, 
isStreamerInvolved = true)
+    }
+  }
+
+  def verifyBackwardsCompatibility(
+      targetDs: Dataset[Row],
+      srcDs: Dataset[Row]): Unit = {
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {
+      // check if some field is missing in source schema
+      if 
(!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase))
 {
+        LOGGER.error(s"source schema does not contain field: ${ tgtField.name 
}")
+        throw new CarbonSchemaException(s"source schema does not contain " +
+                                                  s"field: ${ tgtField.name }")
+      }
+
+      // check if data type got modified for some column
+      val sourceField = sourceSchema.fields
+        .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase))
+      if (!sourceField.get.dataType.equals(tgtField.dataType)) {
+        LOGGER.error(s"source schema has different data type for field: ${
+          tgtField.name
+        }, source type: ${ sourceField.get.dataType }, target type: ${ 
tgtField.dataType }")
+        throw new CarbonSchemaException(s"source schema has different data 
type " +
+                                                  s"for field: ${ 
tgtField.name }")
+      }
+    })
+  }
+
+  /**
+   * The method takes care of following schema evolution cases:
+   * Addition of a new column in source schema which is not present in target
+   * Deletion of a column in source schema which is present in target
+   * Data type changes for an existing column.
+   * The method does not take care of column renames and table renames
+   * @param targetDs existing target dataset
+   * @param srcDs incoming source dataset
+   * @return new target schema to write the incoming batch with
+   */
+  def handleSchemaEvolution(
+      targetDs: Dataset[Row],
+      srcDs: Dataset[Row],
+      sparkSession: SparkSession,
+      isStreamerInvolved: Boolean = false): Unit = {
+
+    if (isStreamerInvolved) {
+      verifyBackwardsCompatibility(targetDs, srcDs)
+    }
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    // check if any column got added in source
+    val addedColumns = sourceSchema.fields
+      .map(_.name)
+      .filterNot(f => targetSchema.fields.map(_.name).contains(f))
+    if (addedColumns.nonEmpty) {
+      handleAddColumnScenario(targetDs,
+        srcDs.schema.fields.filter(f => addedColumns.contains(f.name)).toSeq,

Review comment:
       ```suggestion
           srcDs.schema.fields.filter(f => addedColumns.contains(f.name)).toSeq,
   ```
   ```suggestion
           sourceSchema.fields.filter(f => addedColumns.contains(f.name)).toSeq,
   ```
   after this, can format to single line

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil {
       columnMinMaxInBlocklet.asScala
     }
   }
+
+  /**
+   * This method verifies source and target schemas for the following:
+   * If additional columns are present in source schema as compared to target, 
simply ignore them.
+   * If some columns are missing in source schema as compared to target 
schema, exception is thrown.
+   * If data type of some column differs in source and target schemas, 
exception is thrown.
+   * If source schema has multiple columns whose names differ only in case 
sensitivity, exception
+   * is thrown.
+   * @param targetDs target carbondata table
+   * @param srcDs source/incoming data
+   */
+  def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: 
Dataset[Row]): Unit = {
+    LOGGER.info("schema enforcement is enabled. Source and target schemas will 
be verified")
+    // get the source and target dataset schema
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {
+      // check if some field is missing in source schema
+      if 
(!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase))
 {
+        LOGGER.error(s"source schema does not contain field: ${ tgtField.name 
}")
+        throw new CarbonSchemaException(s"source schema does not contain " +
+                                                  s"field: ${ tgtField.name }")
+      }
+
+      // check if data type got modified for some column
+      val sourceField = sourceSchema.fields
+        .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase))
+      if (!sourceField.get.dataType.equals(tgtField.dataType)) {
+        LOGGER.error(s"source schema has different data type for field: ${
+          tgtField.name
+        }, source type: ${ sourceField.get.dataType }, target type: ${ 
tgtField.dataType }")
+        throw new CarbonSchemaException(s"source schema has different data 
type " +
+                                                  s"for field: ${ 
tgtField.name }")
+      }
+    })
+
+    // check if some additional column got added in source schema
+    if (sourceSchema.fields.length > targetSchema.fields.length) {
+      val additionalSourceFields = sourceSchema.fields.map(_.name.toLowerCase)
+        .filterNot(srcField => {
+          targetSchema.fields.map(_.name.toLowerCase).contains(srcField)
+        })
+      LOGGER.warn(s"source schema contains additional fields which are not 
present in " +
+                  s"target schema: ${ additionalSourceFields.mkString(",") }")
+    }
+
+    // check if source schema has fields whose names only differ in case 
sensitivity
+    val similarFields = sourceSchema.fields.map(_.name.toLowerCase).groupBy(a 
=> identity(a)).map {
+      case (str, times) => (str, times.length)
+    }.toList.filter(e => e._2 > 1).map(_._1)
+    if (similarFields.nonEmpty) {
+      LOGGER.error(s"source schema has similar fields which differ only in 
case sensitivity: " +
+                   s"${ similarFields.mkString(",") }")
+      throw new CarbonSchemaException(s"source schema has similar fields which 
differ" +
+                                                s" only in case sensitivity: ${
+                                                  similarFields.mkString(",")
+                                                }")
+    }
+  }
+
+  /**
+   * This method takes care of handling schema evolution scenarios for 
CarbonStreamer class.
+   * Currently only addition of columns is supported.
+   * @param targetDs target dataset whose schema needs to be modified, if 
applicable
+   * @param srcDs incoming dataset
+   * @param sparkSession SparkSession
+   */
+  def handleSchemaEvolutionForCarbonStreamer(targetDs: Dataset[Row], srcDs: 
Dataset[Row],
+      sparkSession: SparkSession): Unit = {
+    // read the property here
+    val isSchemaEnforcementEnabled = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT,
+        
CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean
+    if (isSchemaEnforcementEnabled) {
+      verifySourceAndTargetSchemas(targetDs, srcDs)
+    } else {
+      // These meta columns should be removed before actually writing the data
+      val metaColumnsString = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_STREAMER_META_COLUMNS, "")
+      val metaCols = metaColumnsString.split(",").map(_.trim)
+      val srcDsWithoutMeta = if (metaCols.length > 0) srcDs.drop(metaCols: _*)
+      else srcDs
+      handleSchemaEvolution(targetDs, srcDsWithoutMeta, sparkSession, 
isStreamerInvolved = true)
+    }
+  }
+
+  def verifyBackwardsCompatibility(
+      targetDs: Dataset[Row],
+      srcDs: Dataset[Row]): Unit = {
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {
+      // check if some field is missing in source schema
+      if 
(!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase))
 {
+        LOGGER.error(s"source schema does not contain field: ${ tgtField.name 
}")
+        throw new CarbonSchemaException(s"source schema does not contain " +
+                                                  s"field: ${ tgtField.name }")
+      }
+
+      // check if data type got modified for some column
+      val sourceField = sourceSchema.fields
+        .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase))
+      if (!sourceField.get.dataType.equals(tgtField.dataType)) {
+        LOGGER.error(s"source schema has different data type for field: ${
+          tgtField.name
+        }, source type: ${ sourceField.get.dataType }, target type: ${ 
tgtField.dataType }")
+        throw new CarbonSchemaException(s"source schema has different data 
type " +
+                                                  s"for field: ${ 
tgtField.name }")
+      }
+    })
+  }
+
+  /**
+   * The method takes care of following schema evolution cases:
+   * Addition of a new column in source schema which is not present in target
+   * Deletion of a column in source schema which is present in target
+   * Data type changes for an existing column.
+   * The method does not take care of column renames and table renames
+   * @param targetDs existing target dataset
+   * @param srcDs incoming source dataset
+   * @return new target schema to write the incoming batch with
+   */
+  def handleSchemaEvolution(
+      targetDs: Dataset[Row],
+      srcDs: Dataset[Row],
+      sparkSession: SparkSession,
+      isStreamerInvolved: Boolean = false): Unit = {
+
+    if (isStreamerInvolved) {
+      verifyBackwardsCompatibility(targetDs, srcDs)
+    }
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    // check if any column got added in source
+    val addedColumns = sourceSchema.fields
+      .map(_.name)
+      .filterNot(f => targetSchema.fields.map(_.name).contains(f))
+    if (addedColumns.nonEmpty) {
+      handleAddColumnScenario(targetDs,
+        srcDs.schema.fields.filter(f => addedColumns.contains(f.name)).toSeq,
+        sparkSession)
+    }
+
+    // check if any column got deleted from source
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+    val partitionInfo = targetCarbonTable.getPartitionInfo
+    val partitionColumns = if (partitionInfo != null) 
partitionInfo.getColumnSchemaList.asScala
+      .map(_.getColumnName).toList else List[String]()
+    val deletedColumns = targetSchema.fields.map(_.name.toLowerCase)
+      .filterNot(f => {
+        sourceSchema.fields.map(_.name.toLowerCase).contains(f) ||
+        partitionColumns.contains(f)
+      })
+    if (deletedColumns.nonEmpty) {
+      handleDeleteColumnScenario(targetDs, deletedColumns.toList, sparkSession)
+    }
+
+    val modifiedColumns = targetSchema.fields.filter(tgtField => {
+      val sourceField = sourceSchema.fields.find(f => 
f.name.equalsIgnoreCase(tgtField.name))
+      if (sourceField.isDefined) 
!sourceField.get.dataType.equals(tgtField.dataType) else false
+    })
+
+    if (modifiedColumns.nonEmpty) {
+      handleDataTypeChangeScenario(targetDs, modifiedColumns.toList, 
sparkSession)
+    }
+  }
+
+  /**
+   * This method calls CarbonAlterTableAddColumnCommand for adding new columns
+   * @param targetDs target dataset whose schema needs to be modified
+   * @param colsToAdd new columns to be added
+   * @param sparkSession SparkSession
+   */
+  def handleAddColumnScenario(targetDs: Dataset[Row], colsToAdd: 
Seq[StructField],
+      sparkSession: SparkSession): Unit = {
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+    val fields = new CarbonSpark2SqlParser().getFields(colsToAdd)
+    val tableModel = CarbonParserUtil.prepareTableModel(ifNotExistPresent = 
false,
+      
CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)),
+      targetCarbonTable.getTableName.toLowerCase,
+      fields.map(CarbonParserUtil.convertFieldNamesToLowercase),
+      Seq.empty,
+      scala.collection.mutable.Map.empty[String, String],
+      None,
+      isAlterFlow = true)
+    //    targetCarbonTable.getAllDimensions.asScala.map(f => Field(column = 
f.getColName,
+    //      dataType = Some(f.getDataType.getName), name = 
Option(f.getColName),
+    //      children = None, ))
+    val alterTableAddColumnsModel = AlterTableAddColumnsModel(
+      
CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)),
+      targetCarbonTable.getTableName.toLowerCase,
+      Map.empty[String, String],
+      tableModel.dimCols,
+      tableModel.msrCols,
+      tableModel.highCardinalityDims.getOrElse(Seq.empty))
+    
CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel).run(sparkSession)
+  }
+
+  /**
+   * This method calls CarbonAlterTableDropColumnCommand for deleting columns
+   * @param targetDs target dataset whose schema needs to be modified
+   * @param colsToDrop columns to be dropped from carbondata table
+   * @param sparkSession SparkSession
+   */
+  def handleDeleteColumnScenario(targetDs: Dataset[Row], colsToDrop: 
List[String],
+      sparkSession: SparkSession): Unit = {
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+    val alterTableDropColumnModel = AlterTableDropColumnModel(
+      
CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)),
+      targetCarbonTable.getTableName.toLowerCase,
+      colsToDrop.map(_.toLowerCase))
+    
CarbonAlterTableDropColumnCommand(alterTableDropColumnModel).run(sparkSession)
+  }
+
+  /**
+   * This method calls CarbonAlterTableColRenameDataTypeChangeCommand for 
handling data type changes
+   * @param targetDs target dataset whose schema needs to be modified
+   * @param modifiedCols columns with data type changes
+   * @param sparkSession SparkSession
+   */
+  def handleDataTypeChangeScenario(targetDs: Dataset[Row], modifiedCols: 
List[StructField],
+      sparkSession: SparkSession): Unit = {
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+
+    // need to call the command one by one for each modified column
+    modifiedCols.foreach(col => {
+      val values = col.dataType match {
+        case d: DecimalType => Some(List((d.precision, d.scale)))
+        case _ => None
+      }
+      val dataTypeInfo = CarbonParserUtil.parseColumn(col.name, col.dataType, 
values)
+
+      val alterTableColRenameAndDataTypeChangeModel =
+        AlterTableDataTypeChangeModel(
+          dataTypeInfo,
+          Option(targetCarbonTable.getDatabaseName.toLowerCase),
+          targetCarbonTable.getTableName.toLowerCase,
+          col.name.toLowerCase,
+          col.name.toLowerCase,
+          isColumnRename = false,
+          Option.empty)
+
+      CarbonAlterTableColRenameDataTypeChangeCommand(
+        alterTableColRenameAndDataTypeChangeModel
+      ).run(sparkSession)
+    })
+  }
+
+  def deduplicateBeforeWriting(
+      srcDs: Dataset[Row],
+      targetDs: Dataset[Row],
+      sparkSession: SparkSession,
+      srcAlias: String,
+      targetAlias: String,
+      keyColumn: String,
+      orderingField: String,
+      targetCarbonTable: CarbonTable): Dataset[Row] = {
+    val properties = CarbonProperties.getInstance()
+    val filterDupes = properties
+      .getProperty(CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE,
+        
CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE_DEFAULT).toBoolean
+    val combineBeforeUpsert = properties
+      .getProperty(CarbonCommonConstants.CARBON_STREAMER_UPSERT_DEDUPLICATE,
+        
CarbonCommonConstants.CARBON_STREAMER_UPSERT_DEDUPLICATE_DEFAULT).toBoolean
+    var dedupedDataset: Dataset[Row] = srcDs
+    if (combineBeforeUpsert) {
+      dedupedDataset = deduplicateAgainstIncomingDataset(srcDs, sparkSession, 
srcAlias, keyColumn,
+        orderingField, targetCarbonTable)
+    }
+    if (filterDupes) {
+      dedupedDataset = deduplicateAgainstExistingDataset(dedupedDataset, 
targetDs,
+        srcAlias, targetAlias, keyColumn)
+    }
+    dedupedDataset.show()
+    dedupedDataset
+  }
+
+  def deduplicateAgainstIncomingDataset(
+      srcDs: Dataset[Row],
+      sparkSession: SparkSession,
+      srcAlias: String,
+      keyColumn: String,
+      orderingField: String,
+      table: CarbonTable): Dataset[Row] = {
+    if 
(orderingField.equals(CarbonCommonConstants.CARBON_STREAMER_SOURCE_ORDERING_FIELD_DEFAULT))
 {
+      return srcDs
+    }
+    val schema = srcDs.schema
+    val carbonKeyColumn = table.getColumnByName(keyColumn)
+    val keyColumnDataType = getCarbonDataType(keyColumn, srcDs)
+    val orderingFieldDataType = getCarbonDataType(orderingField, srcDs)
+    val isPrimitiveAndNotDate = 
DataTypeUtil.isPrimitiveColumn(orderingFieldDataType) &&
+                                (orderingFieldDataType != DataTypes.DATE)
+    val comparator = getComparator(orderingFieldDataType)
+    val rdd = srcDs.rdd
+    val dedupedRDD: RDD[Row] = rdd.map{row =>

Review comment:
       please format this code

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil {
       columnMinMaxInBlocklet.asScala
     }
   }
+
+  /**
+   * This method verifies source and target schemas for the following:
+   * If additional columns are present in source schema as compared to target, 
simply ignore them.
+   * If some columns are missing in source schema as compared to target 
schema, exception is thrown.
+   * If data type of some column differs in source and target schemas, 
exception is thrown.
+   * If source schema has multiple columns whose names differ only in case 
sensitivity, exception
+   * is thrown.
+   * @param targetDs target carbondata table
+   * @param srcDs source/incoming data
+   */
+  def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: 
Dataset[Row]): Unit = {
+    LOGGER.info("schema enforcement is enabled. Source and target schemas will 
be verified")
+    // get the source and target dataset schema
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {
+      // check if some field is missing in source schema
+      if 
(!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase))
 {
+        LOGGER.error(s"source schema does not contain field: ${ tgtField.name 
}")
+        throw new CarbonSchemaException(s"source schema does not contain " +
+                                                  s"field: ${ tgtField.name }")
+      }
+
+      // check if data type got modified for some column
+      val sourceField = sourceSchema.fields
+        .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase))
+      if (!sourceField.get.dataType.equals(tgtField.dataType)) {
+        LOGGER.error(s"source schema has different data type for field: ${
+          tgtField.name
+        }, source type: ${ sourceField.get.dataType }, target type: ${ 
tgtField.dataType }")
+        throw new CarbonSchemaException(s"source schema has different data 
type " +
+                                                  s"for field: ${ 
tgtField.name }")
+      }
+    })
+
+    // check if some additional column got added in source schema
+    if (sourceSchema.fields.length > targetSchema.fields.length) {
+      val additionalSourceFields = sourceSchema.fields.map(_.name.toLowerCase)
+        .filterNot(srcField => {
+          targetSchema.fields.map(_.name.toLowerCase).contains(srcField)
+        })
+      LOGGER.warn(s"source schema contains additional fields which are not 
present in " +
+                  s"target schema: ${ additionalSourceFields.mkString(",") }")
+    }
+
+    // check if source schema has fields whose names only differ in case 
sensitivity
+    val similarFields = sourceSchema.fields.map(_.name.toLowerCase).groupBy(a 
=> identity(a)).map {
+      case (str, times) => (str, times.length)
+    }.toList.filter(e => e._2 > 1).map(_._1)
+    if (similarFields.nonEmpty) {
+      LOGGER.error(s"source schema has similar fields which differ only in 
case sensitivity: " +
+                   s"${ similarFields.mkString(",") }")
+      throw new CarbonSchemaException(s"source schema has similar fields which 
differ" +
+                                                s" only in case sensitivity: ${
+                                                  similarFields.mkString(",")
+                                                }")
+    }
+  }
+
+  /**
+   * This method takes care of handling schema evolution scenarios for 
CarbonStreamer class.
+   * Currently only addition of columns is supported.
+   * @param targetDs target dataset whose schema needs to be modified, if 
applicable
+   * @param srcDs incoming dataset
+   * @param sparkSession SparkSession
+   */
+  def handleSchemaEvolutionForCarbonStreamer(targetDs: Dataset[Row], srcDs: 
Dataset[Row],
+      sparkSession: SparkSession): Unit = {
+    // read the property here
+    val isSchemaEnforcementEnabled = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT,
+        
CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean
+    if (isSchemaEnforcementEnabled) {
+      verifySourceAndTargetSchemas(targetDs, srcDs)
+    } else {
+      // These meta columns should be removed before actually writing the data
+      val metaColumnsString = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_STREAMER_META_COLUMNS, "")
+      val metaCols = metaColumnsString.split(",").map(_.trim)
+      val srcDsWithoutMeta = if (metaCols.length > 0) srcDs.drop(metaCols: _*)
+      else srcDs
+      handleSchemaEvolution(targetDs, srcDsWithoutMeta, sparkSession, 
isStreamerInvolved = true)
+    }
+  }
+
+  def verifyBackwardsCompatibility(
+      targetDs: Dataset[Row],
+      srcDs: Dataset[Row]): Unit = {
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {
+      // check if some field is missing in source schema
+      if 
(!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase))
 {
+        LOGGER.error(s"source schema does not contain field: ${ tgtField.name 
}")
+        throw new CarbonSchemaException(s"source schema does not contain " +
+                                                  s"field: ${ tgtField.name }")
+      }
+
+      // check if data type got modified for some column
+      val sourceField = sourceSchema.fields
+        .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase))
+      if (!sourceField.get.dataType.equals(tgtField.dataType)) {
+        LOGGER.error(s"source schema has different data type for field: ${
+          tgtField.name
+        }, source type: ${ sourceField.get.dataType }, target type: ${ 
tgtField.dataType }")
+        throw new CarbonSchemaException(s"source schema has different data 
type " +
+                                                  s"for field: ${ 
tgtField.name }")
+      }
+    })
+  }
+
+  /**
+   * The method takes care of following schema evolution cases:
+   * Addition of a new column in source schema which is not present in target
+   * Deletion of a column in source schema which is present in target
+   * Data type changes for an existing column.
+   * The method does not take care of column renames and table renames
+   * @param targetDs existing target dataset
+   * @param srcDs incoming source dataset
+   * @return new target schema to write the incoming batch with
+   */
+  def handleSchemaEvolution(
+      targetDs: Dataset[Row],
+      srcDs: Dataset[Row],
+      sparkSession: SparkSession,
+      isStreamerInvolved: Boolean = false): Unit = {
+
+    if (isStreamerInvolved) {
+      verifyBackwardsCompatibility(targetDs, srcDs)
+    }
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    // check if any column got added in source
+    val addedColumns = sourceSchema.fields
+      .map(_.name)
+      .filterNot(f => targetSchema.fields.map(_.name).contains(f))
+    if (addedColumns.nonEmpty) {
+      handleAddColumnScenario(targetDs,
+        srcDs.schema.fields.filter(f => addedColumns.contains(f.name)).toSeq,
+        sparkSession)
+    }
+
+    // check if any column got deleted from source
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+    val partitionInfo = targetCarbonTable.getPartitionInfo
+    val partitionColumns = if (partitionInfo != null) 
partitionInfo.getColumnSchemaList.asScala
+      .map(_.getColumnName).toList else List[String]()
+    val deletedColumns = targetSchema.fields.map(_.name.toLowerCase)
+      .filterNot(f => {
+        sourceSchema.fields.map(_.name.toLowerCase).contains(f) ||
+        partitionColumns.contains(f)
+      })
+    if (deletedColumns.nonEmpty) {
+      handleDeleteColumnScenario(targetDs, deletedColumns.toList, sparkSession)
+    }
+
+    val modifiedColumns = targetSchema.fields.filter(tgtField => {
+      val sourceField = sourceSchema.fields.find(f => 
f.name.equalsIgnoreCase(tgtField.name))
+      if (sourceField.isDefined) 
!sourceField.get.dataType.equals(tgtField.dataType) else false
+    })
+
+    if (modifiedColumns.nonEmpty) {
+      handleDataTypeChangeScenario(targetDs, modifiedColumns.toList, 
sparkSession)
+    }
+  }
+
+  /**
+   * This method calls CarbonAlterTableAddColumnCommand for adding new columns
+   * @param targetDs target dataset whose schema needs to be modified
+   * @param colsToAdd new columns to be added
+   * @param sparkSession SparkSession
+   */
+  def handleAddColumnScenario(targetDs: Dataset[Row], colsToAdd: 
Seq[StructField],
+      sparkSession: SparkSession): Unit = {
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+    val fields = new CarbonSpark2SqlParser().getFields(colsToAdd)
+    val tableModel = CarbonParserUtil.prepareTableModel(ifNotExistPresent = 
false,
+      
CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)),
+      targetCarbonTable.getTableName.toLowerCase,
+      fields.map(CarbonParserUtil.convertFieldNamesToLowercase),
+      Seq.empty,
+      scala.collection.mutable.Map.empty[String, String],
+      None,
+      isAlterFlow = true)
+    //    targetCarbonTable.getAllDimensions.asScala.map(f => Field(column = 
f.getColName,
+    //      dataType = Some(f.getDataType.getName), name = 
Option(f.getColName),
+    //      children = None, ))
+    val alterTableAddColumnsModel = AlterTableAddColumnsModel(
+      
CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)),
+      targetCarbonTable.getTableName.toLowerCase,
+      Map.empty[String, String],
+      tableModel.dimCols,
+      tableModel.msrCols,
+      tableModel.highCardinalityDims.getOrElse(Seq.empty))
+    
CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel).run(sparkSession)
+  }
+
+  /**
+   * This method calls CarbonAlterTableDropColumnCommand for deleting columns
+   * @param targetDs target dataset whose schema needs to be modified
+   * @param colsToDrop columns to be dropped from carbondata table
+   * @param sparkSession SparkSession
+   */
+  def handleDeleteColumnScenario(targetDs: Dataset[Row], colsToDrop: 
List[String],
+      sparkSession: SparkSession): Unit = {
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+    val alterTableDropColumnModel = AlterTableDropColumnModel(
+      
CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)),
+      targetCarbonTable.getTableName.toLowerCase,
+      colsToDrop.map(_.toLowerCase))
+    
CarbonAlterTableDropColumnCommand(alterTableDropColumnModel).run(sparkSession)
+  }
+
+  /**
+   * This method calls CarbonAlterTableColRenameDataTypeChangeCommand for 
handling data type changes
+   * @param targetDs target dataset whose schema needs to be modified
+   * @param modifiedCols columns with data type changes
+   * @param sparkSession SparkSession
+   */
+  def handleDataTypeChangeScenario(targetDs: Dataset[Row], modifiedCols: 
List[StructField],
+      sparkSession: SparkSession): Unit = {
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+
+    // need to call the command one by one for each modified column
+    modifiedCols.foreach(col => {
+      val values = col.dataType match {

Review comment:
       same code is available in DDLHelper.changeColumn. can move common code 
to new method and reuse
   

##########
File path: 
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
##########
@@ -847,20 +820,222 @@ class MergeTestCase extends QueryTest with 
BeforeAndAfterAll {
         Row("j", 2, "RUSSIA"), Row("k", 0, "INDIA")))
   }
 
-  test("test all the merge APIs UPDATE, DELETE, UPSERT and INSERT") {
+  def prepareTarget(
+      isPartitioned: Boolean = false,
+      partitionedColumn: String = null
+  ): Dataset[Row] = {
     sql("drop table if exists target")
-    val initframe = sqlContext.sparkSession.createDataFrame(Seq(
+    val initFrame = sqlContext.sparkSession.createDataFrame(Seq(
       Row("a", "0"),
       Row("b", "1"),
       Row("c", "2"),
       Row("d", "3")
     ).asJava, StructType(Seq(StructField("key", StringType), 
StructField("value", StringType))))
-    initframe.write
-      .format("carbondata")
-      .option("tableName", "target")
-      .mode(SaveMode.Overwrite)
-      .save()
-    val target = sqlContext.read.format("carbondata").option("tableName", 
"target").load()
+
+    if (isPartitioned) {
+      initFrame.write
+        .format("carbondata")
+        .option("tableName", "target")
+        .option("partitionColumns", partitionedColumn)
+        .mode(SaveMode.Overwrite)
+        .save()
+    } else {
+      initFrame.write
+        .format("carbondata")
+        .option("tableName", "target")
+        .mode(SaveMode.Overwrite)
+        .save()
+    }
+    sqlContext.read.format("carbondata").option("tableName", "target").load()
+  }
+
+  def prepareTargetWithThreeFields(
+      isPartitioned: Boolean = false,
+      partitionedColumn: String = null
+  ): Dataset[Row] = {
+    sql("drop table if exists target")
+    val initFrame = sqlContext.sparkSession.createDataFrame(Seq(
+      Row("a", 0, "CHINA"),
+      Row("b", 1, "INDIA"),
+      Row("c", 2, "INDIA"),
+      Row("d", 3, "US")
+    ).asJava,
+      StructType(Seq(StructField("key", StringType),
+        StructField("value", IntegerType),
+        StructField("country", StringType))))
+
+    if (isPartitioned) {
+      initFrame.write
+        .format("carbondata")
+        .option("tableName", "target")
+        .option("partitionColumns", partitionedColumn)
+        .mode(SaveMode.Overwrite)
+        .save()
+    } else {
+      initFrame.write
+        .format("carbondata")
+        .option("tableName", "target")
+        .mode(SaveMode.Overwrite)
+        .save()
+    }
+    sqlContext.read.format("carbondata").option("tableName", "target").load()
+  }
+
+  test("test schema enforcement") {
+    val target = prepareTarget()
+    var cdc = sqlContext.sparkSession.createDataFrame(Seq(
+      Row("a", "1", "ab"),
+      Row("d", "4", "de")
+    ).asJava, StructType(Seq(StructField("key", StringType),
+      StructField("value", StringType)
+      , StructField("new_value", StringType))))
+    val properties = CarbonProperties.getInstance()
+    properties.addProperty(
+      CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE, "false"
+    )
+    properties.addProperty(
+      CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT, "true"
+    )
+    target.as("A").upsert(cdc.as("B"), "key").execute()
+    checkAnswer(sql("select * from target"),
+      Seq(Row("a", "1"), Row("b", "1"), Row("c", "2"), Row("d", "4")))
+
+    properties.addProperty(
+        CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE, "true"
+    )
+
+    val exceptionCaught1 = intercept[MalformedCarbonCommandException] {
+      cdc = sqlContext.sparkSession.createDataFrame(Seq(
+        Row("a", 1, "ab"),
+        Row("d", 4, "de")
+      ).asJava, StructType(Seq(StructField("key", StringType),
+        StructField("value", IntegerType)
+        , StructField("new_value", StringType))))
+      target.as("A").upsert(cdc.as("B"), "key").execute()
+    }
+    assert(exceptionCaught1.getMessage
+      .contains(
+        "property CARBON_STREAMER_INSERT_DEDUPLICATE should " +
+        "only be set with operation type INSERT"))
+
+    properties.addProperty(
+      CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE, "false"
+    )
+    val exceptionCaught2 = intercept[CarbonSchemaException] {
+      cdc = sqlContext.sparkSession.createDataFrame(Seq(
+        Row("a", 1),
+        Row("d", 4)
+      ).asJava, StructType(Seq(StructField("key", StringType),
+        StructField("val", IntegerType))))
+      target.as("A").upsert(cdc.as("B"), "key").execute()
+    }
+    assert(exceptionCaught2.getMessage.contains("source schema does not 
contain field: value"))
+
+    val exceptionCaught3 = intercept[CarbonSchemaException] {
+      cdc = sqlContext.sparkSession.createDataFrame(Seq(
+        Row("a", 1),
+        Row("d", 4)
+      ).asJava, StructType(Seq(StructField("key", StringType),
+        StructField("value", LongType))))
+      target.as("A").upsert(cdc.as("B"), "key").execute()
+    }
+
+    assert(exceptionCaught3.getMsg.contains("source schema has different " +
+                                            "data type for field: value"))
+
+    val exceptionCaught4 = intercept[CarbonSchemaException] {
+      cdc = sqlContext.sparkSession.createDataFrame(Seq(
+        Row("a", "1", "A"),
+        Row("d", "4", "D")
+      ).asJava, StructType(Seq(StructField("key", StringType),
+        StructField("value", StringType), StructField("Key", StringType))))
+      target.as("A").upsert(cdc.as("B"), "key").execute()
+    }
+
+    assert(exceptionCaught4.getMsg.contains("source schema has similar fields 
which " +
+                                            "differ only in case sensitivity: 
key"))
+  }
+
+  test("test schema evolution") {
+    val properties = CarbonProperties.getInstance()
+    properties.addProperty(
+      CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE, "false"
+    )
+    properties.addProperty(
+      CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT, "false"
+    )
+    properties.addProperty(
+      CarbonCommonConstants.CARBON_STREAMER_SOURCE_ORDERING_FIELD, "value"
+    )
+    sql("drop table if exists target")
+    var target = prepareTargetWithThreeFields()
+    var cdc = sqlContext.sparkSession.createDataFrame(Seq(
+      Row("a", 1, "ab", "china"),
+      Row("d", 4, "de", "china"),
+      Row("d", 7, "updated_de", "china_pro")
+    ).asJava, StructType(Seq(StructField("key", StringType),
+      StructField("value", IntegerType)
+      , StructField("new_value", StringType),
+      StructField("country", StringType))))
+    target.as("A").upsert(cdc.as("B"), "key").execute()
+    checkAnswer(sql("select * from target"),
+      Seq(Row("a", 1, "china", "ab"), Row("b", 1, "INDIA", null),
+        Row("c", 2, "INDIA", null), Row("d", 7, "china_pro", "updated_de")))
+
+    target = sqlContext.read.format("carbondata").option("tableName", 
"target").load()
+
+    cdc = sqlContext.sparkSession.createDataFrame(Seq(
+      Row("a", 5),
+      Row("d", 5)
+    ).asJava, StructType(Seq(StructField("key", StringType),
+      StructField("value", IntegerType))))
+    target.as("A").upsert(cdc.as("B"), "key").execute()
+    checkAnswer(sql("select * from target"),
+      Seq(Row("a", 5), Row("b", 1),
+        Row("c", 2), Row("d", 5)))
+
+//    target = sqlContext.read.format("carbondata").option("tableName", 
"target").load()
+//    cdc = sqlContext.sparkSession.createDataFrame(Seq(
+//      Row("b", 50),
+//      Row("d", 50)

Review comment:
       please remove this code if not needed

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil {
       columnMinMaxInBlocklet.asScala
     }
   }
+
+  /**
+   * This method verifies source and target schemas for the following:
+   * If additional columns are present in source schema as compared to target, 
simply ignore them.
+   * If some columns are missing in source schema as compared to target 
schema, exception is thrown.
+   * If data type of some column differs in source and target schemas, 
exception is thrown.
+   * If source schema has multiple columns whose names differ only in case 
sensitivity, exception
+   * is thrown.
+   * @param targetDs target carbondata table
+   * @param srcDs source/incoming data
+   */
+  def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: 
Dataset[Row]): Unit = {
+    LOGGER.info("schema enforcement is enabled. Source and target schemas will 
be verified")
+    // get the source and target dataset schema
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {
+      // check if some field is missing in source schema
+      if 
(!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase))
 {
+        LOGGER.error(s"source schema does not contain field: ${ tgtField.name 
}")
+        throw new CarbonSchemaException(s"source schema does not contain " +
+                                                  s"field: ${ tgtField.name }")
+      }
+
+      // check if data type got modified for some column
+      val sourceField = sourceSchema.fields
+        .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase))
+      if (!sourceField.get.dataType.equals(tgtField.dataType)) {
+        LOGGER.error(s"source schema has different data type for field: ${
+          tgtField.name
+        }, source type: ${ sourceField.get.dataType }, target type: ${ 
tgtField.dataType }")
+        throw new CarbonSchemaException(s"source schema has different data 
type " +
+                                                  s"for field: ${ 
tgtField.name }")
+      }
+    })
+
+    // check if some additional column got added in source schema
+    if (sourceSchema.fields.length > targetSchema.fields.length) {
+      val additionalSourceFields = sourceSchema.fields.map(_.name.toLowerCase)
+        .filterNot(srcField => {
+          targetSchema.fields.map(_.name.toLowerCase).contains(srcField)
+        })
+      LOGGER.warn(s"source schema contains additional fields which are not 
present in " +
+                  s"target schema: ${ additionalSourceFields.mkString(",") }")
+    }
+
+    // check if source schema has fields whose names only differ in case 
sensitivity
+    val similarFields = sourceSchema.fields.map(_.name.toLowerCase).groupBy(a 
=> identity(a)).map {
+      case (str, times) => (str, times.length)
+    }.toList.filter(e => e._2 > 1).map(_._1)
+    if (similarFields.nonEmpty) {
+      LOGGER.error(s"source schema has similar fields which differ only in 
case sensitivity: " +
+                   s"${ similarFields.mkString(",") }")
+      throw new CarbonSchemaException(s"source schema has similar fields which 
differ" +
+                                                s" only in case sensitivity: ${
+                                                  similarFields.mkString(",")
+                                                }")
+    }
+  }
+
+  /**
+   * This method takes care of handling schema evolution scenarios for 
CarbonStreamer class.
+   * Currently only addition of columns is supported.
+   * @param targetDs target dataset whose schema needs to be modified, if 
applicable
+   * @param srcDs incoming dataset
+   * @param sparkSession SparkSession
+   */
+  def handleSchemaEvolutionForCarbonStreamer(targetDs: Dataset[Row], srcDs: 
Dataset[Row],
+      sparkSession: SparkSession): Unit = {
+    // read the property here
+    val isSchemaEnforcementEnabled = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT,
+        
CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean
+    if (isSchemaEnforcementEnabled) {
+      verifySourceAndTargetSchemas(targetDs, srcDs)
+    } else {
+      // These meta columns should be removed before actually writing the data
+      val metaColumnsString = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_STREAMER_META_COLUMNS, "")
+      val metaCols = metaColumnsString.split(",").map(_.trim)
+      val srcDsWithoutMeta = if (metaCols.length > 0) srcDs.drop(metaCols: _*)
+      else srcDs
+      handleSchemaEvolution(targetDs, srcDsWithoutMeta, sparkSession, 
isStreamerInvolved = true)
+    }
+  }
+
+  def verifyBackwardsCompatibility(
+      targetDs: Dataset[Row],
+      srcDs: Dataset[Row]): Unit = {
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {
+      // check if some field is missing in source schema
+      if 
(!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase))
 {
+        LOGGER.error(s"source schema does not contain field: ${ tgtField.name 
}")
+        throw new CarbonSchemaException(s"source schema does not contain " +
+                                                  s"field: ${ tgtField.name }")
+      }
+
+      // check if data type got modified for some column
+      val sourceField = sourceSchema.fields
+        .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase))
+      if (!sourceField.get.dataType.equals(tgtField.dataType)) {
+        LOGGER.error(s"source schema has different data type for field: ${
+          tgtField.name
+        }, source type: ${ sourceField.get.dataType }, target type: ${ 
tgtField.dataType }")
+        throw new CarbonSchemaException(s"source schema has different data 
type " +
+                                                  s"for field: ${ 
tgtField.name }")
+      }
+    })
+  }
+
+  /**
+   * The method takes care of following schema evolution cases:
+   * Addition of a new column in source schema which is not present in target
+   * Deletion of a column in source schema which is present in target
+   * Data type changes for an existing column.
+   * The method does not take care of column renames and table renames
+   * @param targetDs existing target dataset
+   * @param srcDs incoming source dataset
+   * @return new target schema to write the incoming batch with
+   */
+  def handleSchemaEvolution(
+      targetDs: Dataset[Row],
+      srcDs: Dataset[Row],
+      sparkSession: SparkSession,
+      isStreamerInvolved: Boolean = false): Unit = {
+
+    if (isStreamerInvolved) {
+      verifyBackwardsCompatibility(targetDs, srcDs)
+    }
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    // check if any column got added in source
+    val addedColumns = sourceSchema.fields
+      .map(_.name)
+      .filterNot(f => targetSchema.fields.map(_.name).contains(f))
+    if (addedColumns.nonEmpty) {
+      handleAddColumnScenario(targetDs,
+        srcDs.schema.fields.filter(f => addedColumns.contains(f.name)).toSeq,
+        sparkSession)
+    }
+
+    // check if any column got deleted from source
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+    val partitionInfo = targetCarbonTable.getPartitionInfo
+    val partitionColumns = if (partitionInfo != null) 
partitionInfo.getColumnSchemaList.asScala
+      .map(_.getColumnName).toList else List[String]()
+    val deletedColumns = targetSchema.fields.map(_.name.toLowerCase)
+      .filterNot(f => {
+        sourceSchema.fields.map(_.name.toLowerCase).contains(f) ||
+        partitionColumns.contains(f)
+      })
+    if (deletedColumns.nonEmpty) {
+      handleDeleteColumnScenario(targetDs, deletedColumns.toList, sparkSession)
+    }
+
+    val modifiedColumns = targetSchema.fields.filter(tgtField => {
+      val sourceField = sourceSchema.fields.find(f => 
f.name.equalsIgnoreCase(tgtField.name))
+      if (sourceField.isDefined) 
!sourceField.get.dataType.equals(tgtField.dataType) else false
+    })
+
+    if (modifiedColumns.nonEmpty) {
+      handleDataTypeChangeScenario(targetDs, modifiedColumns.toList, 
sparkSession)
+    }
+  }
+
+  /**
+   * This method calls CarbonAlterTableAddColumnCommand for adding new columns
+   * @param targetDs target dataset whose schema needs to be modified
+   * @param colsToAdd new columns to be added
+   * @param sparkSession SparkSession
+   */
+  def handleAddColumnScenario(targetDs: Dataset[Row], colsToAdd: 
Seq[StructField],
+      sparkSession: SparkSession): Unit = {
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+    val fields = new CarbonSpark2SqlParser().getFields(colsToAdd)
+    val tableModel = CarbonParserUtil.prepareTableModel(ifNotExistPresent = 
false,
+      
CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)),
+      targetCarbonTable.getTableName.toLowerCase,
+      fields.map(CarbonParserUtil.convertFieldNamesToLowercase),
+      Seq.empty,
+      scala.collection.mutable.Map.empty[String, String],
+      None,
+      isAlterFlow = true)
+    //    targetCarbonTable.getAllDimensions.asScala.map(f => Field(column = 
f.getColName,

Review comment:
       remove this code

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil {
       columnMinMaxInBlocklet.asScala
     }
   }
+
+  /**
+   * This method verifies source and target schemas for the following:
+   * If additional columns are present in source schema as compared to target, 
simply ignore them.
+   * If some columns are missing in source schema as compared to target 
schema, exception is thrown.
+   * If data type of some column differs in source and target schemas, 
exception is thrown.
+   * If source schema has multiple columns whose names differ only in case 
sensitivity, exception
+   * is thrown.
+   * @param targetDs target carbondata table
+   * @param srcDs source/incoming data
+   */
+  def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: 
Dataset[Row]): Unit = {
+    LOGGER.info("schema enforcement is enabled. Source and target schemas will 
be verified")
+    // get the source and target dataset schema
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {
+      // check if some field is missing in source schema
+      if 
(!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase))
 {
+        LOGGER.error(s"source schema does not contain field: ${ tgtField.name 
}")
+        throw new CarbonSchemaException(s"source schema does not contain " +
+                                                  s"field: ${ tgtField.name }")
+      }
+
+      // check if data type got modified for some column
+      val sourceField = sourceSchema.fields
+        .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase))
+      if (!sourceField.get.dataType.equals(tgtField.dataType)) {
+        LOGGER.error(s"source schema has different data type for field: ${
+          tgtField.name
+        }, source type: ${ sourceField.get.dataType }, target type: ${ 
tgtField.dataType }")
+        throw new CarbonSchemaException(s"source schema has different data 
type " +
+                                                  s"for field: ${ 
tgtField.name }")
+      }
+    })
+
+    // check if some additional column got added in source schema
+    if (sourceSchema.fields.length > targetSchema.fields.length) {
+      val additionalSourceFields = sourceSchema.fields.map(_.name.toLowerCase)
+        .filterNot(srcField => {
+          targetSchema.fields.map(_.name.toLowerCase).contains(srcField)
+        })
+      LOGGER.warn(s"source schema contains additional fields which are not 
present in " +
+                  s"target schema: ${ additionalSourceFields.mkString(",") }")
+    }
+
+    // check if source schema has fields whose names only differ in case 
sensitivity
+    val similarFields = sourceSchema.fields.map(_.name.toLowerCase).groupBy(a 
=> identity(a)).map {
+      case (str, times) => (str, times.length)
+    }.toList.filter(e => e._2 > 1).map(_._1)
+    if (similarFields.nonEmpty) {
+      LOGGER.error(s"source schema has similar fields which differ only in 
case sensitivity: " +
+                   s"${ similarFields.mkString(",") }")
+      throw new CarbonSchemaException(s"source schema has similar fields which 
differ" +
+                                                s" only in case sensitivity: ${
+                                                  similarFields.mkString(",")
+                                                }")
+    }
+  }
+
+  /**
+   * This method takes care of handling schema evolution scenarios for 
CarbonStreamer class.
+   * Currently only addition of columns is supported.
+   * @param targetDs target dataset whose schema needs to be modified, if 
applicable
+   * @param srcDs incoming dataset
+   * @param sparkSession SparkSession
+   */
+  def handleSchemaEvolutionForCarbonStreamer(targetDs: Dataset[Row], srcDs: 
Dataset[Row],
+      sparkSession: SparkSession): Unit = {
+    // read the property here
+    val isSchemaEnforcementEnabled = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT,
+        
CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean
+    if (isSchemaEnforcementEnabled) {
+      verifySourceAndTargetSchemas(targetDs, srcDs)
+    } else {
+      // These meta columns should be removed before actually writing the data
+      val metaColumnsString = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_STREAMER_META_COLUMNS, "")
+      val metaCols = metaColumnsString.split(",").map(_.trim)
+      val srcDsWithoutMeta = if (metaCols.length > 0) srcDs.drop(metaCols: _*)
+      else srcDs
+      handleSchemaEvolution(targetDs, srcDsWithoutMeta, sparkSession, 
isStreamerInvolved = true)
+    }
+  }
+
+  def verifyBackwardsCompatibility(
+      targetDs: Dataset[Row],
+      srcDs: Dataset[Row]): Unit = {
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {
+      // check if some field is missing in source schema
+      if 
(!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase))
 {
+        LOGGER.error(s"source schema does not contain field: ${ tgtField.name 
}")
+        throw new CarbonSchemaException(s"source schema does not contain " +
+                                                  s"field: ${ tgtField.name }")
+      }
+
+      // check if data type got modified for some column
+      val sourceField = sourceSchema.fields
+        .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase))
+      if (!sourceField.get.dataType.equals(tgtField.dataType)) {
+        LOGGER.error(s"source schema has different data type for field: ${
+          tgtField.name
+        }, source type: ${ sourceField.get.dataType }, target type: ${ 
tgtField.dataType }")
+        throw new CarbonSchemaException(s"source schema has different data 
type " +
+                                                  s"for field: ${ 
tgtField.name }")
+      }
+    })
+  }
+
+  /**
+   * The method takes care of following schema evolution cases:
+   * Addition of a new column in source schema which is not present in target
+   * Deletion of a column in source schema which is present in target
+   * Data type changes for an existing column.
+   * The method does not take care of column renames and table renames
+   * @param targetDs existing target dataset
+   * @param srcDs incoming source dataset
+   * @return new target schema to write the incoming batch with
+   */
+  def handleSchemaEvolution(
+      targetDs: Dataset[Row],
+      srcDs: Dataset[Row],
+      sparkSession: SparkSession,
+      isStreamerInvolved: Boolean = false): Unit = {
+
+    if (isStreamerInvolved) {
+      verifyBackwardsCompatibility(targetDs, srcDs)
+    }
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    // check if any column got added in source
+    val addedColumns = sourceSchema.fields
+      .map(_.name)
+      .filterNot(f => targetSchema.fields.map(_.name).contains(f))
+    if (addedColumns.nonEmpty) {
+      handleAddColumnScenario(targetDs,
+        srcDs.schema.fields.filter(f => addedColumns.contains(f.name)).toSeq,
+        sparkSession)
+    }
+
+    // check if any column got deleted from source
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+    val partitionInfo = targetCarbonTable.getPartitionInfo
+    val partitionColumns = if (partitionInfo != null) 
partitionInfo.getColumnSchemaList.asScala
+      .map(_.getColumnName).toList else List[String]()
+    val deletedColumns = targetSchema.fields.map(_.name.toLowerCase)
+      .filterNot(f => {
+        sourceSchema.fields.map(_.name.toLowerCase).contains(f) ||
+        partitionColumns.contains(f)
+      })
+    if (deletedColumns.nonEmpty) {
+      handleDeleteColumnScenario(targetDs, deletedColumns.toList, sparkSession)
+    }
+
+    val modifiedColumns = targetSchema.fields.filter(tgtField => {
+      val sourceField = sourceSchema.fields.find(f => 
f.name.equalsIgnoreCase(tgtField.name))
+      if (sourceField.isDefined) 
!sourceField.get.dataType.equals(tgtField.dataType) else false
+    })
+
+    if (modifiedColumns.nonEmpty) {
+      handleDataTypeChangeScenario(targetDs, modifiedColumns.toList, 
sparkSession)
+    }
+  }
+
+  /**
+   * This method calls CarbonAlterTableAddColumnCommand for adding new columns
+   * @param targetDs target dataset whose schema needs to be modified
+   * @param colsToAdd new columns to be added
+   * @param sparkSession SparkSession
+   */
+  def handleAddColumnScenario(targetDs: Dataset[Row], colsToAdd: 
Seq[StructField],
+      sparkSession: SparkSession): Unit = {
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+    val fields = new CarbonSpark2SqlParser().getFields(colsToAdd)
+    val tableModel = CarbonParserUtil.prepareTableModel(ifNotExistPresent = 
false,
+      
CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)),
+      targetCarbonTable.getTableName.toLowerCase,
+      fields.map(CarbonParserUtil.convertFieldNamesToLowercase),
+      Seq.empty,
+      scala.collection.mutable.Map.empty[String, String],
+      None,
+      isAlterFlow = true)
+    //    targetCarbonTable.getAllDimensions.asScala.map(f => Field(column = 
f.getColName,
+    //      dataType = Some(f.getDataType.getName), name = 
Option(f.getColName),
+    //      children = None, ))
+    val alterTableAddColumnsModel = AlterTableAddColumnsModel(
+      
CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)),
+      targetCarbonTable.getTableName.toLowerCase,
+      Map.empty[String, String],
+      tableModel.dimCols,
+      tableModel.msrCols,
+      tableModel.highCardinalityDims.getOrElse(Seq.empty))
+    
CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel).run(sparkSession)
+  }
+
+  /**
+   * This method calls CarbonAlterTableDropColumnCommand for deleting columns
+   * @param targetDs target dataset whose schema needs to be modified
+   * @param colsToDrop columns to be dropped from carbondata table
+   * @param sparkSession SparkSession
+   */
+  def handleDeleteColumnScenario(targetDs: Dataset[Row], colsToDrop: 
List[String],
+      sparkSession: SparkSession): Unit = {
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+    val alterTableDropColumnModel = AlterTableDropColumnModel(
+      
CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)),
+      targetCarbonTable.getTableName.toLowerCase,
+      colsToDrop.map(_.toLowerCase))
+    
CarbonAlterTableDropColumnCommand(alterTableDropColumnModel).run(sparkSession)
+  }
+
+  /**
+   * This method calls CarbonAlterTableColRenameDataTypeChangeCommand for 
handling data type changes
+   * @param targetDs target dataset whose schema needs to be modified
+   * @param modifiedCols columns with data type changes
+   * @param sparkSession SparkSession
+   */
+  def handleDataTypeChangeScenario(targetDs: Dataset[Row], modifiedCols: 
List[StructField],
+      sparkSession: SparkSession): Unit = {
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+
+    // need to call the command one by one for each modified column
+    modifiedCols.foreach(col => {
+      val values = col.dataType match {
+        case d: DecimalType => Some(List((d.precision, d.scale)))
+        case _ => None
+      }
+      val dataTypeInfo = CarbonParserUtil.parseColumn(col.name, col.dataType, 
values)
+
+      val alterTableColRenameAndDataTypeChangeModel =
+        AlterTableDataTypeChangeModel(
+          dataTypeInfo,
+          Option(targetCarbonTable.getDatabaseName.toLowerCase),
+          targetCarbonTable.getTableName.toLowerCase,
+          col.name.toLowerCase,
+          col.name.toLowerCase,
+          isColumnRename = false,
+          Option.empty)
+
+      CarbonAlterTableColRenameDataTypeChangeCommand(
+        alterTableColRenameAndDataTypeChangeModel
+      ).run(sparkSession)
+    })
+  }
+
+  def deduplicateBeforeWriting(
+      srcDs: Dataset[Row],
+      targetDs: Dataset[Row],
+      sparkSession: SparkSession,
+      srcAlias: String,
+      targetAlias: String,
+      keyColumn: String,
+      orderingField: String,
+      targetCarbonTable: CarbonTable): Dataset[Row] = {
+    val properties = CarbonProperties.getInstance()
+    val filterDupes = properties
+      .getProperty(CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE,
+        
CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE_DEFAULT).toBoolean
+    val combineBeforeUpsert = properties
+      .getProperty(CarbonCommonConstants.CARBON_STREAMER_UPSERT_DEDUPLICATE,
+        
CarbonCommonConstants.CARBON_STREAMER_UPSERT_DEDUPLICATE_DEFAULT).toBoolean
+    var dedupedDataset: Dataset[Row] = srcDs
+    if (combineBeforeUpsert) {
+      dedupedDataset = deduplicateAgainstIncomingDataset(srcDs, sparkSession, 
srcAlias, keyColumn,
+        orderingField, targetCarbonTable)
+    }
+    if (filterDupes) {
+      dedupedDataset = deduplicateAgainstExistingDataset(dedupedDataset, 
targetDs,
+        srcAlias, targetAlias, keyColumn)
+    }
+    dedupedDataset.show()

Review comment:
       remove this line




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to