This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.0
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit a533dc57427bbefa9ea02ff2f0a2ad3635dd9f73
Author: WenjunMin <[email protected]>
AuthorDate: Tue Jan 7 14:51:03 2025 +0800

    [flink] Fix the refresh executor not work after reopen (#4851)
---
 .../paimon/flink/lookup/FullCacheLookupTable.java  | 26 +++++++++-------
 .../flink/lookup/NoPrimaryKeyLookupTable.java      |  2 +-
 .../paimon/flink/lookup/PrimaryKeyLookupTable.java |  2 +-
 .../flink/lookup/SecondaryIndexLookupTable.java    |  2 +-
 .../paimon/flink/lookup/LookupTableTest.java       | 35 ++++++++++++++++++++++
 5 files changed, 54 insertions(+), 13 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
index de69c67a4c..4154b6742c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.lookup;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -79,7 +80,7 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
     protected final int appendUdsFieldNumber;
 
     protected RocksDBStateFactory stateFactory;
-    @Nullable private final ExecutorService refreshExecutor;
+    @Nullable private ExecutorService refreshExecutor;
     private final AtomicReference<Exception> cachedException;
     private final int maxPendingSnapshotCount;
     private final FileStoreTable table;
@@ -127,14 +128,6 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
         Options options = Options.fromMap(context.table.options());
         this.projectedType = projectedType;
         this.refreshAsync = options.get(LOOKUP_REFRESH_ASYNC);
-        this.refreshExecutor =
-                this.refreshAsync
-                        ? Executors.newSingleThreadExecutor(
-                                new ExecutorThreadFactory(
-                                        String.format(
-                                                "%s-lookup-refresh",
-                                                
Thread.currentThread().getName())))
-                        : null;
         this.cachedException = new AtomicReference<>();
         this.maxPendingSnapshotCount = 
options.get(LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT);
     }
@@ -149,12 +142,20 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
         this.cacheRowFilter = filter;
     }
 
-    protected void openStateFactory() throws Exception {
+    protected void init() throws Exception {
         this.stateFactory =
                 new RocksDBStateFactory(
                         context.tempPath.toString(),
                         context.table.coreOptions().toConfiguration(),
                         null);
+        this.refreshExecutor =
+                this.refreshAsync
+                        ? Executors.newSingleThreadExecutor(
+                                new ExecutorThreadFactory(
+                                        String.format(
+                                                "%s-lookup-refresh",
+                                                
Thread.currentThread().getName())))
+                        : null;
     }
 
     protected void bootstrap() throws Exception {
@@ -322,6 +323,11 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
         }
     }
 
+    @VisibleForTesting
+    public Future<?> getRefreshFuture() {
+        return refreshFuture;
+    }
+
     /** Bulk loader for the table. */
     public interface TableBulkLoader {
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
index 84587083bf..63af4f3506 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
@@ -52,7 +52,7 @@ public class NoPrimaryKeyLookupTable extends 
FullCacheLookupTable {
 
     @Override
     public void open() throws Exception {
-        openStateFactory();
+        init();
         this.state =
                 stateFactory.listState(
                         "join-key-index",
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
index c06120d61d..2a3099e9a6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
@@ -69,7 +69,7 @@ public class PrimaryKeyLookupTable extends 
FullCacheLookupTable {
 
     @Override
     public void open() throws Exception {
-        openStateFactory();
+        init();
         createTableState();
         bootstrap();
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
index 5ebace6cd5..11c9cba24b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
@@ -46,7 +46,7 @@ public class SecondaryIndexLookupTable extends 
PrimaryKeyLookupTable {
 
     @Override
     public void open() throws Exception {
-        openStateFactory();
+        init();
         createTableState();
         this.indexState =
                 stateFactory.setState(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
index 46c61a15bd..88b9471330 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
@@ -600,6 +600,41 @@ public class LookupTableTest extends TableTestBase {
         assertThat(res).isEmpty();
     }
 
+    @Test
+    public void testRefreshExecutorRebuildAfterReopen() throws Exception {
+        Options options = new Options();
+        options.set(FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC, true);
+        FileStoreTable storeTable = createTable(singletonList("f0"), options);
+        writeWithBucketAssigner(
+                storeTable, row -> 0, GenericRow.of(1, 11, 111), 
GenericRow.of(2, 22, 222));
+
+        FullCacheLookupTable.Context context =
+                new FullCacheLookupTable.Context(
+                        storeTable,
+                        new int[] {0, 1, 2},
+                        null,
+                        null,
+                        tempDir.toFile(),
+                        singletonList("f0"),
+                        null);
+        table = FullCacheLookupTable.create(context, 
ThreadLocalRandom.current().nextInt(2) * 10);
+        assertThat(table).isInstanceOf(PrimaryKeyLookupTable.class);
+        table.open();
+        // reopen
+        table.close();
+        table.open();
+        List<InternalRow> res = table.get(GenericRow.of(1));
+        assertThat(res).hasSize(1);
+        assertRow(res.get(0), 1, 11, 111);
+        writeWithBucketAssigner(storeTable, row -> 0, GenericRow.of(1, 22, 
222));
+        table.refresh();
+        assertThat(table.getRefreshFuture()).isNotNull();
+        table.getRefreshFuture().get();
+        res = table.get(GenericRow.of(1));
+        assertThat(res).hasSize(1);
+        assertRow(res.get(0), 1, 22, 222);
+    }
+
     @Test
     public void testNoPkTableWithCacheRowFilter() throws Exception {
         FileStoreTable storeTable = createTable(emptyList(), new Options());

Reply via email to