This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1af2dd58e8 [flink] Optimize lookup table refresh with full load for 
snapshot backlog. (#6966)
1af2dd58e8 is described below

commit 1af2dd58e8afcd91e815589e0c690fd295fa068a
Author: zhoulii <[email protected]>
AuthorDate: Mon Jan 12 19:39:59 2026 +0800

    [flink] Optimize lookup table refresh with full load for snapshot backlog. 
(#6966)
---
 .../generated/flink_connector_configuration.html   |  6 ++
 .../apache/paimon/flink/FlinkConnectorOptions.java |  9 +++
 .../flink/lookup/FileStoreLookupFunction.java      | 44 ++++++++++++-
 .../paimon/flink/lookup/FullCacheLookupTable.java  |  5 ++
 .../apache/paimon/flink/lookup/LookupTable.java    |  2 +
 .../flink/lookup/PrimaryKeyPartialLookupTable.java | 14 ++++
 .../flink/lookup/FileStoreLookupFunctionTest.java  | 76 +++++++++++++++++++---
 7 files changed, 145 insertions(+), 11 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 0549fde466..9f4058ba01 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -92,6 +92,12 @@ under the License.
             <td>Integer</td>
             <td>If the pending snapshot count exceeds the threshold, lookup 
operator will refresh the table in sync.</td>
         </tr>
+        <tr>
+            <td><h5>lookup.refresh.full-load-threshold</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Integer</td>
+            <td>If the pending snapshot count exceeds this threshold, lookup 
table will discard incremental updates and refresh the entire table from the 
latest snapshot. This can improve performance when there are many snapshots 
pending. Set to a reasonable value (e.g., 10) to enable this optimization. 
Default is Integer.MAX_VALUE (disabled). </td>
+        </tr>
         <tr>
             <td><h5>lookup.refresh.time-periods-blacklist</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 700ace3b48..b5d40bf3c8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -292,6 +292,15 @@ public class FlinkConnectorOptions {
                                     + "cache refreshing is forbidden. 
Blacklist format is start1->end1,start2->end2,... , "
                                     + "and the time format is yyyy-MM-dd 
HH:mm. Only used when lookup table is FULL cache mode.");
 
+    public static final ConfigOption<Integer> 
LOOKUP_REFRESH_FULL_LOAD_THRESHOLD =
+            ConfigOptions.key("lookup.refresh.full-load-threshold")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "If the pending snapshot count exceeds this 
threshold, lookup table will discard incremental updates "
+                                    + "and refresh the entire table from the 
latest snapshot. This can improve performance when there are many snapshots 
pending. "
+                                    + "Set to a reasonable value (e.g., 10) to 
enable this optimization. Default is Integer.MAX_VALUE (disabled). ");
+
     public static final ConfigOption<Boolean> SINK_AUTO_TAG_FOR_SAVEPOINT =
             ConfigOptions.key("sink.savepoint.auto-tag")
                     .booleanType()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 421867c499..5b71664ca6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -68,6 +68,7 @@ import java.util.stream.IntStream;
 
 import static org.apache.paimon.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
 import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_FULL_LOAD_THRESHOLD;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST;
 import static 
org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable;
 import static 
org.apache.paimon.lookup.rocksdb.RocksDBOptions.LOOKUP_CACHE_ROWS;
@@ -98,6 +99,8 @@ public class FileStoreLookupFunction implements Serializable, 
Closeable {
     private transient Duration refreshInterval;
     // timestamp when refreshing lookup table
     private transient long nextRefreshTime;
+    // threshold for triggering full table reload when snapshots are pending
+    private transient Integer refreshFullThreshold;
 
     protected FunctionContext functionContext;
 
@@ -178,6 +181,7 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
         this.refreshInterval =
                 options.getOptional(LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL)
                         .orElse(options.get(CONTINUOUS_DISCOVERY_INTERVAL));
+        this.refreshFullThreshold = 
options.get(LOOKUP_REFRESH_FULL_LOAD_THRESHOLD);
 
         List<String> fieldNames = table.rowType().getFieldNames();
         int[] projection = 
projectFields.stream().mapToInt(fieldNames::indexOf).toArray();
@@ -335,6 +339,7 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
                         partitionLoader.partitions(), 
partitionLoader.createSpecificPartFilter());
                 lookupTable.close();
                 lookupTable.open();
+                nextRefreshTime = System.currentTimeMillis() + 
refreshInterval.toMillis();
                 // no need to refresh the lookup table because it is reopened
                 return;
             }
@@ -342,11 +347,48 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
 
         // 3. refresh lookup table
         if (shouldRefreshLookupTable()) {
-            lookupTable.refresh();
+            // Check if we should do full load (close and reopen table) 
instead of incremental
+            // refresh
+            boolean doFullLoad = shouldDoFullLoad();
+
+            if (doFullLoad) {
+                LOG.info(
+                        "Doing full load for table {} instead of incremental 
refresh",
+                        table.name());
+                lookupTable.close();
+                lookupTable.open();
+            } else {
+                lookupTable.refresh();
+            }
+
             nextRefreshTime = System.currentTimeMillis() + 
refreshInterval.toMillis();
         }
     }
 
+    /**
+     * Check if we should do full load instead of incremental refresh. This 
can improve performance
+     * when there are many pending snapshots.
+     */
+    @VisibleForTesting
+    public boolean shouldDoFullLoad() {
+        if (refreshFullThreshold == null) {
+            return false;
+        }
+
+        Long latestSnapshotId = ((FileStoreTable) 
table).snapshotManager().latestSnapshotId();
+        Long nextSnapshotId = lookupTable.nextSnapshotId();
+        if (latestSnapshotId == null || nextSnapshotId == null) {
+            return false;
+        }
+
+        LOG.info(
+                "Check if should do full load, latestSnapshotId: {}, 
nextSnapshotId: {}, refreshFullThreshold: {}",
+                latestSnapshotId,
+                nextSnapshotId,
+                refreshFullThreshold);
+        return latestSnapshotId - nextSnapshotId + 1 >= refreshFullThreshold;
+    }
+
     private boolean shouldRefreshLookupTable() {
         if (nextRefreshTime > System.currentTimeMillis()) {
             return false;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
index cda824cc94..81af38ea8f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
@@ -151,6 +151,11 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
         this.cacheRowFilter = filter;
     }
 
+    @Override
+    public Long nextSnapshotId() {
+        return this.reader.nextSnapshotId();
+    }
+
     protected void init() throws Exception {
         this.stateFactory = createStateFactory();
         this.refreshExecutor =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
index f8af411941..3ca792d39d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
@@ -41,4 +41,6 @@ public interface LookupTable extends Closeable {
     void refresh() throws Exception;
 
     void specifyCacheRowFilter(Filter<InternalRow> filter);
+
+    Long nextSnapshotId();
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
index ded0f29da7..03016bbccf 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
@@ -197,6 +197,11 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
         this.cacheRowFilter = filter;
     }
 
+    @Override
+    public Long nextSnapshotId() {
+        return this.queryExecutor.nextSnapshotId();
+    }
+
     @Override
     public void close() throws IOException {
         if (queryExecutor != null) {
@@ -243,6 +248,10 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
         InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) 
throws IOException;
 
         void refresh();
+
+        default Long nextSnapshotId() {
+            return Long.MAX_VALUE;
+        }
     }
 
     static class LocalQueryExecutor implements QueryExecutor {
@@ -334,6 +343,11 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
             numBuckets.put(partition, totalBuckets);
         }
 
+        @Override
+        public Long nextSnapshotId() {
+            return this.scan.checkpoint();
+        }
+
         @Override
         public void close() throws IOException {
             tableQuery.close();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
index 4f7ff334f6..7142e6e573 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
@@ -43,6 +43,7 @@ import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.TraceableFileIO;
 
+import org.apache.flink.table.data.RowData;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -91,16 +92,19 @@ public class FileStoreLookupFunctionTest {
     }
 
     private void createLookupFunction(boolean refreshAsync) throws Exception {
-        createLookupFunction(true, false, false, refreshAsync);
+        createLookupFunction(true, false, false, refreshAsync, null);
     }
 
     private void createLookupFunction(
             boolean isPartition,
             boolean joinEqualPk,
             boolean dynamicPartition,
-            boolean refreshAsync)
+            boolean refreshAsync,
+            Integer fullLoadThreshold)
             throws Exception {
-        table = createFileStoreTable(isPartition, dynamicPartition, 
refreshAsync);
+        table =
+                createFileStoreTable(
+                        isPartition, dynamicPartition, refreshAsync, 
fullLoadThreshold);
         lookupFunction = createLookupFunction(table, joinEqualPk);
         lookupFunction.open(tempDir.toString());
     }
@@ -116,7 +120,11 @@ public class FileStoreLookupFunctionTest {
     }
 
     private FileStoreTable createFileStoreTable(
-            boolean isPartition, boolean dynamicPartition, boolean 
refreshAsync) throws Exception {
+            boolean isPartition,
+            boolean dynamicPartition,
+            boolean refreshAsync,
+            Integer fullLoadThreshold)
+            throws Exception {
         SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
         Options conf = new Options();
         conf.set(FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC, refreshAsync);
@@ -128,6 +136,10 @@ public class FileStoreLookupFunctionTest {
             conf.set(FlinkConnectorOptions.SCAN_PARTITIONS, "max_pt()");
         }
 
+        if (fullLoadThreshold != null) {
+            conf.set(FlinkConnectorOptions.LOOKUP_REFRESH_FULL_LOAD_THRESHOLD, 
fullLoadThreshold);
+        }
+
         RowType rowType =
                 RowType.of(
                         new DataType[] {DataTypes.INT(), DataTypes.INT(), 
DataTypes.BIGINT()},
@@ -153,7 +165,7 @@ public class FileStoreLookupFunctionTest {
 
     @Test
     public void testCompatibilityForOldVersion() throws Exception {
-        createLookupFunction(false, true, false, false);
+        createLookupFunction(false, true, false, false, null);
         commit(writeCommit(1));
         PrimaryKeyPartialLookupTable lookupTable =
                 (PrimaryKeyPartialLookupTable) lookupFunction.lookupTable();
@@ -174,7 +186,7 @@ public class FileStoreLookupFunctionTest {
     @ParameterizedTest
     @ValueSource(booleans = {false, true})
     public void testDefaultLocalPartial(boolean refreshAsync) throws Exception 
{
-        createLookupFunction(false, true, false, refreshAsync);
+        createLookupFunction(false, true, false, refreshAsync, null);
         
assertThat(lookupFunction.lookupTable()).isInstanceOf(PrimaryKeyPartialLookupTable.class);
         QueryExecutor queryExecutor =
                 ((PrimaryKeyPartialLookupTable) 
lookupFunction.lookupTable()).queryExecutor();
@@ -184,7 +196,7 @@ public class FileStoreLookupFunctionTest {
     @ParameterizedTest
     @ValueSource(booleans = {false, true})
     public void testDefaultRemotePartial(boolean refreshAsync) throws 
Exception {
-        createLookupFunction(false, true, false, refreshAsync);
+        createLookupFunction(false, true, false, refreshAsync, null);
         ServiceManager serviceManager = new ServiceManager(fileIO, tablePath);
         serviceManager.resetService(
                 PRIMARY_KEY_LOOKUP, new InetSocketAddress[] {new 
InetSocketAddress(1)});
@@ -232,7 +244,7 @@ public class FileStoreLookupFunctionTest {
 
     @Test
     public void testLookupDynamicPartition() throws Exception {
-        createLookupFunction(true, false, true, false);
+        createLookupFunction(true, false, true, false, null);
         commit(writeCommit(1));
         lookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 10L)));
         assertThat(
@@ -252,7 +264,7 @@ public class FileStoreLookupFunctionTest {
 
     @Test
     public void testParseWrongTimePeriodsBlacklist() throws Exception {
-        FileStoreTable table = createFileStoreTable(false, false, false);
+        FileStoreTable table = createFileStoreTable(false, false, false, null);
 
         FileStoreTable table1 =
                 table.copy(
@@ -299,7 +311,7 @@ public class FileStoreLookupFunctionTest {
         String right = end.atZone(ZoneId.systemDefault()).format(formatter);
 
         FileStoreTable table =
-                createFileStoreTable(false, false, false)
+                createFileStoreTable(false, false, false, null)
                         .copy(
                                 Collections.singletonMap(
                                         
LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST.key(),
@@ -312,6 +324,50 @@ public class FileStoreLookupFunctionTest {
         
assertThat(lookupFunction.nextBlacklistCheckTime()).isEqualTo(end.toEpochMilli()
 + 1);
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testLookupTableWithFullLoad(boolean joinEqualPk) throws 
Exception {
+        createLookupFunction(false, joinEqualPk, false, false, 3);
+
+        if (joinEqualPk) {
+            assertThat(lookupFunction.lookupTable())
+                    .isInstanceOf(PrimaryKeyPartialLookupTable.class);
+        } else {
+            
assertThat(lookupFunction.lookupTable()).isInstanceOf(FullCacheLookupTable.class);
+        }
+
+        GenericRow expectedRow = GenericRow.of(1, 1, 1L);
+        StreamTableWrite writer = table.newStreamWriteBuilder().newWrite();
+        writer.write(expectedRow);
+        commit(writer.prepareCommit(true, 1));
+
+        List<RowData> result =
+                new ArrayList<>(lookupFunction.lookup(new 
FlinkRowData(GenericRow.of(1, 1, 1L))));
+        assertThat(result).size().isEqualTo(1);
+        RowData resultRow = result.get(0);
+        assertThat(resultRow.getInt(0)).isEqualTo(expectedRow.getInt(0));
+        assertThat(resultRow.getInt(1)).isEqualTo(expectedRow.getInt(1));
+
+        // Create more commits to exceed threshold (3 more to have gap > 3)
+        for (int i = 2; i < 6; i++) {
+            writer.write(GenericRow.of(i, i, (long) i));
+            commit(writer.prepareCommit(true, i));
+        }
+        writer.close();
+
+        // wait refresh
+        Thread.sleep(2000);
+
+        expectedRow = GenericRow.of(5, 5, 5L);
+        assertThat(lookupFunction.shouldDoFullLoad()).isTrue();
+        lookupFunction.tryRefresh();
+        result = new ArrayList<>(lookupFunction.lookup(new 
FlinkRowData(GenericRow.of(5, 5, 5L))));
+        assertThat(result).size().isEqualTo(1);
+        resultRow = result.get(0);
+        assertThat(resultRow.getInt(0)).isEqualTo(expectedRow.getInt(0));
+        assertThat(resultRow.getInt(1)).isEqualTo(expectedRow.getInt(1));
+    }
+
     private void commit(List<CommitMessage> messages) throws Exception {
         TableCommitImpl commit = table.newCommit(commitUser);
         commit.commit(messages);

Reply via email to