This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ae9866a16c75 fix: Enable schema merging for incremental and dfs 
sources (#18385)
ae9866a16c75 is described below

commit ae9866a16c759f3426d6f505d86a5e0148e27cdb
Author: Lin Liu <[email protected]>
AuthorDate: Fri May 15 15:08:24 2026 -0700

    fix: Enable schema merging for incremental and dfs sources (#18385)
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../hudi/utilities/config/CloudSourceConfig.java   |  13 +++
 ...FSSourceConfig.java => ORCDFSSourceConfig.java} |  24 ++---
 .../utilities/config/ParquetDFSSourceConfig.java   |  15 ++-
 .../hudi/utilities/sources/ORCDFSSource.java       |   6 +-
 .../hudi/utilities/sources/ParquetDFSSource.java   |   4 +-
 .../helpers/CloudObjectsSelectorCommon.java        |  31 ++++++-
 .../hudi/utilities/sources/TestAvroDFSSource.java  | 103 +++++++++++++++++++++
 .../helpers/TestCloudObjectsSelectorCommon.java    |  62 +++++++++++++
 8 files changed, 239 insertions(+), 19 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
index ec13e502fe31..6ffd5ad5c7ae 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
@@ -181,4 +181,17 @@ public class CloudSourceConfig extends HoodieConfig {
       .markAdvanced()
       .sinceVersion("1.0.0")
       .withDocumentation("Boolean value to allow coalesce alias columns with 
actual columns while reading from source");
+
+  public static final ConfigProperty<Boolean> CLOUD_INCREMENTAL_MERGE_SCHEMA = 
ConfigProperty
+      .key(STREAMER_CONFIG_PREFIX + "source.cloud.data.merge.schema.enable")
+      .defaultValue(true)
+      .markAdvanced()
+      .sinceVersion("1.2.0")
+      .withDocumentation("For Parquet and ORC data files in S3/GCS incremental 
ingestion, merge schemas across all "
+          + "files in each read (Spark mergeSchema). Default true so 
mixed-schema batches during initial "
+          + "ingest/bootstrap produce a valid unified schema. Set false to 
restore prior behavior. "
+          + SPARK_DATASOURCE_OPTIONS.key() + " is applied after this flag and 
can override mergeSchema. "
+          + "Note: the per-read mergeSchema option is honored by Spark's 
native Parquet reader and by Spark's "
+          + "native ORC reader (Spark 3.0+, default ORC impl since Spark 2.4). 
On older runtimes the option is "
+          + "silently ignored.");
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ORCDFSSourceConfig.java
similarity index 62%
copy from 
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java
copy to 
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ORCDFSSourceConfig.java
index a8906c9f70b0..b181d0257331 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ORCDFSSourceConfig.java
@@ -26,24 +26,26 @@ import org.apache.hudi.common.config.HoodieConfig;
 
 import javax.annotation.concurrent.Immutable;
 
-import static 
org.apache.hudi.common.util.ConfigUtils.DELTA_STREAMER_CONFIG_PREFIX;
 import static org.apache.hudi.common.util.ConfigUtils.STREAMER_CONFIG_PREFIX;
 
 /**
- * Parquet DFS Source Configs
+ * ORC DFS Source Configs
  */
 @Immutable
-@ConfigClassProperty(name = "Parquet DFS Source Configs",
+@ConfigClassProperty(name = "ORC DFS Source Configs",
         groupName = ConfigGroups.Names.HUDI_STREAMER,
         subGroupName = ConfigGroups.SubGroupNames.DELTA_STREAMER_SOURCE,
-        description = "Configurations controlling the behavior of Parquet DFS 
source in Hudi Streamer.")
-public class ParquetDFSSourceConfig extends HoodieConfig {
+        description = "Configurations controlling the behavior of ORC DFS 
source in Hudi Streamer.")
+public class ORCDFSSourceConfig extends HoodieConfig {
 
-  public static final ConfigProperty<Boolean> PARQUET_DFS_MERGE_SCHEMA = 
ConfigProperty
-      .key(STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge_schema.enable")
-      .defaultValue(false)
-      .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + 
"source.parquet.dfs.merge_schema.enable")
+  public static final ConfigProperty<Boolean> ORC_DFS_MERGE_SCHEMA = 
ConfigProperty
+      .key(STREAMER_CONFIG_PREFIX + "source.orc.dfs.merge.schema.enable")
+      .defaultValue(true)
       .markAdvanced()
-      .sinceVersion("0.15.0")
-      .withDocumentation("Merge schema across parquet files within a single 
write");
+      .sinceVersion("1.2.0")
+      .withDocumentation("Whether to merge schema across ORC files within a 
single read. "
+          + "Defaults to true: heterogeneous-schema source files (e.g. during 
bootstrap or "
+          + "evolving producers) get a unioned schema instead of silently 
dropping columns "
+          + "that exist only in some files. Requires spark.sql.orc.impl=native 
(default since "
+          + "Spark 2.4); the option is silently ignored under 
spark.sql.orc.impl=hive.");
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java
index a8906c9f70b0..b35842d66282 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java
@@ -40,10 +40,17 @@ import static 
org.apache.hudi.common.util.ConfigUtils.STREAMER_CONFIG_PREFIX;
 public class ParquetDFSSourceConfig extends HoodieConfig {
 
   public static final ConfigProperty<Boolean> PARQUET_DFS_MERGE_SCHEMA = 
ConfigProperty
-      .key(STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge_schema.enable")
-      .defaultValue(false)
-      .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + 
"source.parquet.dfs.merge_schema.enable")
+      .key(STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge.schema.enable")
+      .defaultValue(true)
+      .withAlternatives(
+          // Back-compat aliases for the previous underscore-style keys (since 
0.15.0).
+          STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge_schema.enable",
+          DELTA_STREAMER_CONFIG_PREFIX + 
"source.parquet.dfs.merge_schema.enable")
       .markAdvanced()
       .sinceVersion("0.15.0")
-      .withDocumentation("Merge schema across parquet files within a single 
write");
+      .withDocumentation("Whether to merge schema across parquet files within 
a single read. "
+          + "Defaults to true: heterogeneous-schema source files (e.g. during 
bootstrap or "
+          + "evolving producers) get a unioned schema instead of silently 
dropping columns "
+          + "that exist only in some files. Set to false to restore the 
previous reader "
+          + "behavior (single file's schema wins).");
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ORCDFSSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ORCDFSSource.java
index 46357ee5a35c..ace9a0590bcb 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ORCDFSSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ORCDFSSource.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.config.ORCDFSSourceConfig;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
 
@@ -30,6 +31,8 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 
+import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
+
 /**
  * DFS Source that reads ORC data.
  */
@@ -53,6 +56,7 @@ public class ORCDFSSource extends RowSource {
   }
 
   private Dataset<Row> fromFiles(String pathStr) {
-    return sparkSession.read().orc(pathStr.split(","));
+    boolean mergeSchemaEnabled = getBooleanWithAltKeys(this.props, 
ORCDFSSourceConfig.ORC_DFS_MERGE_SCHEMA);
+    return sparkSession.read().option("mergeSchema", 
mergeSchemaEnabled).orc(pathStr.split(","));
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
index 10ec52f6ef78..3ad5270ab259 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
@@ -56,7 +56,7 @@ public class ParquetDFSSource extends RowSource {
   }
 
   private Dataset<Row> fromFiles(String pathStr) {
-    boolean mergeSchemaOption = getBooleanWithAltKeys(this.props, 
ParquetDFSSourceConfig.PARQUET_DFS_MERGE_SCHEMA);
-    return sparkSession.read().option("mergeSchema", 
mergeSchemaOption).parquet(pathStr.split(","));
+    boolean mergeSchemaEnabled = getBooleanWithAltKeys(this.props, 
ParquetDFSSourceConfig.PARQUET_DFS_MERGE_SCHEMA);
+    return sparkSession.read().option("mergeSchema", 
mergeSchemaEnabled).parquet(pathStr.split(","));
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
index 5cb52dcbaeaa..a136bb5290bf 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
@@ -71,6 +71,7 @@ import static 
org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
 import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty;
 import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
+import static 
org.apache.hudi.utilities.config.CloudSourceConfig.CLOUD_INCREMENTAL_MERGE_SCHEMA;
 import static 
org.apache.hudi.utilities.config.CloudSourceConfig.CLOUD_DATAFILE_EXTENSION;
 import static 
org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE_PATH_PREFIX;
 import static 
org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE_PATH_SUBSTR;
@@ -281,7 +282,7 @@ public class CloudObjectsSelectorCommon {
     if (isNullOrEmpty(cloudObjectMetadata)) {
       return Option.empty();
     }
-    DataFrameReader reader = spark.read().format(fileFormat);
+    DataFrameReader reader = 
applyMergeSchemaOption(spark.read().format(fileFormat), fileFormat);
     String datasourceOpts = getStringWithAltKeys(properties, 
CloudSourceConfig.SPARK_DATASOURCE_OPTIONS, true);
 
     StructType rowSchema = null;
@@ -546,6 +547,34 @@ public class CloudObjectsSelectorCommon {
     return Option.empty();
   }
 
+  /**
+   * Enables Spark {@code mergeSchema} for cloud object batches of Parquet or 
ORC files when configured, so
+   * heterogeneous files in one sync round share a merged struct type. Applied 
before user
+   * {@link CloudSourceConfig#SPARK_DATASOURCE_OPTIONS} so explicit reader 
options can override.
+   *
+   * <p>Spark's native Parquet reader honors {@code mergeSchema} on all 
supported versions. Spark's native ORC
+   * reader honors it on Spark 3.0+ (the native ORC impl is the default since 
Spark 2.4); on older runtimes the
+   * option is silently ignored, which is harmless.
+   */
+  private DataFrameReader applyMergeSchemaOption(DataFrameReader reader, 
String fileFormat) {
+    if (!isParquetOrOrcFileFormat(fileFormat)) {
+      return reader;
+    }
+    if (!getBooleanWithAltKeys(properties, CLOUD_INCREMENTAL_MERGE_SCHEMA)) {
+      return reader;
+    }
+    return reader.option("mergeSchema", "true");
+  }
+
+  // Package-private for unit testing — see TestCloudObjectsSelectorCommon.
+  static boolean isParquetOrOrcFileFormat(String fileFormat) {
+    if (fileFormat == null) {
+      return false;
+    }
+    String f = fileFormat.trim();
+    return "parquet".equalsIgnoreCase(f) || "orc".equalsIgnoreCase(f);
+  }
+
   public enum Type {
     S3,
     GCS
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
index 89d522675ba1..aa15ca18ec6d 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
@@ -20,14 +20,28 @@ package org.apache.hudi.utilities.sources;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
 import org.apache.hudi.utilities.testutils.sources.AbstractDFSSourceTestBase;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.List;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
 /**
  * Basic tests for {@link TestAvroDFSSource}.
  */
@@ -54,4 +68,93 @@ public class TestAvroDFSSource extends 
AbstractDFSSourceTestBase {
   protected void writeNewDataToFile(List<HoodieRecord> records, Path path) 
throws IOException {
     Helpers.saveAvroToDFS(Helpers.toGenericRecords(records), path);
   }
+
+  /**
+   * Regression test: when a single batch contains files with additive schema 
evolution
+   * (one file has the base schema, another has the same fields plus an extra 
field with a
+   * default), reading via {@link AvroDFSSource} configured with the wider 
reader schema must
+   * (a) not fail, (b) return all records from both files, and (c) materialize 
the wider field
+   * as the default for records from the narrow file and as the written value 
for records from
+   * the wider file. Locks in Avro reader/writer schema-resolution behavior.
+   */
+  @Test
+  public void testAdditiveSchemaEvolutionAcrossFiles() throws Exception {
+    // Use a unique subdirectory because basePath is static and shared with
+    // the parent testReadingFromSource, which writes 10000+ records into 
dfsRoot
+    // and would otherwise pollute this test's read.
+    String additiveRoot = basePath + "/avroFilesAdditive";
+    fs.mkdirs(new Path(additiveRoot));
+
+    Schema narrowSchema = HoodieTestDataGenerator.AVRO_SCHEMA;
+    Schema widerSchema = addStringFieldWithDefault(narrowSchema, 
"additive_field", "DEFAULT");
+
+    // File A: narrow writer schema, no additive_field.
+    int narrowCount = 30;
+    List<GenericRecord> narrowRecords = Helpers.toGenericRecords(
+        dataGenerator.generateInserts("000", narrowCount));
+    Path pathA = new Path(additiveRoot, "narrow" + fileSuffix);
+    Helpers.saveAvroToDFS(narrowRecords, pathA, narrowSchema);
+
+    // File B: wider writer schema, additive_field set to a known value.
+    int wideCount = 20;
+    List<GenericRecord> wideRecords = new ArrayList<>();
+    for (GenericRecord narrow : Helpers.toGenericRecords(
+        dataGenerator.generateInserts("001", wideCount))) {
+      GenericRecord wide = new GenericData.Record(widerSchema);
+      for (Schema.Field f : narrowSchema.getFields()) {
+        wide.put(f.name(), narrow.get(f.name()));
+      }
+      wide.put("additive_field", "WRITTEN");
+      wideRecords.add(wide);
+    }
+    Path pathB = new Path(additiveRoot, "wider" + fileSuffix);
+    Helpers.saveAvroToDFS(wideRecords, pathB, widerSchema);
+
+    // Write the wider schema to DFS and point a fresh schema provider at it 
so the source's
+    // reader schema is wider than file A's writer schema.
+    Path widerSchemaFile = new Path(basePath + "/wider-source.avsc");
+    try (OutputStream out = fs.create(widerSchemaFile)) {
+      out.write(widerSchema.toString(true).getBytes(StandardCharsets.UTF_8));
+    }
+    TypedProperties props = new TypedProperties();
+    props.setProperty("hoodie.streamer.source.dfs.root", additiveRoot);
+    props.setProperty("hoodie.streamer.schemaprovider.source.schema.file", 
widerSchemaFile.toString());
+    FilebasedSchemaProvider widerProvider = new FilebasedSchemaProvider(props, 
jsc);
+    AvroDFSSource source = new AvroDFSSource(props, jsc, sparkSession, 
widerProvider);
+
+    SourceFormatAdapter adapter = new SourceFormatAdapter(source);
+    JavaRDD<GenericRecord> fetched =
+        adapter.fetchNewDataInAvroFormat(Option.empty(), 
Long.MAX_VALUE).getBatch().get();
+    List<GenericRecord> read = fetched.collect();
+
+    assertEquals(narrowCount + wideCount, read.size(),
+        "Both narrow and wider files should be read in the same batch");
+
+    long defaulted = read.stream()
+        .filter(r -> "DEFAULT".equals(String.valueOf(r.get("additive_field"))))
+        .count();
+    long preserved = read.stream()
+        .filter(r -> "WRITTEN".equals(String.valueOf(r.get("additive_field"))))
+        .count();
+    assertEquals(narrowCount, defaulted,
+        "Records from the narrow file should get the wider reader schema's 
default for additive_field");
+    assertEquals(wideCount, preserved,
+        "Records from the wider file should preserve the written value of 
additive_field");
+  }
+
+  /**
+   * Returns a copy of {@code base} with one extra optional string field 
appended, defaulting
+   * to {@code defaultValue}. The new field has a non-null default so Avro's 
schema-resolution
+   * can fill it in for records read with this schema but written under {@code 
base}.
+   */
+  private static Schema addStringFieldWithDefault(Schema base, String 
fieldName, String defaultValue) {
+    List<Schema.Field> fields = new ArrayList<>();
+    for (Schema.Field f : base.getFields()) {
+      fields.add(new Schema.Field(f.name(), f.schema(), f.doc(), 
f.defaultVal()));
+    }
+    fields.add(new Schema.Field(fieldName, Schema.create(Schema.Type.STRING), 
null, defaultValue));
+    Schema wider = Schema.createRecord(base.getName(), base.getDoc(), 
base.getNamespace(), false);
+    wider.setFields(fields);
+    return wider;
+  }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
index d25f3082d082..e4d1e42cabef 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
@@ -28,18 +28,27 @@ import 
org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestCloudObjectsSelectorCommon extends 
HoodieSparkClientTestHarness {
 
@@ -200,6 +209,59 @@ public class TestCloudObjectsSelectorCommon extends 
HoodieSparkClientTestHarness
     Assertions.assertEquals(expectedSchema, result.get().schema(), "output 
dataset schema should match source schema");
   }
 
+  @Test
+  void parquetMixedSchemasMergedByDefault(@TempDir Path tempDir) {
+    String p1 = tempDir.resolve("part1").toString();
+    String p2 = tempDir.resolve("part2").toString();
+
+    StructType schema1 = DataTypes.createStructType(Arrays.asList(
+        DataTypes.createStructField("id", DataTypes.IntegerType, true),
+        DataTypes.createStructField("b", DataTypes.StringType, true)));
+    
sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1, 
"x")), schema1)
+        .write().parquet(p1);
+
+    StructType schema2 = DataTypes.createStructType(Arrays.asList(
+        DataTypes.createStructField("id", DataTypes.IntegerType, true),
+        DataTypes.createStructField("c", DataTypes.IntegerType, true)));
+    
sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1, 
99)), schema2)
+        .write().parquet(p2);
+
+    CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new 
CloudObjectsSelectorCommon(new TypedProperties());
+    List<CloudObjectMetadata> input = Arrays.asList(
+        new CloudObjectMetadata(p1, 1L),
+        new CloudObjectMetadata(p2, 1L));
+    Option<Dataset<Row>> result = 
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "parquet", 
Option.empty(), 1);
+    assertTrue(result.isPresent());
+    Dataset<Row> ds = result.get();
+    Assertions.assertEquals(2, ds.count());
+    Set<String> colNames = 
Arrays.stream(ds.schema().fields()).map(StructField::name).collect(Collectors.toSet());
+    assertTrue(colNames.contains("b"));
+    assertTrue(colNames.contains("c"));
+  }
+
+  /**
+   * Verifies that the format-gating predicate for the cloud-incremental 
mergeSchema option recognises
+   * Parquet and ORC and rejects everything else. End-to-end ORC ingestion is 
not exercised here because
+   * {@code hudi-utilities} pulls in {@code orc-core-nohive} while Spark 3.x's 
ORC writer expects the
+   * regular {@code orc-core}; that classpath conflict makes {@code 
sparkSession.write().orc(...)} fail
+   * with {@code NoSuchFieldError: type} in this module's tests. The 
end-to-end behaviour for ORC is
+   * covered by Parquet's tests via the shared helper, plus this predicate 
test for the format dispatch.
+   */
+  @Test
+  void isParquetOrOrcFileFormatRecognisesBothFormats() {
+    assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("parquet"));
+    assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("PARQUET"));
+    assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("orc"));
+    assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("ORC"));
+    assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat(" parquet 
"));
+    assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat(" orc "));
+    assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("json"));
+    assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("csv"));
+    assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("avro"));
+    assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat(""));
+    assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat(null));
+  }
+
   @Test
   public void partitionKeyNotPresentInPath() {
     List<CloudObjectMetadata> input = Collections.singletonList(new 
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json",
 1));

Reply via email to