This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f4816ce82109 fix(flink): avoid repeated timeline reload for unchanged 
lookup table commits (#18930)
f4816ce82109 is described below

commit f4816ce821094822684e80a76b7ddf7e35d052f9
Author: fhan <[email protected]>
AuthorDate: Tue Jun 9 08:16:18 2026 +0800

    fix(flink): avoid repeated timeline reload for unchanged lookup table 
commits (#18930)
    
    Co-authored-by: fhan <[email protected]>
---
 .../hudi/table/lookup/HoodieLookupFunction.java    |  13 +-
 .../table/lookup/TestHoodieLookupFunction.java     | 161 +++++++++++++++++++++
 2 files changed, 171 insertions(+), 3 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
index 443cba0bcf50..876822246397 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
@@ -135,13 +135,16 @@ public class HoodieLookupFunction extends LookupFunction 
implements Serializable
     }
 
     HoodieActiveTimeline latestCommit = metaClient.reloadActiveTimeline();
-    Option<HoodieInstant> latestCommitInstant = 
latestCommit.getCommitsTimeline().lastInstant();
-    if (latestCommit.empty()) {
+    Option<HoodieInstant> latestCommitInstant =
+        
latestCommit.getCommitsTimeline().filterCompletedInstants().lastInstant();
+    if (!latestCommitInstant.isPresent()) {
+      scheduleNextLoad();
       log.info("No commit instant found currently.");
       return;
     }
     // Determine whether to reload data by comparing instant
     if (latestCommitInstant.get().equals(currentCommit)) {
+      scheduleNextLoad();
       log.info("Ignore loading data because the commit instant " + 
currentCommit + " has not changed.");
       return;
     }
@@ -162,7 +165,7 @@ public class HoodieLookupFunction extends LookupFunction 
implements Serializable
         }
         partitionReader.close();
         currentCommit = latestCommitInstant.get();
-        nextLoadTime = System.currentTimeMillis() + reloadInterval.toMillis();
+        scheduleNextLoad();
         log.info("Loaded {} row(s) into lookup join cache", count);
         return;
       } catch (Exception e) {
@@ -185,6 +188,10 @@ public class HoodieLookupFunction extends LookupFunction 
implements Serializable
     }
   }
 
+  private void scheduleNextLoad() {
+    nextLoadTime = System.currentTimeMillis() + reloadInterval.toMillis();
+  }
+
   private RowData extractLookupKey(RowData row) {
     GenericRowData key = new GenericRowData(lookupFieldGetters.length);
     for (int i = 0; i < lookupFieldGetters.length; i++) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/lookup/TestHoodieLookupFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/lookup/TestHoodieLookupFunction.java
new file mode 100644
index 000000000000..8d336ede565e
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/lookup/TestHoodieLookupFunction.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.lookup;
+
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link HoodieLookupFunction}.
+ */
+class TestHoodieLookupFunction {
+
+  @TempDir
+  File tempFile;
+
+  @Test
+  void testNextLoadTimeAdvancesWhenNoCompletedCommit() throws Exception {
+    Configuration conf = getConf();
+    StreamerUtil.initTableIfNotExists(conf);
+
+    CountingLookupTableReader reader = new 
CountingLookupTableReader(Collections.emptyList(), conf);
+    HoodieLookupFunction function = newLookupFunction(reader, conf);
+    function.open(null);
+
+    long beforeLoad = System.currentTimeMillis();
+    try {
+      function.lookup(lookupKey());
+
+      assertEquals(0, reader.openCount, "The reader should not open when no 
completed commit exists");
+      assertTrue(getNextLoadTime(function) >= beforeLoad + 
Duration.ofDays(1).toMillis(),
+          "The next lookup reload check should be delayed by the configured 
TTL");
+    } finally {
+      function.close();
+    }
+  }
+
+  @Test
+  void testLookupCacheDoesNotReloadWhenCompletedCommitHasNotChanged() throws 
Exception {
+    Configuration conf = getConf();
+    TestData.writeData(TestData.DATA_SET_SINGLE_INSERT, conf);
+
+    CountingLookupTableReader reader = new 
CountingLookupTableReader(TestData.DATA_SET_SINGLE_INSERT, conf);
+    HoodieLookupFunction function = newLookupFunction(reader, conf);
+    function.open(null);
+
+    try {
+      Collection<RowData> matchedRows = function.lookup(lookupKey());
+      assertNotNull(matchedRows, "The first lookup should find the inserted 
row");
+      assertEquals(1, matchedRows.size(), "The first lookup should load the 
table into cache");
+      assertEquals(1, reader.openCount, "The first lookup should open the 
reader once");
+
+      // Force the next lookup through the reload branch so the 
unchanged-commit guard is exercised.
+      setNextLoadTime(function, 0L);
+      function.lookup(lookupKey());
+
+      assertEquals(1, reader.openCount, "The same completed commit should not 
reload table data");
+      assertTrue(getNextLoadTime(function) > System.currentTimeMillis(),
+          "The next lookup reload check should be delayed after detecting an 
unchanged commit");
+    } finally {
+      function.close();
+    }
+  }
+
+  private HoodieLookupFunction newLookupFunction(CountingLookupTableReader 
reader, Configuration conf) {
+    return new HoodieLookupFunction(
+        reader,
+        TestConfigurations.ROW_TYPE,
+        new int[] {0},
+        Duration.ofDays(1),
+        conf);
+  }
+
+  private Configuration getConf() {
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.LOOKUP_JOIN_CACHE_TTL, Duration.ofDays(1));
+    return conf;
+  }
+
+  private static RowData lookupKey() {
+    return GenericRowData.of(StringData.fromString("id1"));
+  }
+
+  private static long getNextLoadTime(HoodieLookupFunction function) throws 
Exception {
+    Field field = HoodieLookupFunction.class.getDeclaredField("nextLoadTime");
+    field.setAccessible(true);
+    return field.getLong(function);
+  }
+
+  private static void setNextLoadTime(HoodieLookupFunction function, long 
nextLoadTime) throws Exception {
+    Field field = HoodieLookupFunction.class.getDeclaredField("nextLoadTime");
+    field.setAccessible(true);
+    field.setLong(function, nextLoadTime);
+  }
+
+  private static class CountingLookupTableReader extends 
HoodieLookupTableReader {
+    private final List<RowData> rows;
+    private int openCount;
+    private int nextIndex;
+
+    private CountingLookupTableReader(List<RowData> rows, Configuration conf) {
+      super(() -> null, conf);
+      this.rows = rows;
+    }
+
+    @Override
+    public void open() {
+      openCount++;
+      nextIndex = 0;
+    }
+
+    @Override
+    public RowData read(RowData reuse) {
+      if (nextIndex >= rows.size()) {
+        return null;
+      }
+      return rows.get(nextIndex++);
+    }
+
+    @Override
+    public void close() throws IOException {
+      // no-op
+    }
+  }
+}

Reply via email to