This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2b56eae4b8c0 feat(utilities): add option to make all schema columns
nullable for backwards compatibility (#17777)
2b56eae4b8c0 is described below
commit 2b56eae4b8c02775a734f17c2af946021c33df6e
Author: Prashant Wason <[email protected]>
AuthorDate: Mon Mar 30 20:02:22 2026 -0700
feat(utilities): add option to make all schema columns nullable for
backwards compatibility (#17777)
This PR adds an option to ensure all columns in the schema are nullable
when using HoodieStreamer with row-based sources like SQLSource or
SQLFileBasedSource.
When new columns are added via SQL queries, the schema must be backwards
compatible. New columns added to a table must be nullable because existing
records don't have values for them. This change provides a configuration option
to automatically make all columns nullable, ensuring smooth schema evolution.
Summary and Changelog:
What users gain: Users can now set
hoodie.streamer.schema.make.columns.nullable=true to automatically make all
columns in the incoming schema nullable, preventing schema compatibility issues
during schema evolution.
Co-Authored-By: Claude Opus 4.6 <[email protected]>
---------
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../org/apache/hudi/utilities/UtilHelpers.java | 14 ++
.../utilities/config/HoodieStreamerConfig.java | 12 ++
.../apache/hudi/utilities/sources/RowSource.java | 5 +-
.../apache/hudi/utilities/streamer/StreamSync.java | 8 +-
.../org/apache/hudi/utilities/TestUtilHelpers.java | 141 +++++++++++++++++++++
5 files changed, 175 insertions(+), 5 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index f0fd604a790c..9d04641f71b4 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -84,6 +84,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.launcher.SparkLauncher;
+import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry;
import org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper;
@@ -117,8 +118,10 @@ import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
+import static
org.apache.hudi.utilities.config.HoodieStreamerConfig.SCHEMA_MAKE_COLUMNS_NULLABLE;
/**
* Bunch of helper methods.
@@ -639,6 +642,17 @@ public class UtilHelpers {
}
}
+ /**
+ * Extract and return the schema to use from a Dataset.
+ */
+ public static StructType extractSchemaFromDataset(Dataset dataset,
TypedProperties props) {
+ StructType originalSchema = dataset.schema();
+
+ // Should we make all columns nullable?
+ final boolean allColsNullable = getBooleanWithAltKeys(props,
SCHEMA_MAKE_COLUMNS_NULLABLE);
+ return allColsNullable ? originalSchema.asNullable() : originalSchema;
+ }
+
@FunctionalInterface
public interface CheckedSupplier<T> {
T get() throws Throwable;
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java
index e50e7fa06124..13040cc5d04c 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java
@@ -139,4 +139,16 @@ public class HoodieStreamerConfig extends HoodieConfig {
.markAdvanced()
.sinceVersion("0.15.0")
.withDocumentation("When enabled, the dataframe generated from reading
source data is wrapped with an exception handler to explicitly surface
exceptions.");
+
+ public static final ConfigProperty<Boolean> SCHEMA_MAKE_COLUMNS_NULLABLE =
ConfigProperty
+ .key(STREAMER_CONFIG_PREFIX + "schema.make.columns.nullable")
+ .defaultValue(false)
+ .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX +
"schema.make.columns.nullable")
+ .markAdvanced()
+ .withDocumentation("When set to true, all columns in the incoming
dataset schema are made nullable. "
+ + "This applies to every column in the incoming batch, not just
newly added ones. "
+ + "Hudi's schema evolution supports widening nullability
(non-nullable to nullable), so enabling this "
+ + "config on an existing table with non-nullable columns is safe -
existing data retains its values "
+ + "while the schema is updated to allow nulls. "
+ + "This is useful for maintaining backwards compatibility when new
columns are added via SQL queries.");
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
index 9f48f0a8ed38..aa71ab94398c 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
@@ -34,6 +34,7 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
import static
org.apache.hudi.utilities.config.HoodieStreamerConfig.ROW_THROW_EXPLICIT_EXCEPTIONS;
@@ -61,8 +62,8 @@ public abstract class RowSource extends Source<Dataset<Row>> {
Pair<Option<Dataset<Row>>, Checkpoint> res =
fetchNextBatch(lastCheckpoint, sourceLimit);
return res.getKey().map(dsr -> {
Dataset<Row> sanitizedRows =
SanitizationUtils.sanitizeColumnNamesForAvro(dsr, props);
- SchemaProvider rowSchemaProvider =
- UtilHelpers.createRowBasedSchemaProvider(sanitizedRows.schema(),
props, sparkContext);
+ StructType datasetSchema =
UtilHelpers.extractSchemaFromDataset(sanitizedRows, props);
+ SchemaProvider rowSchemaProvider =
UtilHelpers.createRowBasedSchemaProvider(datasetSchema, props, sparkContext);
Dataset<Row> wrappedDf =
HoodieSparkUtils.maybeWrapDataFrameWithException(sanitizedRows,
HoodieReadFromSourceException.class.getName(),
"Failed to read from row source",
ConfigUtils.getBooleanWithAltKeys(props, ROW_THROW_EXPLICIT_EXCEPTIONS));
return new InputBatch<>(Option.of(wrappedDf), res.getValue(),
rowSchemaProvider);
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 33433bb30612..600891c85dff 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -127,6 +127,7 @@ import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -734,9 +735,10 @@ public class StreamSync implements Serializable, Closeable
{
} else {
// Deduce proper target (writer's) schema for the input dataset,
reconciling its
// schema w/ the table's one
- HoodieSchema incomingSchema = transformed.map(df ->
-
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(df.schema(),
getRecordQualifiedName(cfg.targetTableName)))
-
.orElseGet(dataAndCheckpoint.getSchemaProvider()::getTargetHoodieSchema);
+ HoodieSchema incomingSchema = transformed.map(df -> {
+ StructType structType = UtilHelpers.extractSchemaFromDataset(df,
props);
+ return
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType,
getRecordQualifiedName(cfg.targetTableName));
+
}).orElseGet(dataAndCheckpoint.getSchemaProvider()::getTargetHoodieSchema);
schemaProvider = getDeducedSchemaProvider(incomingSchema,
dataAndCheckpoint.getSchemaProvider(), metaClient);
if (canUseRowWriter(schemaProvider.getTargetHoodieSchema())) {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java
index f8fb4fd29c2e..86448cfb91f5 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java
@@ -24,20 +24,32 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.hudi.utilities.sources.AvroKafkaSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.SchemaTestProvider;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.HoodieStreamerMetrics;
+import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Test;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf;
+import static
org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
@@ -93,4 +105,133 @@ public class TestUtilHelpers {
// We expect two constructors to complain about this error.
assertEquals(2, e.getSuppressed().length);
}
+
+ @Test
+ void testExtractSchemaFromDatasetWithNullableDisabled() throws IOException {
+ SparkSession spark = SparkSession
+ .builder()
+ .config(getSparkConfForTest(TestUtilHelpers.class.getName()))
+ .getOrCreate();
+
+ try {
+ JavaSparkContext jsc =
JavaSparkContext.fromSparkContext(spark.sparkContext());
+
+ // Create test data with non-nullable fields (integer type defaults to
non-nullable in JSON)
+ String testData = "{\"id\": 1, \"name\": \"test\"}";
+ JavaRDD<String> testRdd =
jsc.parallelize(Collections.singletonList(testData), 1);
+ Dataset<Row> ds = spark.read().json(testRdd);
+
+ // Without the config, should return original schema
+ TypedProperties props = new TypedProperties();
+ StructType resultSchema = UtilHelpers.extractSchemaFromDataset(ds,
props);
+
+ assertEquals(ds.schema(), resultSchema);
+ } finally {
+ spark.close();
+ }
+ }
+
+ @Test
+ void testExtractSchemaFromDatasetWithNullableEnabled() throws IOException {
+ SparkSession spark = SparkSession
+ .builder()
+ .config(getSparkConfForTest(TestUtilHelpers.class.getName()))
+ .getOrCreate();
+
+ try {
+ JavaSparkContext jsc =
JavaSparkContext.fromSparkContext(spark.sparkContext());
+
+ // Create test data
+ String testData = "{\"id\": 1, \"name\": \"test\"}";
+ JavaRDD<String> testRdd =
jsc.parallelize(Collections.singletonList(testData), 1);
+ Dataset<Row> ds = spark.read().json(testRdd);
+
+ // Enable nullable config
+ TypedProperties props = new TypedProperties();
+
props.setProperty(HoodieStreamerConfig.SCHEMA_MAKE_COLUMNS_NULLABLE.key(),
"true");
+
+ StructType resultSchema = UtilHelpers.extractSchemaFromDataset(ds,
props);
+
+ // All fields should be nullable
+ for (StructField field : resultSchema.fields()) {
+ assertTrue(field.nullable(), "Field " + field.name() + " should be
nullable");
+ }
+ } finally {
+ spark.close();
+ }
+ }
+
+ @Test
+ void testExtractSchemaFromDatasetWithMixedNullability() throws IOException {
+ // Mimics the scenario where an existing table has non-nullable columns
(e.g., colA, colB, colC)
+ // and the user enables the nullable config. All columns should become
nullable.
+ SparkSession spark = SparkSession
+ .builder()
+ .config(getSparkConfForTest(TestUtilHelpers.class.getName()))
+ .getOrCreate();
+
+ try {
+ // Create a schema with mixed nullability: colA (nullable), colB
(non-nullable), colC (non-nullable)
+ StructType mixedSchema = new StructType(new StructField[]{
+ new StructField("colA", DataTypes.StringType, true,
Metadata.empty()),
+ new StructField("colB", DataTypes.IntegerType, false,
Metadata.empty()),
+ new StructField("colC", DataTypes.LongType, false, Metadata.empty())
+ });
+
+ Dataset<Row> ds = spark.createDataFrame(
+ Arrays.asList(org.apache.spark.sql.RowFactory.create("val1", 1,
100L)),
+ mixedSchema);
+
+ // Verify the original schema has mixed nullability
+ assertTrue(ds.schema().fields()[0].nullable(), "colA should be
nullable");
+ assertFalse(ds.schema().fields()[1].nullable(), "colB should be
non-nullable");
+ assertFalse(ds.schema().fields()[2].nullable(), "colC should be
non-nullable");
+
+ // Without the config, schema should remain unchanged
+ TypedProperties propsDisabled = new TypedProperties();
+ StructType resultDisabled = UtilHelpers.extractSchemaFromDataset(ds,
propsDisabled);
+ assertFalse(resultDisabled.fields()[1].nullable(), "colB should remain
non-nullable when config is disabled");
+ assertFalse(resultDisabled.fields()[2].nullable(), "colC should remain
non-nullable when config is disabled");
+
+ // With the config enabled, all columns should become nullable
+ TypedProperties propsEnabled = new TypedProperties();
+
propsEnabled.setProperty(HoodieStreamerConfig.SCHEMA_MAKE_COLUMNS_NULLABLE.key(),
"true");
+ StructType resultEnabled = UtilHelpers.extractSchemaFromDataset(ds,
propsEnabled);
+ for (StructField field : resultEnabled.fields()) {
+ assertTrue(field.nullable(), "Field " + field.name() + " should be
nullable when config is enabled");
+ }
+ } finally {
+ spark.close();
+ }
+ }
+
+ @Test
+ void testExtractSchemaFromDatasetWithAlternativeKey() throws IOException {
+ SparkSession spark = SparkSession
+ .builder()
+ .config(getSparkConfForTest(TestUtilHelpers.class.getName()))
+ .getOrCreate();
+
+ try {
+ JavaSparkContext jsc =
JavaSparkContext.fromSparkContext(spark.sparkContext());
+
+ // Create test data
+ String testData = "{\"id\": 1, \"name\": \"test\"}";
+ JavaRDD<String> testRdd =
jsc.parallelize(Collections.singletonList(testData), 1);
+ Dataset<Row> ds = spark.read().json(testRdd);
+
+ // Use the alternative (deprecated deltastreamer) key
+ TypedProperties props = new TypedProperties();
+ props.setProperty("hoodie.deltastreamer.schema.make.columns.nullable",
"true");
+
+ StructType resultSchema = UtilHelpers.extractSchemaFromDataset(ds,
props);
+
+ // All fields should be nullable when using alternative key
+ for (StructField field : resultSchema.fields()) {
+ assertTrue(field.nullable(), "Field " + field.name() + " should be
nullable with alternative key");
+ }
+ } finally {
+ spark.close();
+ }
+ }
}