This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 863516920 [client] Lookup sender need retry to send when leader not 
found in metadata cache (#2052)
863516920 is described below

commit 8635169202821e5cb999040f37178596d73943e5
Author: yunhong <[email protected]>
AuthorDate: Mon Dec 1 16:09:29 2025 +0800

    [client] Lookup sender need retry to send when leader not found in metadata 
cache (#2052)
---
 .../apache/fluss/client/lookup/LookupSender.java   | 62 +++++++++++++++-------
 .../fluss/client/lookup/LookupSenderTest.java      | 16 +++---
 2 files changed, 51 insertions(+), 27 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
index df2db466d..a098b8532 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
@@ -126,13 +126,22 @@ class LookupSender implements Runnable {
         sendLookups(lookups);
     }
 
-    private void sendLookups(List<AbstractLookupQuery<?>> lookups) {
+    private void sendLookups(List<AbstractLookupQuery<?>> lookups) throws 
Exception {
         if (lookups.isEmpty()) {
             return;
         }
         // group by <leader, lookup type> to lookup batches
         Map<Tuple2<Integer, LookupType>, List<AbstractLookupQuery<?>>> 
lookupBatches =
                 groupByLeaderAndType(lookups);
+
+        // if no lookup batches, sleep a bit to avoid busy loop. This case 
will happen when there is
+        // no leader for all the lookup request in queue.
+        if (lookupBatches.isEmpty() && !lookupQueue.hasUnDrained()) {
+            // TODO: may use wait/notify mechanism to avoid active sleep, and 
use a dynamic sleep
+            // time based on the request waited time.
+            Thread.sleep(100);
+        }
+
         // now, send the batches
         lookupBatches.forEach(
                 (destAndType, batch) -> sendLookups(destAndType.f0, 
destAndType.f1, batch));
@@ -148,11 +157,10 @@ class LookupSender implements Runnable {
             // lookup the leader node
             TableBucket tb = lookup.tableBucket();
             try {
-                // TODO this can be a re-triable operation. We should retry 
here instead of
-                // throwing exception.
                 leader = metadataUpdater.leaderFor(tb);
             } catch (Exception e) {
-                lookup.future().completeExceptionally(e);
+                // if leader is not found, re-enqueue the lookup to send again.
+                reEnqueueLookup(lookup);
                 continue;
             }
             lookupBatchesByLeader
@@ -165,24 +173,16 @@ class LookupSender implements Runnable {
     @VisibleForTesting
     void sendLookups(
             int destination, LookupType lookupType, 
List<AbstractLookupQuery<?>> lookupBatches) {
-        TabletServerGateway gateway = 
metadataUpdater.newTabletServerClientForNode(destination);
-        if (gateway == null) {
-            // TODO handle this exception, like retry.
-            throw new LeaderNotAvailableException(
-                    "Server " + destination + " is not found in metadata 
cache.");
-        }
-
         if (lookupType == LookupType.LOOKUP) {
-            sendLookupRequest(destination, gateway, lookupBatches);
+            sendLookupRequest(destination, lookupBatches);
         } else if (lookupType == LookupType.PREFIX_LOOKUP) {
-            sendPrefixLookupRequest(destination, gateway, lookupBatches);
+            sendPrefixLookupRequest(destination, lookupBatches);
         } else {
             throw new IllegalArgumentException("Unsupported lookup type: " + 
lookupType);
         }
     }
 
-    private void sendLookupRequest(
-            int destination, TabletServerGateway gateway, 
List<AbstractLookupQuery<?>> lookups) {
+    private void sendLookupRequest(int destination, 
List<AbstractLookupQuery<?>> lookups) {
         // table id -> (bucket -> lookups)
         Map<Long, Map<TableBucket, LookupBatch>> lookupByTableId = new 
HashMap<>();
         for (AbstractLookupQuery<?> abstractLookupQuery : lookups) {
@@ -195,6 +195,19 @@ class LookupSender implements Runnable {
                     .addLookup(lookup);
         }
 
+        TabletServerGateway gateway = 
metadataUpdater.newTabletServerClientForNode(destination);
+        if (gateway == null) {
+            lookupByTableId.forEach(
+                    (tableId, lookupsByBucket) ->
+                            handleLookupRequestException(
+                                    new LeaderNotAvailableException(
+                                            "Server "
+                                                    + destination
+                                                    + " is not found in 
metadata cache."),
+                                    destination,
+                                    lookupsByBucket));
+        }
+
         lookupByTableId.forEach(
                 (tableId, lookupsByBucket) ->
                         sendLookupRequestAndHandleResponse(
@@ -206,9 +219,7 @@ class LookupSender implements Runnable {
     }
 
     private void sendPrefixLookupRequest(
-            int destination,
-            TabletServerGateway gateway,
-            List<AbstractLookupQuery<?>> prefixLookups) {
+            int destination, List<AbstractLookupQuery<?>> prefixLookups) {
         // table id -> (bucket -> lookups)
         Map<Long, Map<TableBucket, PrefixLookupBatch>> lookupByTableId = new 
HashMap<>();
         for (AbstractLookupQuery<?> abstractLookupQuery : prefixLookups) {
@@ -221,6 +232,19 @@ class LookupSender implements Runnable {
                     .addLookup(prefixLookup);
         }
 
+        TabletServerGateway gateway = 
metadataUpdater.newTabletServerClientForNode(destination);
+        if (gateway == null) {
+            lookupByTableId.forEach(
+                    (tableId, lookupsByBucket) ->
+                            handlePrefixLookupException(
+                                    new LeaderNotAvailableException(
+                                            "Server "
+                                                    + destination
+                                                    + " is not found in 
metadata cache."),
+                                    destination,
+                                    lookupsByBucket));
+        }
+
         lookupByTableId.forEach(
                 (tableId, prefixLookupBatch) ->
                         sendPrefixLookupRequestAndHandleResponse(
@@ -396,7 +420,6 @@ class LookupSender implements Runnable {
     }
 
     private void reEnqueueLookup(AbstractLookupQuery<?> lookup) {
-        lookup.incrementRetries();
         lookupQueue.appendLookup(lookup);
     }
 
@@ -455,6 +478,7 @@ class LookupSender implements Runnable {
                         tableBucket,
                         maxRetries - lookup.retries(),
                         error.formatErrMsg());
+                lookup.incrementRetries();
                 reEnqueueLookup(lookup);
             } else {
                 LOG.warn(
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
index bba239d1e..61dc5cbc8 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
@@ -135,10 +135,10 @@ public class LookupSenderTest {
         assertThat(result).isNotDone();
         lookupQueue.appendLookup(lookupQuery);
 
-        // Wait for all retries to complete and verify it eventually fails
-        assertThatThrownBy(() -> result.get(5, TimeUnit.SECONDS))
-                .isInstanceOf(ExecutionException.class)
-                .hasMessageContaining("Leader not found after retry");
+        // Wait for all retries to complete and verify it eventually fails. 
This case will be failed
+        // after timeout.
+        assertThatThrownBy(() -> result.get(2, TimeUnit.SECONDS))
+                .isInstanceOf(java.util.concurrent.TimeoutException.class);
 
         // Verify that retries happened (should be 1, because server meta 
invalidated)
         assertThat(lookupQuery.retries()).isEqualTo(1);
@@ -173,10 +173,10 @@ public class LookupSenderTest {
         assertThat(future).isNotDone();
         lookupQueue.appendLookup(prefixLookupQuery);
 
-        // Wait for all retries to complete and verify it eventually fails
-        assertThatThrownBy(() -> future.get(5, TimeUnit.SECONDS))
-                .isInstanceOf(ExecutionException.class)
-                .hasMessageContaining("Leader not found after retry");
+        // Wait for all retries to complete and verify it eventually fails. 
This case will be failed
+        // after timeout.
+        assertThatThrownBy(() -> future.get(2, TimeUnit.SECONDS))
+                .isInstanceOf(java.util.concurrent.TimeoutException.class);
 
         // Verify that retries happened (should be 1, because server meta 
invalidated)
         assertThat(prefixLookupQuery.retries()).isEqualTo(1);

Reply via email to