This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 42799c0956f [HUDI-6438] Config parameter 'MAKE_NEW_COLUMNS_NULLABLE' to allow for marking a newly created column as nullable. (#9262) 42799c0956f is described below commit 42799c0956f626bc47318ddd91c626b1e58a0fc8 Author: Amrish Lal <amrish.k....@gmail.com> AuthorDate: Mon Jul 24 22:27:50 2023 -0700 [HUDI-6438] Config parameter 'MAKE_NEW_COLUMNS_NULLABLE' to allow for marking a newly created column as nullable. (#9262) - adds a config parameter 'hoodie.datasource.write.new.columns.nullable' which when set to true will mark newly added column as nullable. By default 'hoodie.datasource.write.new.columns.nullable' is set to false to maintain existing behavior. --- .../hudi/common/config/HoodieCommonConfig.java | 8 ++++ .../schema/utils/AvroSchemaEvolutionUtils.java | 12 ++++-- .../scala/org/apache/hudi/DataSourceOptions.scala | 2 + .../org/apache/hudi/HoodieSparkSqlWriter.scala | 6 +-- .../scala/org/apache/hudi/HoodieWriterUtils.scala | 1 + .../apache/hudi/functional/TestCOWDataSource.scala | 48 +++++++++++++++++++++- 6 files changed, 69 insertions(+), 8 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java index 4ff1b89ee9b..7c696b4c1d3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java @@ -62,6 +62,14 @@ public class HoodieCommonConfig extends HoodieConfig { + "This enables us, to always extend the table's schema during evolution and never lose the data (when, for " + "ex, existing column is being dropped in a new batch)"); + public static final ConfigProperty<Boolean> MAKE_NEW_COLUMNS_NULLABLE = ConfigProperty + .key("hoodie.datasource.write.new.columns.nullable") + .defaultValue(false) + .markAdvanced() + .withDocumentation("When a non-nullable column is added to datasource during a write operation, the write " + + " operation will fail schema compatibility check. Set this option to true will make the newly added " + + " column nullable to successfully complete the write operation."); + public static final ConfigProperty<ExternalSpillableMap.DiskMapType> SPILLABLE_DISK_MAP_TYPE = ConfigProperty .key("hoodie.common.spillable.diskmap.type") .defaultValue(ExternalSpillableMap.DiskMapType.BITCASK) diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java index 2dab3d009b4..13c1f0e2277 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java @@ -23,9 +23,11 @@ import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.internal.schema.action.TableChanges; import java.util.List; +import java.util.Map; import java.util.TreeMap; import java.util.stream.Collectors; +import static org.apache.hudi.common.config.HoodieCommonConfig.MAKE_NEW_COLUMNS_NULLABLE; import static org.apache.hudi.common.util.CollectionUtils.reduce; import static org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter.convert; @@ -116,9 +118,10 @@ public class AvroSchemaEvolutionUtils { * * @param sourceSchema source schema that needs reconciliation * @param targetSchema target schema that source schema will be reconciled against + * @param opts config options * @return schema (based off {@code source} one) that has nullability constraints reconciled */ - public static Schema reconcileNullability(Schema sourceSchema, Schema targetSchema) { + public static Schema reconcileNullability(Schema sourceSchema, Schema targetSchema, Map<String, String> opts) { if (sourceSchema.getFields().isEmpty() || targetSchema.getFields().isEmpty()) { return sourceSchema; } @@ -129,9 +132,10 @@ public class AvroSchemaEvolutionUtils { List<String> colNamesSourceSchema = sourceInternalSchema.getAllColsFullName(); List<String> colNamesTargetSchema = targetInternalSchema.getAllColsFullName(); List<String> candidateUpdateCols = colNamesSourceSchema.stream() - .filter(f -> colNamesTargetSchema.contains(f) - && sourceInternalSchema.findField(f).isOptional() != targetInternalSchema.findField(f).isOptional()) - .collect(Collectors.toList()); + .filter(f -> (("true".equals(opts.get(MAKE_NEW_COLUMNS_NULLABLE.key())) && !colNamesTargetSchema.contains(f)) + || colNamesTargetSchema.contains(f) && sourceInternalSchema.findField(f).isOptional() != targetInternalSchema.findField(f).isOptional() + ) + ).collect(Collectors.toList()); if (candidateUpdateCols.isEmpty()) { return sourceSchema; diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 6fb84932c13..1c0545b4212 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -526,6 +526,8 @@ object DataSourceWriteOptions { val RECONCILE_SCHEMA: ConfigProperty[java.lang.Boolean] = HoodieCommonConfig.RECONCILE_SCHEMA + val MAKE_NEW_COLUMNS_NULLABLE: ConfigProperty[java.lang.Boolean] = HoodieCommonConfig.MAKE_NEW_COLUMNS_NULLABLE + val SQL_WRITE_OPERATION: ConfigProperty[String] = ConfigProperty .key("hoodie.sql.write.operation") .defaultValue("insert") diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index ca1359578d1..fcee3fdab49 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -464,7 +464,7 @@ object HoodieSparkSqlWriter { SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean val canonicalizedSourceSchema = if (shouldCanonicalizeNullable) { - canonicalizeSchema(sourceSchema, latestTableSchema) + canonicalizeSchema(sourceSchema, latestTableSchema, opts) } else { sourceSchema } @@ -652,8 +652,8 @@ object HoodieSparkSqlWriter { * * TODO support casing reconciliation */ - private def canonicalizeSchema(sourceSchema: Schema, latestTableSchema: Schema): Schema = { - reconcileNullability(sourceSchema, latestTableSchema) + private def canonicalizeSchema(sourceSchema: Schema, latestTableSchema: Schema, opts : Map[String, String]): Schema = { + reconcileNullability(sourceSchema, latestTableSchema, opts) } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index 405e761635a..5ee56642e31 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -80,6 +80,7 @@ object HoodieWriterUtils { hoodieConfig.setDefaultValue(ASYNC_CLUSTERING_ENABLE) hoodieConfig.setDefaultValue(ENABLE_ROW_WRITER) hoodieConfig.setDefaultValue(RECONCILE_SCHEMA) + hoodieConfig.setDefaultValue(MAKE_NEW_COLUMNS_NULLABLE) hoodieConfig.setDefaultValue(DROP_PARTITION_COLUMNS) hoodieConfig.setDefaultValue(KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED) Map() ++ hoodieConfig.getProps.asScala ++ globalProps ++ DataSourceOptionsHelper.translateConfigurations(parameters) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index bd1bdbf3e2e..bfbf42535d2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -23,7 +23,7 @@ import org.apache.hudi.DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME import org.apache.hudi.HoodieConversionUtils.toJavaOption import org.apache.hudi.QuickstartUtils.{convertToStringList, getQuickstartWriteConfigs} import org.apache.hudi.client.common.HoodieSparkEngineContext -import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig} import org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT, TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TIMEZONE_FORMAT, TIMESTAMP_TYPE_FIELD} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieRecord @@ -39,6 +39,7 @@ import org.apache.hudi.exception.ExceptionUtil.getRootCause import org.apache.hudi.exception.HoodieException import org.apache.hudi.functional.CommonOptionUtils._ import org.apache.hudi.functional.TestCOWDataSource.convertColumnsToNullable +import org.apache.hudi.hive.HiveSyncConfigHolder import org.apache.hudi.keygen._ import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.metrics.Metrics @@ -1538,7 +1539,52 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup assertEquals(2, result.count()) assertEquals(0, result.filter(result("id") === 1).count()) } + + /** Test case to verify MAKE_NEW_COLUMNS_NULLABLE config parameter. */ + @Test + def testSchemaEvolutionWithNewColumn(): Unit = { + val df1 = spark.sql("select '1' as event_id, '2' as ts, '3' as version, 'foo' as event_date") + var hudiOptions = Map[String, String]( + HoodieWriteConfig.TBL_NAME.key() -> "test_hudi_merger", + KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key() -> "event_id", + KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key() -> "version", + DataSourceWriteOptions.OPERATION.key() -> "insert", + HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key() -> "ts", + HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key() -> "org.apache.hudi.keygen.ComplexKeyGenerator", + KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key() -> "true", + HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key() -> "false", + HoodieWriteConfig.RECORD_MERGER_IMPLS.key() -> "org.apache.hudi.HoodieSparkRecordMerger" + ) + df1.write.format("hudi").options(hudiOptions).mode(SaveMode.Append).save(basePath) + + // Try adding a string column. This operation is expected to throw 'schema not compatible' exception since + // 'MAKE_NEW_COLUMNS_NULLABLE' parameter is 'false' by default. + val df2 = spark.sql("select '2' as event_id, '2' as ts, '3' as version, 'foo' as event_date, 'bar' as add_col") + try { + (df2.write.format("hudi").options(hudiOptions).mode("append").save(basePath)) + fail("Option succeeded, but was expected to fail.") + } catch { + case ex: org.apache.hudi.exception.HoodieInsertException => { + assertTrue(ex.getMessage.equals("Failed insert schema compatibility check")) + } + case ex: Exception => { + fail(ex) + } + } + + // Try adding the string column again. This operation is expected to succeed since 'MAKE_NEW_COLUMNS_NULLABLE' + // parameter has been set to 'true'. + hudiOptions = hudiOptions + (HoodieCommonConfig.MAKE_NEW_COLUMNS_NULLABLE.key() -> "true") + try { + (df2.write.format("hudi").options(hudiOptions).mode("append").save(basePath)) + } catch { + case ex: Exception => { + fail(ex) + } + } + } } + object TestCOWDataSource { def convertColumnsToNullable(df: DataFrame, cols: String*): DataFrame = { cols.foldLeft(df) { (df, c) =>