Indhumathi27 commented on a change in pull request #4227: URL: https://github.com/apache/carbondata/pull/4227#discussion_r728023802
########## File path: common/src/main/java/org/apache/carbondata/common/exceptions/sql/CarbonSchemaException.java ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.common.exceptions.sql; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; + +@InterfaceAudience.User +@InterfaceStability.Stable +public class CarbonSchemaException extends Exception { + + private static final long serialVersionUID = 1L; + + private final String msg; + + public CarbonSchemaException(String msg) { + super(msg); + this.msg = msg; + } + + public String getMsg() { Review comment: ```suggestion public String getMsg() { ``` ```suggestion public String getMessage() { ``` ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala ########## @@ -98,8 +99,35 @@ case class CarbonMergeDataSetCommand( throw new UnsupportedOperationException( "Carbon table supposed to be present in merge dataset") } + + val properties = CarbonProperties.getInstance() + val filterDupes = properties + .getProperty(CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE, + CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE_DEFAULT).toBoolean + if (operationType != null && + !MergeOperationType.withName(operationType.toUpperCase).equals(MergeOperationType.INSERT) && + filterDupes) { + throw new MalformedCarbonCommandException("property CARBON_STREAMER_INSERT_DEDUPLICATE" + + " should only be set with operation type INSERT") + } + val isSchemaEnforcementEnabled = properties + .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT, + CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean + if (operationType != null) { + if (isSchemaEnforcementEnabled) { + // call the util function to verify if incoming schema matches with target schema + CarbonMergeDataSetUtil.verifySourceAndTargetSchemas(targetDsOri, srcDS) + } else { + CarbonMergeDataSetUtil.handleSchemaEvolution( + targetDsOri, srcDS, sparkSession) + } + } + // Target dataset must be backed by carbondata table. - val targetCarbonTable = relations.head.carbonRelation.carbonTable + val tgtTable = relations.head.carbonRelation.carbonTable + val targetCarbonTable: CarbonTable = CarbonEnv.getCarbonTable(Option(tgtTable.getDatabaseName), Review comment: why this code is required ? ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala ########## @@ -98,8 +99,35 @@ case class CarbonMergeDataSetCommand( throw new UnsupportedOperationException( "Carbon table supposed to be present in merge dataset") } + + val properties = CarbonProperties.getInstance() Review comment: looks like, the validations added here, is applicable only when operationType is not Null. So, please move these validations inside if check (operationType != null ) ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala ########## @@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil { columnMinMaxInBlocklet.asScala } } + + /** + * This method verifies source and target schemas for the following: + * If additional columns are present in source schema as compared to target, simply ignore them. + * If some columns are missing in source schema as compared to target schema, exception is thrown. + * If data type of some column differs in source and target schemas, exception is thrown. + * If source schema has multiple columns whose names differ only in case sensitivity, exception + * is thrown. + * @param targetDs target carbondata table + * @param srcDs source/incoming data + */ + def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: Dataset[Row]): Unit = { + LOGGER.info("schema enforcement is enabled. Source and target schemas will be verified") + // get the source and target dataset schema + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + targetSchema.fields.foreach(tgtField => { + // check if some field is missing in source schema + if (!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase)) { + LOGGER.error(s"source schema does not contain field: ${ tgtField.name }") + throw new CarbonSchemaException(s"source schema does not contain " + + s"field: ${ tgtField.name }") + } + + // check if data type got modified for some column + val sourceField = sourceSchema.fields + .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase)) + if (!sourceField.get.dataType.equals(tgtField.dataType)) { + LOGGER.error(s"source schema has different data type for field: ${ + tgtField.name + }, source type: ${ sourceField.get.dataType }, target type: ${ tgtField.dataType }") + throw new CarbonSchemaException(s"source schema has different data type " + + s"for field: ${ tgtField.name }") + } + }) + + // check if some additional column got added in source schema + if (sourceSchema.fields.length > targetSchema.fields.length) { + val additionalSourceFields = sourceSchema.fields.map(_.name.toLowerCase) + .filterNot(srcField => { + targetSchema.fields.map(_.name.toLowerCase).contains(srcField) + }) + LOGGER.warn(s"source schema contains additional fields which are not present in " + + s"target schema: ${ additionalSourceFields.mkString(",") }") + } + + // check if source schema has fields whose names only differ in case sensitivity + val similarFields = sourceSchema.fields.map(_.name.toLowerCase).groupBy(a => identity(a)).map { Review comment: can create new variable for sourceSchema.fields.map(_.name.toLowerCase) and reuse in line:516 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala ########## @@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil { columnMinMaxInBlocklet.asScala } } + + /** + * This method verifies source and target schemas for the following: + * If additional columns are present in source schema as compared to target, simply ignore them. + * If some columns are missing in source schema as compared to target schema, exception is thrown. + * If data type of some column differs in source and target schemas, exception is thrown. + * If source schema has multiple columns whose names differ only in case sensitivity, exception + * is thrown. + * @param targetDs target carbondata table + * @param srcDs source/incoming data + */ + def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: Dataset[Row]): Unit = { + LOGGER.info("schema enforcement is enabled. Source and target schemas will be verified") + // get the source and target dataset schema + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + targetSchema.fields.foreach(tgtField => { + // check if some field is missing in source schema + if (!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase)) { + LOGGER.error(s"source schema does not contain field: ${ tgtField.name }") Review comment: can add exception message to a new variable and reuse. Please handle in all places applicable ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala ########## @@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil { columnMinMaxInBlocklet.asScala } } + + /** + * This method verifies source and target schemas for the following: + * If additional columns are present in source schema as compared to target, simply ignore them. + * If some columns are missing in source schema as compared to target schema, exception is thrown. + * If data type of some column differs in source and target schemas, exception is thrown. + * If source schema has multiple columns whose names differ only in case sensitivity, exception + * is thrown. + * @param targetDs target carbondata table + * @param srcDs source/incoming data + */ + def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: Dataset[Row]): Unit = { + LOGGER.info("schema enforcement is enabled. Source and target schemas will be verified") + // get the source and target dataset schema + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + targetSchema.fields.foreach(tgtField => { + // check if some field is missing in source schema + if (!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase)) { + LOGGER.error(s"source schema does not contain field: ${ tgtField.name }") + throw new CarbonSchemaException(s"source schema does not contain " + + s"field: ${ tgtField.name }") + } + + // check if data type got modified for some column + val sourceField = sourceSchema.fields + .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase)) + if (!sourceField.get.dataType.equals(tgtField.dataType)) { + LOGGER.error(s"source schema has different data type for field: ${ + tgtField.name + }, source type: ${ sourceField.get.dataType }, target type: ${ tgtField.dataType }") + throw new CarbonSchemaException(s"source schema has different data type " + + s"for field: ${ tgtField.name }") + } + }) + + // check if some additional column got added in source schema + if (sourceSchema.fields.length > targetSchema.fields.length) { + val additionalSourceFields = sourceSchema.fields.map(_.name.toLowerCase) + .filterNot(srcField => { + targetSchema.fields.map(_.name.toLowerCase).contains(srcField) + }) + LOGGER.warn(s"source schema contains additional fields which are not present in " + + s"target schema: ${ additionalSourceFields.mkString(",") }") + } + + // check if source schema has fields whose names only differ in case sensitivity + val similarFields = sourceSchema.fields.map(_.name.toLowerCase).groupBy(a => identity(a)).map { + case (str, times) => (str, times.length) + }.toList.filter(e => e._2 > 1).map(_._1) + if (similarFields.nonEmpty) { + LOGGER.error(s"source schema has similar fields which differ only in case sensitivity: " + + s"${ similarFields.mkString(",") }") + throw new CarbonSchemaException(s"source schema has similar fields which differ" + + s" only in case sensitivity: ${ + similarFields.mkString(",") + }") + } + } + + /** + * This method takes care of handling schema evolution scenarios for CarbonStreamer class. + * Currently only addition of columns is supported. + * @param targetDs target dataset whose schema needs to be modified, if applicable + * @param srcDs incoming dataset + * @param sparkSession SparkSession + */ + def handleSchemaEvolutionForCarbonStreamer(targetDs: Dataset[Row], srcDs: Dataset[Row], + sparkSession: SparkSession): Unit = { + // read the property here + val isSchemaEnforcementEnabled = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT, + CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean + if (isSchemaEnforcementEnabled) { + verifySourceAndTargetSchemas(targetDs, srcDs) + } else { + // These meta columns should be removed before actually writing the data + val metaColumnsString = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_STREAMER_META_COLUMNS, "") + val metaCols = metaColumnsString.split(",").map(_.trim) + val srcDsWithoutMeta = if (metaCols.length > 0) srcDs.drop(metaCols: _*) + else srcDs + handleSchemaEvolution(targetDs, srcDsWithoutMeta, sparkSession, isStreamerInvolved = true) + } + } + + def verifyBackwardsCompatibility( + targetDs: Dataset[Row], + srcDs: Dataset[Row]): Unit = { + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + targetSchema.fields.foreach(tgtField => { + // check if some field is missing in source schema + if (!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase)) { + LOGGER.error(s"source schema does not contain field: ${ tgtField.name }") + throw new CarbonSchemaException(s"source schema does not contain " + + s"field: ${ tgtField.name }") + } + + // check if data type got modified for some column + val sourceField = sourceSchema.fields + .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase)) + if (!sourceField.get.dataType.equals(tgtField.dataType)) { + LOGGER.error(s"source schema has different data type for field: ${ + tgtField.name + }, source type: ${ sourceField.get.dataType }, target type: ${ tgtField.dataType }") + throw new CarbonSchemaException(s"source schema has different data type " + + s"for field: ${ tgtField.name }") + } + }) + } + + /** + * The method takes care of following schema evolution cases: + * Addition of a new column in source schema which is not present in target + * Deletion of a column in source schema which is present in target + * Data type changes for an existing column. + * The method does not take care of column renames and table renames + * @param targetDs existing target dataset + * @param srcDs incoming source dataset + * @return new target schema to write the incoming batch with + */ + def handleSchemaEvolution( + targetDs: Dataset[Row], + srcDs: Dataset[Row], + sparkSession: SparkSession, + isStreamerInvolved: Boolean = false): Unit = { + + if (isStreamerInvolved) { + verifyBackwardsCompatibility(targetDs, srcDs) + } + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + // check if any column got added in source + val addedColumns = sourceSchema.fields + .map(_.name) + .filterNot(f => targetSchema.fields.map(_.name).contains(f)) + if (addedColumns.nonEmpty) { + handleAddColumnScenario(targetDs, + srcDs.schema.fields.filter(f => addedColumns.contains(f.name)).toSeq, + sparkSession) + } + + // check if any column got deleted from source + val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan) + val targetCarbonTable = relations.head.carbonRelation.carbonTable + val partitionInfo = targetCarbonTable.getPartitionInfo + val partitionColumns = if (partitionInfo != null) partitionInfo.getColumnSchemaList.asScala + .map(_.getColumnName).toList else List[String]() + val deletedColumns = targetSchema.fields.map(_.name.toLowerCase) + .filterNot(f => { + sourceSchema.fields.map(_.name.toLowerCase).contains(f) || Review comment: can move sourceSchema.fields.map(_.name.toLowerCase) outside the loop to new variable and reuse ########## File path: pom.xml ########## @@ -130,7 +130,7 @@ <scala.version>2.11.8</scala.version> <hadoop.deps.scope>compile</hadoop.deps.scope> <spark.version>2.3.4</spark.version> - <spark.binary.version>2.3</spark.binary.version> + <spark.binary.version>2.4</spark.binary.version> Review comment: why this change ? please revert ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala ########## @@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil { columnMinMaxInBlocklet.asScala } } + + /** + * This method verifies source and target schemas for the following: + * If additional columns are present in source schema as compared to target, simply ignore them. + * If some columns are missing in source schema as compared to target schema, exception is thrown. + * If data type of some column differs in source and target schemas, exception is thrown. + * If source schema has multiple columns whose names differ only in case sensitivity, exception + * is thrown. + * @param targetDs target carbondata table + * @param srcDs source/incoming data + */ + def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: Dataset[Row]): Unit = { + LOGGER.info("schema enforcement is enabled. Source and target schemas will be verified") + // get the source and target dataset schema + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + targetSchema.fields.foreach(tgtField => { + // check if some field is missing in source schema + if (!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase)) { + LOGGER.error(s"source schema does not contain field: ${ tgtField.name }") + throw new CarbonSchemaException(s"source schema does not contain " + + s"field: ${ tgtField.name }") + } + + // check if data type got modified for some column + val sourceField = sourceSchema.fields + .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase)) + if (!sourceField.get.dataType.equals(tgtField.dataType)) { + LOGGER.error(s"source schema has different data type for field: ${ + tgtField.name + }, source type: ${ sourceField.get.dataType }, target type: ${ tgtField.dataType }") + throw new CarbonSchemaException(s"source schema has different data type " + + s"for field: ${ tgtField.name }") + } + }) + + // check if some additional column got added in source schema + if (sourceSchema.fields.length > targetSchema.fields.length) { + val additionalSourceFields = sourceSchema.fields.map(_.name.toLowerCase) + .filterNot(srcField => { + targetSchema.fields.map(_.name.toLowerCase).contains(srcField) + }) + LOGGER.warn(s"source schema contains additional fields which are not present in " + + s"target schema: ${ additionalSourceFields.mkString(",") }") + } + + // check if source schema has fields whose names only differ in case sensitivity + val similarFields = sourceSchema.fields.map(_.name.toLowerCase).groupBy(a => identity(a)).map { + case (str, times) => (str, times.length) + }.toList.filter(e => e._2 > 1).map(_._1) + if (similarFields.nonEmpty) { + LOGGER.error(s"source schema has similar fields which differ only in case sensitivity: " + + s"${ similarFields.mkString(",") }") + throw new CarbonSchemaException(s"source schema has similar fields which differ" + + s" only in case sensitivity: ${ + similarFields.mkString(",") + }") + } + } + + /** + * This method takes care of handling schema evolution scenarios for CarbonStreamer class. + * Currently only addition of columns is supported. + * @param targetDs target dataset whose schema needs to be modified, if applicable + * @param srcDs incoming dataset + * @param sparkSession SparkSession + */ + def handleSchemaEvolutionForCarbonStreamer(targetDs: Dataset[Row], srcDs: Dataset[Row], + sparkSession: SparkSession): Unit = { + // read the property here + val isSchemaEnforcementEnabled = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT, + CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean + if (isSchemaEnforcementEnabled) { + verifySourceAndTargetSchemas(targetDs, srcDs) + } else { + // These meta columns should be removed before actually writing the data + val metaColumnsString = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_STREAMER_META_COLUMNS, "") + val metaCols = metaColumnsString.split(",").map(_.trim) + val srcDsWithoutMeta = if (metaCols.length > 0) srcDs.drop(metaCols: _*) + else srcDs + handleSchemaEvolution(targetDs, srcDsWithoutMeta, sparkSession, isStreamerInvolved = true) + } + } + + def verifyBackwardsCompatibility( + targetDs: Dataset[Row], + srcDs: Dataset[Row]): Unit = { + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + targetSchema.fields.foreach(tgtField => { + // check if some field is missing in source schema + if (!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase)) { + LOGGER.error(s"source schema does not contain field: ${ tgtField.name }") + throw new CarbonSchemaException(s"source schema does not contain " + + s"field: ${ tgtField.name }") + } + + // check if data type got modified for some column + val sourceField = sourceSchema.fields + .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase)) + if (!sourceField.get.dataType.equals(tgtField.dataType)) { + LOGGER.error(s"source schema has different data type for field: ${ + tgtField.name + }, source type: ${ sourceField.get.dataType }, target type: ${ tgtField.dataType }") + throw new CarbonSchemaException(s"source schema has different data type " + + s"for field: ${ tgtField.name }") + } + }) + } + + /** + * The method takes care of following schema evolution cases: + * Addition of a new column in source schema which is not present in target + * Deletion of a column in source schema which is present in target + * Data type changes for an existing column. + * The method does not take care of column renames and table renames + * @param targetDs existing target dataset + * @param srcDs incoming source dataset + * @return new target schema to write the incoming batch with + */ + def handleSchemaEvolution( + targetDs: Dataset[Row], + srcDs: Dataset[Row], + sparkSession: SparkSession, + isStreamerInvolved: Boolean = false): Unit = { + + if (isStreamerInvolved) { + verifyBackwardsCompatibility(targetDs, srcDs) + } + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + // check if any column got added in source + val addedColumns = sourceSchema.fields + .map(_.name) + .filterNot(f => targetSchema.fields.map(_.name).contains(f)) + if (addedColumns.nonEmpty) { + handleAddColumnScenario(targetDs, + srcDs.schema.fields.filter(f => addedColumns.contains(f.name)).toSeq, + sparkSession) + } + + // check if any column got deleted from source + val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan) + val targetCarbonTable = relations.head.carbonRelation.carbonTable + val partitionInfo = targetCarbonTable.getPartitionInfo + val partitionColumns = if (partitionInfo != null) partitionInfo.getColumnSchemaList.asScala + .map(_.getColumnName).toList else List[String]() + val deletedColumns = targetSchema.fields.map(_.name.toLowerCase) + .filterNot(f => { + sourceSchema.fields.map(_.name.toLowerCase).contains(f) || + partitionColumns.contains(f) + }) + if (deletedColumns.nonEmpty) { + handleDeleteColumnScenario(targetDs, deletedColumns.toList, sparkSession) + } + + val modifiedColumns = targetSchema.fields.filter(tgtField => { + val sourceField = sourceSchema.fields.find(f => f.name.equalsIgnoreCase(tgtField.name)) + if (sourceField.isDefined) !sourceField.get.dataType.equals(tgtField.dataType) else false + }) + + if (modifiedColumns.nonEmpty) { + handleDataTypeChangeScenario(targetDs, modifiedColumns.toList, sparkSession) + } + } + + /** + * This method calls CarbonAlterTableAddColumnCommand for adding new columns + * @param targetDs target dataset whose schema needs to be modified + * @param colsToAdd new columns to be added + * @param sparkSession SparkSession + */ + def handleAddColumnScenario(targetDs: Dataset[Row], colsToAdd: Seq[StructField], + sparkSession: SparkSession): Unit = { + val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan) + val targetCarbonTable = relations.head.carbonRelation.carbonTable + val fields = new CarbonSpark2SqlParser().getFields(colsToAdd) + val tableModel = CarbonParserUtil.prepareTableModel(ifNotExistPresent = false, + CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)), + targetCarbonTable.getTableName.toLowerCase, + fields.map(CarbonParserUtil.convertFieldNamesToLowercase), + Seq.empty, + scala.collection.mutable.Map.empty[String, String], + None, + isAlterFlow = true) + // targetCarbonTable.getAllDimensions.asScala.map(f => Field(column = f.getColName, + // dataType = Some(f.getDataType.getName), name = Option(f.getColName), + // children = None, )) + val alterTableAddColumnsModel = AlterTableAddColumnsModel( + CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)), + targetCarbonTable.getTableName.toLowerCase, + Map.empty[String, String], + tableModel.dimCols, + tableModel.msrCols, + tableModel.highCardinalityDims.getOrElse(Seq.empty)) + CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel).run(sparkSession) + } + + /** + * This method calls CarbonAlterTableDropColumnCommand for deleting columns + * @param targetDs target dataset whose schema needs to be modified + * @param colsToDrop columns to be dropped from carbondata table + * @param sparkSession SparkSession + */ + def handleDeleteColumnScenario(targetDs: Dataset[Row], colsToDrop: List[String], + sparkSession: SparkSession): Unit = { + val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan) + val targetCarbonTable = relations.head.carbonRelation.carbonTable + val alterTableDropColumnModel = AlterTableDropColumnModel( + CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)), + targetCarbonTable.getTableName.toLowerCase, + colsToDrop.map(_.toLowerCase)) + CarbonAlterTableDropColumnCommand(alterTableDropColumnModel).run(sparkSession) + } + + /** + * This method calls CarbonAlterTableColRenameDataTypeChangeCommand for handling data type changes + * @param targetDs target dataset whose schema needs to be modified + * @param modifiedCols columns with data type changes + * @param sparkSession SparkSession + */ + def handleDataTypeChangeScenario(targetDs: Dataset[Row], modifiedCols: List[StructField], + sparkSession: SparkSession): Unit = { + val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan) + val targetCarbonTable = relations.head.carbonRelation.carbonTable + + // need to call the command one by one for each modified column + modifiedCols.foreach(col => { + val values = col.dataType match { + case d: DecimalType => Some(List((d.precision, d.scale))) + case _ => None + } + val dataTypeInfo = CarbonParserUtil.parseColumn(col.name, col.dataType, values) + + val alterTableColRenameAndDataTypeChangeModel = + AlterTableDataTypeChangeModel( + dataTypeInfo, + Option(targetCarbonTable.getDatabaseName.toLowerCase), + targetCarbonTable.getTableName.toLowerCase, + col.name.toLowerCase, + col.name.toLowerCase, + isColumnRename = false, + Option.empty) + + CarbonAlterTableColRenameDataTypeChangeCommand( + alterTableColRenameAndDataTypeChangeModel + ).run(sparkSession) + }) + } + + def deduplicateBeforeWriting( + srcDs: Dataset[Row], + targetDs: Dataset[Row], + sparkSession: SparkSession, + srcAlias: String, + targetAlias: String, + keyColumn: String, + orderingField: String, + targetCarbonTable: CarbonTable): Dataset[Row] = { + val properties = CarbonProperties.getInstance() + val filterDupes = properties + .getProperty(CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE, + CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE_DEFAULT).toBoolean + val combineBeforeUpsert = properties + .getProperty(CarbonCommonConstants.CARBON_STREAMER_UPSERT_DEDUPLICATE, + CarbonCommonConstants.CARBON_STREAMER_UPSERT_DEDUPLICATE_DEFAULT).toBoolean + var dedupedDataset: Dataset[Row] = srcDs + if (combineBeforeUpsert) { + dedupedDataset = deduplicateAgainstIncomingDataset(srcDs, sparkSession, srcAlias, keyColumn, + orderingField, targetCarbonTable) + } + if (filterDupes) { + dedupedDataset = deduplicateAgainstExistingDataset(dedupedDataset, targetDs, + srcAlias, targetAlias, keyColumn) + } + dedupedDataset.show() + dedupedDataset + } + + def deduplicateAgainstIncomingDataset( + srcDs: Dataset[Row], + sparkSession: SparkSession, + srcAlias: String, + keyColumn: String, + orderingField: String, + table: CarbonTable): Dataset[Row] = { + if (orderingField.equals(CarbonCommonConstants.CARBON_STREAMER_SOURCE_ORDERING_FIELD_DEFAULT)) { + return srcDs + } + val schema = srcDs.schema + val carbonKeyColumn = table.getColumnByName(keyColumn) + val keyColumnDataType = getCarbonDataType(keyColumn, srcDs) + val orderingFieldDataType = getCarbonDataType(orderingField, srcDs) + val isPrimitiveAndNotDate = DataTypeUtil.isPrimitiveColumn(orderingFieldDataType) && + (orderingFieldDataType != DataTypes.DATE) + val comparator = getComparator(orderingFieldDataType) + val rdd = srcDs.rdd + val dedupedRDD: RDD[Row] = rdd.map{row => + val index = row.fieldIndex(keyColumn) + val rowKey = getRowKey(row, index, carbonKeyColumn, isPrimitiveAndNotDate, keyColumnDataType) + (rowKey, row) + }.reduceByKey{(row1, row2) => + val orderingValue1 = row1.getAs(orderingField).asInstanceOf[Any] + val orderingValue2 = row2.getAs(orderingField).asInstanceOf[Any] + if (orderingFieldDataType.equals(DataTypes.STRING)) { + if (orderingValue1 == null) { + row2 + } else if (orderingValue2 == null) { + row1 + } else { + if (ByteUtil.UnsafeComparer.INSTANCE + .compareTo(orderingValue1.toString + .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)), + orderingValue2.toString + .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))) >= 0) { + row1 + } else { + row2 + } + } + } else { + if (comparator.compare(orderingValue1, orderingValue2) >= 0) { + row1 + } else { + row2 + } + } + }.map(_._2) + sparkSession.createDataFrame(dedupedRDD, schema).alias(srcAlias) + } + + def getComparator( + orderingFieldDataType: CarbonDataType + ): SerializableComparator = { + val isPrimitiveAndNotDate = DataTypeUtil.isPrimitiveColumn(orderingFieldDataType) && + (orderingFieldDataType != DataTypes.DATE) + if (isPrimitiveAndNotDate) { + Comparator.getComparator(orderingFieldDataType) + } else if (orderingFieldDataType == DataTypes.STRING) { + null + } else { + Comparator.getComparatorByDataTypeForMeasure(orderingFieldDataType) + } + } + + def getRowKey( + row: Row, + index: Integer, + carbonKeyColumn: CarbonColumn, + isPrimitiveAndNotDate: Boolean, + keyColumnDataType: CarbonDataType + ): AnyRef = { + if (!row.isNullAt(index)) { + row.getAs(index).toString + } else { + val value: Long = 0 + if (carbonKeyColumn.isDimension) { + if (isPrimitiveAndNotDate) { + CarbonCommonConstants.EMPTY_BYTE_ARRAY + } else { + CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY + } + } else { + val nullValueForMeasure = if ((keyColumnDataType eq DataTypes.BOOLEAN) || Review comment: can replace this with CASE MATCH ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala ########## @@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil { columnMinMaxInBlocklet.asScala } } + + /** + * This method verifies source and target schemas for the following: + * If additional columns are present in source schema as compared to target, simply ignore them. + * If some columns are missing in source schema as compared to target schema, exception is thrown. + * If data type of some column differs in source and target schemas, exception is thrown. + * If source schema has multiple columns whose names differ only in case sensitivity, exception + * is thrown. + * @param targetDs target carbondata table + * @param srcDs source/incoming data + */ + def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: Dataset[Row]): Unit = { + LOGGER.info("schema enforcement is enabled. Source and target schemas will be verified") + // get the source and target dataset schema + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + targetSchema.fields.foreach(tgtField => { + // check if some field is missing in source schema + if (!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase)) { + LOGGER.error(s"source schema does not contain field: ${ tgtField.name }") + throw new CarbonSchemaException(s"source schema does not contain " + + s"field: ${ tgtField.name }") + } + + // check if data type got modified for some column + val sourceField = sourceSchema.fields + .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase)) + if (!sourceField.get.dataType.equals(tgtField.dataType)) { + LOGGER.error(s"source schema has different data type for field: ${ + tgtField.name + }, source type: ${ sourceField.get.dataType }, target type: ${ tgtField.dataType }") + throw new CarbonSchemaException(s"source schema has different data type " + + s"for field: ${ tgtField.name }") + } + }) + + // check if some additional column got added in source schema + if (sourceSchema.fields.length > targetSchema.fields.length) { + val additionalSourceFields = sourceSchema.fields.map(_.name.toLowerCase) + .filterNot(srcField => { + targetSchema.fields.map(_.name.toLowerCase).contains(srcField) + }) + LOGGER.warn(s"source schema contains additional fields which are not present in " + + s"target schema: ${ additionalSourceFields.mkString(",") }") + } + + // check if source schema has fields whose names only differ in case sensitivity + val similarFields = sourceSchema.fields.map(_.name.toLowerCase).groupBy(a => identity(a)).map { + case (str, times) => (str, times.length) + }.toList.filter(e => e._2 > 1).map(_._1) + if (similarFields.nonEmpty) { + LOGGER.error(s"source schema has similar fields which differ only in case sensitivity: " + + s"${ similarFields.mkString(",") }") + throw new CarbonSchemaException(s"source schema has similar fields which differ" + + s" only in case sensitivity: ${ + similarFields.mkString(",") + }") + } + } + + /** + * This method takes care of handling schema evolution scenarios for CarbonStreamer class. + * Currently only addition of columns is supported. + * @param targetDs target dataset whose schema needs to be modified, if applicable + * @param srcDs incoming dataset + * @param sparkSession SparkSession + */ + def handleSchemaEvolutionForCarbonStreamer(targetDs: Dataset[Row], srcDs: Dataset[Row], + sparkSession: SparkSession): Unit = { + // read the property here + val isSchemaEnforcementEnabled = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT, + CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean + if (isSchemaEnforcementEnabled) { + verifySourceAndTargetSchemas(targetDs, srcDs) + } else { + // These meta columns should be removed before actually writing the data + val metaColumnsString = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_STREAMER_META_COLUMNS, "") + val metaCols = metaColumnsString.split(",").map(_.trim) + val srcDsWithoutMeta = if (metaCols.length > 0) srcDs.drop(metaCols: _*) + else srcDs + handleSchemaEvolution(targetDs, srcDsWithoutMeta, sparkSession, isStreamerInvolved = true) + } + } + + def verifyBackwardsCompatibility( + targetDs: Dataset[Row], + srcDs: Dataset[Row]): Unit = { + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + targetSchema.fields.foreach(tgtField => { + // check if some field is missing in source schema + if (!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase)) { + LOGGER.error(s"source schema does not contain field: ${ tgtField.name }") + throw new CarbonSchemaException(s"source schema does not contain " + + s"field: ${ tgtField.name }") + } + + // check if data type got modified for some column + val sourceField = sourceSchema.fields + .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase)) + if (!sourceField.get.dataType.equals(tgtField.dataType)) { + LOGGER.error(s"source schema has different data type for field: ${ + tgtField.name + }, source type: ${ sourceField.get.dataType }, target type: ${ tgtField.dataType }") + throw new CarbonSchemaException(s"source schema has different data type " + + s"for field: ${ tgtField.name }") + } + }) + } + + /** + * The method takes care of following schema evolution cases: + * Addition of a new column in source schema which is not present in target + * Deletion of a column in source schema which is present in target + * Data type changes for an existing column. + * The method does not take care of column renames and table renames + * @param targetDs existing target dataset + * @param srcDs incoming source dataset + * @return new target schema to write the incoming batch with + */ + def handleSchemaEvolution( + targetDs: Dataset[Row], + srcDs: Dataset[Row], + sparkSession: SparkSession, + isStreamerInvolved: Boolean = false): Unit = { + + if (isStreamerInvolved) { + verifyBackwardsCompatibility(targetDs, srcDs) + } + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + // check if any column got added in source + val addedColumns = sourceSchema.fields + .map(_.name) + .filterNot(f => targetSchema.fields.map(_.name).contains(f)) + if (addedColumns.nonEmpty) { + handleAddColumnScenario(targetDs, + srcDs.schema.fields.filter(f => addedColumns.contains(f.name)).toSeq, + sparkSession) + } + + // check if any column got deleted from source + val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan) + val targetCarbonTable = relations.head.carbonRelation.carbonTable + val partitionInfo = targetCarbonTable.getPartitionInfo + val partitionColumns = if (partitionInfo != null) partitionInfo.getColumnSchemaList.asScala + .map(_.getColumnName).toList else List[String]() + val deletedColumns = targetSchema.fields.map(_.name.toLowerCase) + .filterNot(f => { + sourceSchema.fields.map(_.name.toLowerCase).contains(f) || + partitionColumns.contains(f) + }) + if (deletedColumns.nonEmpty) { + handleDeleteColumnScenario(targetDs, deletedColumns.toList, sparkSession) + } + + val modifiedColumns = targetSchema.fields.filter(tgtField => { + val sourceField = sourceSchema.fields.find(f => f.name.equalsIgnoreCase(tgtField.name)) + if (sourceField.isDefined) !sourceField.get.dataType.equals(tgtField.dataType) else false + }) + + if (modifiedColumns.nonEmpty) { + handleDataTypeChangeScenario(targetDs, modifiedColumns.toList, sparkSession) + } + } + + /** + * This method calls CarbonAlterTableAddColumnCommand for adding new columns + * @param targetDs target dataset whose schema needs to be modified + * @param colsToAdd new columns to be added + * @param sparkSession SparkSession + */ + def handleAddColumnScenario(targetDs: Dataset[Row], colsToAdd: Seq[StructField], + sparkSession: SparkSession): Unit = { + val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan) + val targetCarbonTable = relations.head.carbonRelation.carbonTable + val fields = new CarbonSpark2SqlParser().getFields(colsToAdd) + val tableModel = CarbonParserUtil.prepareTableModel(ifNotExistPresent = false, + CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)), + targetCarbonTable.getTableName.toLowerCase, + fields.map(CarbonParserUtil.convertFieldNamesToLowercase), + Seq.empty, + scala.collection.mutable.Map.empty[String, String], + None, + isAlterFlow = true) + // targetCarbonTable.getAllDimensions.asScala.map(f => Field(column = f.getColName, + // dataType = Some(f.getDataType.getName), name = Option(f.getColName), + // children = None, )) + val alterTableAddColumnsModel = AlterTableAddColumnsModel( + CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)), + targetCarbonTable.getTableName.toLowerCase, + Map.empty[String, String], + tableModel.dimCols, + tableModel.msrCols, + tableModel.highCardinalityDims.getOrElse(Seq.empty)) + CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel).run(sparkSession) + } + + /** + * This method calls CarbonAlterTableDropColumnCommand for deleting columns + * @param targetDs target dataset whose schema needs to be modified + * @param colsToDrop columns to be dropped from carbondata table + * @param sparkSession SparkSession + */ + def handleDeleteColumnScenario(targetDs: Dataset[Row], colsToDrop: List[String], + sparkSession: SparkSession): Unit = { + val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan) + val targetCarbonTable = relations.head.carbonRelation.carbonTable + val alterTableDropColumnModel = AlterTableDropColumnModel( + CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)), + targetCarbonTable.getTableName.toLowerCase, + colsToDrop.map(_.toLowerCase)) + CarbonAlterTableDropColumnCommand(alterTableDropColumnModel).run(sparkSession) + } + + /** + * This method calls CarbonAlterTableColRenameDataTypeChangeCommand for handling data type changes + * @param targetDs target dataset whose schema needs to be modified + * @param modifiedCols columns with data type changes + * @param sparkSession SparkSession + */ + def handleDataTypeChangeScenario(targetDs: Dataset[Row], modifiedCols: List[StructField], + sparkSession: SparkSession): Unit = { + val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan) + val targetCarbonTable = relations.head.carbonRelation.carbonTable + + // need to call the command one by one for each modified column + modifiedCols.foreach(col => { + val values = col.dataType match { + case d: DecimalType => Some(List((d.precision, d.scale))) + case _ => None + } + val dataTypeInfo = CarbonParserUtil.parseColumn(col.name, col.dataType, values) + + val alterTableColRenameAndDataTypeChangeModel = + AlterTableDataTypeChangeModel( + dataTypeInfo, + Option(targetCarbonTable.getDatabaseName.toLowerCase), + targetCarbonTable.getTableName.toLowerCase, + col.name.toLowerCase, + col.name.toLowerCase, + isColumnRename = false, + Option.empty) + + CarbonAlterTableColRenameDataTypeChangeCommand( + alterTableColRenameAndDataTypeChangeModel + ).run(sparkSession) + }) + } + + def deduplicateBeforeWriting( + srcDs: Dataset[Row], + targetDs: Dataset[Row], + sparkSession: SparkSession, + srcAlias: String, + targetAlias: String, + keyColumn: String, + orderingField: String, + targetCarbonTable: CarbonTable): Dataset[Row] = { + val properties = CarbonProperties.getInstance() + val filterDupes = properties + .getProperty(CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE, + CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE_DEFAULT).toBoolean + val combineBeforeUpsert = properties + .getProperty(CarbonCommonConstants.CARBON_STREAMER_UPSERT_DEDUPLICATE, + CarbonCommonConstants.CARBON_STREAMER_UPSERT_DEDUPLICATE_DEFAULT).toBoolean + var dedupedDataset: Dataset[Row] = srcDs + if (combineBeforeUpsert) { + dedupedDataset = deduplicateAgainstIncomingDataset(srcDs, sparkSession, srcAlias, keyColumn, + orderingField, targetCarbonTable) + } + if (filterDupes) { + dedupedDataset = deduplicateAgainstExistingDataset(dedupedDataset, targetDs, + srcAlias, targetAlias, keyColumn) + } + dedupedDataset.show() + dedupedDataset + } + + def deduplicateAgainstIncomingDataset( + srcDs: Dataset[Row], + sparkSession: SparkSession, + srcAlias: String, + keyColumn: String, + orderingField: String, + table: CarbonTable): Dataset[Row] = { + if (orderingField.equals(CarbonCommonConstants.CARBON_STREAMER_SOURCE_ORDERING_FIELD_DEFAULT)) { + return srcDs + } + val schema = srcDs.schema + val carbonKeyColumn = table.getColumnByName(keyColumn) + val keyColumnDataType = getCarbonDataType(keyColumn, srcDs) + val orderingFieldDataType = getCarbonDataType(orderingField, srcDs) + val isPrimitiveAndNotDate = DataTypeUtil.isPrimitiveColumn(orderingFieldDataType) && + (orderingFieldDataType != DataTypes.DATE) + val comparator = getComparator(orderingFieldDataType) + val rdd = srcDs.rdd + val dedupedRDD: RDD[Row] = rdd.map{row => + val index = row.fieldIndex(keyColumn) + val rowKey = getRowKey(row, index, carbonKeyColumn, isPrimitiveAndNotDate, keyColumnDataType) + (rowKey, row) + }.reduceByKey{(row1, row2) => + val orderingValue1 = row1.getAs(orderingField).asInstanceOf[Any] + val orderingValue2 = row2.getAs(orderingField).asInstanceOf[Any] + if (orderingFieldDataType.equals(DataTypes.STRING)) { + if (orderingValue1 == null) { + row2 + } else if (orderingValue2 == null) { + row1 + } else { + if (ByteUtil.UnsafeComparer.INSTANCE + .compareTo(orderingValue1.toString + .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)), + orderingValue2.toString + .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))) >= 0) { + row1 + } else { + row2 + } + } + } else { + if (comparator.compare(orderingValue1, orderingValue2) >= 0) { + row1 + } else { + row2 + } + } + }.map(_._2) + sparkSession.createDataFrame(dedupedRDD, schema).alias(srcAlias) + } + + def getComparator( + orderingFieldDataType: CarbonDataType + ): SerializableComparator = { + val isPrimitiveAndNotDate = DataTypeUtil.isPrimitiveColumn(orderingFieldDataType) && Review comment: 1. I think, this method itself not needed. Comparator.getComparator can be directly called. For string type, ByteArraySerializableComparator can be used. 2. Please refactor the caller method also 3. For DATE type, might throw IllegalArgumentException. Please check and handle ########## File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ########## @@ -1215,6 +1215,17 @@ private CarbonCommonConstants() { public static final String CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT_DEFAULT = "false"; + /** + * This flag decides if table schema needs to change as per the incoming batch schema. Review comment: can move/group CDC related properties in same place ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala ########## @@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil { columnMinMaxInBlocklet.asScala } } + + /** + * This method verifies source and target schemas for the following: + * If additional columns are present in source schema as compared to target, simply ignore them. + * If some columns are missing in source schema as compared to target schema, exception is thrown. + * If data type of some column differs in source and target schemas, exception is thrown. + * If source schema has multiple columns whose names differ only in case sensitivity, exception + * is thrown. + * @param targetDs target carbondata table + * @param srcDs source/incoming data + */ + def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: Dataset[Row]): Unit = { + LOGGER.info("schema enforcement is enabled. Source and target schemas will be verified") + // get the source and target dataset schema + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + targetSchema.fields.foreach(tgtField => { + // check if some field is missing in source schema + if (!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase)) { Review comment: This check is not needed. Can move sourceField variable before this check and replace this check with sourceField.isempty ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala ########## @@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil { columnMinMaxInBlocklet.asScala } } + + /** + * This method verifies source and target schemas for the following: + * If additional columns are present in source schema as compared to target, simply ignore them. + * If some columns are missing in source schema as compared to target schema, exception is thrown. + * If data type of some column differs in source and target schemas, exception is thrown. + * If source schema has multiple columns whose names differ only in case sensitivity, exception + * is thrown. + * @param targetDs target carbondata table + * @param srcDs source/incoming data + */ + def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: Dataset[Row]): Unit = { + LOGGER.info("schema enforcement is enabled. Source and target schemas will be verified") + // get the source and target dataset schema + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + targetSchema.fields.foreach(tgtField => { + // check if some field is missing in source schema + if (!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase)) { + LOGGER.error(s"source schema does not contain field: ${ tgtField.name }") + throw new CarbonSchemaException(s"source schema does not contain " + + s"field: ${ tgtField.name }") + } + + // check if data type got modified for some column + val sourceField = sourceSchema.fields + .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase)) + if (!sourceField.get.dataType.equals(tgtField.dataType)) { + LOGGER.error(s"source schema has different data type for field: ${ + tgtField.name + }, source type: ${ sourceField.get.dataType }, target type: ${ tgtField.dataType }") + throw new CarbonSchemaException(s"source schema has different data type " + + s"for field: ${ tgtField.name }") + } + }) + + // check if some additional column got added in source schema + if (sourceSchema.fields.length > targetSchema.fields.length) { + val additionalSourceFields = sourceSchema.fields.map(_.name.toLowerCase) + .filterNot(srcField => { + targetSchema.fields.map(_.name.toLowerCase).contains(srcField) + }) + LOGGER.warn(s"source schema contains additional fields which are not present in " + + s"target schema: ${ additionalSourceFields.mkString(",") }") + } + + // check if source schema has fields whose names only differ in case sensitivity + val similarFields = sourceSchema.fields.map(_.name.toLowerCase).groupBy(a => identity(a)).map { + case (str, times) => (str, times.length) + }.toList.filter(e => e._2 > 1).map(_._1) + if (similarFields.nonEmpty) { + LOGGER.error(s"source schema has similar fields which differ only in case sensitivity: " + + s"${ similarFields.mkString(",") }") + throw new CarbonSchemaException(s"source schema has similar fields which differ" + + s" only in case sensitivity: ${ + similarFields.mkString(",") + }") + } + } + + /** + * This method takes care of handling schema evolution scenarios for CarbonStreamer class. + * Currently only addition of columns is supported. + * @param targetDs target dataset whose schema needs to be modified, if applicable + * @param srcDs incoming dataset + * @param sparkSession SparkSession + */ + def handleSchemaEvolutionForCarbonStreamer(targetDs: Dataset[Row], srcDs: Dataset[Row], + sparkSession: SparkSession): Unit = { + // read the property here + val isSchemaEnforcementEnabled = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT, + CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean + if (isSchemaEnforcementEnabled) { + verifySourceAndTargetSchemas(targetDs, srcDs) + } else { + // These meta columns should be removed before actually writing the data + val metaColumnsString = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_STREAMER_META_COLUMNS, "") + val metaCols = metaColumnsString.split(",").map(_.trim) + val srcDsWithoutMeta = if (metaCols.length > 0) srcDs.drop(metaCols: _*) + else srcDs + handleSchemaEvolution(targetDs, srcDsWithoutMeta, sparkSession, isStreamerInvolved = true) + } + } + + def verifyBackwardsCompatibility( + targetDs: Dataset[Row], + srcDs: Dataset[Row]): Unit = { + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + targetSchema.fields.foreach(tgtField => { Review comment: same code is present in verifySourceAndTargetSchemas method. Can move this code to common method and reuse ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala ########## @@ -16,31 +16,41 @@ */ package org.apache.spark.sql.execution.command.mutation.merge +import java.nio.charset.Charset import java.util import scala.collection.JavaConverters._ import scala.collection.mutable +import org.apache.spark.rdd.RDD import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession} -import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil +import org.apache.spark.sql.catalyst.{CarbonParserUtil, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.EqualTo import org.apache.spark.sql.execution.CastExpressionOptimization +import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel, AlterTableDropColumnModel} +import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableColRenameDataTypeChangeCommand, CarbonAlterTableDropColumnCommand} +import org.apache.spark.sql.functions.expr import org.apache.spark.sql.optimizer.CarbonFilters -import org.apache.spark.sql.types.DateType +import org.apache.spark.sql.parser.CarbonSpark2SqlParser +import org.apache.spark.sql.types.{DateType, DecimalType, StructField} +import org.apache.carbondata.common.exceptions.sql.{CarbonSchemaException, MalformedCarbonCommandException} Review comment: please remove unused import - MalformedCarbonCommandException ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala ########## @@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil { columnMinMaxInBlocklet.asScala } } + + /** + * This method verifies source and target schemas for the following: + * If additional columns are present in source schema as compared to target, simply ignore them. + * If some columns are missing in source schema as compared to target schema, exception is thrown. + * If data type of some column differs in source and target schemas, exception is thrown. + * If source schema has multiple columns whose names differ only in case sensitivity, exception + * is thrown. + * @param targetDs target carbondata table + * @param srcDs source/incoming data + */ + def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: Dataset[Row]): Unit = { + LOGGER.info("schema enforcement is enabled. Source and target schemas will be verified") + // get the source and target dataset schema + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + targetSchema.fields.foreach(tgtField => { + // check if some field is missing in source schema + if (!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase)) { + LOGGER.error(s"source schema does not contain field: ${ tgtField.name }") + throw new CarbonSchemaException(s"source schema does not contain " + + s"field: ${ tgtField.name }") Review comment: can move this to previous line ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala ########## @@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil { columnMinMaxInBlocklet.asScala } } + + /** + * This method verifies source and target schemas for the following: + * If additional columns are present in source schema as compared to target, simply ignore them. + * If some columns are missing in source schema as compared to target schema, exception is thrown. + * If data type of some column differs in source and target schemas, exception is thrown. + * If source schema has multiple columns whose names differ only in case sensitivity, exception + * is thrown. + * @param targetDs target carbondata table + * @param srcDs source/incoming data + */ + def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: Dataset[Row]): Unit = { + LOGGER.info("schema enforcement is enabled. Source and target schemas will be verified") + // get the source and target dataset schema + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + targetSchema.fields.foreach(tgtField => { + // check if some field is missing in source schema + if (!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase)) { + LOGGER.error(s"source schema does not contain field: ${ tgtField.name }") + throw new CarbonSchemaException(s"source schema does not contain " + + s"field: ${ tgtField.name }") + } + + // check if data type got modified for some column + val sourceField = sourceSchema.fields + .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase)) Review comment: tgtField.name.toLowerCase - converting to lowercase is not needed, since equalsIgnoreCase check is used here ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala ########## @@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil { columnMinMaxInBlocklet.asScala } } + + /** + * This method verifies source and target schemas for the following: + * If additional columns are present in source schema as compared to target, simply ignore them. + * If some columns are missing in source schema as compared to target schema, exception is thrown. + * If data type of some column differs in source and target schemas, exception is thrown. + * If source schema has multiple columns whose names differ only in case sensitivity, exception + * is thrown. + * @param targetDs target carbondata table + * @param srcDs source/incoming data + */ + def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: Dataset[Row]): Unit = { + LOGGER.info("schema enforcement is enabled. Source and target schemas will be verified") + // get the source and target dataset schema + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + targetSchema.fields.foreach(tgtField => { + // check if some field is missing in source schema + if (!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase)) { + LOGGER.error(s"source schema does not contain field: ${ tgtField.name }") + throw new CarbonSchemaException(s"source schema does not contain " + + s"field: ${ tgtField.name }") + } + + // check if data type got modified for some column + val sourceField = sourceSchema.fields + .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase)) + if (!sourceField.get.dataType.equals(tgtField.dataType)) { + LOGGER.error(s"source schema has different data type for field: ${ + tgtField.name + }, source type: ${ sourceField.get.dataType }, target type: ${ tgtField.dataType }") + throw new CarbonSchemaException(s"source schema has different data type " + + s"for field: ${ tgtField.name }") + } + }) + + // check if some additional column got added in source schema + if (sourceSchema.fields.length > targetSchema.fields.length) { + val additionalSourceFields = sourceSchema.fields.map(_.name.toLowerCase) + .filterNot(srcField => { + targetSchema.fields.map(_.name.toLowerCase).contains(srcField) + }) + LOGGER.warn(s"source schema contains additional fields which are not present in " + + s"target schema: ${ additionalSourceFields.mkString(",") }") + } + + // check if source schema has fields whose names only differ in case sensitivity + val similarFields = sourceSchema.fields.map(_.name.toLowerCase).groupBy(a => identity(a)).map { + case (str, times) => (str, times.length) + }.toList.filter(e => e._2 > 1).map(_._1) + if (similarFields.nonEmpty) { + LOGGER.error(s"source schema has similar fields which differ only in case sensitivity: " + + s"${ similarFields.mkString(",") }") + throw new CarbonSchemaException(s"source schema has similar fields which differ" + + s" only in case sensitivity: ${ + similarFields.mkString(",") + }") + } + } + + /** + * This method takes care of handling schema evolution scenarios for CarbonStreamer class. + * Currently only addition of columns is supported. + * @param targetDs target dataset whose schema needs to be modified, if applicable + * @param srcDs incoming dataset + * @param sparkSession SparkSession + */ + def handleSchemaEvolutionForCarbonStreamer(targetDs: Dataset[Row], srcDs: Dataset[Row], + sparkSession: SparkSession): Unit = { + // read the property here + val isSchemaEnforcementEnabled = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT, + CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean + if (isSchemaEnforcementEnabled) { + verifySourceAndTargetSchemas(targetDs, srcDs) + } else { + // These meta columns should be removed before actually writing the data + val metaColumnsString = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_STREAMER_META_COLUMNS, "") + val metaCols = metaColumnsString.split(",").map(_.trim) + val srcDsWithoutMeta = if (metaCols.length > 0) srcDs.drop(metaCols: _*) + else srcDs + handleSchemaEvolution(targetDs, srcDsWithoutMeta, sparkSession, isStreamerInvolved = true) + } + } + + def verifyBackwardsCompatibility( + targetDs: Dataset[Row], + srcDs: Dataset[Row]): Unit = { + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + targetSchema.fields.foreach(tgtField => { + // check if some field is missing in source schema + if (!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase)) { + LOGGER.error(s"source schema does not contain field: ${ tgtField.name }") + throw new CarbonSchemaException(s"source schema does not contain " + + s"field: ${ tgtField.name }") + } + + // check if data type got modified for some column + val sourceField = sourceSchema.fields + .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase)) + if (!sourceField.get.dataType.equals(tgtField.dataType)) { + LOGGER.error(s"source schema has different data type for field: ${ + tgtField.name + }, source type: ${ sourceField.get.dataType }, target type: ${ tgtField.dataType }") + throw new CarbonSchemaException(s"source schema has different data type " + + s"for field: ${ tgtField.name }") + } + }) + } + + /** + * The method takes care of following schema evolution cases: + * Addition of a new column in source schema which is not present in target + * Deletion of a column in source schema which is present in target + * Data type changes for an existing column. + * The method does not take care of column renames and table renames + * @param targetDs existing target dataset + * @param srcDs incoming source dataset + * @return new target schema to write the incoming batch with + */ + def handleSchemaEvolution( + targetDs: Dataset[Row], + srcDs: Dataset[Row], + sparkSession: SparkSession, + isStreamerInvolved: Boolean = false): Unit = { + + if (isStreamerInvolved) { + verifyBackwardsCompatibility(targetDs, srcDs) + } + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + // check if any column got added in source + val addedColumns = sourceSchema.fields + .map(_.name) + .filterNot(f => targetSchema.fields.map(_.name).contains(f)) + if (addedColumns.nonEmpty) { + handleAddColumnScenario(targetDs, + srcDs.schema.fields.filter(f => addedColumns.contains(f.name)).toSeq, + sparkSession) + } + + // check if any column got deleted from source + val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan) + val targetCarbonTable = relations.head.carbonRelation.carbonTable + val partitionInfo = targetCarbonTable.getPartitionInfo + val partitionColumns = if (partitionInfo != null) partitionInfo.getColumnSchemaList.asScala + .map(_.getColumnName).toList else List[String]() + val deletedColumns = targetSchema.fields.map(_.name.toLowerCase) + .filterNot(f => { + sourceSchema.fields.map(_.name.toLowerCase).contains(f) || + partitionColumns.contains(f) + }) + if (deletedColumns.nonEmpty) { + handleDeleteColumnScenario(targetDs, deletedColumns.toList, sparkSession) + } + + val modifiedColumns = targetSchema.fields.filter(tgtField => { + val sourceField = sourceSchema.fields.find(f => f.name.equalsIgnoreCase(tgtField.name)) + if (sourceField.isDefined) !sourceField.get.dataType.equals(tgtField.dataType) else false + }) + + if (modifiedColumns.nonEmpty) { + handleDataTypeChangeScenario(targetDs, modifiedColumns.toList, sparkSession) + } + } + + /** + * This method calls CarbonAlterTableAddColumnCommand for adding new columns + * @param targetDs target dataset whose schema needs to be modified + * @param colsToAdd new columns to be added + * @param sparkSession SparkSession + */ + def handleAddColumnScenario(targetDs: Dataset[Row], colsToAdd: Seq[StructField], + sparkSession: SparkSession): Unit = { + val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan) + val targetCarbonTable = relations.head.carbonRelation.carbonTable + val fields = new CarbonSpark2SqlParser().getFields(colsToAdd) + val tableModel = CarbonParserUtil.prepareTableModel(ifNotExistPresent = false, Review comment: same code is available in DDLHelper.addColumns. can move common code to new method and reuse ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala ########## @@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil { columnMinMaxInBlocklet.asScala } } + + /** + * This method verifies source and target schemas for the following: + * If additional columns are present in source schema as compared to target, simply ignore them. + * If some columns are missing in source schema as compared to target schema, exception is thrown. + * If data type of some column differs in source and target schemas, exception is thrown. + * If source schema has multiple columns whose names differ only in case sensitivity, exception + * is thrown. + * @param targetDs target carbondata table + * @param srcDs source/incoming data + */ + def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: Dataset[Row]): Unit = { + LOGGER.info("schema enforcement is enabled. Source and target schemas will be verified") + // get the source and target dataset schema + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + targetSchema.fields.foreach(tgtField => { + // check if some field is missing in source schema + if (!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase)) { + LOGGER.error(s"source schema does not contain field: ${ tgtField.name }") + throw new CarbonSchemaException(s"source schema does not contain " + + s"field: ${ tgtField.name }") + } + + // check if data type got modified for some column + val sourceField = sourceSchema.fields + .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase)) + if (!sourceField.get.dataType.equals(tgtField.dataType)) { + LOGGER.error(s"source schema has different data type for field: ${ + tgtField.name + }, source type: ${ sourceField.get.dataType }, target type: ${ tgtField.dataType }") + throw new CarbonSchemaException(s"source schema has different data type " + + s"for field: ${ tgtField.name }") + } + }) + + // check if some additional column got added in source schema + if (sourceSchema.fields.length > targetSchema.fields.length) { + val additionalSourceFields = sourceSchema.fields.map(_.name.toLowerCase) + .filterNot(srcField => { + targetSchema.fields.map(_.name.toLowerCase).contains(srcField) + }) + LOGGER.warn(s"source schema contains additional fields which are not present in " + + s"target schema: ${ additionalSourceFields.mkString(",") }") + } + + // check if source schema has fields whose names only differ in case sensitivity + val similarFields = sourceSchema.fields.map(_.name.toLowerCase).groupBy(a => identity(a)).map { + case (str, times) => (str, times.length) + }.toList.filter(e => e._2 > 1).map(_._1) + if (similarFields.nonEmpty) { + LOGGER.error(s"source schema has similar fields which differ only in case sensitivity: " + + s"${ similarFields.mkString(",") }") + throw new CarbonSchemaException(s"source schema has similar fields which differ" + + s" only in case sensitivity: ${ + similarFields.mkString(",") + }") + } + } + + /** + * This method takes care of handling schema evolution scenarios for CarbonStreamer class. + * Currently only addition of columns is supported. + * @param targetDs target dataset whose schema needs to be modified, if applicable + * @param srcDs incoming dataset + * @param sparkSession SparkSession + */ + def handleSchemaEvolutionForCarbonStreamer(targetDs: Dataset[Row], srcDs: Dataset[Row], + sparkSession: SparkSession): Unit = { + // read the property here + val isSchemaEnforcementEnabled = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT, + CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean + if (isSchemaEnforcementEnabled) { + verifySourceAndTargetSchemas(targetDs, srcDs) + } else { + // These meta columns should be removed before actually writing the data + val metaColumnsString = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_STREAMER_META_COLUMNS, "") + val metaCols = metaColumnsString.split(",").map(_.trim) + val srcDsWithoutMeta = if (metaCols.length > 0) srcDs.drop(metaCols: _*) Review comment: please add braces and format the code here ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala ########## @@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil { columnMinMaxInBlocklet.asScala } } + + /** + * This method verifies source and target schemas for the following: + * If additional columns are present in source schema as compared to target, simply ignore them. + * If some columns are missing in source schema as compared to target schema, exception is thrown. + * If data type of some column differs in source and target schemas, exception is thrown. + * If source schema has multiple columns whose names differ only in case sensitivity, exception + * is thrown. + * @param targetDs target carbondata table + * @param srcDs source/incoming data + */ + def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: Dataset[Row]): Unit = { + LOGGER.info("schema enforcement is enabled. Source and target schemas will be verified") + // get the source and target dataset schema + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + targetSchema.fields.foreach(tgtField => { + // check if some field is missing in source schema + if (!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase)) { + LOGGER.error(s"source schema does not contain field: ${ tgtField.name }") + throw new CarbonSchemaException(s"source schema does not contain " + + s"field: ${ tgtField.name }") + } + + // check if data type got modified for some column + val sourceField = sourceSchema.fields + .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase)) + if (!sourceField.get.dataType.equals(tgtField.dataType)) { + LOGGER.error(s"source schema has different data type for field: ${ + tgtField.name + }, source type: ${ sourceField.get.dataType }, target type: ${ tgtField.dataType }") + throw new CarbonSchemaException(s"source schema has different data type " + + s"for field: ${ tgtField.name }") + } + }) + + // check if some additional column got added in source schema + if (sourceSchema.fields.length > targetSchema.fields.length) { + val additionalSourceFields = sourceSchema.fields.map(_.name.toLowerCase) + .filterNot(srcField => { + targetSchema.fields.map(_.name.toLowerCase).contains(srcField) + }) + LOGGER.warn(s"source schema contains additional fields which are not present in " + + s"target schema: ${ additionalSourceFields.mkString(",") }") + } + + // check if source schema has fields whose names only differ in case sensitivity + val similarFields = sourceSchema.fields.map(_.name.toLowerCase).groupBy(a => identity(a)).map { + case (str, times) => (str, times.length) + }.toList.filter(e => e._2 > 1).map(_._1) + if (similarFields.nonEmpty) { + LOGGER.error(s"source schema has similar fields which differ only in case sensitivity: " + + s"${ similarFields.mkString(",") }") + throw new CarbonSchemaException(s"source schema has similar fields which differ" + + s" only in case sensitivity: ${ + similarFields.mkString(",") + }") + } + } + + /** + * This method takes care of handling schema evolution scenarios for CarbonStreamer class. + * Currently only addition of columns is supported. + * @param targetDs target dataset whose schema needs to be modified, if applicable + * @param srcDs incoming dataset + * @param sparkSession SparkSession + */ + def handleSchemaEvolutionForCarbonStreamer(targetDs: Dataset[Row], srcDs: Dataset[Row], + sparkSession: SparkSession): Unit = { + // read the property here + val isSchemaEnforcementEnabled = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT, + CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean + if (isSchemaEnforcementEnabled) { + verifySourceAndTargetSchemas(targetDs, srcDs) + } else { + // These meta columns should be removed before actually writing the data + val metaColumnsString = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_STREAMER_META_COLUMNS, "") + val metaCols = metaColumnsString.split(",").map(_.trim) + val srcDsWithoutMeta = if (metaCols.length > 0) srcDs.drop(metaCols: _*) + else srcDs + handleSchemaEvolution(targetDs, srcDsWithoutMeta, sparkSession, isStreamerInvolved = true) + } + } + + def verifyBackwardsCompatibility( + targetDs: Dataset[Row], + srcDs: Dataset[Row]): Unit = { + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + targetSchema.fields.foreach(tgtField => { + // check if some field is missing in source schema + if (!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase)) { + LOGGER.error(s"source schema does not contain field: ${ tgtField.name }") + throw new CarbonSchemaException(s"source schema does not contain " + + s"field: ${ tgtField.name }") + } + + // check if data type got modified for some column + val sourceField = sourceSchema.fields + .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase)) + if (!sourceField.get.dataType.equals(tgtField.dataType)) { + LOGGER.error(s"source schema has different data type for field: ${ + tgtField.name + }, source type: ${ sourceField.get.dataType }, target type: ${ tgtField.dataType }") + throw new CarbonSchemaException(s"source schema has different data type " + + s"for field: ${ tgtField.name }") + } + }) + } + + /** + * The method takes care of following schema evolution cases: + * Addition of a new column in source schema which is not present in target + * Deletion of a column in source schema which is present in target + * Data type changes for an existing column. + * The method does not take care of column renames and table renames + * @param targetDs existing target dataset + * @param srcDs incoming source dataset + * @return new target schema to write the incoming batch with + */ + def handleSchemaEvolution( + targetDs: Dataset[Row], + srcDs: Dataset[Row], + sparkSession: SparkSession, + isStreamerInvolved: Boolean = false): Unit = { + + if (isStreamerInvolved) { + verifyBackwardsCompatibility(targetDs, srcDs) + } + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + // check if any column got added in source + val addedColumns = sourceSchema.fields + .map(_.name) + .filterNot(f => targetSchema.fields.map(_.name).contains(f)) + if (addedColumns.nonEmpty) { + handleAddColumnScenario(targetDs, + srcDs.schema.fields.filter(f => addedColumns.contains(f.name)).toSeq, Review comment: ```suggestion srcDs.schema.fields.filter(f => addedColumns.contains(f.name)).toSeq, ``` ```suggestion sourceSchema.fields.filter(f => addedColumns.contains(f.name)).toSeq, ``` after this, can format to single line ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala ########## @@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil { columnMinMaxInBlocklet.asScala } } + + /** + * This method verifies source and target schemas for the following: + * If additional columns are present in source schema as compared to target, simply ignore them. + * If some columns are missing in source schema as compared to target schema, exception is thrown. + * If data type of some column differs in source and target schemas, exception is thrown. + * If source schema has multiple columns whose names differ only in case sensitivity, exception + * is thrown. + * @param targetDs target carbondata table + * @param srcDs source/incoming data + */ + def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: Dataset[Row]): Unit = { + LOGGER.info("schema enforcement is enabled. Source and target schemas will be verified") + // get the source and target dataset schema + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + targetSchema.fields.foreach(tgtField => { + // check if some field is missing in source schema + if (!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase)) { + LOGGER.error(s"source schema does not contain field: ${ tgtField.name }") + throw new CarbonSchemaException(s"source schema does not contain " + + s"field: ${ tgtField.name }") + } + + // check if data type got modified for some column + val sourceField = sourceSchema.fields + .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase)) + if (!sourceField.get.dataType.equals(tgtField.dataType)) { + LOGGER.error(s"source schema has different data type for field: ${ + tgtField.name + }, source type: ${ sourceField.get.dataType }, target type: ${ tgtField.dataType }") + throw new CarbonSchemaException(s"source schema has different data type " + + s"for field: ${ tgtField.name }") + } + }) + + // check if some additional column got added in source schema + if (sourceSchema.fields.length > targetSchema.fields.length) { + val additionalSourceFields = sourceSchema.fields.map(_.name.toLowerCase) + .filterNot(srcField => { + targetSchema.fields.map(_.name.toLowerCase).contains(srcField) + }) + LOGGER.warn(s"source schema contains additional fields which are not present in " + + s"target schema: ${ additionalSourceFields.mkString(",") }") + } + + // check if source schema has fields whose names only differ in case sensitivity + val similarFields = sourceSchema.fields.map(_.name.toLowerCase).groupBy(a => identity(a)).map { + case (str, times) => (str, times.length) + }.toList.filter(e => e._2 > 1).map(_._1) + if (similarFields.nonEmpty) { + LOGGER.error(s"source schema has similar fields which differ only in case sensitivity: " + + s"${ similarFields.mkString(",") }") + throw new CarbonSchemaException(s"source schema has similar fields which differ" + + s" only in case sensitivity: ${ + similarFields.mkString(",") + }") + } + } + + /** + * This method takes care of handling schema evolution scenarios for CarbonStreamer class. + * Currently only addition of columns is supported. + * @param targetDs target dataset whose schema needs to be modified, if applicable + * @param srcDs incoming dataset + * @param sparkSession SparkSession + */ + def handleSchemaEvolutionForCarbonStreamer(targetDs: Dataset[Row], srcDs: Dataset[Row], + sparkSession: SparkSession): Unit = { + // read the property here + val isSchemaEnforcementEnabled = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT, + CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean + if (isSchemaEnforcementEnabled) { + verifySourceAndTargetSchemas(targetDs, srcDs) + } else { + // These meta columns should be removed before actually writing the data + val metaColumnsString = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_STREAMER_META_COLUMNS, "") + val metaCols = metaColumnsString.split(",").map(_.trim) + val srcDsWithoutMeta = if (metaCols.length > 0) srcDs.drop(metaCols: _*) + else srcDs + handleSchemaEvolution(targetDs, srcDsWithoutMeta, sparkSession, isStreamerInvolved = true) + } + } + + def verifyBackwardsCompatibility( + targetDs: Dataset[Row], + srcDs: Dataset[Row]): Unit = { + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + targetSchema.fields.foreach(tgtField => { + // check if some field is missing in source schema + if (!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase)) { + LOGGER.error(s"source schema does not contain field: ${ tgtField.name }") + throw new CarbonSchemaException(s"source schema does not contain " + + s"field: ${ tgtField.name }") + } + + // check if data type got modified for some column + val sourceField = sourceSchema.fields + .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase)) + if (!sourceField.get.dataType.equals(tgtField.dataType)) { + LOGGER.error(s"source schema has different data type for field: ${ + tgtField.name + }, source type: ${ sourceField.get.dataType }, target type: ${ tgtField.dataType }") + throw new CarbonSchemaException(s"source schema has different data type " + + s"for field: ${ tgtField.name }") + } + }) + } + + /** + * The method takes care of following schema evolution cases: + * Addition of a new column in source schema which is not present in target + * Deletion of a column in source schema which is present in target + * Data type changes for an existing column. + * The method does not take care of column renames and table renames + * @param targetDs existing target dataset + * @param srcDs incoming source dataset + * @return new target schema to write the incoming batch with + */ + def handleSchemaEvolution( + targetDs: Dataset[Row], + srcDs: Dataset[Row], + sparkSession: SparkSession, + isStreamerInvolved: Boolean = false): Unit = { + + if (isStreamerInvolved) { + verifyBackwardsCompatibility(targetDs, srcDs) + } + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + // check if any column got added in source + val addedColumns = sourceSchema.fields + .map(_.name) + .filterNot(f => targetSchema.fields.map(_.name).contains(f)) + if (addedColumns.nonEmpty) { + handleAddColumnScenario(targetDs, + srcDs.schema.fields.filter(f => addedColumns.contains(f.name)).toSeq, + sparkSession) + } + + // check if any column got deleted from source + val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan) + val targetCarbonTable = relations.head.carbonRelation.carbonTable + val partitionInfo = targetCarbonTable.getPartitionInfo + val partitionColumns = if (partitionInfo != null) partitionInfo.getColumnSchemaList.asScala + .map(_.getColumnName).toList else List[String]() + val deletedColumns = targetSchema.fields.map(_.name.toLowerCase) + .filterNot(f => { + sourceSchema.fields.map(_.name.toLowerCase).contains(f) || + partitionColumns.contains(f) + }) + if (deletedColumns.nonEmpty) { + handleDeleteColumnScenario(targetDs, deletedColumns.toList, sparkSession) + } + + val modifiedColumns = targetSchema.fields.filter(tgtField => { + val sourceField = sourceSchema.fields.find(f => f.name.equalsIgnoreCase(tgtField.name)) + if (sourceField.isDefined) !sourceField.get.dataType.equals(tgtField.dataType) else false + }) + + if (modifiedColumns.nonEmpty) { + handleDataTypeChangeScenario(targetDs, modifiedColumns.toList, sparkSession) + } + } + + /** + * This method calls CarbonAlterTableAddColumnCommand for adding new columns + * @param targetDs target dataset whose schema needs to be modified + * @param colsToAdd new columns to be added + * @param sparkSession SparkSession + */ + def handleAddColumnScenario(targetDs: Dataset[Row], colsToAdd: Seq[StructField], + sparkSession: SparkSession): Unit = { + val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan) + val targetCarbonTable = relations.head.carbonRelation.carbonTable + val fields = new CarbonSpark2SqlParser().getFields(colsToAdd) + val tableModel = CarbonParserUtil.prepareTableModel(ifNotExistPresent = false, + CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)), + targetCarbonTable.getTableName.toLowerCase, + fields.map(CarbonParserUtil.convertFieldNamesToLowercase), + Seq.empty, + scala.collection.mutable.Map.empty[String, String], + None, + isAlterFlow = true) + // targetCarbonTable.getAllDimensions.asScala.map(f => Field(column = f.getColName, + // dataType = Some(f.getDataType.getName), name = Option(f.getColName), + // children = None, )) + val alterTableAddColumnsModel = AlterTableAddColumnsModel( + CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)), + targetCarbonTable.getTableName.toLowerCase, + Map.empty[String, String], + tableModel.dimCols, + tableModel.msrCols, + tableModel.highCardinalityDims.getOrElse(Seq.empty)) + CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel).run(sparkSession) + } + + /** + * This method calls CarbonAlterTableDropColumnCommand for deleting columns + * @param targetDs target dataset whose schema needs to be modified + * @param colsToDrop columns to be dropped from carbondata table + * @param sparkSession SparkSession + */ + def handleDeleteColumnScenario(targetDs: Dataset[Row], colsToDrop: List[String], + sparkSession: SparkSession): Unit = { + val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan) + val targetCarbonTable = relations.head.carbonRelation.carbonTable + val alterTableDropColumnModel = AlterTableDropColumnModel( + CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)), + targetCarbonTable.getTableName.toLowerCase, + colsToDrop.map(_.toLowerCase)) + CarbonAlterTableDropColumnCommand(alterTableDropColumnModel).run(sparkSession) + } + + /** + * This method calls CarbonAlterTableColRenameDataTypeChangeCommand for handling data type changes + * @param targetDs target dataset whose schema needs to be modified + * @param modifiedCols columns with data type changes + * @param sparkSession SparkSession + */ + def handleDataTypeChangeScenario(targetDs: Dataset[Row], modifiedCols: List[StructField], + sparkSession: SparkSession): Unit = { + val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan) + val targetCarbonTable = relations.head.carbonRelation.carbonTable + + // need to call the command one by one for each modified column + modifiedCols.foreach(col => { + val values = col.dataType match { + case d: DecimalType => Some(List((d.precision, d.scale))) + case _ => None + } + val dataTypeInfo = CarbonParserUtil.parseColumn(col.name, col.dataType, values) + + val alterTableColRenameAndDataTypeChangeModel = + AlterTableDataTypeChangeModel( + dataTypeInfo, + Option(targetCarbonTable.getDatabaseName.toLowerCase), + targetCarbonTable.getTableName.toLowerCase, + col.name.toLowerCase, + col.name.toLowerCase, + isColumnRename = false, + Option.empty) + + CarbonAlterTableColRenameDataTypeChangeCommand( + alterTableColRenameAndDataTypeChangeModel + ).run(sparkSession) + }) + } + + def deduplicateBeforeWriting( + srcDs: Dataset[Row], + targetDs: Dataset[Row], + sparkSession: SparkSession, + srcAlias: String, + targetAlias: String, + keyColumn: String, + orderingField: String, + targetCarbonTable: CarbonTable): Dataset[Row] = { + val properties = CarbonProperties.getInstance() + val filterDupes = properties + .getProperty(CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE, + CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE_DEFAULT).toBoolean + val combineBeforeUpsert = properties + .getProperty(CarbonCommonConstants.CARBON_STREAMER_UPSERT_DEDUPLICATE, + CarbonCommonConstants.CARBON_STREAMER_UPSERT_DEDUPLICATE_DEFAULT).toBoolean + var dedupedDataset: Dataset[Row] = srcDs + if (combineBeforeUpsert) { + dedupedDataset = deduplicateAgainstIncomingDataset(srcDs, sparkSession, srcAlias, keyColumn, + orderingField, targetCarbonTable) + } + if (filterDupes) { + dedupedDataset = deduplicateAgainstExistingDataset(dedupedDataset, targetDs, + srcAlias, targetAlias, keyColumn) + } + dedupedDataset.show() + dedupedDataset + } + + def deduplicateAgainstIncomingDataset( + srcDs: Dataset[Row], + sparkSession: SparkSession, + srcAlias: String, + keyColumn: String, + orderingField: String, + table: CarbonTable): Dataset[Row] = { + if (orderingField.equals(CarbonCommonConstants.CARBON_STREAMER_SOURCE_ORDERING_FIELD_DEFAULT)) { + return srcDs + } + val schema = srcDs.schema + val carbonKeyColumn = table.getColumnByName(keyColumn) + val keyColumnDataType = getCarbonDataType(keyColumn, srcDs) + val orderingFieldDataType = getCarbonDataType(orderingField, srcDs) + val isPrimitiveAndNotDate = DataTypeUtil.isPrimitiveColumn(orderingFieldDataType) && + (orderingFieldDataType != DataTypes.DATE) + val comparator = getComparator(orderingFieldDataType) + val rdd = srcDs.rdd + val dedupedRDD: RDD[Row] = rdd.map{row => Review comment: please format this code ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala ########## @@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil { columnMinMaxInBlocklet.asScala } } + + /** + * This method verifies source and target schemas for the following: + * If additional columns are present in source schema as compared to target, simply ignore them. + * If some columns are missing in source schema as compared to target schema, exception is thrown. + * If data type of some column differs in source and target schemas, exception is thrown. + * If source schema has multiple columns whose names differ only in case sensitivity, exception + * is thrown. + * @param targetDs target carbondata table + * @param srcDs source/incoming data + */ + def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: Dataset[Row]): Unit = { + LOGGER.info("schema enforcement is enabled. Source and target schemas will be verified") + // get the source and target dataset schema + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + targetSchema.fields.foreach(tgtField => { + // check if some field is missing in source schema + if (!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase)) { + LOGGER.error(s"source schema does not contain field: ${ tgtField.name }") + throw new CarbonSchemaException(s"source schema does not contain " + + s"field: ${ tgtField.name }") + } + + // check if data type got modified for some column + val sourceField = sourceSchema.fields + .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase)) + if (!sourceField.get.dataType.equals(tgtField.dataType)) { + LOGGER.error(s"source schema has different data type for field: ${ + tgtField.name + }, source type: ${ sourceField.get.dataType }, target type: ${ tgtField.dataType }") + throw new CarbonSchemaException(s"source schema has different data type " + + s"for field: ${ tgtField.name }") + } + }) + + // check if some additional column got added in source schema + if (sourceSchema.fields.length > targetSchema.fields.length) { + val additionalSourceFields = sourceSchema.fields.map(_.name.toLowerCase) + .filterNot(srcField => { + targetSchema.fields.map(_.name.toLowerCase).contains(srcField) + }) + LOGGER.warn(s"source schema contains additional fields which are not present in " + + s"target schema: ${ additionalSourceFields.mkString(",") }") + } + + // check if source schema has fields whose names only differ in case sensitivity + val similarFields = sourceSchema.fields.map(_.name.toLowerCase).groupBy(a => identity(a)).map { + case (str, times) => (str, times.length) + }.toList.filter(e => e._2 > 1).map(_._1) + if (similarFields.nonEmpty) { + LOGGER.error(s"source schema has similar fields which differ only in case sensitivity: " + + s"${ similarFields.mkString(",") }") + throw new CarbonSchemaException(s"source schema has similar fields which differ" + + s" only in case sensitivity: ${ + similarFields.mkString(",") + }") + } + } + + /** + * This method takes care of handling schema evolution scenarios for CarbonStreamer class. + * Currently only addition of columns is supported. + * @param targetDs target dataset whose schema needs to be modified, if applicable + * @param srcDs incoming dataset + * @param sparkSession SparkSession + */ + def handleSchemaEvolutionForCarbonStreamer(targetDs: Dataset[Row], srcDs: Dataset[Row], + sparkSession: SparkSession): Unit = { + // read the property here + val isSchemaEnforcementEnabled = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT, + CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean + if (isSchemaEnforcementEnabled) { + verifySourceAndTargetSchemas(targetDs, srcDs) + } else { + // These meta columns should be removed before actually writing the data + val metaColumnsString = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_STREAMER_META_COLUMNS, "") + val metaCols = metaColumnsString.split(",").map(_.trim) + val srcDsWithoutMeta = if (metaCols.length > 0) srcDs.drop(metaCols: _*) + else srcDs + handleSchemaEvolution(targetDs, srcDsWithoutMeta, sparkSession, isStreamerInvolved = true) + } + } + + def verifyBackwardsCompatibility( + targetDs: Dataset[Row], + srcDs: Dataset[Row]): Unit = { + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + targetSchema.fields.foreach(tgtField => { + // check if some field is missing in source schema + if (!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase)) { + LOGGER.error(s"source schema does not contain field: ${ tgtField.name }") + throw new CarbonSchemaException(s"source schema does not contain " + + s"field: ${ tgtField.name }") + } + + // check if data type got modified for some column + val sourceField = sourceSchema.fields + .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase)) + if (!sourceField.get.dataType.equals(tgtField.dataType)) { + LOGGER.error(s"source schema has different data type for field: ${ + tgtField.name + }, source type: ${ sourceField.get.dataType }, target type: ${ tgtField.dataType }") + throw new CarbonSchemaException(s"source schema has different data type " + + s"for field: ${ tgtField.name }") + } + }) + } + + /** + * The method takes care of following schema evolution cases: + * Addition of a new column in source schema which is not present in target + * Deletion of a column in source schema which is present in target + * Data type changes for an existing column. + * The method does not take care of column renames and table renames + * @param targetDs existing target dataset + * @param srcDs incoming source dataset + * @return new target schema to write the incoming batch with + */ + def handleSchemaEvolution( + targetDs: Dataset[Row], + srcDs: Dataset[Row], + sparkSession: SparkSession, + isStreamerInvolved: Boolean = false): Unit = { + + if (isStreamerInvolved) { + verifyBackwardsCompatibility(targetDs, srcDs) + } + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + // check if any column got added in source + val addedColumns = sourceSchema.fields + .map(_.name) + .filterNot(f => targetSchema.fields.map(_.name).contains(f)) + if (addedColumns.nonEmpty) { + handleAddColumnScenario(targetDs, + srcDs.schema.fields.filter(f => addedColumns.contains(f.name)).toSeq, + sparkSession) + } + + // check if any column got deleted from source + val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan) + val targetCarbonTable = relations.head.carbonRelation.carbonTable + val partitionInfo = targetCarbonTable.getPartitionInfo + val partitionColumns = if (partitionInfo != null) partitionInfo.getColumnSchemaList.asScala + .map(_.getColumnName).toList else List[String]() + val deletedColumns = targetSchema.fields.map(_.name.toLowerCase) + .filterNot(f => { + sourceSchema.fields.map(_.name.toLowerCase).contains(f) || + partitionColumns.contains(f) + }) + if (deletedColumns.nonEmpty) { + handleDeleteColumnScenario(targetDs, deletedColumns.toList, sparkSession) + } + + val modifiedColumns = targetSchema.fields.filter(tgtField => { + val sourceField = sourceSchema.fields.find(f => f.name.equalsIgnoreCase(tgtField.name)) + if (sourceField.isDefined) !sourceField.get.dataType.equals(tgtField.dataType) else false + }) + + if (modifiedColumns.nonEmpty) { + handleDataTypeChangeScenario(targetDs, modifiedColumns.toList, sparkSession) + } + } + + /** + * This method calls CarbonAlterTableAddColumnCommand for adding new columns + * @param targetDs target dataset whose schema needs to be modified + * @param colsToAdd new columns to be added + * @param sparkSession SparkSession + */ + def handleAddColumnScenario(targetDs: Dataset[Row], colsToAdd: Seq[StructField], + sparkSession: SparkSession): Unit = { + val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan) + val targetCarbonTable = relations.head.carbonRelation.carbonTable + val fields = new CarbonSpark2SqlParser().getFields(colsToAdd) + val tableModel = CarbonParserUtil.prepareTableModel(ifNotExistPresent = false, + CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)), + targetCarbonTable.getTableName.toLowerCase, + fields.map(CarbonParserUtil.convertFieldNamesToLowercase), + Seq.empty, + scala.collection.mutable.Map.empty[String, String], + None, + isAlterFlow = true) + // targetCarbonTable.getAllDimensions.asScala.map(f => Field(column = f.getColName, + // dataType = Some(f.getDataType.getName), name = Option(f.getColName), + // children = None, )) + val alterTableAddColumnsModel = AlterTableAddColumnsModel( + CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)), + targetCarbonTable.getTableName.toLowerCase, + Map.empty[String, String], + tableModel.dimCols, + tableModel.msrCols, + tableModel.highCardinalityDims.getOrElse(Seq.empty)) + CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel).run(sparkSession) + } + + /** + * This method calls CarbonAlterTableDropColumnCommand for deleting columns + * @param targetDs target dataset whose schema needs to be modified + * @param colsToDrop columns to be dropped from carbondata table + * @param sparkSession SparkSession + */ + def handleDeleteColumnScenario(targetDs: Dataset[Row], colsToDrop: List[String], + sparkSession: SparkSession): Unit = { + val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan) + val targetCarbonTable = relations.head.carbonRelation.carbonTable + val alterTableDropColumnModel = AlterTableDropColumnModel( + CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)), + targetCarbonTable.getTableName.toLowerCase, + colsToDrop.map(_.toLowerCase)) + CarbonAlterTableDropColumnCommand(alterTableDropColumnModel).run(sparkSession) + } + + /** + * This method calls CarbonAlterTableColRenameDataTypeChangeCommand for handling data type changes + * @param targetDs target dataset whose schema needs to be modified + * @param modifiedCols columns with data type changes + * @param sparkSession SparkSession + */ + def handleDataTypeChangeScenario(targetDs: Dataset[Row], modifiedCols: List[StructField], + sparkSession: SparkSession): Unit = { + val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan) + val targetCarbonTable = relations.head.carbonRelation.carbonTable + + // need to call the command one by one for each modified column + modifiedCols.foreach(col => { + val values = col.dataType match { Review comment: same code is available in DDLHelper.changeColumn. can move common code to new method and reuse ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala ########## @@ -847,20 +820,222 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll { Row("j", 2, "RUSSIA"), Row("k", 0, "INDIA"))) } - test("test all the merge APIs UPDATE, DELETE, UPSERT and INSERT") { + def prepareTarget( + isPartitioned: Boolean = false, + partitionedColumn: String = null + ): Dataset[Row] = { sql("drop table if exists target") - val initframe = sqlContext.sparkSession.createDataFrame(Seq( + val initFrame = sqlContext.sparkSession.createDataFrame(Seq( Row("a", "0"), Row("b", "1"), Row("c", "2"), Row("d", "3") ).asJava, StructType(Seq(StructField("key", StringType), StructField("value", StringType)))) - initframe.write - .format("carbondata") - .option("tableName", "target") - .mode(SaveMode.Overwrite) - .save() - val target = sqlContext.read.format("carbondata").option("tableName", "target").load() + + if (isPartitioned) { + initFrame.write + .format("carbondata") + .option("tableName", "target") + .option("partitionColumns", partitionedColumn) + .mode(SaveMode.Overwrite) + .save() + } else { + initFrame.write + .format("carbondata") + .option("tableName", "target") + .mode(SaveMode.Overwrite) + .save() + } + sqlContext.read.format("carbondata").option("tableName", "target").load() + } + + def prepareTargetWithThreeFields( + isPartitioned: Boolean = false, + partitionedColumn: String = null + ): Dataset[Row] = { + sql("drop table if exists target") + val initFrame = sqlContext.sparkSession.createDataFrame(Seq( + Row("a", 0, "CHINA"), + Row("b", 1, "INDIA"), + Row("c", 2, "INDIA"), + Row("d", 3, "US") + ).asJava, + StructType(Seq(StructField("key", StringType), + StructField("value", IntegerType), + StructField("country", StringType)))) + + if (isPartitioned) { + initFrame.write + .format("carbondata") + .option("tableName", "target") + .option("partitionColumns", partitionedColumn) + .mode(SaveMode.Overwrite) + .save() + } else { + initFrame.write + .format("carbondata") + .option("tableName", "target") + .mode(SaveMode.Overwrite) + .save() + } + sqlContext.read.format("carbondata").option("tableName", "target").load() + } + + test("test schema enforcement") { + val target = prepareTarget() + var cdc = sqlContext.sparkSession.createDataFrame(Seq( + Row("a", "1", "ab"), + Row("d", "4", "de") + ).asJava, StructType(Seq(StructField("key", StringType), + StructField("value", StringType) + , StructField("new_value", StringType)))) + val properties = CarbonProperties.getInstance() + properties.addProperty( + CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE, "false" + ) + properties.addProperty( + CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT, "true" + ) + target.as("A").upsert(cdc.as("B"), "key").execute() + checkAnswer(sql("select * from target"), + Seq(Row("a", "1"), Row("b", "1"), Row("c", "2"), Row("d", "4"))) + + properties.addProperty( + CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE, "true" + ) + + val exceptionCaught1 = intercept[MalformedCarbonCommandException] { + cdc = sqlContext.sparkSession.createDataFrame(Seq( + Row("a", 1, "ab"), + Row("d", 4, "de") + ).asJava, StructType(Seq(StructField("key", StringType), + StructField("value", IntegerType) + , StructField("new_value", StringType)))) + target.as("A").upsert(cdc.as("B"), "key").execute() + } + assert(exceptionCaught1.getMessage + .contains( + "property CARBON_STREAMER_INSERT_DEDUPLICATE should " + + "only be set with operation type INSERT")) + + properties.addProperty( + CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE, "false" + ) + val exceptionCaught2 = intercept[CarbonSchemaException] { + cdc = sqlContext.sparkSession.createDataFrame(Seq( + Row("a", 1), + Row("d", 4) + ).asJava, StructType(Seq(StructField("key", StringType), + StructField("val", IntegerType)))) + target.as("A").upsert(cdc.as("B"), "key").execute() + } + assert(exceptionCaught2.getMessage.contains("source schema does not contain field: value")) + + val exceptionCaught3 = intercept[CarbonSchemaException] { + cdc = sqlContext.sparkSession.createDataFrame(Seq( + Row("a", 1), + Row("d", 4) + ).asJava, StructType(Seq(StructField("key", StringType), + StructField("value", LongType)))) + target.as("A").upsert(cdc.as("B"), "key").execute() + } + + assert(exceptionCaught3.getMsg.contains("source schema has different " + + "data type for field: value")) + + val exceptionCaught4 = intercept[CarbonSchemaException] { + cdc = sqlContext.sparkSession.createDataFrame(Seq( + Row("a", "1", "A"), + Row("d", "4", "D") + ).asJava, StructType(Seq(StructField("key", StringType), + StructField("value", StringType), StructField("Key", StringType)))) + target.as("A").upsert(cdc.as("B"), "key").execute() + } + + assert(exceptionCaught4.getMsg.contains("source schema has similar fields which " + + "differ only in case sensitivity: key")) + } + + test("test schema evolution") { + val properties = CarbonProperties.getInstance() + properties.addProperty( + CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE, "false" + ) + properties.addProperty( + CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT, "false" + ) + properties.addProperty( + CarbonCommonConstants.CARBON_STREAMER_SOURCE_ORDERING_FIELD, "value" + ) + sql("drop table if exists target") + var target = prepareTargetWithThreeFields() + var cdc = sqlContext.sparkSession.createDataFrame(Seq( + Row("a", 1, "ab", "china"), + Row("d", 4, "de", "china"), + Row("d", 7, "updated_de", "china_pro") + ).asJava, StructType(Seq(StructField("key", StringType), + StructField("value", IntegerType) + , StructField("new_value", StringType), + StructField("country", StringType)))) + target.as("A").upsert(cdc.as("B"), "key").execute() + checkAnswer(sql("select * from target"), + Seq(Row("a", 1, "china", "ab"), Row("b", 1, "INDIA", null), + Row("c", 2, "INDIA", null), Row("d", 7, "china_pro", "updated_de"))) + + target = sqlContext.read.format("carbondata").option("tableName", "target").load() + + cdc = sqlContext.sparkSession.createDataFrame(Seq( + Row("a", 5), + Row("d", 5) + ).asJava, StructType(Seq(StructField("key", StringType), + StructField("value", IntegerType)))) + target.as("A").upsert(cdc.as("B"), "key").execute() + checkAnswer(sql("select * from target"), + Seq(Row("a", 5), Row("b", 1), + Row("c", 2), Row("d", 5))) + +// target = sqlContext.read.format("carbondata").option("tableName", "target").load() +// cdc = sqlContext.sparkSession.createDataFrame(Seq( +// Row("b", 50), +// Row("d", 50) Review comment: please remove this code if not needed ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala ########## @@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil { columnMinMaxInBlocklet.asScala } } + + /** + * This method verifies source and target schemas for the following: + * If additional columns are present in source schema as compared to target, simply ignore them. + * If some columns are missing in source schema as compared to target schema, exception is thrown. + * If data type of some column differs in source and target schemas, exception is thrown. + * If source schema has multiple columns whose names differ only in case sensitivity, exception + * is thrown. + * @param targetDs target carbondata table + * @param srcDs source/incoming data + */ + def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: Dataset[Row]): Unit = { + LOGGER.info("schema enforcement is enabled. Source and target schemas will be verified") + // get the source and target dataset schema + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + targetSchema.fields.foreach(tgtField => { + // check if some field is missing in source schema + if (!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase)) { + LOGGER.error(s"source schema does not contain field: ${ tgtField.name }") + throw new CarbonSchemaException(s"source schema does not contain " + + s"field: ${ tgtField.name }") + } + + // check if data type got modified for some column + val sourceField = sourceSchema.fields + .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase)) + if (!sourceField.get.dataType.equals(tgtField.dataType)) { + LOGGER.error(s"source schema has different data type for field: ${ + tgtField.name + }, source type: ${ sourceField.get.dataType }, target type: ${ tgtField.dataType }") + throw new CarbonSchemaException(s"source schema has different data type " + + s"for field: ${ tgtField.name }") + } + }) + + // check if some additional column got added in source schema + if (sourceSchema.fields.length > targetSchema.fields.length) { + val additionalSourceFields = sourceSchema.fields.map(_.name.toLowerCase) + .filterNot(srcField => { + targetSchema.fields.map(_.name.toLowerCase).contains(srcField) + }) + LOGGER.warn(s"source schema contains additional fields which are not present in " + + s"target schema: ${ additionalSourceFields.mkString(",") }") + } + + // check if source schema has fields whose names only differ in case sensitivity + val similarFields = sourceSchema.fields.map(_.name.toLowerCase).groupBy(a => identity(a)).map { + case (str, times) => (str, times.length) + }.toList.filter(e => e._2 > 1).map(_._1) + if (similarFields.nonEmpty) { + LOGGER.error(s"source schema has similar fields which differ only in case sensitivity: " + + s"${ similarFields.mkString(",") }") + throw new CarbonSchemaException(s"source schema has similar fields which differ" + + s" only in case sensitivity: ${ + similarFields.mkString(",") + }") + } + } + + /** + * This method takes care of handling schema evolution scenarios for CarbonStreamer class. + * Currently only addition of columns is supported. + * @param targetDs target dataset whose schema needs to be modified, if applicable + * @param srcDs incoming dataset + * @param sparkSession SparkSession + */ + def handleSchemaEvolutionForCarbonStreamer(targetDs: Dataset[Row], srcDs: Dataset[Row], + sparkSession: SparkSession): Unit = { + // read the property here + val isSchemaEnforcementEnabled = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT, + CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean + if (isSchemaEnforcementEnabled) { + verifySourceAndTargetSchemas(targetDs, srcDs) + } else { + // These meta columns should be removed before actually writing the data + val metaColumnsString = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_STREAMER_META_COLUMNS, "") + val metaCols = metaColumnsString.split(",").map(_.trim) + val srcDsWithoutMeta = if (metaCols.length > 0) srcDs.drop(metaCols: _*) + else srcDs + handleSchemaEvolution(targetDs, srcDsWithoutMeta, sparkSession, isStreamerInvolved = true) + } + } + + def verifyBackwardsCompatibility( + targetDs: Dataset[Row], + srcDs: Dataset[Row]): Unit = { + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + targetSchema.fields.foreach(tgtField => { + // check if some field is missing in source schema + if (!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase)) { + LOGGER.error(s"source schema does not contain field: ${ tgtField.name }") + throw new CarbonSchemaException(s"source schema does not contain " + + s"field: ${ tgtField.name }") + } + + // check if data type got modified for some column + val sourceField = sourceSchema.fields + .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase)) + if (!sourceField.get.dataType.equals(tgtField.dataType)) { + LOGGER.error(s"source schema has different data type for field: ${ + tgtField.name + }, source type: ${ sourceField.get.dataType }, target type: ${ tgtField.dataType }") + throw new CarbonSchemaException(s"source schema has different data type " + + s"for field: ${ tgtField.name }") + } + }) + } + + /** + * The method takes care of following schema evolution cases: + * Addition of a new column in source schema which is not present in target + * Deletion of a column in source schema which is present in target + * Data type changes for an existing column. + * The method does not take care of column renames and table renames + * @param targetDs existing target dataset + * @param srcDs incoming source dataset + * @return new target schema to write the incoming batch with + */ + def handleSchemaEvolution( + targetDs: Dataset[Row], + srcDs: Dataset[Row], + sparkSession: SparkSession, + isStreamerInvolved: Boolean = false): Unit = { + + if (isStreamerInvolved) { + verifyBackwardsCompatibility(targetDs, srcDs) + } + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + // check if any column got added in source + val addedColumns = sourceSchema.fields + .map(_.name) + .filterNot(f => targetSchema.fields.map(_.name).contains(f)) + if (addedColumns.nonEmpty) { + handleAddColumnScenario(targetDs, + srcDs.schema.fields.filter(f => addedColumns.contains(f.name)).toSeq, + sparkSession) + } + + // check if any column got deleted from source + val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan) + val targetCarbonTable = relations.head.carbonRelation.carbonTable + val partitionInfo = targetCarbonTable.getPartitionInfo + val partitionColumns = if (partitionInfo != null) partitionInfo.getColumnSchemaList.asScala + .map(_.getColumnName).toList else List[String]() + val deletedColumns = targetSchema.fields.map(_.name.toLowerCase) + .filterNot(f => { + sourceSchema.fields.map(_.name.toLowerCase).contains(f) || + partitionColumns.contains(f) + }) + if (deletedColumns.nonEmpty) { + handleDeleteColumnScenario(targetDs, deletedColumns.toList, sparkSession) + } + + val modifiedColumns = targetSchema.fields.filter(tgtField => { + val sourceField = sourceSchema.fields.find(f => f.name.equalsIgnoreCase(tgtField.name)) + if (sourceField.isDefined) !sourceField.get.dataType.equals(tgtField.dataType) else false + }) + + if (modifiedColumns.nonEmpty) { + handleDataTypeChangeScenario(targetDs, modifiedColumns.toList, sparkSession) + } + } + + /** + * This method calls CarbonAlterTableAddColumnCommand for adding new columns + * @param targetDs target dataset whose schema needs to be modified + * @param colsToAdd new columns to be added + * @param sparkSession SparkSession + */ + def handleAddColumnScenario(targetDs: Dataset[Row], colsToAdd: Seq[StructField], + sparkSession: SparkSession): Unit = { + val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan) + val targetCarbonTable = relations.head.carbonRelation.carbonTable + val fields = new CarbonSpark2SqlParser().getFields(colsToAdd) + val tableModel = CarbonParserUtil.prepareTableModel(ifNotExistPresent = false, + CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)), + targetCarbonTable.getTableName.toLowerCase, + fields.map(CarbonParserUtil.convertFieldNamesToLowercase), + Seq.empty, + scala.collection.mutable.Map.empty[String, String], + None, + isAlterFlow = true) + // targetCarbonTable.getAllDimensions.asScala.map(f => Field(column = f.getColName, Review comment: remove this code ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala ########## @@ -462,4 +474,413 @@ object CarbonMergeDataSetUtil { columnMinMaxInBlocklet.asScala } } + + /** + * This method verifies source and target schemas for the following: + * If additional columns are present in source schema as compared to target, simply ignore them. + * If some columns are missing in source schema as compared to target schema, exception is thrown. + * If data type of some column differs in source and target schemas, exception is thrown. + * If source schema has multiple columns whose names differ only in case sensitivity, exception + * is thrown. + * @param targetDs target carbondata table + * @param srcDs source/incoming data + */ + def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: Dataset[Row]): Unit = { + LOGGER.info("schema enforcement is enabled. Source and target schemas will be verified") + // get the source and target dataset schema + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + targetSchema.fields.foreach(tgtField => { + // check if some field is missing in source schema + if (!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase)) { + LOGGER.error(s"source schema does not contain field: ${ tgtField.name }") + throw new CarbonSchemaException(s"source schema does not contain " + + s"field: ${ tgtField.name }") + } + + // check if data type got modified for some column + val sourceField = sourceSchema.fields + .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase)) + if (!sourceField.get.dataType.equals(tgtField.dataType)) { + LOGGER.error(s"source schema has different data type for field: ${ + tgtField.name + }, source type: ${ sourceField.get.dataType }, target type: ${ tgtField.dataType }") + throw new CarbonSchemaException(s"source schema has different data type " + + s"for field: ${ tgtField.name }") + } + }) + + // check if some additional column got added in source schema + if (sourceSchema.fields.length > targetSchema.fields.length) { + val additionalSourceFields = sourceSchema.fields.map(_.name.toLowerCase) + .filterNot(srcField => { + targetSchema.fields.map(_.name.toLowerCase).contains(srcField) + }) + LOGGER.warn(s"source schema contains additional fields which are not present in " + + s"target schema: ${ additionalSourceFields.mkString(",") }") + } + + // check if source schema has fields whose names only differ in case sensitivity + val similarFields = sourceSchema.fields.map(_.name.toLowerCase).groupBy(a => identity(a)).map { + case (str, times) => (str, times.length) + }.toList.filter(e => e._2 > 1).map(_._1) + if (similarFields.nonEmpty) { + LOGGER.error(s"source schema has similar fields which differ only in case sensitivity: " + + s"${ similarFields.mkString(",") }") + throw new CarbonSchemaException(s"source schema has similar fields which differ" + + s" only in case sensitivity: ${ + similarFields.mkString(",") + }") + } + } + + /** + * This method takes care of handling schema evolution scenarios for CarbonStreamer class. + * Currently only addition of columns is supported. + * @param targetDs target dataset whose schema needs to be modified, if applicable + * @param srcDs incoming dataset + * @param sparkSession SparkSession + */ + def handleSchemaEvolutionForCarbonStreamer(targetDs: Dataset[Row], srcDs: Dataset[Row], + sparkSession: SparkSession): Unit = { + // read the property here + val isSchemaEnforcementEnabled = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT, + CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean + if (isSchemaEnforcementEnabled) { + verifySourceAndTargetSchemas(targetDs, srcDs) + } else { + // These meta columns should be removed before actually writing the data + val metaColumnsString = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_STREAMER_META_COLUMNS, "") + val metaCols = metaColumnsString.split(",").map(_.trim) + val srcDsWithoutMeta = if (metaCols.length > 0) srcDs.drop(metaCols: _*) + else srcDs + handleSchemaEvolution(targetDs, srcDsWithoutMeta, sparkSession, isStreamerInvolved = true) + } + } + + def verifyBackwardsCompatibility( + targetDs: Dataset[Row], + srcDs: Dataset[Row]): Unit = { + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + targetSchema.fields.foreach(tgtField => { + // check if some field is missing in source schema + if (!sourceSchema.fields.map(_.name.toLowerCase).contains(tgtField.name.toLowerCase)) { + LOGGER.error(s"source schema does not contain field: ${ tgtField.name }") + throw new CarbonSchemaException(s"source schema does not contain " + + s"field: ${ tgtField.name }") + } + + // check if data type got modified for some column + val sourceField = sourceSchema.fields + .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase)) + if (!sourceField.get.dataType.equals(tgtField.dataType)) { + LOGGER.error(s"source schema has different data type for field: ${ + tgtField.name + }, source type: ${ sourceField.get.dataType }, target type: ${ tgtField.dataType }") + throw new CarbonSchemaException(s"source schema has different data type " + + s"for field: ${ tgtField.name }") + } + }) + } + + /** + * The method takes care of following schema evolution cases: + * Addition of a new column in source schema which is not present in target + * Deletion of a column in source schema which is present in target + * Data type changes for an existing column. + * The method does not take care of column renames and table renames + * @param targetDs existing target dataset + * @param srcDs incoming source dataset + * @return new target schema to write the incoming batch with + */ + def handleSchemaEvolution( + targetDs: Dataset[Row], + srcDs: Dataset[Row], + sparkSession: SparkSession, + isStreamerInvolved: Boolean = false): Unit = { + + if (isStreamerInvolved) { + verifyBackwardsCompatibility(targetDs, srcDs) + } + val sourceSchema = srcDs.schema + val targetSchema = targetDs.schema + + // check if any column got added in source + val addedColumns = sourceSchema.fields + .map(_.name) + .filterNot(f => targetSchema.fields.map(_.name).contains(f)) + if (addedColumns.nonEmpty) { + handleAddColumnScenario(targetDs, + srcDs.schema.fields.filter(f => addedColumns.contains(f.name)).toSeq, + sparkSession) + } + + // check if any column got deleted from source + val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan) + val targetCarbonTable = relations.head.carbonRelation.carbonTable + val partitionInfo = targetCarbonTable.getPartitionInfo + val partitionColumns = if (partitionInfo != null) partitionInfo.getColumnSchemaList.asScala + .map(_.getColumnName).toList else List[String]() + val deletedColumns = targetSchema.fields.map(_.name.toLowerCase) + .filterNot(f => { + sourceSchema.fields.map(_.name.toLowerCase).contains(f) || + partitionColumns.contains(f) + }) + if (deletedColumns.nonEmpty) { + handleDeleteColumnScenario(targetDs, deletedColumns.toList, sparkSession) + } + + val modifiedColumns = targetSchema.fields.filter(tgtField => { + val sourceField = sourceSchema.fields.find(f => f.name.equalsIgnoreCase(tgtField.name)) + if (sourceField.isDefined) !sourceField.get.dataType.equals(tgtField.dataType) else false + }) + + if (modifiedColumns.nonEmpty) { + handleDataTypeChangeScenario(targetDs, modifiedColumns.toList, sparkSession) + } + } + + /** + * This method calls CarbonAlterTableAddColumnCommand for adding new columns + * @param targetDs target dataset whose schema needs to be modified + * @param colsToAdd new columns to be added + * @param sparkSession SparkSession + */ + def handleAddColumnScenario(targetDs: Dataset[Row], colsToAdd: Seq[StructField], + sparkSession: SparkSession): Unit = { + val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan) + val targetCarbonTable = relations.head.carbonRelation.carbonTable + val fields = new CarbonSpark2SqlParser().getFields(colsToAdd) + val tableModel = CarbonParserUtil.prepareTableModel(ifNotExistPresent = false, + CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)), + targetCarbonTable.getTableName.toLowerCase, + fields.map(CarbonParserUtil.convertFieldNamesToLowercase), + Seq.empty, + scala.collection.mutable.Map.empty[String, String], + None, + isAlterFlow = true) + // targetCarbonTable.getAllDimensions.asScala.map(f => Field(column = f.getColName, + // dataType = Some(f.getDataType.getName), name = Option(f.getColName), + // children = None, )) + val alterTableAddColumnsModel = AlterTableAddColumnsModel( + CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)), + targetCarbonTable.getTableName.toLowerCase, + Map.empty[String, String], + tableModel.dimCols, + tableModel.msrCols, + tableModel.highCardinalityDims.getOrElse(Seq.empty)) + CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel).run(sparkSession) + } + + /** + * This method calls CarbonAlterTableDropColumnCommand for deleting columns + * @param targetDs target dataset whose schema needs to be modified + * @param colsToDrop columns to be dropped from carbondata table + * @param sparkSession SparkSession + */ + def handleDeleteColumnScenario(targetDs: Dataset[Row], colsToDrop: List[String], + sparkSession: SparkSession): Unit = { + val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan) + val targetCarbonTable = relations.head.carbonRelation.carbonTable + val alterTableDropColumnModel = AlterTableDropColumnModel( + CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)), + targetCarbonTable.getTableName.toLowerCase, + colsToDrop.map(_.toLowerCase)) + CarbonAlterTableDropColumnCommand(alterTableDropColumnModel).run(sparkSession) + } + + /** + * This method calls CarbonAlterTableColRenameDataTypeChangeCommand for handling data type changes + * @param targetDs target dataset whose schema needs to be modified + * @param modifiedCols columns with data type changes + * @param sparkSession SparkSession + */ + def handleDataTypeChangeScenario(targetDs: Dataset[Row], modifiedCols: List[StructField], + sparkSession: SparkSession): Unit = { + val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan) + val targetCarbonTable = relations.head.carbonRelation.carbonTable + + // need to call the command one by one for each modified column + modifiedCols.foreach(col => { + val values = col.dataType match { + case d: DecimalType => Some(List((d.precision, d.scale))) + case _ => None + } + val dataTypeInfo = CarbonParserUtil.parseColumn(col.name, col.dataType, values) + + val alterTableColRenameAndDataTypeChangeModel = + AlterTableDataTypeChangeModel( + dataTypeInfo, + Option(targetCarbonTable.getDatabaseName.toLowerCase), + targetCarbonTable.getTableName.toLowerCase, + col.name.toLowerCase, + col.name.toLowerCase, + isColumnRename = false, + Option.empty) + + CarbonAlterTableColRenameDataTypeChangeCommand( + alterTableColRenameAndDataTypeChangeModel + ).run(sparkSession) + }) + } + + def deduplicateBeforeWriting( + srcDs: Dataset[Row], + targetDs: Dataset[Row], + sparkSession: SparkSession, + srcAlias: String, + targetAlias: String, + keyColumn: String, + orderingField: String, + targetCarbonTable: CarbonTable): Dataset[Row] = { + val properties = CarbonProperties.getInstance() + val filterDupes = properties + .getProperty(CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE, + CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE_DEFAULT).toBoolean + val combineBeforeUpsert = properties + .getProperty(CarbonCommonConstants.CARBON_STREAMER_UPSERT_DEDUPLICATE, + CarbonCommonConstants.CARBON_STREAMER_UPSERT_DEDUPLICATE_DEFAULT).toBoolean + var dedupedDataset: Dataset[Row] = srcDs + if (combineBeforeUpsert) { + dedupedDataset = deduplicateAgainstIncomingDataset(srcDs, sparkSession, srcAlias, keyColumn, + orderingField, targetCarbonTable) + } + if (filterDupes) { + dedupedDataset = deduplicateAgainstExistingDataset(dedupedDataset, targetDs, + srcAlias, targetAlias, keyColumn) + } + dedupedDataset.show() Review comment: remove this line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@carbondata.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org