This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ae9866a16c75 fix: Enable schema merging for incremental and dfs
sources (#18385)
ae9866a16c75 is described below
commit ae9866a16c759f3426d6f505d86a5e0148e27cdb
Author: Lin Liu <[email protected]>
AuthorDate: Fri May 15 15:08:24 2026 -0700
fix: Enable schema merging for incremental and dfs sources (#18385)
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../hudi/utilities/config/CloudSourceConfig.java | 13 +++
...FSSourceConfig.java => ORCDFSSourceConfig.java} | 24 ++---
.../utilities/config/ParquetDFSSourceConfig.java | 15 ++-
.../hudi/utilities/sources/ORCDFSSource.java | 6 +-
.../hudi/utilities/sources/ParquetDFSSource.java | 4 +-
.../helpers/CloudObjectsSelectorCommon.java | 31 ++++++-
.../hudi/utilities/sources/TestAvroDFSSource.java | 103 +++++++++++++++++++++
.../helpers/TestCloudObjectsSelectorCommon.java | 62 +++++++++++++
8 files changed, 239 insertions(+), 19 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
index ec13e502fe31..6ffd5ad5c7ae 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
@@ -181,4 +181,17 @@ public class CloudSourceConfig extends HoodieConfig {
.markAdvanced()
.sinceVersion("1.0.0")
.withDocumentation("Boolean value to allow coalesce alias columns with
actual columns while reading from source");
+
+ public static final ConfigProperty<Boolean> CLOUD_INCREMENTAL_MERGE_SCHEMA =
ConfigProperty
+ .key(STREAMER_CONFIG_PREFIX + "source.cloud.data.merge.schema.enable")
+ .defaultValue(true)
+ .markAdvanced()
+ .sinceVersion("1.2.0")
+ .withDocumentation("For Parquet and ORC data files in S3/GCS incremental
ingestion, merge schemas across all "
+ + "files in each read (Spark mergeSchema). Default true so
mixed-schema batches during initial "
+ + "ingest/bootstrap produce a valid unified schema. Set false to
restore prior behavior. "
+ + SPARK_DATASOURCE_OPTIONS.key() + " is applied after this flag and
can override mergeSchema. "
+ + "Note: the per-read mergeSchema option is honored by Spark's
native Parquet reader and by Spark's "
+ + "native ORC reader (Spark 3.0+, default ORC impl since Spark 2.4).
On older runtimes the option is "
+ + "silently ignored.");
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ORCDFSSourceConfig.java
similarity index 62%
copy from
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java
copy to
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ORCDFSSourceConfig.java
index a8906c9f70b0..b181d0257331 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ORCDFSSourceConfig.java
@@ -26,24 +26,26 @@ import org.apache.hudi.common.config.HoodieConfig;
import javax.annotation.concurrent.Immutable;
-import static
org.apache.hudi.common.util.ConfigUtils.DELTA_STREAMER_CONFIG_PREFIX;
import static org.apache.hudi.common.util.ConfigUtils.STREAMER_CONFIG_PREFIX;
/**
- * Parquet DFS Source Configs
+ * ORC DFS Source Configs
*/
@Immutable
-@ConfigClassProperty(name = "Parquet DFS Source Configs",
+@ConfigClassProperty(name = "ORC DFS Source Configs",
groupName = ConfigGroups.Names.HUDI_STREAMER,
subGroupName = ConfigGroups.SubGroupNames.DELTA_STREAMER_SOURCE,
- description = "Configurations controlling the behavior of Parquet DFS
source in Hudi Streamer.")
-public class ParquetDFSSourceConfig extends HoodieConfig {
+ description = "Configurations controlling the behavior of ORC DFS
source in Hudi Streamer.")
+public class ORCDFSSourceConfig extends HoodieConfig {
- public static final ConfigProperty<Boolean> PARQUET_DFS_MERGE_SCHEMA =
ConfigProperty
- .key(STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge_schema.enable")
- .defaultValue(false)
- .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX +
"source.parquet.dfs.merge_schema.enable")
+ public static final ConfigProperty<Boolean> ORC_DFS_MERGE_SCHEMA =
ConfigProperty
+ .key(STREAMER_CONFIG_PREFIX + "source.orc.dfs.merge.schema.enable")
+ .defaultValue(true)
.markAdvanced()
- .sinceVersion("0.15.0")
- .withDocumentation("Merge schema across parquet files within a single
write");
+ .sinceVersion("1.2.0")
+ .withDocumentation("Whether to merge schema across ORC files within a
single read. "
+ + "Defaults to true: heterogeneous-schema source files (e.g. during
bootstrap or "
+ + "evolving producers) get a unioned schema instead of silently
dropping columns "
+ + "that exist only in some files. Requires spark.sql.orc.impl=native
(default since "
+ + "Spark 2.4); the option is silently ignored under
spark.sql.orc.impl=hive.");
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java
index a8906c9f70b0..b35842d66282 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java
@@ -40,10 +40,17 @@ import static
org.apache.hudi.common.util.ConfigUtils.STREAMER_CONFIG_PREFIX;
public class ParquetDFSSourceConfig extends HoodieConfig {
public static final ConfigProperty<Boolean> PARQUET_DFS_MERGE_SCHEMA =
ConfigProperty
- .key(STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge_schema.enable")
- .defaultValue(false)
- .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX +
"source.parquet.dfs.merge_schema.enable")
+ .key(STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge.schema.enable")
+ .defaultValue(true)
+ .withAlternatives(
+ // Back-compat aliases for the previous underscore-style keys (since
0.15.0).
+ STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge_schema.enable",
+ DELTA_STREAMER_CONFIG_PREFIX +
"source.parquet.dfs.merge_schema.enable")
.markAdvanced()
.sinceVersion("0.15.0")
- .withDocumentation("Merge schema across parquet files within a single
write");
+ .withDocumentation("Whether to merge schema across parquet files within
a single read. "
+ + "Defaults to true: heterogeneous-schema source files (e.g. during
bootstrap or "
+ + "evolving producers) get a unioned schema instead of silently
dropping columns "
+ + "that exist only in some files. Set to false to restore the
previous reader "
+ + "behavior (single file's schema wins).");
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ORCDFSSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ORCDFSSource.java
index 46357ee5a35c..ace9a0590bcb 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ORCDFSSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ORCDFSSource.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.config.ORCDFSSourceConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
@@ -30,6 +31,8 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
+
/**
* DFS Source that reads ORC data.
*/
@@ -53,6 +56,7 @@ public class ORCDFSSource extends RowSource {
}
private Dataset<Row> fromFiles(String pathStr) {
- return sparkSession.read().orc(pathStr.split(","));
+ boolean mergeSchemaEnabled = getBooleanWithAltKeys(this.props,
ORCDFSSourceConfig.ORC_DFS_MERGE_SCHEMA);
+ return sparkSession.read().option("mergeSchema",
mergeSchemaEnabled).orc(pathStr.split(","));
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
index 10ec52f6ef78..3ad5270ab259 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
@@ -56,7 +56,7 @@ public class ParquetDFSSource extends RowSource {
}
private Dataset<Row> fromFiles(String pathStr) {
- boolean mergeSchemaOption = getBooleanWithAltKeys(this.props,
ParquetDFSSourceConfig.PARQUET_DFS_MERGE_SCHEMA);
- return sparkSession.read().option("mergeSchema",
mergeSchemaOption).parquet(pathStr.split(","));
+ boolean mergeSchemaEnabled = getBooleanWithAltKeys(this.props,
ParquetDFSSourceConfig.PARQUET_DFS_MERGE_SCHEMA);
+ return sparkSession.read().option("mergeSchema",
mergeSchemaEnabled).parquet(pathStr.split(","));
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
index 5cb52dcbaeaa..a136bb5290bf 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
@@ -71,6 +71,7 @@ import static
org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty;
import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
+import static
org.apache.hudi.utilities.config.CloudSourceConfig.CLOUD_INCREMENTAL_MERGE_SCHEMA;
import static
org.apache.hudi.utilities.config.CloudSourceConfig.CLOUD_DATAFILE_EXTENSION;
import static
org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE_PATH_PREFIX;
import static
org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE_PATH_SUBSTR;
@@ -281,7 +282,7 @@ public class CloudObjectsSelectorCommon {
if (isNullOrEmpty(cloudObjectMetadata)) {
return Option.empty();
}
- DataFrameReader reader = spark.read().format(fileFormat);
+ DataFrameReader reader =
applyMergeSchemaOption(spark.read().format(fileFormat), fileFormat);
String datasourceOpts = getStringWithAltKeys(properties,
CloudSourceConfig.SPARK_DATASOURCE_OPTIONS, true);
StructType rowSchema = null;
@@ -546,6 +547,34 @@ public class CloudObjectsSelectorCommon {
return Option.empty();
}
+ /**
+ * Enables Spark {@code mergeSchema} for cloud object batches of Parquet or
ORC files when configured, so
+ * heterogeneous files in one sync round share a merged struct type. Applied
before user
+ * {@link CloudSourceConfig#SPARK_DATASOURCE_OPTIONS} so explicit reader
options can override.
+ *
+ * <p>Spark's native Parquet reader honors {@code mergeSchema} on all
supported versions. Spark's native ORC
+ * reader honors it on Spark 3.0+ (the native ORC impl is the default since
Spark 2.4); on older runtimes the
+ * option is silently ignored, which is harmless.
+ */
+ private DataFrameReader applyMergeSchemaOption(DataFrameReader reader,
String fileFormat) {
+ if (!isParquetOrOrcFileFormat(fileFormat)) {
+ return reader;
+ }
+ if (!getBooleanWithAltKeys(properties, CLOUD_INCREMENTAL_MERGE_SCHEMA)) {
+ return reader;
+ }
+ return reader.option("mergeSchema", "true");
+ }
+
+ // Package-private for unit testing — see TestCloudObjectsSelectorCommon.
+ static boolean isParquetOrOrcFileFormat(String fileFormat) {
+ if (fileFormat == null) {
+ return false;
+ }
+ String f = fileFormat.trim();
+ return "parquet".equalsIgnoreCase(f) || "orc".equalsIgnoreCase(f);
+ }
+
public enum Type {
S3,
GCS
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
index 89d522675ba1..aa15ca18ec6d 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
@@ -20,14 +20,28 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.testutils.sources.AbstractDFSSourceTestBase;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.List;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
/**
* Basic tests for {@link TestAvroDFSSource}.
*/
@@ -54,4 +68,93 @@ public class TestAvroDFSSource extends
AbstractDFSSourceTestBase {
protected void writeNewDataToFile(List<HoodieRecord> records, Path path)
throws IOException {
Helpers.saveAvroToDFS(Helpers.toGenericRecords(records), path);
}
+
+ /**
+ * Regression test: when a single batch contains files with additive schema
evolution
+ * (one file has the base schema, another has the same fields plus an extra
field with a
+ * default), reading via {@link AvroDFSSource} configured with the wider
reader schema must
+ * (a) not fail, (b) return all records from both files, and (c) materialize
the wider field
+ * as the default for records from the narrow file and as the written value
for records from
+ * the wider file. Locks in Avro reader/writer schema-resolution behavior.
+ */
+ @Test
+ public void testAdditiveSchemaEvolutionAcrossFiles() throws Exception {
+ // Use a unique subdirectory because basePath is static and shared with
+ // the parent testReadingFromSource, which writes 10000+ records into
dfsRoot
+ // and would otherwise pollute this test's read.
+ String additiveRoot = basePath + "/avroFilesAdditive";
+ fs.mkdirs(new Path(additiveRoot));
+
+ Schema narrowSchema = HoodieTestDataGenerator.AVRO_SCHEMA;
+ Schema widerSchema = addStringFieldWithDefault(narrowSchema,
"additive_field", "DEFAULT");
+
+ // File A: narrow writer schema, no additive_field.
+ int narrowCount = 30;
+ List<GenericRecord> narrowRecords = Helpers.toGenericRecords(
+ dataGenerator.generateInserts("000", narrowCount));
+ Path pathA = new Path(additiveRoot, "narrow" + fileSuffix);
+ Helpers.saveAvroToDFS(narrowRecords, pathA, narrowSchema);
+
+ // File B: wider writer schema, additive_field set to a known value.
+ int wideCount = 20;
+ List<GenericRecord> wideRecords = new ArrayList<>();
+ for (GenericRecord narrow : Helpers.toGenericRecords(
+ dataGenerator.generateInserts("001", wideCount))) {
+ GenericRecord wide = new GenericData.Record(widerSchema);
+ for (Schema.Field f : narrowSchema.getFields()) {
+ wide.put(f.name(), narrow.get(f.name()));
+ }
+ wide.put("additive_field", "WRITTEN");
+ wideRecords.add(wide);
+ }
+ Path pathB = new Path(additiveRoot, "wider" + fileSuffix);
+ Helpers.saveAvroToDFS(wideRecords, pathB, widerSchema);
+
+ // Write the wider schema to DFS and point a fresh schema provider at it
so the source's
+ // reader schema is wider than file A's writer schema.
+ Path widerSchemaFile = new Path(basePath + "/wider-source.avsc");
+ try (OutputStream out = fs.create(widerSchemaFile)) {
+ out.write(widerSchema.toString(true).getBytes(StandardCharsets.UTF_8));
+ }
+ TypedProperties props = new TypedProperties();
+ props.setProperty("hoodie.streamer.source.dfs.root", additiveRoot);
+ props.setProperty("hoodie.streamer.schemaprovider.source.schema.file",
widerSchemaFile.toString());
+ FilebasedSchemaProvider widerProvider = new FilebasedSchemaProvider(props,
jsc);
+ AvroDFSSource source = new AvroDFSSource(props, jsc, sparkSession,
widerProvider);
+
+ SourceFormatAdapter adapter = new SourceFormatAdapter(source);
+ JavaRDD<GenericRecord> fetched =
+ adapter.fetchNewDataInAvroFormat(Option.empty(),
Long.MAX_VALUE).getBatch().get();
+ List<GenericRecord> read = fetched.collect();
+
+ assertEquals(narrowCount + wideCount, read.size(),
+ "Both narrow and wider files should be read in the same batch");
+
+ long defaulted = read.stream()
+ .filter(r -> "DEFAULT".equals(String.valueOf(r.get("additive_field"))))
+ .count();
+ long preserved = read.stream()
+ .filter(r -> "WRITTEN".equals(String.valueOf(r.get("additive_field"))))
+ .count();
+ assertEquals(narrowCount, defaulted,
+ "Records from the narrow file should get the wider reader schema's
default for additive_field");
+ assertEquals(wideCount, preserved,
+ "Records from the wider file should preserve the written value of
additive_field");
+ }
+
+ /**
+ * Returns a copy of {@code base} with one extra optional string field
appended, defaulting
+ * to {@code defaultValue}. The new field has a non-null default so Avro's
schema-resolution
+ * can fill it in for records read with this schema but written under {@code
base}.
+ */
+ private static Schema addStringFieldWithDefault(Schema base, String
fieldName, String defaultValue) {
+ List<Schema.Field> fields = new ArrayList<>();
+ for (Schema.Field f : base.getFields()) {
+ fields.add(new Schema.Field(f.name(), f.schema(), f.doc(),
f.defaultVal()));
+ }
+ fields.add(new Schema.Field(fieldName, Schema.create(Schema.Type.STRING),
null, defaultValue));
+ Schema wider = Schema.createRecord(base.getName(), base.getDoc(),
base.getNamespace(), false);
+ wider.setFields(fields);
+ return wider;
+ }
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
index d25f3082d082..e4d1e42cabef 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
@@ -28,18 +28,27 @@ import
org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import java.io.FileInputStream;
import java.io.IOException;
+import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestCloudObjectsSelectorCommon extends
HoodieSparkClientTestHarness {
@@ -200,6 +209,59 @@ public class TestCloudObjectsSelectorCommon extends
HoodieSparkClientTestHarness
Assertions.assertEquals(expectedSchema, result.get().schema(), "output
dataset schema should match source schema");
}
+ @Test
+ void parquetMixedSchemasMergedByDefault(@TempDir Path tempDir) {
+ String p1 = tempDir.resolve("part1").toString();
+ String p2 = tempDir.resolve("part2").toString();
+
+ StructType schema1 = DataTypes.createStructType(Arrays.asList(
+ DataTypes.createStructField("id", DataTypes.IntegerType, true),
+ DataTypes.createStructField("b", DataTypes.StringType, true)));
+
sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1,
"x")), schema1)
+ .write().parquet(p1);
+
+ StructType schema2 = DataTypes.createStructType(Arrays.asList(
+ DataTypes.createStructField("id", DataTypes.IntegerType, true),
+ DataTypes.createStructField("c", DataTypes.IntegerType, true)));
+
sparkSession.createDataFrame(Collections.singletonList(RowFactory.create(1,
99)), schema2)
+ .write().parquet(p2);
+
+ CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new
CloudObjectsSelectorCommon(new TypedProperties());
+ List<CloudObjectMetadata> input = Arrays.asList(
+ new CloudObjectMetadata(p1, 1L),
+ new CloudObjectMetadata(p2, 1L));
+ Option<Dataset<Row>> result =
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "parquet",
Option.empty(), 1);
+ assertTrue(result.isPresent());
+ Dataset<Row> ds = result.get();
+ Assertions.assertEquals(2, ds.count());
+ Set<String> colNames =
Arrays.stream(ds.schema().fields()).map(StructField::name).collect(Collectors.toSet());
+ assertTrue(colNames.contains("b"));
+ assertTrue(colNames.contains("c"));
+ }
+
+ /**
+ * Verifies that the format-gating predicate for the cloud-incremental
mergeSchema option recognises
+ * Parquet and ORC and rejects everything else. End-to-end ORC ingestion is
not exercised here because
+ * {@code hudi-utilities} pulls in {@code orc-core-nohive} while Spark 3.x's
ORC writer expects the
+ * regular {@code orc-core}; that classpath conflict makes {@code
sparkSession.write().orc(...)} fail
+ * with {@code NoSuchFieldError: type} in this module's tests. The
end-to-end behaviour for ORC is
+ * covered by Parquet's tests via the shared helper, plus this predicate
test for the format dispatch.
+ */
+ @Test
+ void isParquetOrOrcFileFormatRecognisesBothFormats() {
+ assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("parquet"));
+ assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("PARQUET"));
+ assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("orc"));
+ assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("ORC"));
+ assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat(" parquet
"));
+ assertTrue(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat(" orc "));
+ assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("json"));
+ assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("csv"));
+ assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat("avro"));
+ assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat(""));
+ assertFalse(CloudObjectsSelectorCommon.isParquetOrOrcFileFormat(null));
+ }
+
@Test
public void partitionKeyNotPresentInPath() {
List<CloudObjectMetadata> input = Collections.singletonList(new
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json",
1));