This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 558281ed430 [MINOR] Disable reader for test with enum (#10061) 558281ed430 is described below commit 558281ed4303756ad7a00331e1568dbb107f8571 Author: Jon Vexler <jbvex...@gmail.com> AuthorDate: Fri Nov 10 19:42:45 2023 -0500 [MINOR] Disable reader for test with enum (#10061) Co-authored-by: Jonathan Vexler <=> Co-authored-by: Y Ethan Guo <ethan.guoyi...@gmail.com> --- .../hudi/utilities/sources/HoodieIncrSource.java | 20 ++++++++++++++++++++ .../hudi/utilities/sources/TestHoodieIncrSource.java | 3 +++ 2 files changed, 23 insertions(+) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index fa316cf806f..aafd4c9e3b5 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -19,9 +19,11 @@ package org.apache.hudi.utilities.sources; import org.apache.hudi.DataSourceReadOptions; +import org.apache.hudi.common.config.HoodieReaderConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.collection.Pair; @@ -38,6 +40,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import static org.apache.hudi.DataSourceReadOptions.BEGIN_INSTANTTIME; import static org.apache.hudi.DataSourceReadOptions.END_INSTANTTIME; @@ -58,6 +63,10 @@ import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHoll public class HoodieIncrSource extends RowSource { private static final Logger LOG = LoggerFactory.getLogger(HoodieIncrSource.class); + public static final Set<String> HOODIE_INCR_SOURCE_READ_OPT_KEYS = + CollectionUtils.createImmutableSet( + "hoodie.datasource.read.use.new.parquet.file.format", + HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key()); private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter; public static class Config { @@ -128,10 +137,19 @@ public class HoodieIncrSource extends RowSource { HoodieIncrSourceConfig.HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE.defaultValue(); } + private final Map<String, String> readOpts = new HashMap<>(); + public HoodieIncrSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) { super(props, sparkContext, sparkSession, schemaProvider); + for (Object key : props.keySet()) { + String keyString = key.toString(); + if (HOODIE_INCR_SOURCE_READ_OPT_KEYS.contains(keyString)) { + readOpts.put(keyString, props.getString(key.toString())); + } + } + this.snapshotLoadQuerySplitter = Option.ofNullable(props.getString(SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME, null)) .map(className -> (SnapshotLoadQuerySplitter) ReflectionUtils.loadClass(className, new Class<?>[] {TypedProperties.class}, props)); @@ -181,6 +199,7 @@ public class HoodieIncrSource extends RowSource { // Do Incr pull. Set end instant if available if (queryInfo.isIncremental()) { source = sparkSession.read().format("org.apache.hudi") + .options(readOpts) .option(QUERY_TYPE().key(), QUERY_TYPE_INCREMENTAL_OPT_VAL()) .option(BEGIN_INSTANTTIME().key(), queryInfo.getStartInstant()) .option(END_INSTANTTIME().key(), queryInfo.getEndInstant()) @@ -192,6 +211,7 @@ public class HoodieIncrSource extends RowSource { } else { // if checkpoint is missing from source table, and if strategy is set to READ_UPTO_LATEST_COMMIT, we have to issue snapshot query Dataset<Row> snapshot = sparkSession.read().format("org.apache.hudi") + .options(readOpts) .option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL()) .load(srcPath); if (snapshotLoadQuerySplitter.isPresent()) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java index d35041592aa..1b534c22c7e 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java @@ -21,6 +21,7 @@ package org.apache.hudi.utilities.sources; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.config.HoodieReaderConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieRecord; @@ -332,6 +333,8 @@ public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness { Properties properties = new Properties(); properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", basePath()); properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy", missingCheckpointStrategy.name()); + // TODO: [HUDI-7081] get rid of this + properties.setProperty(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false"); snapshotCheckPointImplClassOpt.map(className -> properties.setProperty(SnapshotLoadQuerySplitter.Config.SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME, className)); TypedProperties typedProperties = new TypedProperties(properties);