This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 42799c0956f [HUDI-6438] Config parameter 'MAKE_NEW_COLUMNS_NULLABLE' 
to allow for marking a newly created column as nullable. (#9262)
42799c0956f is described below

commit 42799c0956f626bc47318ddd91c626b1e58a0fc8
Author: Amrish Lal <amrish.k....@gmail.com>
AuthorDate: Mon Jul 24 22:27:50 2023 -0700

    [HUDI-6438] Config parameter 'MAKE_NEW_COLUMNS_NULLABLE' to allow for 
marking a newly created column as nullable. (#9262)
    
    - adds a config parameter 'hoodie.datasource.write.new.columns.nullable' 
which when set to true will mark newly added column as nullable. By default 
'hoodie.datasource.write.new.columns.nullable' is set to false to maintain 
existing behavior.
---
 .../hudi/common/config/HoodieCommonConfig.java     |  8 ++++
 .../schema/utils/AvroSchemaEvolutionUtils.java     | 12 ++++--
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  2 +
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  6 +--
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |  1 +
 .../apache/hudi/functional/TestCOWDataSource.scala | 48 +++++++++++++++++++++-
 6 files changed, 69 insertions(+), 8 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
index 4ff1b89ee9b..7c696b4c1d3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
@@ -62,6 +62,14 @@ public class HoodieCommonConfig extends HoodieConfig {
           + "This enables us, to always extend the table's schema during 
evolution and never lose the data (when, for "
           + "ex, existing column is being dropped in a new batch)");
 
+  public static final ConfigProperty<Boolean> MAKE_NEW_COLUMNS_NULLABLE = 
ConfigProperty
+      .key("hoodie.datasource.write.new.columns.nullable")
+      .defaultValue(false)
+      .markAdvanced()
+      .withDocumentation("When a non-nullable column is added to datasource 
during a write operation, the write "
+          + " operation will fail schema compatibility check. Set this option 
to true will make the newly added "
+          + " column nullable to successfully complete the write operation.");
+
   public static final ConfigProperty<ExternalSpillableMap.DiskMapType> 
SPILLABLE_DISK_MAP_TYPE = ConfigProperty
       .key("hoodie.common.spillable.diskmap.type")
       .defaultValue(ExternalSpillableMap.DiskMapType.BITCASK)
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java
index 2dab3d009b4..13c1f0e2277 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java
@@ -23,9 +23,11 @@ import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.internal.schema.action.TableChanges;
 
 import java.util.List;
+import java.util.Map;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.MAKE_NEW_COLUMNS_NULLABLE;
 import static org.apache.hudi.common.util.CollectionUtils.reduce;
 import static 
org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter.convert;
 
@@ -116,9 +118,10 @@ public class AvroSchemaEvolutionUtils {
    *
    * @param sourceSchema source schema that needs reconciliation
    * @param targetSchema target schema that source schema will be reconciled 
against
+   * @param opts config options
    * @return schema (based off {@code source} one) that has nullability 
constraints reconciled
    */
-  public static Schema reconcileNullability(Schema sourceSchema, Schema 
targetSchema) {
+  public static Schema reconcileNullability(Schema sourceSchema, Schema 
targetSchema, Map<String, String> opts) {
     if (sourceSchema.getFields().isEmpty() || 
targetSchema.getFields().isEmpty()) {
       return sourceSchema;
     }
@@ -129,9 +132,10 @@ public class AvroSchemaEvolutionUtils {
     List<String> colNamesSourceSchema = 
sourceInternalSchema.getAllColsFullName();
     List<String> colNamesTargetSchema = 
targetInternalSchema.getAllColsFullName();
     List<String> candidateUpdateCols = colNamesSourceSchema.stream()
-        .filter(f -> colNamesTargetSchema.contains(f)
-            && sourceInternalSchema.findField(f).isOptional() != 
targetInternalSchema.findField(f).isOptional())
-        .collect(Collectors.toList());
+        .filter(f -> 
(("true".equals(opts.get(MAKE_NEW_COLUMNS_NULLABLE.key())) && 
!colNamesTargetSchema.contains(f))
+                || colNamesTargetSchema.contains(f) && 
sourceInternalSchema.findField(f).isOptional() != 
targetInternalSchema.findField(f).isOptional()
+            )
+        ).collect(Collectors.toList());
 
     if (candidateUpdateCols.isEmpty()) {
       return sourceSchema;
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 6fb84932c13..1c0545b4212 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -526,6 +526,8 @@ object DataSourceWriteOptions {
 
   val RECONCILE_SCHEMA: ConfigProperty[java.lang.Boolean] = 
HoodieCommonConfig.RECONCILE_SCHEMA
 
+  val MAKE_NEW_COLUMNS_NULLABLE: ConfigProperty[java.lang.Boolean] = 
HoodieCommonConfig.MAKE_NEW_COLUMNS_NULLABLE
+
   val SQL_WRITE_OPERATION: ConfigProperty[String] = ConfigProperty
     .key("hoodie.sql.write.operation")
     .defaultValue("insert")
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index ca1359578d1..fcee3fdab49 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -464,7 +464,7 @@ object HoodieSparkSqlWriter {
           SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean
 
         val canonicalizedSourceSchema = if (shouldCanonicalizeNullable) {
-          canonicalizeSchema(sourceSchema, latestTableSchema)
+          canonicalizeSchema(sourceSchema, latestTableSchema, opts)
         } else {
           sourceSchema
         }
@@ -652,8 +652,8 @@ object HoodieSparkSqlWriter {
    *
    * TODO support casing reconciliation
    */
-  private def canonicalizeSchema(sourceSchema: Schema, latestTableSchema: 
Schema): Schema = {
-    reconcileNullability(sourceSchema, latestTableSchema)
+  private def canonicalizeSchema(sourceSchema: Schema, latestTableSchema: 
Schema, opts : Map[String, String]): Schema = {
+    reconcileNullability(sourceSchema, latestTableSchema, opts)
   }
 
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 405e761635a..5ee56642e31 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -80,6 +80,7 @@ object HoodieWriterUtils {
     hoodieConfig.setDefaultValue(ASYNC_CLUSTERING_ENABLE)
     hoodieConfig.setDefaultValue(ENABLE_ROW_WRITER)
     hoodieConfig.setDefaultValue(RECONCILE_SCHEMA)
+    hoodieConfig.setDefaultValue(MAKE_NEW_COLUMNS_NULLABLE)
     hoodieConfig.setDefaultValue(DROP_PARTITION_COLUMNS)
     
hoodieConfig.setDefaultValue(KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED)
     Map() ++ hoodieConfig.getProps.asScala ++ globalProps ++ 
DataSourceOptionsHelper.translateConfigurations(parameters)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index bd1bdbf3e2e..bfbf42535d2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -23,7 +23,7 @@ import 
org.apache.hudi.DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME
 import org.apache.hudi.HoodieConversionUtils.toJavaOption
 import org.apache.hudi.QuickstartUtils.{convertToStringList, 
getQuickstartWriteConfigs}
 import org.apache.hudi.client.common.HoodieSparkEngineContext
-import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
 import 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT,
 TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TIMEZONE_FORMAT, TIMESTAMP_TYPE_FIELD}
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieRecord
@@ -39,6 +39,7 @@ import org.apache.hudi.exception.ExceptionUtil.getRootCause
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.functional.CommonOptionUtils._
 import org.apache.hudi.functional.TestCOWDataSource.convertColumnsToNullable
+import org.apache.hudi.hive.HiveSyncConfigHolder
 import org.apache.hudi.keygen._
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import org.apache.hudi.metrics.Metrics
@@ -1538,7 +1539,52 @@ class TestCOWDataSource extends 
HoodieSparkClientTestBase with ScalaAssertionSup
     assertEquals(2, result.count())
     assertEquals(0, result.filter(result("id") === 1).count())
   }
+
+  /** Test case to verify MAKE_NEW_COLUMNS_NULLABLE config parameter. */
+  @Test
+  def testSchemaEvolutionWithNewColumn(): Unit = {
+    val df1 = spark.sql("select '1' as event_id, '2' as ts, '3' as version, 
'foo' as event_date")
+    var hudiOptions = Map[String, String](
+      HoodieWriteConfig.TBL_NAME.key() -> "test_hudi_merger",
+      KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key() -> "event_id",
+      KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key() -> "version",
+      DataSourceWriteOptions.OPERATION.key() -> "insert",
+      HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key() -> "ts",
+      HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key() -> 
"org.apache.hudi.keygen.ComplexKeyGenerator",
+      KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key() -> "true",
+      HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key() -> "false",
+      HoodieWriteConfig.RECORD_MERGER_IMPLS.key() -> 
"org.apache.hudi.HoodieSparkRecordMerger"
+    )
+    
df1.write.format("hudi").options(hudiOptions).mode(SaveMode.Append).save(basePath)
+
+    // Try adding a string column. This operation is expected to throw 'schema 
not compatible' exception since
+    // 'MAKE_NEW_COLUMNS_NULLABLE' parameter is 'false' by default.
+    val df2 = spark.sql("select '2' as event_id, '2' as ts, '3' as version, 
'foo' as event_date, 'bar' as add_col")
+    try {
+      
(df2.write.format("hudi").options(hudiOptions).mode("append").save(basePath))
+      fail("Option succeeded, but was expected to fail.")
+    } catch {
+      case ex: org.apache.hudi.exception.HoodieInsertException => {
+        assertTrue(ex.getMessage.equals("Failed insert schema compatibility 
check"))
+      }
+      case ex: Exception => {
+        fail(ex)
+      }
+    }
+
+    // Try adding the string column again. This operation is expected to 
succeed since 'MAKE_NEW_COLUMNS_NULLABLE'
+    // parameter has been set to 'true'.
+    hudiOptions = hudiOptions + 
(HoodieCommonConfig.MAKE_NEW_COLUMNS_NULLABLE.key() -> "true")
+    try {
+      
(df2.write.format("hudi").options(hudiOptions).mode("append").save(basePath))
+    } catch {
+      case ex: Exception => {
+        fail(ex)
+      }
+    }
+  }
 }
+
 object TestCOWDataSource {
   def convertColumnsToNullable(df: DataFrame, cols: String*): DataFrame = {
     cols.foldLeft(df) { (df, c) =>

Reply via email to