This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit b8dc3a582208cdcd0bc761f1fc45008f5b08929c Author: lokesh-lingarajan-0310 <84048984+lokesh-lingarajan-0...@users.noreply.github.com> AuthorDate: Tue Aug 15 09:47:56 2023 -0700 Handling empty commits after s3 applyFilter api (#9433) Handling empty commit and returning current batch's endpoint to handle scenarios of customer configuring filters for specific objects in s3 among other objects. Co-authored-by: Lokesh Lingarajan <lokeshlingarajan@Lokeshs-MacBook-Pro.local> --- .../sources/GcsEventsHoodieIncrSource.java | 94 +++++++++---------- .../sources/S3EventsHoodieIncrSource.java | 14 ++- .../sources/helpers/IncrSourceHelper.java | 13 ++- .../sources/TestS3EventsHoodieIncrSource.java | 104 ++++++++++++++++++++- .../sources/helpers/TestIncrSourceHelper.java | 47 +++++----- 5 files changed, 183 insertions(+), 89 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java index 5fe5e9bb9ed..6eb9a7fdbf7 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java @@ -24,14 +24,14 @@ import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint; import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy; -import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; -import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectMetadataFetcher; import org.apache.hudi.utilities.sources.helpers.QueryInfo; import org.apache.hudi.utilities.sources.helpers.QueryRunner; +import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectMetadataFetcher; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; @@ -64,44 +64,44 @@ import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getMiss * You should set spark.driver.extraClassPath in spark-defaults.conf to * look like below WITHOUT THE NEWLINES (or give the equivalent as CLI options if in cluster mode): * (mysql-connector at the end is only needed if Hive Sync is enabled and Mysql is used for Hive Metastore). - - absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar: - absolute_path_to/31.1-jre/guava-31.1-jre.jar: - absolute_path_to/mysql-connector-java-8.0.30.jar - - This class can be invoked via spark-submit as follows. There's a bunch of optional hive sync flags at the end. - $ bin/spark-submit \ - --packages com.google.cloud:google-cloud-pubsub:1.120.0 \ - --packages com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.7 \ - --driver-memory 4g \ - --executor-memory 4g \ - --class org.apache.hudi.utilities.streamer.HoodieStreamer \ - absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \ - --source-class org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource \ - --op INSERT \ - --hoodie-conf hoodie.streamer.source.hoodieincr.file.format="parquet" \ - --hoodie-conf hoodie.streamer.source.cloud.data.select.file.extension="jsonl" \ - --hoodie-conf hoodie.streamer.source.cloud.data.datafile.format="json" \ - --hoodie-conf hoodie.streamer.source.cloud.data.select.relpath.prefix="country" \ - --hoodie-conf hoodie.streamer.source.cloud.data.ignore.relpath.prefix="blah" \ - --hoodie-conf hoodie.streamer.source.cloud.data.ignore.relpath.substring="blah" \ - --hoodie-conf hoodie.datasource.write.recordkey.field=id \ - --hoodie-conf hoodie.datasource.write.partitionpath.field= \ - --filter-dupes \ - --hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \ - --hoodie-conf hoodie.combine.before.insert=true \ - --source-ordering-field id \ - --table-type COPY_ON_WRITE \ - --target-base-path file:\/\/\/absolute_path_to/data-gcs \ - --target-table gcs_data \ - --continuous \ - --source-limit 100 \ - --min-sync-interval-seconds 60 \ - --hoodie-conf hoodie.streamer.source.hoodieincr.path=file:\/\/\/absolute_path_to/meta-gcs \ - --hoodie-conf hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT \ - --enable-hive-sync \ - --hoodie-conf hoodie.datasource.hive_sync.database=default \ - --hoodie-conf hoodie.datasource.hive_sync.table=gcs_data + * <p> + * absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar: + * absolute_path_to/31.1-jre/guava-31.1-jre.jar: + * absolute_path_to/mysql-connector-java-8.0.30.jar + * <p> + * This class can be invoked via spark-submit as follows. There's a bunch of optional hive sync flags at the end. + * $ bin/spark-submit \ + * --packages com.google.cloud:google-cloud-pubsub:1.120.0 \ + * --packages com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.7 \ + * --driver-memory 4g \ + * --executor-memory 4g \ + * --class org.apache.hudi.utilities.streamer.HoodieStreamer \ + * absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \ + * --source-class org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource \ + * --op INSERT \ + * --hoodie-conf hoodie.streamer.source.hoodieincr.file.format="parquet" \ + * --hoodie-conf hoodie.streamer.source.cloud.data.select.file.extension="jsonl" \ + * --hoodie-conf hoodie.streamer.source.cloud.data.datafile.format="json" \ + * --hoodie-conf hoodie.streamer.source.cloud.data.select.relpath.prefix="country" \ + * --hoodie-conf hoodie.streamer.source.cloud.data.ignore.relpath.prefix="blah" \ + * --hoodie-conf hoodie.streamer.source.cloud.data.ignore.relpath.substring="blah" \ + * --hoodie-conf hoodie.datasource.write.recordkey.field=id \ + * --hoodie-conf hoodie.datasource.write.partitionpath.field= \ + * --filter-dupes \ + * --hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \ + * --hoodie-conf hoodie.combine.before.insert=true \ + * --source-ordering-field id \ + * --table-type COPY_ON_WRITE \ + * --target-base-path file:\/\/\/absolute_path_to/data-gcs \ + * --target-table gcs_data \ + * --continuous \ + * --source-limit 100 \ + * --min-sync-interval-seconds 60 \ + * --hoodie-conf hoodie.streamer.source.hoodieincr.path=file:\/\/\/absolute_path_to/meta-gcs \ + * --hoodie-conf hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT \ + * --enable-hive-sync \ + * --hoodie-conf hoodie.datasource.hive_sync.database=default \ + * --hoodie-conf hoodie.datasource.hive_sync.table=gcs_data */ public class GcsEventsHoodieIncrSource extends HoodieIncrSource { @@ -169,19 +169,17 @@ public class GcsEventsHoodieIncrSource extends HoodieIncrSource { } Dataset<Row> cloudObjectMetadataDF = queryRunner.run(queryInfo); - if (cloudObjectMetadataDF.isEmpty()) { - LOG.info("Source of file names is empty. Returning empty result and endInstant: " - + queryInfo.getEndInstant()); - return Pair.of(Option.empty(), queryInfo.getEndInstant()); - } - LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based on sourceLimit :" + sourceLimit); - Pair<CloudObjectIncrCheckpoint, Dataset<Row>> checkPointAndDataset = + Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( cloudObjectMetadataDF, sourceLimit, queryInfo, cloudObjectIncrCheckpoint); + if (!checkPointAndDataset.getRight().isPresent()) { + LOG.info("Empty source, returning endpoint:" + queryInfo.getEndInstant()); + return Pair.of(Option.empty(), queryInfo.getEndInstant()); + } LOG.info("Adjusted end checkpoint :" + checkPointAndDataset.getLeft()); - Pair<Option<Dataset<Row>>, String> extractedCheckPointAndDataset = extractData(queryInfo, checkPointAndDataset.getRight()); + Pair<Option<Dataset<Row>>, String> extractedCheckPointAndDataset = extractData(queryInfo, checkPointAndDataset.getRight().get()); return Pair.of(extractedCheckPointAndDataset.getLeft(), checkPointAndDataset.getLeft().toString()); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java index 44efdc3ec15..927a8fc3ebb 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java @@ -157,18 +157,16 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { } Dataset<Row> source = queryRunner.run(queryInfo); - if (source.isEmpty()) { - LOG.info("Source of file names is empty. Returning empty result and endInstant: " - + queryInfo.getEndInstant()); - return Pair.of(Option.empty(), queryInfo.getEndInstant()); - } - Dataset<Row> filteredSourceData = applyFilter(source, fileFormat); LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based on sourceLimit :" + sourceLimit); - Pair<CloudObjectIncrCheckpoint, Dataset<Row>> checkPointAndDataset = + Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( filteredSourceData, sourceLimit, queryInfo, cloudObjectIncrCheckpoint); + if (!checkPointAndDataset.getRight().isPresent()) { + LOG.info("Empty source, returning endpoint:" + queryInfo.getEndInstant()); + return Pair.of(Option.empty(), queryInfo.getEndInstant()); + } LOG.info("Adjusted end checkpoint :" + checkPointAndDataset.getLeft()); String s3FS = getStringWithAltKeys(props, S3_FS_PREFIX, true).toLowerCase(); @@ -176,7 +174,7 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { // Create S3 paths SerializableConfiguration serializableHadoopConf = new SerializableConfiguration(sparkContext.hadoopConfiguration()); - List<CloudObjectMetadata> cloudObjectMetadata = checkPointAndDataset.getRight() + List<CloudObjectMetadata> cloudObjectMetadata = checkPointAndDataset.getRight().get() .select(S3_BUCKET_NAME, S3_OBJECT_KEY, S3_OBJECT_SIZE) .distinct() .mapPartitions(getCloudObjectMetadataPerPartition(s3Prefix, serializableHadoopConf, checkIfFileExists), Encoders.kryo(CloudObjectMetadata.class)) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java index 19383933bd9..ceec1851ee9 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java @@ -170,12 +170,11 @@ public class IncrSourceHelper { * @param queryInfo Query Info * @return end instants along with filtered rows. */ - public static Pair<CloudObjectIncrCheckpoint, Dataset<Row>> filterAndGenerateCheckpointBasedOnSourceLimit(Dataset<Row> sourceData, - long sourceLimit, QueryInfo queryInfo, - CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint) { + public static Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> filterAndGenerateCheckpointBasedOnSourceLimit(Dataset<Row> sourceData, + long sourceLimit, QueryInfo queryInfo, + CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint) { if (sourceData.isEmpty()) { - LOG.info("Empty source, returning endpoint:" + queryInfo.getEndInstant()); - return Pair.of(cloudObjectIncrCheckpoint, sourceData); + return Pair.of(cloudObjectIncrCheckpoint, Option.empty()); } // Let's persist the dataset to avoid triggering the dag repeatedly sourceData.persist(StorageLevel.MEMORY_AND_DISK()); @@ -195,7 +194,7 @@ public class IncrSourceHelper { if (orderedDf.isEmpty()) { LOG.info("Empty ordered source, returning endpoint:" + queryInfo.getEndInstant()); sourceData.unpersist(); - return Pair.of(new CloudObjectIncrCheckpoint(queryInfo.getEndInstant(), lastCheckpointKey.get()), orderedDf); + return Pair.of(new CloudObjectIncrCheckpoint(queryInfo.getEndInstant(), lastCheckpointKey.get()), Option.empty()); } } @@ -219,7 +218,7 @@ public class IncrSourceHelper { } LOG.info("Processed batch size: " + row.get(row.fieldIndex(CUMULATIVE_COLUMN_NAME)) + " bytes"); sourceData.unpersist(); - return Pair.of(new CloudObjectIncrCheckpoint(row.getString(0), row.getString(1)), collectedRows); + return Pair.of(new CloudObjectIncrCheckpoint(row.getString(0), row.getString(1)), Option.of(collectedRows)); } /** diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java index 8bd345626e7..9ff90678e5f 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java @@ -302,10 +302,101 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 1000L, "2#path/to/file5.json"); } - private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, - Option<String> checkpointToPull, long sourceLimit, String expectedCheckpoint) { - TypedProperties typedProperties = setProps(missingCheckpointStrategy); + @Test + public void testEmptyDataAfterFilter() throws IOException { + String commitTimeForWrites = "2"; + String commitTimeForReads = "1"; + + Pair<String, List<HoodieRecord>> inserts = writeS3MetadataRecords(commitTimeForReads); + inserts = writeS3MetadataRecords(commitTimeForWrites); + + + List<Triple<String, Long, String>> filePathSizeAndCommitTime = new ArrayList<>(); + // Add file paths and sizes to the list + filePathSizeAndCommitTime.add(Triple.of("path/to/skip1.json", 100L, "1")); + filePathSizeAndCommitTime.add(Triple.of("path/to/skip3.json", 200L, "1")); + filePathSizeAndCommitTime.add(Triple.of("path/to/skip2.json", 150L, "1")); + filePathSizeAndCommitTime.add(Triple.of("path/to/skip5.json", 50L, "2")); + filePathSizeAndCommitTime.add(Triple.of("path/to/skip4.json", 150L, "2")); + + Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); + + when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs); + TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); + typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix", "path/to/skip"); + + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 1000L, "2", typedProperties); + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"), 1000L, "2", typedProperties); + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("2#path/to/skip4.json"), 1000L, "2", typedProperties); + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("2#path/to/skip5.json"), 1000L, "2", typedProperties); + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("2"), 1000L, "2", typedProperties); + } + + @Test + public void testFilterAnEntireCommit() throws IOException { + String commitTimeForWrites1 = "2"; + String commitTimeForReads = "1"; + + Pair<String, List<HoodieRecord>> inserts = writeS3MetadataRecords(commitTimeForReads); + inserts = writeS3MetadataRecords(commitTimeForWrites1); + + List<Triple<String, Long, String>> filePathSizeAndCommitTime = new ArrayList<>(); + // Add file paths and sizes to the list + filePathSizeAndCommitTime.add(Triple.of("path/to/skip1.json", 100L, "1")); + filePathSizeAndCommitTime.add(Triple.of("path/to/skip2.json", 200L, "1")); + filePathSizeAndCommitTime.add(Triple.of("path/to/skip3.json", 150L, "1")); + filePathSizeAndCommitTime.add(Triple.of("path/to/skip4.json", 50L, "1")); + filePathSizeAndCommitTime.add(Triple.of("path/to/skip5.json", 150L, "1")); + filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 150L, "2")); + filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 150L, "2")); + + Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); + + when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs); + when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any())) + .thenReturn(Option.empty()); + TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); + typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix", "path/to/skip"); + + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 50L, "2#path/to/file4.json", typedProperties); + } + + @Test + public void testFilterAnEntireMiddleCommit() throws IOException { + String commitTimeForWrites1 = "2"; + String commitTimeForWrites2 = "3"; + String commitTimeForReads = "1"; + + Pair<String, List<HoodieRecord>> inserts = writeS3MetadataRecords(commitTimeForReads); + inserts = writeS3MetadataRecords(commitTimeForWrites1); + inserts = writeS3MetadataRecords(commitTimeForWrites2); + + + List<Triple<String, Long, String>> filePathSizeAndCommitTime = new ArrayList<>(); + // Add file paths and sizes to the list + filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, "1")); + filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1")); + filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 150L, "1")); + filePathSizeAndCommitTime.add(Triple.of("path/to/skip1.json", 50L, "2")); + filePathSizeAndCommitTime.add(Triple.of("path/to/skip2.json", 150L, "2")); + filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 150L, "3")); + filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 150L, "3")); + + Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime); + + when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs); + when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any())) + .thenReturn(Option.empty()); + TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); + typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix", "path/to/skip"); + + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"), 50L, "3#path/to/file4.json", typedProperties); + } + + private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, + Option<String> checkpointToPull, long sourceLimit, String expectedCheckpoint, + TypedProperties typedProperties) { S3EventsHoodieIncrSource incrSource = new S3EventsHoodieIncrSource(typedProperties, jsc(), spark(), mockSchemaProvider, mockQueryRunner, mockCloudDataFetcher); @@ -317,4 +408,11 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne Assertions.assertNotNull(nextCheckPoint); Assertions.assertEquals(expectedCheckpoint, nextCheckPoint); } + + private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, + Option<String> checkpointToPull, long sourceLimit, String expectedCheckpoint) { + TypedProperties typedProperties = setProps(missingCheckpointStrategy); + + readAndAssert(missingCheckpointStrategy, checkpointToPull, sourceLimit, expectedCheckpoint, typedProperties); + } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java index 3c0b5ee23c8..78020697c2e 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java @@ -18,6 +18,7 @@ package org.apache.hudi.utilities.sources.helpers; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Triple; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; @@ -94,10 +95,10 @@ class TestIncrSourceHelper extends SparkClientFunctionalTestHarness { QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1", "commit2", "_hoodie_commit_time", "s3.object.key", "s3.object.size"); - Pair<CloudObjectIncrCheckpoint, Dataset<Row>> result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( + Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( emptyDataset, 50L, queryInfo, new CloudObjectIncrCheckpoint(null, null)); assertEquals(INIT_INSTANT_TS, result.getKey().toString()); - assertEquals(emptyDataset, result.getRight()); + assertTrue(!result.getRight().isPresent()); } @Test @@ -115,11 +116,11 @@ class TestIncrSourceHelper extends SparkClientFunctionalTestHarness { QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1", "commit2", "_hoodie_commit_time", "s3.object.key", "s3.object.size"); - Pair<CloudObjectIncrCheckpoint, Dataset<Row>> result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( + Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( inputDs, 50L, queryInfo, new CloudObjectIncrCheckpoint("commit1", null)); - Row row = result.getRight().select("cumulativeSize").collectAsList().get((int) result.getRight().count() - 1); + Row row = result.getRight().get().select("cumulativeSize").collectAsList().get((int) result.getRight().get().count() - 1); assertEquals("commit1#path/to/file1.json", result.getKey().toString()); - List<Row> rows = result.getRight().collectAsList(); + List<Row> rows = result.getRight().get().collectAsList(); assertEquals(1, rows.size()); assertEquals("[[commit1,[[bucket-1],[path/to/file1.json,100]],100]]", rows.toString()); assertEquals(100L, row.get(0)); @@ -142,20 +143,20 @@ class TestIncrSourceHelper extends SparkClientFunctionalTestHarness { QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1", "commit2", "_hoodie_commit_time", "s3.object.key", "s3.object.size"); - Pair<CloudObjectIncrCheckpoint, Dataset<Row>> result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( + Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( inputDs, 350L, queryInfo, new CloudObjectIncrCheckpoint("commit1", null)); - Row row = result.getRight().select("cumulativeSize").collectAsList().get((int) result.getRight().count() - 1); + Row row = result.getRight().get().select("cumulativeSize").collectAsList().get((int) result.getRight().get().count() - 1); assertEquals("commit1#path/to/file2.json", result.getKey().toString()); - List<Row> rows = result.getRight().collectAsList(); + List<Row> rows = result.getRight().get().collectAsList(); assertEquals(2, rows.size()); assertEquals("[[commit1,[[bucket-1],[path/to/file1.json,100]],100], [commit1,[[bucket-1],[path/to/file2.json,150]],250]]", rows.toString()); assertEquals(250L, row.get(0)); result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( inputDs, 550L, queryInfo, new CloudObjectIncrCheckpoint("commit1", null)); - row = result.getRight().select("cumulativeSize").collectAsList().get((int) result.getRight().count() - 1); + row = result.getRight().get().select("cumulativeSize").collectAsList().get((int) result.getRight().get().count() - 1); assertEquals("commit2#path/to/file4.json", result.getKey().toString()); - rows = result.getRight().collectAsList(); + rows = result.getRight().get().collectAsList(); assertEquals(4, rows.size()); assertEquals("[[commit1,[[bucket-1],[path/to/file1.json,100]],100], [commit1,[[bucket-1],[path/to/file2.json,150]],250]," + " [commit1,[[bucket-1],[path/to/file3.json,200]],450], [commit2,[[bucket-1],[path/to/file4.json,50]],500]]", @@ -181,11 +182,11 @@ class TestIncrSourceHelper extends SparkClientFunctionalTestHarness { QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1", "commit2", "_hoodie_commit_time", "s3.object.key", "s3.object.size"); - Pair<CloudObjectIncrCheckpoint, Dataset<Row>> result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( + Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( inputDs, 1500L, queryInfo, new CloudObjectIncrCheckpoint("commit1", null)); - Row row = result.getRight().select("cumulativeSize").collectAsList().get((int) result.getRight().count() - 1); + Row row = result.getRight().get().select("cumulativeSize").collectAsList().get((int) result.getRight().get().count() - 1); assertEquals("commit3#path/to/file8.json", result.getKey().toString()); - List<Row> rows = result.getRight().collectAsList(); + List<Row> rows = result.getRight().get().collectAsList(); assertEquals(8, rows.size()); assertEquals(1050L, row.get(0)); } @@ -206,19 +207,19 @@ class TestIncrSourceHelper extends SparkClientFunctionalTestHarness { QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit3", "commit3", "commit4", "_hoodie_commit_time", "s3.object.key", "s3.object.size"); - Pair<CloudObjectIncrCheckpoint, Dataset<Row>> result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( - inputDs, 50L, queryInfo, new CloudObjectIncrCheckpoint("commit3","path/to/file8.json")); - Row row = result.getRight().select("cumulativeSize").collectAsList().get((int) result.getRight().count() - 1); + Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( + inputDs, 50L, queryInfo, new CloudObjectIncrCheckpoint("commit3", "path/to/file8.json")); + Row row = result.getRight().get().select("cumulativeSize").collectAsList().get((int) result.getRight().get().count() - 1); assertEquals("commit4#path/to/file0.json", result.getKey().toString()); - List<Row> rows = result.getRight().collectAsList(); + List<Row> rows = result.getRight().get().collectAsList(); assertEquals(1, rows.size()); assertEquals(100L, row.get(0)); result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( - inputDs, 350L, queryInfo, new CloudObjectIncrCheckpoint("commit3","path/to/file8.json")); - row = result.getRight().select("cumulativeSize").collectAsList().get((int) result.getRight().count() - 1); + inputDs, 350L, queryInfo, new CloudObjectIncrCheckpoint("commit3", "path/to/file8.json")); + row = result.getRight().get().select("cumulativeSize").collectAsList().get((int) result.getRight().get().count() - 1); assertEquals("commit4#path/to/file2.json", result.getKey().toString()); - rows = result.getRight().collectAsList(); + rows = result.getRight().get().collectAsList(); assertEquals(3, rows.size()); assertEquals(200L, row.get(0)); } @@ -241,9 +242,9 @@ class TestIncrSourceHelper extends SparkClientFunctionalTestHarness { QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1", "commit3", "_hoodie_commit_time", "s3.object.key", "s3.object.size"); - Pair<CloudObjectIncrCheckpoint, Dataset<Row>> result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( - inputDs, 1500L, queryInfo, new CloudObjectIncrCheckpoint("commit3","path/to/file8.json")); + Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( + inputDs, 1500L, queryInfo, new CloudObjectIncrCheckpoint("commit3", "path/to/file8.json")); assertEquals("commit3#path/to/file8.json", result.getKey().toString()); - assertTrue(result.getRight().isEmpty()); + assertTrue(!result.getRight().isPresent()); } } \ No newline at end of file