This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit b8dc3a582208cdcd0bc761f1fc45008f5b08929c
Author: lokesh-lingarajan-0310 
<84048984+lokesh-lingarajan-0...@users.noreply.github.com>
AuthorDate: Tue Aug 15 09:47:56 2023 -0700

    Handling empty commits after s3 applyFilter api (#9433)
    
    Handling empty commit and returning current batch's endpoint to handle 
scenarios of customer configuring filters for specific objects in s3 among 
other objects.
    
    Co-authored-by: Lokesh Lingarajan 
<lokeshlingarajan@Lokeshs-MacBook-Pro.local>
---
 .../sources/GcsEventsHoodieIncrSource.java         |  94 +++++++++----------
 .../sources/S3EventsHoodieIncrSource.java          |  14 ++-
 .../sources/helpers/IncrSourceHelper.java          |  13 ++-
 .../sources/TestS3EventsHoodieIncrSource.java      | 104 ++++++++++++++++++++-
 .../sources/helpers/TestIncrSourceHelper.java      |  47 +++++-----
 5 files changed, 183 insertions(+), 89 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
index 5fe5e9bb9ed..6eb9a7fdbf7 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
@@ -24,14 +24,14 @@ import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
 import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint;
 import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
 import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
 import 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy;
-import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
-import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectMetadataFetcher;
 import org.apache.hudi.utilities.sources.helpers.QueryInfo;
 import org.apache.hudi.utilities.sources.helpers.QueryRunner;
+import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectMetadataFetcher;
 
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
@@ -64,44 +64,44 @@ import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getMiss
  * You should set spark.driver.extraClassPath in spark-defaults.conf to
  * look like below WITHOUT THE NEWLINES (or give the equivalent as CLI options 
if in cluster mode):
  * (mysql-connector at the end is only needed if Hive Sync is enabled and 
Mysql is used for Hive Metastore).
-
- 
absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar:
- absolute_path_to/31.1-jre/guava-31.1-jre.jar:
- absolute_path_to/mysql-connector-java-8.0.30.jar
-
- This class can be invoked via spark-submit as follows. There's a bunch of 
optional hive sync flags at the end.
- $ bin/spark-submit \
- --packages com.google.cloud:google-cloud-pubsub:1.120.0 \
- --packages com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.7 \
- --driver-memory 4g \
- --executor-memory 4g \
- --class org.apache.hudi.utilities.streamer.HoodieStreamer \
- absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \
- --source-class org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource \
- --op INSERT \
- --hoodie-conf hoodie.streamer.source.hoodieincr.file.format="parquet" \
- --hoodie-conf hoodie.streamer.source.cloud.data.select.file.extension="jsonl" 
\
- --hoodie-conf hoodie.streamer.source.cloud.data.datafile.format="json" \
- --hoodie-conf 
hoodie.streamer.source.cloud.data.select.relpath.prefix="country" \
- --hoodie-conf hoodie.streamer.source.cloud.data.ignore.relpath.prefix="blah" \
- --hoodie-conf 
hoodie.streamer.source.cloud.data.ignore.relpath.substring="blah" \
- --hoodie-conf hoodie.datasource.write.recordkey.field=id \
- --hoodie-conf hoodie.datasource.write.partitionpath.field= \
- --filter-dupes \
- --hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \
- --hoodie-conf hoodie.combine.before.insert=true \
- --source-ordering-field id \
- --table-type COPY_ON_WRITE \
- --target-base-path file:\/\/\/absolute_path_to/data-gcs \
- --target-table gcs_data \
- --continuous \
- --source-limit 100 \
- --min-sync-interval-seconds 60 \
- --hoodie-conf 
hoodie.streamer.source.hoodieincr.path=file:\/\/\/absolute_path_to/meta-gcs \
- --hoodie-conf 
hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT
 \
- --enable-hive-sync \
- --hoodie-conf hoodie.datasource.hive_sync.database=default \
- --hoodie-conf hoodie.datasource.hive_sync.table=gcs_data
+ * <p>
+ * 
absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar:
+ * absolute_path_to/31.1-jre/guava-31.1-jre.jar:
+ * absolute_path_to/mysql-connector-java-8.0.30.jar
+ * <p>
+ * This class can be invoked via spark-submit as follows. There's a bunch of 
optional hive sync flags at the end.
+ * $ bin/spark-submit \
+ * --packages com.google.cloud:google-cloud-pubsub:1.120.0 \
+ * --packages com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.7 \
+ * --driver-memory 4g \
+ * --executor-memory 4g \
+ * --class org.apache.hudi.utilities.streamer.HoodieStreamer \
+ * absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \
+ * --source-class org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource \
+ * --op INSERT \
+ * --hoodie-conf hoodie.streamer.source.hoodieincr.file.format="parquet" \
+ * --hoodie-conf 
hoodie.streamer.source.cloud.data.select.file.extension="jsonl" \
+ * --hoodie-conf hoodie.streamer.source.cloud.data.datafile.format="json" \
+ * --hoodie-conf 
hoodie.streamer.source.cloud.data.select.relpath.prefix="country" \
+ * --hoodie-conf 
hoodie.streamer.source.cloud.data.ignore.relpath.prefix="blah" \
+ * --hoodie-conf 
hoodie.streamer.source.cloud.data.ignore.relpath.substring="blah" \
+ * --hoodie-conf hoodie.datasource.write.recordkey.field=id \
+ * --hoodie-conf hoodie.datasource.write.partitionpath.field= \
+ * --filter-dupes \
+ * --hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \
+ * --hoodie-conf hoodie.combine.before.insert=true \
+ * --source-ordering-field id \
+ * --table-type COPY_ON_WRITE \
+ * --target-base-path file:\/\/\/absolute_path_to/data-gcs \
+ * --target-table gcs_data \
+ * --continuous \
+ * --source-limit 100 \
+ * --min-sync-interval-seconds 60 \
+ * --hoodie-conf 
hoodie.streamer.source.hoodieincr.path=file:\/\/\/absolute_path_to/meta-gcs \
+ * --hoodie-conf 
hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT
 \
+ * --enable-hive-sync \
+ * --hoodie-conf hoodie.datasource.hive_sync.database=default \
+ * --hoodie-conf hoodie.datasource.hive_sync.table=gcs_data
  */
 public class GcsEventsHoodieIncrSource extends HoodieIncrSource {
 
@@ -169,19 +169,17 @@ public class GcsEventsHoodieIncrSource extends 
HoodieIncrSource {
     }
 
     Dataset<Row> cloudObjectMetadataDF = queryRunner.run(queryInfo);
-    if (cloudObjectMetadataDF.isEmpty()) {
-      LOG.info("Source of file names is empty. Returning empty result and 
endInstant: "
-          + queryInfo.getEndInstant());
-      return Pair.of(Option.empty(), queryInfo.getEndInstant());
-    }
-
     LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based 
on sourceLimit :" + sourceLimit);
-    Pair<CloudObjectIncrCheckpoint, Dataset<Row>> checkPointAndDataset =
+    Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset 
=
         IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
             cloudObjectMetadataDF, sourceLimit, queryInfo, 
cloudObjectIncrCheckpoint);
+    if (!checkPointAndDataset.getRight().isPresent()) {
+      LOG.info("Empty source, returning endpoint:" + 
queryInfo.getEndInstant());
+      return Pair.of(Option.empty(), queryInfo.getEndInstant());
+    }
     LOG.info("Adjusted end checkpoint :" + checkPointAndDataset.getLeft());
 
-    Pair<Option<Dataset<Row>>, String> extractedCheckPointAndDataset = 
extractData(queryInfo, checkPointAndDataset.getRight());
+    Pair<Option<Dataset<Row>>, String> extractedCheckPointAndDataset = 
extractData(queryInfo, checkPointAndDataset.getRight().get());
     return Pair.of(extractedCheckPointAndDataset.getLeft(), 
checkPointAndDataset.getLeft().toString());
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
index 44efdc3ec15..927a8fc3ebb 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
@@ -157,18 +157,16 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
     }
 
     Dataset<Row> source = queryRunner.run(queryInfo);
-    if (source.isEmpty()) {
-      LOG.info("Source of file names is empty. Returning empty result and 
endInstant: "
-          + queryInfo.getEndInstant());
-      return Pair.of(Option.empty(), queryInfo.getEndInstant());
-    }
-
     Dataset<Row> filteredSourceData = applyFilter(source, fileFormat);
 
     LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based 
on sourceLimit :" + sourceLimit);
-    Pair<CloudObjectIncrCheckpoint, Dataset<Row>> checkPointAndDataset =
+    Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset 
=
         IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
             filteredSourceData, sourceLimit, queryInfo, 
cloudObjectIncrCheckpoint);
+    if (!checkPointAndDataset.getRight().isPresent()) {
+      LOG.info("Empty source, returning endpoint:" + 
queryInfo.getEndInstant());
+      return Pair.of(Option.empty(), queryInfo.getEndInstant());
+    }
     LOG.info("Adjusted end checkpoint :" + checkPointAndDataset.getLeft());
 
     String s3FS = getStringWithAltKeys(props, S3_FS_PREFIX, 
true).toLowerCase();
@@ -176,7 +174,7 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
 
     // Create S3 paths
     SerializableConfiguration serializableHadoopConf = new 
SerializableConfiguration(sparkContext.hadoopConfiguration());
-    List<CloudObjectMetadata> cloudObjectMetadata = 
checkPointAndDataset.getRight()
+    List<CloudObjectMetadata> cloudObjectMetadata = 
checkPointAndDataset.getRight().get()
         .select(S3_BUCKET_NAME, S3_OBJECT_KEY, S3_OBJECT_SIZE)
         .distinct()
         .mapPartitions(getCloudObjectMetadataPerPartition(s3Prefix, 
serializableHadoopConf, checkIfFileExists), 
Encoders.kryo(CloudObjectMetadata.class))
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
index 19383933bd9..ceec1851ee9 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
@@ -170,12 +170,11 @@ public class IncrSourceHelper {
    * @param queryInfo   Query Info
    * @return end instants along with filtered rows.
    */
-  public static Pair<CloudObjectIncrCheckpoint, Dataset<Row>> 
filterAndGenerateCheckpointBasedOnSourceLimit(Dataset<Row> sourceData,
-                                                                               
                             long sourceLimit, QueryInfo queryInfo,
-                                                                               
                             CloudObjectIncrCheckpoint 
cloudObjectIncrCheckpoint) {
+  public static Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> 
filterAndGenerateCheckpointBasedOnSourceLimit(Dataset<Row> sourceData,
+                                                                               
                                     long sourceLimit, QueryInfo queryInfo,
+                                                                               
                                     CloudObjectIncrCheckpoint 
cloudObjectIncrCheckpoint) {
     if (sourceData.isEmpty()) {
-      LOG.info("Empty source, returning endpoint:" + 
queryInfo.getEndInstant());
-      return Pair.of(cloudObjectIncrCheckpoint, sourceData);
+      return Pair.of(cloudObjectIncrCheckpoint, Option.empty());
     }
     // Let's persist the dataset to avoid triggering the dag repeatedly
     sourceData.persist(StorageLevel.MEMORY_AND_DISK());
@@ -195,7 +194,7 @@ public class IncrSourceHelper {
       if (orderedDf.isEmpty()) {
         LOG.info("Empty ordered source, returning endpoint:" + 
queryInfo.getEndInstant());
         sourceData.unpersist();
-        return Pair.of(new 
CloudObjectIncrCheckpoint(queryInfo.getEndInstant(), lastCheckpointKey.get()), 
orderedDf);
+        return Pair.of(new 
CloudObjectIncrCheckpoint(queryInfo.getEndInstant(), lastCheckpointKey.get()), 
Option.empty());
       }
     }
 
@@ -219,7 +218,7 @@ public class IncrSourceHelper {
     }
     LOG.info("Processed batch size: " + 
row.get(row.fieldIndex(CUMULATIVE_COLUMN_NAME)) + " bytes");
     sourceData.unpersist();
-    return Pair.of(new CloudObjectIncrCheckpoint(row.getString(0), 
row.getString(1)), collectedRows);
+    return Pair.of(new CloudObjectIncrCheckpoint(row.getString(0), 
row.getString(1)), Option.of(collectedRows));
   }
 
   /**
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
index 8bd345626e7..9ff90678e5f 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
@@ -302,10 +302,101 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 
1000L, "2#path/to/file5.json");
   }
 
-  private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy 
missingCheckpointStrategy,
-                             Option<String> checkpointToPull, long 
sourceLimit, String expectedCheckpoint) {
-    TypedProperties typedProperties = setProps(missingCheckpointStrategy);
+  @Test
+  public void testEmptyDataAfterFilter() throws IOException {
+    String commitTimeForWrites = "2";
+    String commitTimeForReads = "1";
+
+    Pair<String, List<HoodieRecord>> inserts = 
writeS3MetadataRecords(commitTimeForReads);
+    inserts = writeS3MetadataRecords(commitTimeForWrites);
+
+
+    List<Triple<String, Long, String>> filePathSizeAndCommitTime = new 
ArrayList<>();
+    // Add file paths and sizes to the list
+    filePathSizeAndCommitTime.add(Triple.of("path/to/skip1.json", 100L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/skip3.json", 200L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/skip2.json", 150L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/skip5.json", 50L, "2"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/skip4.json", 150L, "2"));
+
+    Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+    when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs);
+    TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
+    
typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix",
 "path/to/skip");
+
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 1000L, "2", 
typedProperties);
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"), 
1000L, "2", typedProperties);
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("2#path/to/skip4.json"), 
1000L, "2", typedProperties);
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("2#path/to/skip5.json"), 
1000L, "2", typedProperties);
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("2"), 1000L, "2", 
typedProperties);
+  }
+
+  @Test
+  public void testFilterAnEntireCommit() throws IOException {
+    String commitTimeForWrites1 = "2";
+    String commitTimeForReads = "1";
+
+    Pair<String, List<HoodieRecord>> inserts = 
writeS3MetadataRecords(commitTimeForReads);
+    inserts = writeS3MetadataRecords(commitTimeForWrites1);
+
 
+    List<Triple<String, Long, String>> filePathSizeAndCommitTime = new 
ArrayList<>();
+    // Add file paths and sizes to the list
+    filePathSizeAndCommitTime.add(Triple.of("path/to/skip1.json", 100L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/skip2.json", 200L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/skip3.json", 150L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/skip4.json", 50L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/skip5.json", 150L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 150L, "2"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 150L, "2"));
+
+    Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+    when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs);
+    when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any()))
+        .thenReturn(Option.empty());
+    TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
+    
typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix",
 "path/to/skip");
+
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 50L, 
"2#path/to/file4.json", typedProperties);
+  }
+
+  @Test
+  public void testFilterAnEntireMiddleCommit() throws IOException {
+    String commitTimeForWrites1 = "2";
+    String commitTimeForWrites2 = "3";
+    String commitTimeForReads = "1";
+
+    Pair<String, List<HoodieRecord>> inserts = 
writeS3MetadataRecords(commitTimeForReads);
+    inserts = writeS3MetadataRecords(commitTimeForWrites1);
+    inserts = writeS3MetadataRecords(commitTimeForWrites2);
+
+
+    List<Triple<String, Long, String>> filePathSizeAndCommitTime = new 
ArrayList<>();
+    // Add file paths and sizes to the list
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 150L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/skip1.json", 50L, "2"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/skip2.json", 150L, "2"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 150L, "3"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 150L, "3"));
+
+    Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+    when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs);
+    when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any()))
+        .thenReturn(Option.empty());
+    TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
+    
typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix",
 "path/to/skip");
+
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"), 
50L, "3#path/to/file4.json", typedProperties);
+  }
+
+  private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy 
missingCheckpointStrategy,
+                             Option<String> checkpointToPull, long 
sourceLimit, String expectedCheckpoint,
+                             TypedProperties typedProperties) {
     S3EventsHoodieIncrSource incrSource = new 
S3EventsHoodieIncrSource(typedProperties, jsc(),
         spark(), mockSchemaProvider, mockQueryRunner, mockCloudDataFetcher);
 
@@ -317,4 +408,11 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
     Assertions.assertNotNull(nextCheckPoint);
     Assertions.assertEquals(expectedCheckpoint, nextCheckPoint);
   }
+
+  private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy 
missingCheckpointStrategy,
+                             Option<String> checkpointToPull, long 
sourceLimit, String expectedCheckpoint) {
+    TypedProperties typedProperties = setProps(missingCheckpointStrategy);
+
+    readAndAssert(missingCheckpointStrategy, checkpointToPull, sourceLimit, 
expectedCheckpoint, typedProperties);
+  }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java
index 3c0b5ee23c8..78020697c2e 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.utilities.sources.helpers;
 
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.common.util.collection.Triple;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
@@ -94,10 +95,10 @@ class TestIncrSourceHelper extends 
SparkClientFunctionalTestHarness {
         QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1",
         "commit2", "_hoodie_commit_time",
         "s3.object.key", "s3.object.size");
-    Pair<CloudObjectIncrCheckpoint, Dataset<Row>> result = 
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+    Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> result = 
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
         emptyDataset, 50L, queryInfo, new CloudObjectIncrCheckpoint(null, 
null));
     assertEquals(INIT_INSTANT_TS, result.getKey().toString());
-    assertEquals(emptyDataset, result.getRight());
+    assertTrue(!result.getRight().isPresent());
   }
 
   @Test
@@ -115,11 +116,11 @@ class TestIncrSourceHelper extends 
SparkClientFunctionalTestHarness {
         QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1",
         "commit2", "_hoodie_commit_time",
         "s3.object.key", "s3.object.size");
-    Pair<CloudObjectIncrCheckpoint, Dataset<Row>> result = 
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+    Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> result = 
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
         inputDs, 50L, queryInfo, new CloudObjectIncrCheckpoint("commit1", 
null));
-    Row row = 
result.getRight().select("cumulativeSize").collectAsList().get((int) 
result.getRight().count() - 1);
+    Row row = 
result.getRight().get().select("cumulativeSize").collectAsList().get((int) 
result.getRight().get().count() - 1);
     assertEquals("commit1#path/to/file1.json", result.getKey().toString());
-    List<Row> rows = result.getRight().collectAsList();
+    List<Row> rows = result.getRight().get().collectAsList();
     assertEquals(1, rows.size());
     assertEquals("[[commit1,[[bucket-1],[path/to/file1.json,100]],100]]", 
rows.toString());
     assertEquals(100L, row.get(0));
@@ -142,20 +143,20 @@ class TestIncrSourceHelper extends 
SparkClientFunctionalTestHarness {
         QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1",
         "commit2", "_hoodie_commit_time",
         "s3.object.key", "s3.object.size");
-    Pair<CloudObjectIncrCheckpoint, Dataset<Row>> result = 
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+    Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> result = 
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
         inputDs, 350L, queryInfo, new CloudObjectIncrCheckpoint("commit1", 
null));
-    Row row = 
result.getRight().select("cumulativeSize").collectAsList().get((int) 
result.getRight().count() - 1);
+    Row row = 
result.getRight().get().select("cumulativeSize").collectAsList().get((int) 
result.getRight().get().count() - 1);
     assertEquals("commit1#path/to/file2.json", result.getKey().toString());
-    List<Row> rows = result.getRight().collectAsList();
+    List<Row> rows = result.getRight().get().collectAsList();
     assertEquals(2, rows.size());
     assertEquals("[[commit1,[[bucket-1],[path/to/file1.json,100]],100], 
[commit1,[[bucket-1],[path/to/file2.json,150]],250]]", rows.toString());
     assertEquals(250L, row.get(0));
 
     result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
         inputDs, 550L, queryInfo, new CloudObjectIncrCheckpoint("commit1", 
null));
-    row = result.getRight().select("cumulativeSize").collectAsList().get((int) 
result.getRight().count() - 1);
+    row = 
result.getRight().get().select("cumulativeSize").collectAsList().get((int) 
result.getRight().get().count() - 1);
     assertEquals("commit2#path/to/file4.json", result.getKey().toString());
-    rows = result.getRight().collectAsList();
+    rows = result.getRight().get().collectAsList();
     assertEquals(4, rows.size());
     assertEquals("[[commit1,[[bucket-1],[path/to/file1.json,100]],100], 
[commit1,[[bucket-1],[path/to/file2.json,150]],250],"
             + " [commit1,[[bucket-1],[path/to/file3.json,200]],450], 
[commit2,[[bucket-1],[path/to/file4.json,50]],500]]",
@@ -181,11 +182,11 @@ class TestIncrSourceHelper extends 
SparkClientFunctionalTestHarness {
         QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1",
         "commit2", "_hoodie_commit_time",
         "s3.object.key", "s3.object.size");
-    Pair<CloudObjectIncrCheckpoint, Dataset<Row>> result = 
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+    Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> result = 
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
         inputDs, 1500L, queryInfo, new CloudObjectIncrCheckpoint("commit1", 
null));
-    Row row = 
result.getRight().select("cumulativeSize").collectAsList().get((int) 
result.getRight().count() - 1);
+    Row row = 
result.getRight().get().select("cumulativeSize").collectAsList().get((int) 
result.getRight().get().count() - 1);
     assertEquals("commit3#path/to/file8.json", result.getKey().toString());
-    List<Row> rows = result.getRight().collectAsList();
+    List<Row> rows = result.getRight().get().collectAsList();
     assertEquals(8, rows.size());
     assertEquals(1050L, row.get(0));
   }
@@ -206,19 +207,19 @@ class TestIncrSourceHelper extends 
SparkClientFunctionalTestHarness {
         QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit3", "commit3",
         "commit4", "_hoodie_commit_time",
         "s3.object.key", "s3.object.size");
-    Pair<CloudObjectIncrCheckpoint, Dataset<Row>> result = 
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
-        inputDs, 50L, queryInfo, new 
CloudObjectIncrCheckpoint("commit3","path/to/file8.json"));
-    Row row = 
result.getRight().select("cumulativeSize").collectAsList().get((int) 
result.getRight().count() - 1);
+    Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> result = 
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+        inputDs, 50L, queryInfo, new CloudObjectIncrCheckpoint("commit3", 
"path/to/file8.json"));
+    Row row = 
result.getRight().get().select("cumulativeSize").collectAsList().get((int) 
result.getRight().get().count() - 1);
     assertEquals("commit4#path/to/file0.json", result.getKey().toString());
-    List<Row> rows = result.getRight().collectAsList();
+    List<Row> rows = result.getRight().get().collectAsList();
     assertEquals(1, rows.size());
     assertEquals(100L, row.get(0));
 
     result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
-        inputDs, 350L, queryInfo, new 
CloudObjectIncrCheckpoint("commit3","path/to/file8.json"));
-    row = result.getRight().select("cumulativeSize").collectAsList().get((int) 
result.getRight().count() - 1);
+        inputDs, 350L, queryInfo, new CloudObjectIncrCheckpoint("commit3", 
"path/to/file8.json"));
+    row = 
result.getRight().get().select("cumulativeSize").collectAsList().get((int) 
result.getRight().get().count() - 1);
     assertEquals("commit4#path/to/file2.json", result.getKey().toString());
-    rows = result.getRight().collectAsList();
+    rows = result.getRight().get().collectAsList();
     assertEquals(3, rows.size());
     assertEquals(200L, row.get(0));
   }
@@ -241,9 +242,9 @@ class TestIncrSourceHelper extends 
SparkClientFunctionalTestHarness {
         QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1",
         "commit3", "_hoodie_commit_time",
         "s3.object.key", "s3.object.size");
-    Pair<CloudObjectIncrCheckpoint, Dataset<Row>> result = 
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
-        inputDs, 1500L, queryInfo, new 
CloudObjectIncrCheckpoint("commit3","path/to/file8.json"));
+    Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> result = 
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+        inputDs, 1500L, queryInfo, new CloudObjectIncrCheckpoint("commit3", 
"path/to/file8.json"));
     assertEquals("commit3#path/to/file8.json", result.getKey().toString());
-    assertTrue(result.getRight().isEmpty());
+    assertTrue(!result.getRight().isPresent());
   }
 }
\ No newline at end of file

Reply via email to