This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 863516920 [client] Lookup sender need retry to send when leader not
found in metadata cache (#2052)
863516920 is described below
commit 8635169202821e5cb999040f37178596d73943e5
Author: yunhong <[email protected]>
AuthorDate: Mon Dec 1 16:09:29 2025 +0800
[client] Lookup sender need retry to send when leader not found in metadata
cache (#2052)
---
.../apache/fluss/client/lookup/LookupSender.java | 62 +++++++++++++++-------
.../fluss/client/lookup/LookupSenderTest.java | 16 +++---
2 files changed, 51 insertions(+), 27 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
index df2db466d..a098b8532 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
@@ -126,13 +126,22 @@ class LookupSender implements Runnable {
sendLookups(lookups);
}
- private void sendLookups(List<AbstractLookupQuery<?>> lookups) {
+ private void sendLookups(List<AbstractLookupQuery<?>> lookups) throws
Exception {
if (lookups.isEmpty()) {
return;
}
// group by <leader, lookup type> to lookup batches
Map<Tuple2<Integer, LookupType>, List<AbstractLookupQuery<?>>>
lookupBatches =
groupByLeaderAndType(lookups);
+
+ // if no lookup batches, sleep a bit to avoid busy loop. This case
will happen when there is
+ // no leader for all the lookup request in queue.
+ if (lookupBatches.isEmpty() && !lookupQueue.hasUnDrained()) {
+ // TODO: may use wait/notify mechanism to avoid active sleep, and
use a dynamic sleep
+ // time based on the request waited time.
+ Thread.sleep(100);
+ }
+
// now, send the batches
lookupBatches.forEach(
(destAndType, batch) -> sendLookups(destAndType.f0,
destAndType.f1, batch));
@@ -148,11 +157,10 @@ class LookupSender implements Runnable {
// lookup the leader node
TableBucket tb = lookup.tableBucket();
try {
- // TODO this can be a re-triable operation. We should retry
here instead of
- // throwing exception.
leader = metadataUpdater.leaderFor(tb);
} catch (Exception e) {
- lookup.future().completeExceptionally(e);
+ // if leader is not found, re-enqueue the lookup to send again.
+ reEnqueueLookup(lookup);
continue;
}
lookupBatchesByLeader
@@ -165,24 +173,16 @@ class LookupSender implements Runnable {
@VisibleForTesting
void sendLookups(
int destination, LookupType lookupType,
List<AbstractLookupQuery<?>> lookupBatches) {
- TabletServerGateway gateway =
metadataUpdater.newTabletServerClientForNode(destination);
- if (gateway == null) {
- // TODO handle this exception, like retry.
- throw new LeaderNotAvailableException(
- "Server " + destination + " is not found in metadata
cache.");
- }
-
if (lookupType == LookupType.LOOKUP) {
- sendLookupRequest(destination, gateway, lookupBatches);
+ sendLookupRequest(destination, lookupBatches);
} else if (lookupType == LookupType.PREFIX_LOOKUP) {
- sendPrefixLookupRequest(destination, gateway, lookupBatches);
+ sendPrefixLookupRequest(destination, lookupBatches);
} else {
throw new IllegalArgumentException("Unsupported lookup type: " +
lookupType);
}
}
- private void sendLookupRequest(
- int destination, TabletServerGateway gateway,
List<AbstractLookupQuery<?>> lookups) {
+ private void sendLookupRequest(int destination,
List<AbstractLookupQuery<?>> lookups) {
// table id -> (bucket -> lookups)
Map<Long, Map<TableBucket, LookupBatch>> lookupByTableId = new
HashMap<>();
for (AbstractLookupQuery<?> abstractLookupQuery : lookups) {
@@ -195,6 +195,19 @@ class LookupSender implements Runnable {
.addLookup(lookup);
}
+ TabletServerGateway gateway =
metadataUpdater.newTabletServerClientForNode(destination);
+ if (gateway == null) {
+ lookupByTableId.forEach(
+ (tableId, lookupsByBucket) ->
+ handleLookupRequestException(
+ new LeaderNotAvailableException(
+ "Server "
+ + destination
+ + " is not found in
metadata cache."),
+ destination,
+ lookupsByBucket));
+ }
+
lookupByTableId.forEach(
(tableId, lookupsByBucket) ->
sendLookupRequestAndHandleResponse(
@@ -206,9 +219,7 @@ class LookupSender implements Runnable {
}
private void sendPrefixLookupRequest(
- int destination,
- TabletServerGateway gateway,
- List<AbstractLookupQuery<?>> prefixLookups) {
+ int destination, List<AbstractLookupQuery<?>> prefixLookups) {
// table id -> (bucket -> lookups)
Map<Long, Map<TableBucket, PrefixLookupBatch>> lookupByTableId = new
HashMap<>();
for (AbstractLookupQuery<?> abstractLookupQuery : prefixLookups) {
@@ -221,6 +232,19 @@ class LookupSender implements Runnable {
.addLookup(prefixLookup);
}
+ TabletServerGateway gateway =
metadataUpdater.newTabletServerClientForNode(destination);
+ if (gateway == null) {
+ lookupByTableId.forEach(
+ (tableId, lookupsByBucket) ->
+ handlePrefixLookupException(
+ new LeaderNotAvailableException(
+ "Server "
+ + destination
+ + " is not found in
metadata cache."),
+ destination,
+ lookupsByBucket));
+ }
+
lookupByTableId.forEach(
(tableId, prefixLookupBatch) ->
sendPrefixLookupRequestAndHandleResponse(
@@ -396,7 +420,6 @@ class LookupSender implements Runnable {
}
private void reEnqueueLookup(AbstractLookupQuery<?> lookup) {
- lookup.incrementRetries();
lookupQueue.appendLookup(lookup);
}
@@ -455,6 +478,7 @@ class LookupSender implements Runnable {
tableBucket,
maxRetries - lookup.retries(),
error.formatErrMsg());
+ lookup.incrementRetries();
reEnqueueLookup(lookup);
} else {
LOG.warn(
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
index bba239d1e..61dc5cbc8 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
@@ -135,10 +135,10 @@ public class LookupSenderTest {
assertThat(result).isNotDone();
lookupQueue.appendLookup(lookupQuery);
- // Wait for all retries to complete and verify it eventually fails
- assertThatThrownBy(() -> result.get(5, TimeUnit.SECONDS))
- .isInstanceOf(ExecutionException.class)
- .hasMessageContaining("Leader not found after retry");
+ // Wait for all retries to complete and verify it eventually fails.
This case will be failed
+ // after timeout.
+ assertThatThrownBy(() -> result.get(2, TimeUnit.SECONDS))
+ .isInstanceOf(java.util.concurrent.TimeoutException.class);
// Verify that retries happened (should be 1, because server meta
invalidated)
assertThat(lookupQuery.retries()).isEqualTo(1);
@@ -173,10 +173,10 @@ public class LookupSenderTest {
assertThat(future).isNotDone();
lookupQueue.appendLookup(prefixLookupQuery);
- // Wait for all retries to complete and verify it eventually fails
- assertThatThrownBy(() -> future.get(5, TimeUnit.SECONDS))
- .isInstanceOf(ExecutionException.class)
- .hasMessageContaining("Leader not found after retry");
+ // Wait for all retries to complete and verify it eventually fails.
This case will be failed
+ // after timeout.
+ assertThatThrownBy(() -> future.get(2, TimeUnit.SECONDS))
+ .isInstanceOf(java.util.concurrent.TimeoutException.class);
// Verify that retries happened (should be 1, because server meta
invalidated)
assertThat(prefixLookupQuery.retries()).isEqualTo(1);