This is an automated email from the ASF dual-hosted git repository.

satish pushed a commit to branch release-0.12.2
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit bf6a7586ecb5f8e19d4ee3876d8c7fcbb7b73001
Author: voonhous <voonho...@gmail.com>
AuthorDate: Tue Nov 29 11:53:31 2022 +0800

    [HUDI-5007] Prevent Hudi from reading the entire timeline's when performing 
a LATEST streaming read (#6920)
---
 .../apache/hudi/configuration/OptionsResolver.java |  7 ++
 .../apache/hudi/source/IncrementalInputSplits.java | 13 +++-
 .../hudi/source/TestIncrementalInputSplits.java    | 88 ++++++++++++++++++++++
 3 files changed, 107 insertions(+), 1 deletion(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index d176ecb85dc..0dd31ee7538 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -195,6 +195,13 @@ public class OptionsResolver {
         && 
!conf.get(FlinkOptions.READ_START_COMMIT).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST);
   }
 
+  /**
+   * Returns true if there are no explicit start and end commits.
+   */
+  public static boolean hasNoSpecificReadCommits(Configuration conf) {
+    return !conf.contains(FlinkOptions.READ_START_COMMIT) && 
!conf.contains(FlinkOptions.READ_END_COMMIT);
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 3455417b6db..7e0eda46cec 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.source;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.BaseFile;
 import org.apache.hudi.common.model.FileSlice;
@@ -447,7 +448,8 @@ public class IncrementalInputSplits implements Serializable 
{
    * @param issuedInstant  The last issued instant that has already been 
delivered to downstream
    * @return the filtered hoodie instants
    */
-  private List<HoodieInstant> filterInstantsWithRange(
+  @VisibleForTesting
+  public List<HoodieInstant> filterInstantsWithRange(
       HoodieTimeline commitTimeline,
       final String issuedInstant) {
     HoodieTimeline completedTimeline = 
commitTimeline.filterCompletedInstants();
@@ -461,6 +463,15 @@ public class IncrementalInputSplits implements 
Serializable {
 
     Stream<HoodieInstant> instantStream = 
completedTimeline.getInstantsAsStream();
 
+    if (OptionsResolver.hasNoSpecificReadCommits(this.conf)) {
+      // by default read from the latest commit
+      List<HoodieInstant> instants = 
completedTimeline.getInstants().collect(Collectors.toList());
+      if (instants.size() > 1) {
+        return Collections.singletonList(instants.get(instants.size() - 1));
+      }
+      return instants;
+    }
+
     if (OptionsResolver.isSpecificStartCommit(this.conf)) {
       final String startCommit = this.conf.get(FlinkOptions.READ_START_COMMIT);
       instantStream = instantStream
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
new file mode 100644
index 00000000000..b42fd2c04a3
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.utils.TestConfigurations;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertIterableEquals;
+
+/**
+ * Test cases for {@link IncrementalInputSplits}.
+ */
+public class TestIncrementalInputSplits extends HoodieCommonTestHarness {
+
+  @BeforeEach
+  private void init() throws IOException {
+    initPath();
+    initMetaClient();
+  }
+
+  @Test
+  void testFilterInstantsWithRange() {
+    HoodieActiveTimeline timeline = new HoodieActiveTimeline(metaClient, true);
+    Configuration conf = TestConfigurations.getDefaultConf(basePath);
+    IncrementalInputSplits iis = IncrementalInputSplits.builder()
+        .conf(conf)
+        .path(new Path(basePath))
+        .rowType(TestConfigurations.ROW_TYPE)
+        .build();
+
+    HoodieInstant commit1 = new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "1");
+    HoodieInstant commit2 = new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "2");
+    HoodieInstant commit3 = new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "3");
+    timeline.createNewInstant(commit1);
+    timeline.createNewInstant(commit2);
+    timeline.createNewInstant(commit3);
+    timeline = timeline.reload();
+
+    // previous read iteration read till instant time "1", next read iteration 
should return ["2", "3"]
+    List<HoodieInstant> instantRange2 = iis.filterInstantsWithRange(timeline, 
"1");
+    assertEquals(2, instantRange2.size());
+    assertIterableEquals(Arrays.asList(commit2, commit3), instantRange2);
+
+    // simulate first iteration cycle with read from LATEST commit
+    List<HoodieInstant> instantRange1 = iis.filterInstantsWithRange(timeline, 
null);
+    assertEquals(1, instantRange1.size());
+    assertIterableEquals(Collections.singletonList(commit3), instantRange1);
+
+    // specifying a start and end commit
+    conf.set(FlinkOptions.READ_START_COMMIT, "1");
+    conf.set(FlinkOptions.READ_END_COMMIT, "3");
+    List<HoodieInstant> instantRange3 = iis.filterInstantsWithRange(timeline, 
null);
+    assertEquals(3, instantRange3.size());
+    assertIterableEquals(Arrays.asList(commit1, commit2, commit3), 
instantRange3);
+  }
+
+}

Reply via email to