This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b56eae4b8c0 feat(utilities): add option to make all schema columns 
nullable for backwards compatibility (#17777)
2b56eae4b8c0 is described below

commit 2b56eae4b8c02775a734f17c2af946021c33df6e
Author: Prashant Wason <[email protected]>
AuthorDate: Mon Mar 30 20:02:22 2026 -0700

    feat(utilities): add option to make all schema columns nullable for 
backwards compatibility (#17777)
    
    This PR adds an option to ensure all columns in the schema are nullable 
when using HoodieStreamer with row-based sources like SQLSource or 
SQLFileBasedSource.
    
    When new columns are added via SQL queries, the schema must be backwards 
compatible. New columns added to a table must be nullable because existing 
records don't have values for them. This change provides a configuration option 
to automatically make all columns nullable, ensuring smooth schema evolution.
    
    Summary and Changelog:
    What users gain: Users can now set 
hoodie.streamer.schema.make.columns.nullable=true to automatically make all 
columns in the incoming schema nullable, preventing schema compatibility issues 
during schema evolution.
    
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../org/apache/hudi/utilities/UtilHelpers.java     |  14 ++
 .../utilities/config/HoodieStreamerConfig.java     |  12 ++
 .../apache/hudi/utilities/sources/RowSource.java   |   5 +-
 .../apache/hudi/utilities/streamer/StreamSync.java |   8 +-
 .../org/apache/hudi/utilities/TestUtilHelpers.java | 141 +++++++++++++++++++++
 5 files changed, 175 insertions(+), 5 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index f0fd604a790c..9d04641f71b4 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -84,6 +84,7 @@ import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.launcher.SparkLauncher;
+import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry;
 import org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper;
@@ -117,8 +118,10 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
+import static 
org.apache.hudi.utilities.config.HoodieStreamerConfig.SCHEMA_MAKE_COLUMNS_NULLABLE;
 
 /**
  * Bunch of helper methods.
@@ -639,6 +642,17 @@ public class UtilHelpers {
     }
   }
 
+  /**
+   * Extract and return the schema to use from a Dataset.
+   */
+  public static StructType extractSchemaFromDataset(Dataset dataset, 
TypedProperties props) {
+    StructType originalSchema = dataset.schema();
+
+    // Should we make all columns nullable?
+    final boolean allColsNullable = getBooleanWithAltKeys(props, 
SCHEMA_MAKE_COLUMNS_NULLABLE);
+    return allColsNullable ? originalSchema.asNullable() : originalSchema;
+  }
+
   @FunctionalInterface
   public interface CheckedSupplier<T> {
     T get() throws Throwable;
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java
index e50e7fa06124..13040cc5d04c 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java
@@ -139,4 +139,16 @@ public class HoodieStreamerConfig extends HoodieConfig {
       .markAdvanced()
       .sinceVersion("0.15.0")
       .withDocumentation("When enabled, the dataframe generated from reading 
source data is wrapped with an exception handler to explicitly surface 
exceptions.");
+
+  public static final ConfigProperty<Boolean> SCHEMA_MAKE_COLUMNS_NULLABLE = 
ConfigProperty
+      .key(STREAMER_CONFIG_PREFIX + "schema.make.columns.nullable")
+      .defaultValue(false)
+      .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + 
"schema.make.columns.nullable")
+      .markAdvanced()
+      .withDocumentation("When set to true, all columns in the incoming 
dataset schema are made nullable. "
+          + "This applies to every column in the incoming batch, not just 
newly added ones. "
+          + "Hudi's schema evolution supports widening nullability 
(non-nullable to nullable), so enabling this "
+          + "config on an existing table with non-nullable columns is safe - 
existing data retains its values "
+          + "while the schema is updated to allow nulls. "
+          + "This is useful for maintaining backwards compatibility when new 
columns are added via SQL queries.");
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
index 9f48f0a8ed38..aa71ab94398c 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
@@ -34,6 +34,7 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
 
 import static 
org.apache.hudi.utilities.config.HoodieStreamerConfig.ROW_THROW_EXPLICIT_EXCEPTIONS;
 
@@ -61,8 +62,8 @@ public abstract class RowSource extends Source<Dataset<Row>> {
     Pair<Option<Dataset<Row>>, Checkpoint> res = 
fetchNextBatch(lastCheckpoint, sourceLimit);
     return res.getKey().map(dsr -> {
       Dataset<Row> sanitizedRows = 
SanitizationUtils.sanitizeColumnNamesForAvro(dsr, props);
-      SchemaProvider rowSchemaProvider =
-          UtilHelpers.createRowBasedSchemaProvider(sanitizedRows.schema(), 
props, sparkContext);
+      StructType datasetSchema = 
UtilHelpers.extractSchemaFromDataset(sanitizedRows, props);
+      SchemaProvider rowSchemaProvider = 
UtilHelpers.createRowBasedSchemaProvider(datasetSchema, props, sparkContext);
       Dataset<Row> wrappedDf = 
HoodieSparkUtils.maybeWrapDataFrameWithException(sanitizedRows, 
HoodieReadFromSourceException.class.getName(),
           "Failed to read from row source", 
ConfigUtils.getBooleanWithAltKeys(props, ROW_THROW_EXPLICIT_EXCEPTIONS));
       return new InputBatch<>(Option.of(wrappedDf), res.getValue(), 
rowSchemaProvider);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 33433bb30612..600891c85dff 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -127,6 +127,7 @@ import org.apache.spark.rdd.RDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -734,9 +735,10 @@ public class StreamSync implements Serializable, Closeable 
{
       } else {
         // Deduce proper target (writer's) schema for the input dataset, 
reconciling its
         // schema w/ the table's one
-        HoodieSchema incomingSchema = transformed.map(df ->
-                
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(df.schema(), 
getRecordQualifiedName(cfg.targetTableName)))
-            
.orElseGet(dataAndCheckpoint.getSchemaProvider()::getTargetHoodieSchema);
+        HoodieSchema incomingSchema = transformed.map(df -> {
+          StructType structType = UtilHelpers.extractSchemaFromDataset(df, 
props);
+          return 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType, 
getRecordQualifiedName(cfg.targetTableName));
+        
}).orElseGet(dataAndCheckpoint.getSchemaProvider()::getTargetHoodieSchema);
         schemaProvider = getDeducedSchemaProvider(incomingSchema, 
dataAndCheckpoint.getSchemaProvider(), metaClient);
 
         if (canUseRowWriter(schemaProvider.getTargetHoodieSchema())) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java
index f8fb4fd29c2e..86448cfb91f5 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java
@@ -24,20 +24,32 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.utilities.config.HoodieStreamerConfig;
 import org.apache.hudi.utilities.sources.AvroKafkaSource;
 import org.apache.hudi.utilities.sources.Source;
 import org.apache.hudi.utilities.sources.helpers.SchemaTestProvider;
 import org.apache.hudi.utilities.streamer.DefaultStreamContext;
 import org.apache.hudi.utilities.streamer.HoodieStreamerMetrics;
 
+import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
 
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf;
+import static 
org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -93,4 +105,133 @@ public class TestUtilHelpers {
     // We expect two constructors to complain about this error.
     assertEquals(2, e.getSuppressed().length);
   }
+
+  @Test
+  void testExtractSchemaFromDatasetWithNullableDisabled() throws IOException {
+    SparkSession spark = SparkSession
+        .builder()
+        .config(getSparkConfForTest(TestUtilHelpers.class.getName()))
+        .getOrCreate();
+
+    try {
+      JavaSparkContext jsc = 
JavaSparkContext.fromSparkContext(spark.sparkContext());
+
+      // Create test data with non-nullable fields (integer type defaults to 
non-nullable in JSON)
+      String testData = "{\"id\": 1, \"name\": \"test\"}";
+      JavaRDD<String> testRdd = 
jsc.parallelize(Collections.singletonList(testData), 1);
+      Dataset<Row> ds = spark.read().json(testRdd);
+
+      // Without the config, should return original schema
+      TypedProperties props = new TypedProperties();
+      StructType resultSchema = UtilHelpers.extractSchemaFromDataset(ds, 
props);
+
+      assertEquals(ds.schema(), resultSchema);
+    } finally {
+      spark.close();
+    }
+  }
+
+  @Test
+  void testExtractSchemaFromDatasetWithNullableEnabled() throws IOException {
+    SparkSession spark = SparkSession
+        .builder()
+        .config(getSparkConfForTest(TestUtilHelpers.class.getName()))
+        .getOrCreate();
+
+    try {
+      JavaSparkContext jsc = 
JavaSparkContext.fromSparkContext(spark.sparkContext());
+
+      // Create test data
+      String testData = "{\"id\": 1, \"name\": \"test\"}";
+      JavaRDD<String> testRdd = 
jsc.parallelize(Collections.singletonList(testData), 1);
+      Dataset<Row> ds = spark.read().json(testRdd);
+
+      // Enable nullable config
+      TypedProperties props = new TypedProperties();
+      
props.setProperty(HoodieStreamerConfig.SCHEMA_MAKE_COLUMNS_NULLABLE.key(), 
"true");
+
+      StructType resultSchema = UtilHelpers.extractSchemaFromDataset(ds, 
props);
+
+      // All fields should be nullable
+      for (StructField field : resultSchema.fields()) {
+        assertTrue(field.nullable(), "Field " + field.name() + " should be 
nullable");
+      }
+    } finally {
+      spark.close();
+    }
+  }
+
+  @Test
+  void testExtractSchemaFromDatasetWithMixedNullability() throws IOException {
+    // Mimics the scenario where an existing table has non-nullable columns 
(e.g., colA, colB, colC)
+    // and the user enables the nullable config. All columns should become 
nullable.
+    SparkSession spark = SparkSession
+        .builder()
+        .config(getSparkConfForTest(TestUtilHelpers.class.getName()))
+        .getOrCreate();
+
+    try {
+      // Create a schema with mixed nullability: colA (nullable), colB 
(non-nullable), colC (non-nullable)
+      StructType mixedSchema = new StructType(new StructField[]{
+          new StructField("colA", DataTypes.StringType, true, 
Metadata.empty()),
+          new StructField("colB", DataTypes.IntegerType, false, 
Metadata.empty()),
+          new StructField("colC", DataTypes.LongType, false, Metadata.empty())
+      });
+
+      Dataset<Row> ds = spark.createDataFrame(
+          Arrays.asList(org.apache.spark.sql.RowFactory.create("val1", 1, 
100L)),
+          mixedSchema);
+
+      // Verify the original schema has mixed nullability
+      assertTrue(ds.schema().fields()[0].nullable(), "colA should be 
nullable");
+      assertFalse(ds.schema().fields()[1].nullable(), "colB should be 
non-nullable");
+      assertFalse(ds.schema().fields()[2].nullable(), "colC should be 
non-nullable");
+
+      // Without the config, schema should remain unchanged
+      TypedProperties propsDisabled = new TypedProperties();
+      StructType resultDisabled = UtilHelpers.extractSchemaFromDataset(ds, 
propsDisabled);
+      assertFalse(resultDisabled.fields()[1].nullable(), "colB should remain 
non-nullable when config is disabled");
+      assertFalse(resultDisabled.fields()[2].nullable(), "colC should remain 
non-nullable when config is disabled");
+
+      // With the config enabled, all columns should become nullable
+      TypedProperties propsEnabled = new TypedProperties();
+      
propsEnabled.setProperty(HoodieStreamerConfig.SCHEMA_MAKE_COLUMNS_NULLABLE.key(),
 "true");
+      StructType resultEnabled = UtilHelpers.extractSchemaFromDataset(ds, 
propsEnabled);
+      for (StructField field : resultEnabled.fields()) {
+        assertTrue(field.nullable(), "Field " + field.name() + " should be 
nullable when config is enabled");
+      }
+    } finally {
+      spark.close();
+    }
+  }
+
+  @Test
+  void testExtractSchemaFromDatasetWithAlternativeKey() throws IOException {
+    SparkSession spark = SparkSession
+        .builder()
+        .config(getSparkConfForTest(TestUtilHelpers.class.getName()))
+        .getOrCreate();
+
+    try {
+      JavaSparkContext jsc = 
JavaSparkContext.fromSparkContext(spark.sparkContext());
+
+      // Create test data
+      String testData = "{\"id\": 1, \"name\": \"test\"}";
+      JavaRDD<String> testRdd = 
jsc.parallelize(Collections.singletonList(testData), 1);
+      Dataset<Row> ds = spark.read().json(testRdd);
+
+      // Use the alternative (deprecated deltastreamer) key
+      TypedProperties props = new TypedProperties();
+      props.setProperty("hoodie.deltastreamer.schema.make.columns.nullable", 
"true");
+
+      StructType resultSchema = UtilHelpers.extractSchemaFromDataset(ds, 
props);
+
+      // All fields should be nullable when using alternative key
+      for (StructField field : resultSchema.fields()) {
+        assertTrue(field.nullable(), "Field " + field.name() + " should be 
nullable with alternative key");
+      }
+    } finally {
+      spark.close();
+    }
+  }
 }

Reply via email to