This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new a07e563fc [common] Fix thread-safety problem of PrimaryKeyLoookuper 
and PrefixKeyLookuper (#1915)
a07e563fc is described below

commit a07e563fcf9e2cb77ad4e310fa52442784d331f4
Author: Yang Wang <[email protected]>
AuthorDate: Thu Nov 27 15:54:24 2025 +0800

    [common] Fix thread-safety problem of PrimaryKeyLoookuper and 
PrefixKeyLookuper (#1915)
---
 .../fluss/client/lookup/AbstractLookupQuery.java   |  10 +
 .../apache/fluss/client/lookup/LookupClient.java   |   3 +-
 .../apache/fluss/client/lookup/LookupSender.java   | 113 ++++--
 .../org/apache/fluss/client/lookup/Lookuper.java   |   9 +-
 .../fluss/client/lookup/PrefixKeyLookuper.java     |   2 +
 .../fluss/client/lookup/PrimaryKeyLookuper.java    |   2 +
 .../fluss/client/lookup/LookupSenderTest.java      | 452 +++++++++++++++++++--
 .../client/metadata/TestingMetadataUpdater.java    |  60 ++-
 .../org/apache/fluss/config/ConfigOptions.java     |   8 +
 .../fluss/flink/catalog/FlinkTableFactory.java     |   8 +-
 .../fluss/flink/source/FlinkTableSource.java       |   7 -
 .../source/lookup/FlinkAsyncLookupFunction.java    |  82 ++--
 .../flink/source/lookup/FlinkLookupFunction.java   |  52 +--
 .../apache/fluss/flink/utils/PushdownUtils.java    |   2 -
 .../source/lookup/FlinkLookupFunctionTest.java     |   3 -
 website/docs/engine-flink/options.md               |  13 +-
 16 files changed, 650 insertions(+), 176 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookupQuery.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookupQuery.java
index 1737a7696..efaf0100a 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookupQuery.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookupQuery.java
@@ -28,10 +28,12 @@ public abstract class AbstractLookupQuery<T> {
 
     private final TableBucket tableBucket;
     private final byte[] key;
+    private int retries;
 
     public AbstractLookupQuery(TableBucket tableBucket, byte[] key) {
         this.tableBucket = tableBucket;
         this.key = key;
+        this.retries = 0;
     }
 
     public byte[] key() {
@@ -42,6 +44,14 @@ public abstract class AbstractLookupQuery<T> {
         return tableBucket;
     }
 
+    public int retries() {
+        return retries;
+    }
+
+    public void incrementRetries() {
+        retries++;
+    }
+
     public abstract LookupType lookupType();
 
     public abstract CompletableFuture<T> future();
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java
index 3c201541a..a974ba972 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java
@@ -67,7 +67,8 @@ public class LookupClient {
                 new LookupSender(
                         metadataUpdater,
                         lookupQueue,
-                        
conf.getInt(ConfigOptions.CLIENT_LOOKUP_MAX_INFLIGHT_SIZE));
+                        
conf.getInt(ConfigOptions.CLIENT_LOOKUP_MAX_INFLIGHT_SIZE),
+                        conf.getInt(ConfigOptions.CLIENT_LOOKUP_MAX_RETRIES));
         lookupSenderThreadPool.submit(lookupSender);
     }
 
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
index 0f249213a..df2db466d 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
@@ -24,6 +24,7 @@ import org.apache.fluss.exception.ApiException;
 import org.apache.fluss.exception.FlussRuntimeException;
 import org.apache.fluss.exception.InvalidMetadataException;
 import org.apache.fluss.exception.LeaderNotAvailableException;
+import org.apache.fluss.exception.RetriableException;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TablePartition;
@@ -74,10 +75,17 @@ class LookupSender implements Runnable {
 
     private final Semaphore maxInFlightReuqestsSemaphore;
 
-    LookupSender(MetadataUpdater metadataUpdater, LookupQueue lookupQueue, int 
maxFlightRequests) {
+    private final int maxRetries;
+
+    LookupSender(
+            MetadataUpdater metadataUpdater,
+            LookupQueue lookupQueue,
+            int maxFlightRequests,
+            int maxRetries) {
         this.metadataUpdater = metadataUpdater;
         this.lookupQueue = lookupQueue;
         this.maxInFlightReuqestsSemaphore = new Semaphore(maxFlightRequests);
+        this.maxRetries = maxRetries;
         this.running = true;
     }
 
@@ -307,10 +315,8 @@ class LookupSender implements Runnable {
                             pbLookupRespForBucket.getBucketId());
             LookupBatch lookupBatch = lookupsByBucket.get(tableBucket);
             if (pbLookupRespForBucket.hasErrorCode()) {
-                // TODO for re-triable error, we should retry here instead of 
throwing exception.
                 ApiError error = 
ApiError.fromErrorMessage(pbLookupRespForBucket);
-                handleLookupExceptionForBucket(tableBucket, destination, 
error, "lookup");
-                lookupBatch.completeExceptionally(error.exception());
+                handleLookupError(tableBucket, destination, error, 
lookupBatch.lookups(), "lookup");
             } else {
                 List<byte[]> byteValues =
                         pbLookupRespForBucket.getValuesList().stream()
@@ -345,10 +351,13 @@ class LookupSender implements Runnable {
 
             PrefixLookupBatch prefixLookupBatch = 
prefixLookupsByBucket.get(tableBucket);
             if (pbRespForBucket.hasErrorCode()) {
-                // TODO for re-triable error, we should retry here instead of 
throwing exception.
                 ApiError error = ApiError.fromErrorMessage(pbRespForBucket);
-                handleLookupExceptionForBucket(tableBucket, destination, 
error, "prefixLookup");
-                prefixLookupBatch.completeExceptionally(error.exception());
+                handleLookupError(
+                        tableBucket,
+                        destination,
+                        error,
+                        prefixLookupBatch.lookups(),
+                        "prefix lookup");
             } else {
                 List<List<byte[]>> result = new 
ArrayList<>(pbRespForBucket.getValueListsCount());
                 for (int i = 0; i < pbRespForBucket.getValueListsCount(); i++) 
{
@@ -368,58 +377,106 @@ class LookupSender implements Runnable {
             Throwable t, int destination, Map<TableBucket, LookupBatch> 
lookupsByBucket) {
         ApiError error = ApiError.fromThrowable(t);
         for (LookupBatch lookupBatch : lookupsByBucket.values()) {
-            // TODO for re-triable error, we should retry here instead of 
throwing exception.
-            handleLookupExceptionForBucket(lookupBatch.tableBucket(), 
destination, error, "lookup");
-            lookupBatch.completeExceptionally(error.exception());
+            handleLookupError(
+                    lookupBatch.tableBucket(), destination, error, 
lookupBatch.lookups(), "lookup");
         }
     }
 
     private void handlePrefixLookupException(
             Throwable t, int destination, Map<TableBucket, PrefixLookupBatch> 
lookupsByBucket) {
         ApiError error = ApiError.fromThrowable(t);
-        // TODO If error, we need to retry send the request instead of throw 
exception.
         for (PrefixLookupBatch lookupBatch : lookupsByBucket.values()) {
-            handleLookupExceptionForBucket(
-                    lookupBatch.tableBucket(), destination, error, 
"prefixLookup");
-            lookupBatch.completeExceptionally(error.exception());
+            handleLookupError(
+                    lookupBatch.tableBucket(),
+                    destination,
+                    error,
+                    lookupBatch.lookups(),
+                    "prefix lookup");
         }
     }
 
-    void forceClose() {
-        forceClose = true;
-        initiateClose();
+    private void reEnqueueLookup(AbstractLookupQuery<?> lookup) {
+        lookup.incrementRetries();
+        lookupQueue.appendLookup(lookup);
     }
 
-    void initiateClose() {
-        // Ensure accumulator is closed first to guarantee that no more 
appends are accepted after
-        // breaking from the sender loop. Otherwise, we may miss some 
callbacks when shutting down.
-        lookupQueue.close();
-        running = false;
+    private boolean canRetry(AbstractLookupQuery<?> lookup, Exception 
exception) {
+        return lookup.retries() < maxRetries
+                && !lookup.future().isDone()
+                && exception instanceof RetriableException;
     }
 
-    private void handleLookupExceptionForBucket(
-            TableBucket tb, int destination, ApiError error, String 
lookupType) {
+    /**
+     * Handle lookup error with retry logic. For each lookup in the list, 
check if it can be
+     * retried. If yes, re-enqueue it; otherwise, complete it exceptionally.
+     *
+     * @param tableBucket the table bucket
+     * @param error the error from server response
+     * @param lookups the list of lookups to handle
+     * @param lookupType the type of lookup ("" for regular lookup, "prefix " 
for prefix lookup)
+     */
+    private void handleLookupError(
+            TableBucket tableBucket,
+            int destination,
+            ApiError error,
+            List<? extends AbstractLookupQuery<?>> lookups,
+            String lookupType) {
         ApiException exception = error.error().exception();
         LOG.error(
-                "Failed to {} from node {} for bucket {}", lookupType, 
destination, tb, exception);
+                "Failed to {} from node {} for bucket {}",
+                lookupType,
+                destination,
+                tableBucket,
+                exception);
         if (exception instanceof InvalidMetadataException) {
             LOG.warn(
                     "Invalid metadata error in {} request. Going to request 
metadata update.",
                     lookupType,
                     exception);
-            long tableId = tb.getTableId();
+            long tableId = tableBucket.getTableId();
             TableOrPartitions tableOrPartitions;
-            if (tb.getPartitionId() == null) {
+            if (tableBucket.getPartitionId() == null) {
                 tableOrPartitions = new 
TableOrPartitions(Collections.singleton(tableId), null);
             } else {
                 tableOrPartitions =
                         new TableOrPartitions(
                                 null,
                                 Collections.singleton(
-                                        new TablePartition(tableId, 
tb.getPartitionId())));
+                                        new TablePartition(tableId, 
tableBucket.getPartitionId())));
             }
             invalidTableOrPartitions(tableOrPartitions);
         }
+
+        for (AbstractLookupQuery<?> lookup : lookups) {
+            if (canRetry(lookup, error.exception())) {
+                LOG.warn(
+                        "Get error {} response on table bucket {}, retrying 
({} attempts left). Error: {}",
+                        lookupType,
+                        tableBucket,
+                        maxRetries - lookup.retries(),
+                        error.formatErrMsg());
+                reEnqueueLookup(lookup);
+            } else {
+                LOG.warn(
+                        "Get error {} response on table bucket {}, fail. 
Error: {}",
+                        lookupType,
+                        tableBucket,
+                        error.formatErrMsg());
+                lookup.future().completeExceptionally(error.exception());
+            }
+        }
+    }
+
+    void forceClose() {
+        forceClose = true;
+        initiateClose();
+    }
+
+    void initiateClose() {
+        // Ensure accumulator is closed first to guarantee that no more 
appends are accepted after
+        // breaking from the sender loop. Otherwise, we may miss some 
callbacks when shutting down.
+        lookupQueue.close();
+        running = false;
     }
 
     /** A helper class to hold table ids or table partitions. */
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java
index ccb632318..37157d9b1 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java
@@ -20,14 +20,21 @@ package org.apache.fluss.client.lookup;
 import org.apache.fluss.annotation.PublicEvolving;
 import org.apache.fluss.row.InternalRow;
 
+import javax.annotation.concurrent.NotThreadSafe;
+
 import java.util.concurrent.CompletableFuture;
 
 /**
- * The lookup-er is used to lookup row of a primary key table by primary key 
or prefix key.
+ * The lookup-er is used to lookup row of a primary key table by primary key 
or prefix key. The
+ * lookuper has retriable ability to handle transient errors during lookup 
operations which is
+ * configured by {@link 
org.apache.fluss.config.ConfigOptions#CLIENT_LOOKUP_MAX_RETRIES}.
+ *
+ * <p>Note: Lookuper instances are not thread-safe.
  *
  * @since 0.6
  */
 @PublicEvolving
+@NotThreadSafe
 public interface Lookuper {
 
     /**
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java
index 258d9b1c2..61645374f 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java
@@ -32,6 +32,7 @@ import org.apache.fluss.types.DataType;
 import org.apache.fluss.types.RowType;
 
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -46,6 +47,7 @@ import static 
org.apache.fluss.client.utils.ClientUtils.getPartitionId;
  * An implementation of {@link Lookuper} that lookups by prefix key. A prefix 
key is a prefix subset
  * of the primary key.
  */
+@NotThreadSafe
 class PrefixKeyLookuper implements Lookuper {
 
     private final TableInfo tableInfo;
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
index 2a6233e46..2945f1ab4 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
@@ -32,6 +32,7 @@ import org.apache.fluss.types.DataType;
 import org.apache.fluss.types.RowType;
 
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
 
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
@@ -40,6 +41,7 @@ import static 
org.apache.fluss.client.utils.ClientUtils.getPartitionId;
 import static org.apache.fluss.utils.Preconditions.checkArgument;
 
 /** An implementation of {@link Lookuper} that lookups by primary key. */
+@NotThreadSafe
 class PrimaryKeyLookuper implements Lookuper {
 
     private final TableInfo tableInfo;
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
index 336f588ea..bba239d1e 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
@@ -21,20 +21,41 @@ import 
org.apache.fluss.client.metadata.TestingMetadataUpdater;
 import org.apache.fluss.cluster.BucketLocation;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.InvalidTableException;
 import org.apache.fluss.exception.NotLeaderOrFollowerException;
+import org.apache.fluss.exception.TableNotExistException;
+import org.apache.fluss.exception.TimeoutException;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.rpc.gateway.TabletServerGateway;
+import org.apache.fluss.rpc.messages.LookupRequest;
+import org.apache.fluss.rpc.messages.LookupResponse;
+import org.apache.fluss.rpc.messages.PbLookupRespForBucket;
+import org.apache.fluss.rpc.messages.PbPrefixLookupRespForBucket;
+import org.apache.fluss.rpc.messages.PrefixLookupRequest;
+import org.apache.fluss.rpc.messages.PrefixLookupResponse;
+import org.apache.fluss.rpc.protocol.ApiError;
+import org.apache.fluss.server.tablet.TestTabletServerGateway;
 
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.util.Collections;
+import java.time.Duration;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.fluss.record.TestData.DATA1_TABLE_ID_PK;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO_PK;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
+import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -46,17 +67,51 @@ public class LookupSenderTest {
     private TestingMetadataUpdater metadataUpdater;
     private LookupSender lookupSender;
 
+    private static final int MAX_RETRIES = 3;
+    private static final int MAX_INFLIGHT_REQUESTS = 10;
+    private static final TableBucket TABLE_BUCKET = new 
TableBucket(DATA1_TABLE_ID_PK, 0);
+
+    private LookupQueue lookupQueue;
+    private Thread senderThread;
+    private ConfigurableTestTabletServerGateway gateway;
+
     @BeforeEach
-    public void setup() {
-        metadataUpdater = initializeMetadataUpdater();
+    void setup() {
+        // create a configurable gateway for testing
+        gateway = new ConfigurableTestTabletServerGateway();
+
+        // build metadata updater with custom gateway using builder pattern
+        Map<TablePath, TableInfo> tableInfos = new HashMap<>();
+        tableInfos.put(DATA1_TABLE_PATH_PK, DATA1_TABLE_INFO_PK);
+        metadataUpdater =
+                TestingMetadataUpdater.builder(tableInfos)
+                        .withTabletServerGateway(1, gateway)
+                        .build();
+
         Configuration conf = new Configuration();
         conf.set(ConfigOptions.CLIENT_LOOKUP_QUEUE_SIZE, 5);
         conf.set(ConfigOptions.CLIENT_LOOKUP_MAX_BATCH_SIZE, 10);
-        lookupSender = new LookupSender(metadataUpdater, new 
LookupQueue(conf), 5);
+        lookupQueue = new LookupQueue(conf);
+
+        lookupSender =
+                new LookupSender(metadataUpdater, lookupQueue, 
MAX_INFLIGHT_REQUESTS, MAX_RETRIES);
+
+        senderThread = new Thread(lookupSender);
+        senderThread.start();
+    }
+
+    @AfterEach
+    void teardown() throws InterruptedException {
+        if (lookupSender != null) {
+            lookupSender.forceClose();
+        }
+        if (senderThread != null) {
+            senderThread.join(5000);
+        }
     }
 
     @Test
-    void testSendLookupRequestWithNotLeaderOrFollowerException() {
+    void testSendLookupRequestWithNotLeaderOrFollowerException() throws 
Exception {
         assertThat(metadataUpdater.getBucketLocation(tb1))
                 .hasValue(
                         new BucketLocation(
@@ -65,26 +120,36 @@ public class LookupSenderTest {
                                 1,
                                 new int[] {1, 2, 3}));
 
-        // send LookupRequest to serverId 1, which will respond with 
NotLeaderOrFollowerException
-        // as responseLogicId=1 do.
-        metadataUpdater.setResponseLogicId(1, 1);
+        // Configure gateway to always return NotLeaderOrFollowerException for 
all attempts
+        // (including retries)
+        gateway.setLookupHandler(
+                request ->
+                        createFailedResponse(
+                                request,
+                                new NotLeaderOrFollowerException(
+                                        "mock not leader or follower 
exception.")));
+
+        // send LookupRequest through the queue so that retry mechanism can 
work
         LookupQuery lookupQuery = new LookupQuery(tb1, new byte[0]);
         CompletableFuture<byte[]> result = lookupQuery.future();
         assertThat(result).isNotDone();
-        lookupSender.sendLookups(1, LookupType.LOOKUP, 
Collections.singletonList(lookupQuery));
+        lookupQueue.appendLookup(lookupQuery);
+
+        // Wait for all retries to complete and verify it eventually fails
+        assertThatThrownBy(() -> result.get(5, TimeUnit.SECONDS))
+                .isInstanceOf(ExecutionException.class)
+                .hasMessageContaining("Leader not found after retry");
+
+        // Verify that retries happened (should be 1, because server meta 
invalidated)
+        assertThat(lookupQuery.retries()).isEqualTo(1);
 
-        assertThat(result.isCompletedExceptionally()).isTrue();
-        assertThatThrownBy(result::get)
-                .rootCause()
-                .isInstanceOf(NotLeaderOrFollowerException.class)
-                .hasMessage("mock not leader or follower exception.");
         // When NotLeaderOrFollowerException is received, the bucketLocation 
will be removed from
         // metadata updater to trigger get the latest bucketLocation in next 
lookup round.
         assertThat(metadataUpdater.getBucketLocation(tb1)).isNotPresent();
     }
 
     @Test
-    void testSendPrefixLookupRequestWithNotLeaderOrFollowerException() {
+    void testSendPrefixLookupRequestWithNotLeaderOrFollowerException() throws 
Exception {
         assertThat(metadataUpdater.getBucketLocation(tb1))
                 .hasValue(
                         new BucketLocation(
@@ -93,27 +158,356 @@ public class LookupSenderTest {
                                 1,
                                 new int[] {1, 2, 3}));
 
-        // send PrefixLookupRequest to serverId 1, which will respond with
-        // NotLeaderOrFollowerException as responseLogicId=1 do.
-        metadataUpdater.setResponseLogicId(1, 1);
+        // Configure gateway to always return NotLeaderOrFollowerException for 
all attempts
+        // (including retries)
+        gateway.setPrefixLookupHandler(
+                request ->
+                        createFailedPrefixLookupResponse(
+                                request,
+                                new NotLeaderOrFollowerException(
+                                        "mock not leader or follower 
exception.")));
+
+        // send PrefixLookupRequest through the queue so that retry mechanism 
can work
         PrefixLookupQuery prefixLookupQuery = new PrefixLookupQuery(tb1, new 
byte[0]);
         CompletableFuture<List<byte[]>> future = prefixLookupQuery.future();
         assertThat(future).isNotDone();
-        lookupSender.sendLookups(
-                1, LookupType.PREFIX_LOOKUP, 
Collections.singletonList(prefixLookupQuery));
-
-        assertThat(future.isCompletedExceptionally()).isTrue();
-        assertThatThrownBy(future::get)
-                .rootCause()
-                .isInstanceOf(NotLeaderOrFollowerException.class)
-                .hasMessage("mock not leader or follower exception.");
+        lookupQueue.appendLookup(prefixLookupQuery);
+
+        // Wait for all retries to complete and verify it eventually fails
+        assertThatThrownBy(() -> future.get(5, TimeUnit.SECONDS))
+                .isInstanceOf(ExecutionException.class)
+                .hasMessageContaining("Leader not found after retry");
+
+        // Verify that retries happened (should be 1, because server meta 
invalidated)
+        assertThat(prefixLookupQuery.retries()).isEqualTo(1);
+
         // When NotLeaderOrFollowerException is received, the bucketLocation 
will be removed from
         // metadata updater to trigger get the latest bucketLocation in next 
lookup round.
         assertThat(metadataUpdater.getBucketLocation(tb1)).isNotPresent();
     }
 
-    private TestingMetadataUpdater initializeMetadataUpdater() {
-        return new TestingMetadataUpdater(
-                Collections.singletonMap(DATA1_TABLE_PATH_PK, 
DATA1_TABLE_INFO_PK));
+    @Test
+    void testRetriableExceptionTriggersRetry() throws Exception {
+        // setup: fail twice with retriable exception, then succeed
+        AtomicInteger attemptCount = new AtomicInteger(0);
+        gateway.setLookupHandler(
+                request -> {
+                    int attempt = attemptCount.incrementAndGet();
+                    if (attempt <= 2) {
+                        // first two attempts fail with retriable exception
+                        return createFailedResponse(
+                                request, new TimeoutException("simulated 
timeout"));
+                    } else {
+                        // third attempt succeeds
+                        return createSuccessResponse(request, 
"value".getBytes());
+                    }
+                });
+
+        // execute: submit lookup
+        byte[] key = "key".getBytes();
+        LookupQuery query = new LookupQuery(TABLE_BUCKET, key);
+        lookupQueue.appendLookup(query);
+
+        // verify: eventually succeeds after retries
+        byte[] result = query.future().get(5, TimeUnit.SECONDS);
+        assertThat(result).isEqualTo("value".getBytes());
+        assertThat(attemptCount.get()).isEqualTo(3);
+        assertThat(query.retries()).isEqualTo(2); // retried 2 times
+    }
+
+    @Test
+    void testNonRetriableExceptionDoesNotRetry() throws Exception {
+        // setup: fail with non-retriable exception
+        gateway.setLookupHandler(
+                request ->
+                        createFailedResponse(
+                                request, new TableNotExistException("table not 
found")));
+
+        // execute: submit lookup
+        byte[] key = "key".getBytes();
+        LookupQuery query = new LookupQuery(TABLE_BUCKET, key);
+        lookupQueue.appendLookup(query);
+
+        // verify: fails immediately without retry
+        assertThatThrownBy(() -> query.future().get(5, TimeUnit.SECONDS))
+                .isInstanceOf(ExecutionException.class)
+                .hasRootCauseInstanceOf(TableNotExistException.class);
+        assertThat(query.retries()).isEqualTo(0); // no retries
+    }
+
+    @Test
+    void testMaxRetriesEnforced() throws Exception {
+        // setup: always fail with retriable exception
+        AtomicInteger attemptCount = new AtomicInteger(0);
+        gateway.setLookupHandler(
+                request -> {
+                    attemptCount.incrementAndGet();
+                    return createFailedResponse(request, new 
TimeoutException("timeout"));
+                });
+
+        // execute: submit lookup
+        byte[] key = "key".getBytes();
+        LookupQuery query = new LookupQuery(TABLE_BUCKET, key);
+        lookupQueue.appendLookup(query);
+
+        // verify: eventually fails after max retries
+        assertThatThrownBy(() -> query.future().get(5, TimeUnit.SECONDS))
+                .isInstanceOf(ExecutionException.class)
+                .hasRootCauseInstanceOf(TimeoutException.class);
+
+        // should attempt: 1 initial + MAX_RETRIES retries
+        assertThat(attemptCount.get()).isEqualTo(1 + MAX_RETRIES);
+        assertThat(query.retries()).isEqualTo(MAX_RETRIES);
+    }
+
+    @Test
+    void testRetryStopsIfFutureCompleted() throws Exception {
+        // setup: always fail with retriable exception
+        AtomicInteger attemptCount = new AtomicInteger(0);
+        gateway.setLookupHandler(
+                request -> {
+                    int attempt = attemptCount.incrementAndGet();
+                    if (attempt == 1) {
+                        // first attempt fails
+                        return createFailedResponse(request, new 
TimeoutException("timeout"));
+                    } else {
+                        try {
+                            // Avoid attempting again too quickly
+                            Thread.sleep(100);
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        }
+                        // subsequent attempts should not happen if we 
complete the future
+                        throw new AssertionError(
+                                "Should not retry after future is completed 
externally");
+                    }
+                });
+
+        // execute: submit lookup
+        byte[] key = "key".getBytes();
+        LookupQuery query = new LookupQuery(TABLE_BUCKET, key);
+        lookupQueue.appendLookup(query);
+
+        // complete the future externally before retry happens
+        waitUntil(() -> attemptCount.get() >= 1, Duration.ofSeconds(5), "first 
attempt to be made");
+        query.future().complete("external".getBytes());
+
+        // verify: completed externally
+        byte[] result = query.future().get(1, TimeUnit.SECONDS);
+        assertThat(result).isEqualTo("external".getBytes());
+        // retries is less than 3, because we stop the query so it won't send 
again.
+        assertThat(query.retries()).isGreaterThanOrEqualTo(0).isLessThan(3);
+        assertThat(attemptCount.get()).isGreaterThanOrEqualTo(1).isLessThan(4);
+    }
+
+    @Test
+    void testDifferentExceptionTypesHandledCorrectly() throws Exception {
+        // test multiple exception types
+        testException(new TimeoutException("timeout"), true, 3); // retriable, 
should retry
+        testException(new InvalidTableException("invalid"), false, 0); // 
non-retriable, no retry
+        testException(new TableNotExistException("not exist"), false, 0); // 
non-retriable, no retry
+    }
+
+    @Test
+    void testPrefixLookupRetry() throws Exception {
+        // setup: fail twice with retriable exception, then succeed
+        AtomicInteger attemptCount = new AtomicInteger(0);
+        gateway.setPrefixLookupHandler(
+                request -> {
+                    int attempt = attemptCount.incrementAndGet();
+                    if (attempt <= 2) {
+                        // first two attempts fail
+                        return createFailedPrefixLookupResponse(
+                                request, new TimeoutException("timeout"));
+                    } else {
+                        // third attempt succeeds
+                        return createSuccessPrefixLookupResponse(request);
+                    }
+                });
+
+        // execute: submit prefix lookup
+        byte[] prefixKey = "prefix".getBytes();
+        PrefixLookupQuery query = new PrefixLookupQuery(TABLE_BUCKET, 
prefixKey);
+        lookupQueue.appendLookup(query);
+
+        // verify: eventually succeeds after retries
+        query.future().get(5, TimeUnit.SECONDS);
+        assertThat(attemptCount.get()).isEqualTo(3);
+        assertThat(query.retries()).isEqualTo(2);
+    }
+
+    @Test
+    void testMultipleConcurrentLookupsWithRetries() throws Exception {
+        // setup: first attempt fails, second succeeds
+        AtomicInteger attemptCount = new AtomicInteger(0);
+        gateway.setLookupHandler(
+                request -> {
+                    int attempt = attemptCount.incrementAndGet();
+                    if (attempt % 2 == 1) {
+                        // odd attempts fail
+                        return createFailedResponse(request, new 
TimeoutException("timeout"));
+                    } else {
+                        // even attempts succeed
+                        return createSuccessResponse(request, ("value" + 
attempt).getBytes());
+                    }
+                });
+
+        // execute: submit multiple lookups
+        LookupQuery query1 = new LookupQuery(TABLE_BUCKET, "key1".getBytes());
+        LookupQuery query2 = new LookupQuery(TABLE_BUCKET, "key2".getBytes());
+        LookupQuery query3 = new LookupQuery(TABLE_BUCKET, "key3".getBytes());
+
+        lookupQueue.appendLookup(query1);
+        lookupQueue.appendLookup(query2);
+        lookupQueue.appendLookup(query3);
+
+        // verify: all succeed after retries
+        assertThat(query1.future().get(5, TimeUnit.SECONDS)).isNotNull();
+        assertThat(query2.future().get(5, TimeUnit.SECONDS)).isNotNull();
+        assertThat(query3.future().get(5, TimeUnit.SECONDS)).isNotNull();
+        // Note: lookups are batched together, so attemptCount reflects batch 
attempts, not
+        // individual lookups
+        assertThat(attemptCount.get())
+                .isGreaterThanOrEqualTo(2); // at least 1 failure + 1 success 
for the batch
+    }
+
+    // Helper methods
+
+    private void testException(Exception exception, boolean shouldRetry, int 
expectedRetries)
+            throws Exception {
+        // reset gateway
+        AtomicInteger attemptCount = new AtomicInteger(0);
+        gateway.setLookupHandler(
+                request -> {
+                    attemptCount.incrementAndGet();
+                    return createFailedResponse(request, exception);
+                });
+
+        // execute
+        byte[] key = ("key-" + 
exception.getClass().getSimpleName()).getBytes();
+        LookupQuery query = new LookupQuery(TABLE_BUCKET, key);
+        lookupQueue.appendLookup(query);
+
+        // verify
+        assertThatThrownBy(() -> query.future().get(5, TimeUnit.SECONDS))
+                .isInstanceOf(ExecutionException.class);
+
+        if (shouldRetry) {
+            assertThat(attemptCount.get()).isEqualTo(1 + MAX_RETRIES);
+            assertThat(query.retries()).isEqualTo(expectedRetries);
+        } else {
+            assertThat(attemptCount.get()).isEqualTo(1); // only initial 
attempt
+            assertThat(query.retries()).isEqualTo(expectedRetries);
+        }
+
+        // wait a bit to ensure no more attempts
+        Thread.sleep(200);
+    }
+
+    private CompletableFuture<LookupResponse> createSuccessResponse(
+            LookupRequest request, byte[] value) {
+        LookupResponse response = new LookupResponse();
+        PbLookupRespForBucket bucketResp = response.addBucketsResp();
+        bucketResp.setBucketId(TABLE_BUCKET.getBucket());
+        if (TABLE_BUCKET.getPartitionId() != null) {
+            bucketResp.setPartitionId(TABLE_BUCKET.getPartitionId());
+        }
+        // Add value for each key in the request
+        int keyCount = request.getBucketsReqAt(0).getKeysCount();
+        for (int i = 0; i < keyCount; i++) {
+            bucketResp.addValue().setValues(value);
+        }
+        return CompletableFuture.completedFuture(response);
+    }
+
+    private CompletableFuture<LookupResponse> createFailedResponse(
+            LookupRequest request, Exception exception) {
+        LookupResponse response = new LookupResponse();
+        PbLookupRespForBucket bucketResp = response.addBucketsResp();
+        bucketResp.setBucketId(TABLE_BUCKET.getBucket());
+        if (TABLE_BUCKET.getPartitionId() != null) {
+            bucketResp.setPartitionId(TABLE_BUCKET.getPartitionId());
+        }
+        ApiError error = ApiError.fromThrowable(exception);
+        bucketResp.setErrorCode(error.error().code());
+        bucketResp.setErrorMessage(error.formatErrMsg());
+        return CompletableFuture.completedFuture(response);
+    }
+
+    private CompletableFuture<PrefixLookupResponse> 
createSuccessPrefixLookupResponse(
+            PrefixLookupRequest request) {
+        PrefixLookupResponse response = new PrefixLookupResponse();
+        // Create response for each prefix key in request
+        PbPrefixLookupRespForBucket bucketResp = response.addBucketsResp();
+        bucketResp.setBucketId(TABLE_BUCKET.getBucket());
+        if (TABLE_BUCKET.getPartitionId() != null) {
+            bucketResp.setPartitionId(TABLE_BUCKET.getPartitionId());
+        }
+        // Add empty value list for each prefix key
+        int keyCount = request.getBucketsReqAt(0).getKeysCount();
+        for (int i = 0; i < keyCount; i++) {
+            bucketResp.addValueList(); // empty list is valid for prefix lookup
+        }
+        return CompletableFuture.completedFuture(response);
+    }
+
+    private CompletableFuture<PrefixLookupResponse> 
createFailedPrefixLookupResponse(
+            PrefixLookupRequest request, Exception exception) {
+        PrefixLookupResponse response = new PrefixLookupResponse();
+        PbPrefixLookupRespForBucket bucketResp = response.addBucketsResp();
+        bucketResp.setBucketId(TABLE_BUCKET.getBucket());
+        if (TABLE_BUCKET.getPartitionId() != null) {
+            bucketResp.setPartitionId(TABLE_BUCKET.getPartitionId());
+        }
+        ApiError error = ApiError.fromThrowable(exception);
+        bucketResp.setErrorCode(error.error().code());
+        bucketResp.setErrorMessage(error.formatErrMsg());
+        return CompletableFuture.completedFuture(response);
+    }
+
+    /**
+     * A configurable {@link TabletServerGateway} for testing that allows 
setting custom handlers
+     * for lookup operations.
+     */
+    private static class ConfigurableTestTabletServerGateway extends 
TestTabletServerGateway {
+
+        private java.util.function.Function<LookupRequest, 
CompletableFuture<LookupResponse>>
+                lookupHandler;
+        private java.util.function.Function<
+                        PrefixLookupRequest, 
CompletableFuture<PrefixLookupResponse>>
+                prefixLookupHandler;
+
+        public ConfigurableTestTabletServerGateway() {
+            super(false);
+        }
+
+        public void setLookupHandler(
+                java.util.function.Function<LookupRequest, 
CompletableFuture<LookupResponse>>
+                        handler) {
+            this.lookupHandler = handler;
+        }
+
+        public void setPrefixLookupHandler(
+                java.util.function.Function<
+                                PrefixLookupRequest, 
CompletableFuture<PrefixLookupResponse>>
+                        handler) {
+            this.prefixLookupHandler = handler;
+        }
+
+        @Override
+        public CompletableFuture<LookupResponse> lookup(LookupRequest request) 
{
+            if (lookupHandler != null) {
+                return lookupHandler.apply(request);
+            }
+            return CompletableFuture.completedFuture(new LookupResponse());
+        }
+
+        @Override
+        public CompletableFuture<PrefixLookupResponse> 
prefixLookup(PrefixLookupRequest request) {
+            if (prefixLookupHandler != null) {
+                return prefixLookupHandler.apply(request);
+            }
+            return CompletableFuture.completedFuture(new 
PrefixLookupResponse());
+        }
     }
 }
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
index 38994cbca..6ca025f33 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
@@ -55,22 +55,72 @@ public class TestingMetadataUpdater extends MetadataUpdater 
{
     private final Map<Integer, TestTabletServerGateway> tabletServerGatewayMap;
 
     public TestingMetadataUpdater(Map<TablePath, TableInfo> tableInfos) {
-        this(COORDINATOR, Arrays.asList(NODE1, NODE2, NODE3), tableInfos);
+        this(COORDINATOR, Arrays.asList(NODE1, NODE2, NODE3), tableInfos, 
null);
     }
 
     private TestingMetadataUpdater(
             ServerNode coordinatorServer,
             List<ServerNode> tabletServers,
-            Map<TablePath, TableInfo> tableInfos) {
+            Map<TablePath, TableInfo> tableInfos,
+            Map<Integer, TestTabletServerGateway> customGateways) {
         super(
                 RpcClient.create(
                         new Configuration(), 
TestingClientMetricGroup.newInstance(), false),
                 Cluster.empty());
         initializeCluster(coordinatorServer, tabletServers, tableInfos);
         coordinatorGateway = new TestCoordinatorGateway();
-        tabletServerGatewayMap = new HashMap<>();
-        for (ServerNode tabletServer : tabletServers) {
-            tabletServerGatewayMap.put(tabletServer.id(), new 
TestTabletServerGateway(false));
+        if (customGateways != null) {
+            tabletServerGatewayMap = customGateways;
+        } else {
+            tabletServerGatewayMap = new HashMap<>();
+            for (ServerNode tabletServer : tabletServers) {
+                tabletServerGatewayMap.put(tabletServer.id(), new 
TestTabletServerGateway(false));
+            }
+        }
+    }
+
+    /**
+     * Create a builder for constructing TestingMetadataUpdater with custom 
gateways.
+     *
+     * @param tableInfos the table information map
+     * @return a builder instance
+     */
+    public static Builder builder(Map<TablePath, TableInfo> tableInfos) {
+        return new Builder(tableInfos);
+    }
+
+    /** Builder for TestingMetadataUpdater to support custom gateway 
configuration. */
+    public static class Builder {
+        private final Map<TablePath, TableInfo> tableInfos;
+        private final Map<Integer, TestTabletServerGateway> customGateways = 
new HashMap<>();
+
+        private Builder(Map<TablePath, TableInfo> tableInfos) {
+            this.tableInfos = tableInfos;
+        }
+
+        /**
+         * Set a custom gateway for a specific tablet server node.
+         *
+         * @param serverId the server id (1, 2, or 3 for default nodes)
+         * @param gateway the custom gateway
+         * @return this builder
+         */
+        public Builder withTabletServerGateway(int serverId, 
TestTabletServerGateway gateway) {
+            customGateways.put(serverId, gateway);
+            return this;
+        }
+
+        /**
+         * Build the TestingMetadataUpdater instance.
+         *
+         * @return the configured TestingMetadataUpdater
+         */
+        public TestingMetadataUpdater build() {
+            return new TestingMetadataUpdater(
+                    COORDINATOR,
+                    Arrays.asList(NODE1, NODE2, NODE3),
+                    tableInfos,
+                    customGateways.isEmpty() ? null : customGateways);
         }
     }
 
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java 
b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index 6b045bb38..875e875d5 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -1112,6 +1112,14 @@ public class ConfigOptions {
                             "The maximum time to wait for the lookup batch to 
full, if this timeout is reached, "
                                     + "the lookup batch will be closed to 
send.");
 
+    public static final ConfigOption<Integer> CLIENT_LOOKUP_MAX_RETRIES =
+            key("client.lookup.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "Setting a value greater than zero will cause the 
client to resend any lookup request "
+                                    + "that fails with a potentially transient 
error.");
+
     public static final ConfigOption<Integer> 
CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM =
             key("client.scanner.remote-log.prefetch-num")
                     .intType()
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
index 92579e3ac..d3ddf29ac 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
@@ -148,7 +148,6 @@ public class FlinkTableFactory implements 
DynamicTableSourceFactory, DynamicTabl
                 partitionKeyIndexes,
                 isStreamingMode,
                 startupOptions,
-                tableOptions.get(LookupOptions.MAX_RETRIES),
                 tableOptions.get(FlinkConnectorOptions.LOOKUP_ASYNC),
                 cache,
                 partitionDiscoveryIntervalMs,
@@ -244,6 +243,13 @@ public class FlinkTableFactory implements 
DynamicTableSourceFactory, DynamicTabl
                     }
                 });
 
+        // map flink lookup.max-retries to client.lookup.max-retries
+        if (tableOptions.containsKey(LookupOptions.MAX_RETRIES.key())) {
+            flussConfig.setString(
+                    ConfigOptions.CLIENT_LOOKUP_MAX_RETRIES.key(),
+                    tableOptions.get(LookupOptions.MAX_RETRIES.key()));
+        }
+
         // pass flink io tmp dir to fluss client.
         flussConfig.setString(
                 ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR,
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
index e7c7357e3..ef4b63812 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
@@ -127,7 +127,6 @@ public class FlinkTableSource
     private final FlinkConnectorOptionsUtils.StartupOptions startupOptions;
 
     // options for lookup source
-    private final int lookupMaxRetryTimes;
     private final boolean lookupAsync;
     @Nullable private final LookupCache cache;
 
@@ -166,7 +165,6 @@ public class FlinkTableSource
             int[] partitionKeyIndexes,
             boolean streaming,
             FlinkConnectorOptionsUtils.StartupOptions startupOptions,
-            int lookupMaxRetryTimes,
             boolean lookupAsync,
             @Nullable LookupCache cache,
             long scanPartitionDiscoveryIntervalMs,
@@ -183,7 +181,6 @@ public class FlinkTableSource
         this.streaming = streaming;
         this.startupOptions = checkNotNull(startupOptions, "startupOptions 
must not be null");
 
-        this.lookupMaxRetryTimes = lookupMaxRetryTimes;
         this.lookupAsync = lookupAsync;
         this.cache = cache;
 
@@ -253,7 +250,6 @@ public class FlinkTableSource
                                 flussConfig,
                                 tableOutputType,
                                 primaryKeyIndexes,
-                                lookupMaxRetryTimes,
                                 projectedFields);
             } else if (limit > 0) {
                 results =
@@ -412,7 +408,6 @@ public class FlinkTableSource
                             flussConfig,
                             tablePath,
                             tableOutputType,
-                            lookupMaxRetryTimes,
                             lookupNormalizer,
                             projectedFields);
             if (cache != null) {
@@ -426,7 +421,6 @@ public class FlinkTableSource
                             flussConfig,
                             tablePath,
                             tableOutputType,
-                            lookupMaxRetryTimes,
                             lookupNormalizer,
                             projectedFields);
             if (cache != null) {
@@ -449,7 +443,6 @@ public class FlinkTableSource
                         partitionKeyIndexes,
                         streaming,
                         startupOptions,
-                        lookupMaxRetryTimes,
                         lookupAsync,
                         cache,
                         scanPartitionDiscoveryIntervalMs,
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java
index 716a74199..e1906712a 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java
@@ -33,6 +33,7 @@ import 
org.apache.fluss.flink.utils.FlussRowToFlinkRowConverter;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.ProjectedRow;
+import org.apache.fluss.utils.ExceptionUtils;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.functions.AsyncLookupFunction;
@@ -58,7 +59,6 @@ public class FlinkAsyncLookupFunction extends 
AsyncLookupFunction {
 
     private final Configuration flussConfig;
     private final TablePath tablePath;
-    private final int maxRetryTimes;
     private final RowType flinkRowType;
     private final LookupNormalizer lookupNormalizer;
     @Nullable private final int[] projection;
@@ -73,12 +73,10 @@ public class FlinkAsyncLookupFunction extends 
AsyncLookupFunction {
             Configuration flussConfig,
             TablePath tablePath,
             RowType flinkRowType,
-            int maxRetryTimes,
             LookupNormalizer lookupNormalizer,
             @Nullable int[] projection) {
         this.flussConfig = flussConfig;
         this.tablePath = tablePath;
-        this.maxRetryTimes = maxRetryTimes;
         this.flinkRowType = flinkRowType;
         this.lookupNormalizer = lookupNormalizer;
         this.projection = projection;
@@ -121,69 +119,35 @@ public class FlinkAsyncLookupFunction extends 
AsyncLookupFunction {
         RowData normalizedKeyRow = lookupNormalizer.normalizeLookupKey(keyRow);
         RemainingFilter remainingFilter = 
lookupNormalizer.createRemainingFilter(keyRow);
         InternalRow flussKeyRow = lookupRow.replace(normalizedKeyRow);
-        CompletableFuture<Collection<RowData>> future = new 
CompletableFuture<>();
-        // fetch result
-        fetchResult(future, 0, flussKeyRow, remainingFilter);
-        return future;
-    }
 
-    /**
-     * Execute async fetch result .
-     *
-     * @param resultFuture The result or exception is returned.
-     * @param currentRetry Current number of retries.
-     * @param keyRow the key row to get.
-     * @param remainingFilter the nullable remaining filter to filter the 
result.
-     */
-    private void fetchResult(
-            CompletableFuture<Collection<RowData>> resultFuture,
-            int currentRetry,
-            InternalRow keyRow,
-            @Nullable RemainingFilter remainingFilter) {
-        lookuper.lookup(keyRow)
+        // the retry mechanism is now handled by the underlying LookupClient 
layer,
+        // we can't call lookuper.lookup() in whenComplete callback as 
lookuper is not thread-safe.
+        CompletableFuture<Collection<RowData>> future = new 
CompletableFuture<>();
+        lookuper.lookup(flussKeyRow)
                 .whenComplete(
                         (result, throwable) -> {
                             if (throwable != null) {
-                                handleLookupFailed(
-                                        resultFuture,
-                                        throwable,
-                                        currentRetry,
-                                        keyRow,
-                                        remainingFilter);
+                                if (ExceptionUtils.findThrowable(
+                                                throwable, 
TableNotExistException.class)
+                                        .isPresent()) {
+                                    LOG.error("Table '{}' not found ", 
tablePath, throwable);
+                                    future.completeExceptionally(
+                                            new RuntimeException(
+                                                    "Fluss table '" + 
tablePath + "' not found.",
+                                                    throwable));
+                                } else {
+                                    LOG.error("Fluss asyncLookup error", 
throwable);
+                                    future.completeExceptionally(
+                                            new RuntimeException(
+                                                    "Execution of Fluss 
asyncLookup failed: "
+                                                            + 
throwable.getMessage(),
+                                                    throwable));
+                                }
                             } else {
-                                handleLookupSuccess(
-                                        resultFuture, result.getRowList(), 
remainingFilter);
+                                handleLookupSuccess(future, 
result.getRowList(), remainingFilter);
                             }
                         });
-    }
-
-    private void handleLookupFailed(
-            CompletableFuture<Collection<RowData>> resultFuture,
-            Throwable throwable,
-            int currentRetry,
-            InternalRow keyRow,
-            @Nullable RemainingFilter remainingFilter) {
-        if (throwable instanceof TableNotExistException) {
-            LOG.error("Table '{}' not found ", tablePath, throwable);
-            resultFuture.completeExceptionally(
-                    new RuntimeException("Fluss table '" + tablePath + "' not 
found.", throwable));
-        } else {
-            LOG.error("Fluss asyncLookup error, retry times = {}", 
currentRetry, throwable);
-            if (currentRetry >= maxRetryTimes) {
-                String exceptionMsg =
-                        String.format(
-                                "Execution of Fluss asyncLookup failed: %s, 
retry times = %d.",
-                                throwable.getMessage(), currentRetry);
-                resultFuture.completeExceptionally(new 
RuntimeException(exceptionMsg, throwable));
-            } else {
-                try {
-                    Thread.sleep(1000L * currentRetry);
-                } catch (InterruptedException e1) {
-                    resultFuture.completeExceptionally(e1);
-                }
-                fetchResult(resultFuture, currentRetry + 1, keyRow, 
remainingFilter);
-            }
-        }
+        return future;
     }
 
     private void handleLookupSuccess(
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java
index 480247f0f..79418f9b2 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java
@@ -54,7 +54,6 @@ public class FlinkLookupFunction extends LookupFunction {
 
     private final Configuration flussConfig;
     private final TablePath tablePath;
-    private final int maxRetryTimes;
     private final RowType flinkRowType;
     private final LookupNormalizer lookupNormalizer;
     @Nullable private final int[] projection;
@@ -70,12 +69,10 @@ public class FlinkLookupFunction extends LookupFunction {
             Configuration flussConfig,
             TablePath tablePath,
             RowType flinkRowType,
-            int maxRetryTimes,
             LookupNormalizer lookupNormalizer,
             @Nullable int[] projection) {
         this.flussConfig = flussConfig;
         this.tablePath = tablePath;
-        this.maxRetryTimes = maxRetryTimes;
         this.flinkRowType = flinkRowType;
         this.lookupNormalizer = lookupNormalizer;
         this.projection = projection;
@@ -124,41 +121,28 @@ public class FlinkLookupFunction extends LookupFunction {
                 lookupNormalizer.createRemainingFilter(keyRow);
         // wrap flink row as fluss row to lookup, the flink row has already 
been in expected order.
         InternalRow flussKeyRow = lookupRow.replace(normalizedKeyRow);
-        for (int retry = 0; retry <= maxRetryTimes; retry++) {
-            try {
-                List<InternalRow> lookupRows = 
lookuper.lookup(flussKeyRow).get().getRowList();
-                if (lookupRows.isEmpty()) {
-                    return Collections.emptyList();
-                }
-                List<RowData> projectedRows = new ArrayList<>();
-                for (InternalRow row : lookupRows) {
-                    if (row != null) {
-                        RowData flinkRow =
-                                
flussRowToFlinkRowConverter.toFlinkRowData(maybeProject(row));
-                        if (remainingFilter == null || 
remainingFilter.isMatch(flinkRow)) {
-                            projectedRows.add(flinkRow);
-                        }
-                    }
-                }
-                return projectedRows;
-            } catch (Exception e) {
-                LOG.error(String.format("Fluss lookup error, retry times = 
%d", retry), e);
-                if (retry >= maxRetryTimes) {
-                    String exceptionMsg =
-                            String.format(
-                                    "Execution of Fluss lookup failed, retry 
times = %d.", retry);
-                    throw new RuntimeException(exceptionMsg, e);
-                }
 
-                try {
-                    Thread.sleep(1000L * retry);
-                } catch (InterruptedException interruptedException) {
-                    Thread.currentThread().interrupt();
-                    throw new RuntimeException(interruptedException);
+        // the retry mechanism will be handled by the underlying LookupClient 
layer
+        try {
+            List<InternalRow> lookupRows = 
lookuper.lookup(flussKeyRow).get().getRowList();
+            if (lookupRows.isEmpty()) {
+                return Collections.emptyList();
+            }
+            List<RowData> projectedRows = new ArrayList<>();
+            for (InternalRow row : lookupRows) {
+                if (row != null) {
+                    RowData flinkRow =
+                            
flussRowToFlinkRowConverter.toFlinkRowData(maybeProject(row));
+                    if (remainingFilter == null || 
remainingFilter.isMatch(flinkRow)) {
+                        projectedRows.add(flinkRow);
+                    }
                 }
             }
+            return projectedRows;
+        } catch (Exception e) {
+            LOG.error("Fluss lookup error", e);
+            throw new RuntimeException("Execution of Fluss lookup failed: " + 
e.getMessage(), e);
         }
-        return Collections.emptyList();
     }
 
     private InternalRow maybeProject(InternalRow row) {
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
index e32e0752c..1d1f39853 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
@@ -262,7 +262,6 @@ public class PushdownUtils {
             Configuration flussConfig,
             RowType sourceOutputType,
             int[] primaryKeyIndexes,
-            int lookupMaxRetryTimes,
             @Nullable int[] projectedFields) {
         LookupNormalizer lookupNormalizer =
                 createPrimaryKeyLookupNormalizer(primaryKeyIndexes, 
sourceOutputType);
@@ -271,7 +270,6 @@ public class PushdownUtils {
                         flussConfig,
                         tablePath,
                         sourceOutputType,
-                        lookupMaxRetryTimes,
                         lookupNormalizer,
                         projectedFields);
         try {
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
index a68350a0c..297c6a16d 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
@@ -23,7 +23,6 @@ import org.apache.fluss.flink.utils.FlinkConversions;
 import org.apache.fluss.flink.utils.FlinkTestBase;
 import org.apache.fluss.metadata.TablePath;
 
-import org.apache.flink.table.connector.source.lookup.LookupOptions;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.functions.AsyncLookupFunction;
 import org.apache.flink.table.types.logical.RowType;
@@ -57,7 +56,6 @@ class FlinkLookupFunctionTest extends FlinkTestBase {
                         clientConf,
                         tablePath,
                         flinkRowType,
-                        LookupOptions.MAX_RETRIES.defaultValue(),
                         createPrimaryKeyLookupNormalizer(new int[] {0}, 
flinkRowType),
                         null);
 
@@ -95,7 +93,6 @@ class FlinkLookupFunctionTest extends FlinkTestBase {
                         clientConf,
                         tablePath,
                         flinkRowType,
-                        LookupOptions.MAX_RETRIES.defaultValue(),
                         createPrimaryKeyLookupNormalizer(new int[] {0}, 
flinkRowType),
                         null);
         asyncLookupFunction.open(null);
diff --git a/website/docs/engine-flink/options.md 
b/website/docs/engine-flink/options.md
index caddc0f58..e5ecc0ff5 100644
--- a/website/docs/engine-flink/options.md
+++ b/website/docs/engine-flink/options.md
@@ -109,16 +109,17 @@ See more details about [ALTER TABLE ... 
SET](engine-flink/ddl.md#set-properties)
 | Option                                   | Type       | Default | 
Description                                                                     
                                            |
 
|------------------------------------------|------------|---------|-----------------------------------------------------------------------------------------------------------------------------|
 | lookup.async                             | Boolean    | true    | Whether to 
use asynchronous lookup. Asynchronous lookup has better throughput performance 
than synchronous lookup.          |
-| lookup.cache                             | Enum       | NONE    | The 
caching strategy for this lookup table, including NONE, PARTIAL.                
                                        |                                       
                                                                                
                                                                                
                                                                             |
-| lookup.max-retries                       | Integer    | 3       | The 
maximum allowed retries if a lookup operation fails.                            
                                        |                                       
                                                                                
                                                                                
                                                                                
          |
-| lookup.partial-cache.expire-after-access | Duration   | (None)  | Duration 
to expire an entry in the cache after accessing.                                
                                   |                                            
                                                                                
                                                                                
                                                                                
   |
-| lookup.partial-cache.expire-after-write  | Duration   | (None)  | Duration 
to expire an entry in the cache after writing.                                  
                                   |                                            
                                                                                
                                                                                
                                                                                
  |
-| lookup.partial-cache.cache-missing-key   | Boolean    | true    | Whether to 
store an empty value into the cache if the lookup key doesn't match any rows in 
the table.                       |                                              
                                                                                
                                                                                
                                     |
-| lookup.partial-cache.max-rows            | Long       | (None)  | The 
maximum number of rows to store in the cache.                                   
                                        |                                       
                                                                                
                                                                                
                                                                                
             |
+| lookup.cache                             | Enum       | NONE    | The 
caching strategy for this lookup table, including NONE, PARTIAL.                
                                        |
+| lookup.max-retries                       | Integer    | 3       | The 
maximum allowed retries if a lookup operation fails. Setting this value will 
override option 'client.lookup.max-retries'.|
+| lookup.partial-cache.expire-after-access | Duration   | (None)  | Duration 
to expire an entry in the cache after accessing.                                
                                   |
+| lookup.partial-cache.expire-after-write  | Duration   | (None)  | Duration 
to expire an entry in the cache after writing.                                  
                                   |
+| lookup.partial-cache.cache-missing-key   | Boolean    | true    | Whether to 
store an empty value into the cache if the lookup key doesn't match any rows in 
the table.                       |
+| lookup.partial-cache.max-rows            | Long       | (None)  | The 
maximum number of rows to store in the cache.                                   
                                        |
 | client.lookup.queue-size                 | Integer    | 25600   | The 
maximum number of pending lookup operations.                                    
                                        |
 | client.lookup.max-batch-size             | Integer    | 128     | The 
maximum batch size of merging lookup operations to one lookup request.          
                                        |
 | client.lookup.max-inflight-requests      | Integer    | 128     | The 
maximum number of unacknowledged lookup requests for lookup operations.         
                                        |
 | client.lookup.batch-timeout              | Duration   | 100ms   | The 
maximum time to wait for the lookup batch to full, if this timeout is reached, 
the lookup batch will be closed to send. |
+| client.lookup.max-retries                | Integer    | 3       | Setting a 
value greater than zero will cause the client to resend any lookup request that 
fails with a potentially transient error. |
 
 
 ## Write Options

Reply via email to