This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 558281ed430 [MINOR] Disable reader for test with enum (#10061)
558281ed430 is described below

commit 558281ed4303756ad7a00331e1568dbb107f8571
Author: Jon Vexler <jbvex...@gmail.com>
AuthorDate: Fri Nov 10 19:42:45 2023 -0500

    [MINOR] Disable reader for test with enum (#10061)
    
    Co-authored-by: Jonathan Vexler <=>
    Co-authored-by: Y Ethan Guo <ethan.guoyi...@gmail.com>
---
 .../hudi/utilities/sources/HoodieIncrSource.java     | 20 ++++++++++++++++++++
 .../hudi/utilities/sources/TestHoodieIncrSource.java |  3 +++
 2 files changed, 23 insertions(+)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index fa316cf806f..aafd4c9e3b5 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -19,9 +19,11 @@
 package org.apache.hudi.utilities.sources;
 
 import org.apache.hudi.DataSourceReadOptions;
+import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
 import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
+import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
@@ -38,6 +40,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
 
 import static org.apache.hudi.DataSourceReadOptions.BEGIN_INSTANTTIME;
 import static org.apache.hudi.DataSourceReadOptions.END_INSTANTTIME;
@@ -58,6 +63,10 @@ import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHoll
 public class HoodieIncrSource extends RowSource {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieIncrSource.class);
+  public static final Set<String> HOODIE_INCR_SOURCE_READ_OPT_KEYS =
+      CollectionUtils.createImmutableSet(
+          "hoodie.datasource.read.use.new.parquet.file.format",
+          HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key());
   private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter;
 
   public static class Config {
@@ -128,10 +137,19 @@ public class HoodieIncrSource extends RowSource {
         
HoodieIncrSourceConfig.HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE.defaultValue();
   }
 
+  private final Map<String, String> readOpts = new HashMap<>();
+
   public HoodieIncrSource(TypedProperties props, JavaSparkContext 
sparkContext, SparkSession sparkSession,
                           SchemaProvider schemaProvider) {
     super(props, sparkContext, sparkSession, schemaProvider);
 
+    for (Object key : props.keySet()) {
+      String keyString = key.toString();
+      if (HOODIE_INCR_SOURCE_READ_OPT_KEYS.contains(keyString)) {
+        readOpts.put(keyString, props.getString(key.toString()));
+      }
+    }
+
     this.snapshotLoadQuerySplitter = 
Option.ofNullable(props.getString(SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME, 
null))
         .map(className -> (SnapshotLoadQuerySplitter) 
ReflectionUtils.loadClass(className,
             new Class<?>[] {TypedProperties.class}, props));
@@ -181,6 +199,7 @@ public class HoodieIncrSource extends RowSource {
     // Do Incr pull. Set end instant if available
     if (queryInfo.isIncremental()) {
       source = sparkSession.read().format("org.apache.hudi")
+          .options(readOpts)
           .option(QUERY_TYPE().key(), QUERY_TYPE_INCREMENTAL_OPT_VAL())
           .option(BEGIN_INSTANTTIME().key(), queryInfo.getStartInstant())
           .option(END_INSTANTTIME().key(), queryInfo.getEndInstant())
@@ -192,6 +211,7 @@ public class HoodieIncrSource extends RowSource {
     } else {
       // if checkpoint is missing from source table, and if strategy is set to 
READ_UPTO_LATEST_COMMIT, we have to issue snapshot query
       Dataset<Row> snapshot = sparkSession.read().format("org.apache.hudi")
+          .options(readOpts)
           .option(DataSourceReadOptions.QUERY_TYPE().key(), 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL())
           .load(srcPath);
       if (snapshotLoadQuerySplitter.isPresent()) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
index d35041592aa..1b534c22c7e 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.sources;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -332,6 +333,8 @@ public class TestHoodieIncrSource extends 
SparkClientFunctionalTestHarness {
     Properties properties = new Properties();
     properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", 
basePath());
     
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy",
 missingCheckpointStrategy.name());
+    // TODO: [HUDI-7081] get rid of this
+    properties.setProperty(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
"false");
     snapshotCheckPointImplClassOpt.map(className ->
         
properties.setProperty(SnapshotLoadQuerySplitter.Config.SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME,
 className));
     TypedProperties typedProperties = new TypedProperties(properties);

Reply via email to