This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new a07e563fc [common] Fix thread-safety problem of PrimaryKeyLoookuper
and PrefixKeyLookuper (#1915)
a07e563fc is described below
commit a07e563fcf9e2cb77ad4e310fa52442784d331f4
Author: Yang Wang <[email protected]>
AuthorDate: Thu Nov 27 15:54:24 2025 +0800
[common] Fix thread-safety problem of PrimaryKeyLoookuper and
PrefixKeyLookuper (#1915)
---
.../fluss/client/lookup/AbstractLookupQuery.java | 10 +
.../apache/fluss/client/lookup/LookupClient.java | 3 +-
.../apache/fluss/client/lookup/LookupSender.java | 113 ++++--
.../org/apache/fluss/client/lookup/Lookuper.java | 9 +-
.../fluss/client/lookup/PrefixKeyLookuper.java | 2 +
.../fluss/client/lookup/PrimaryKeyLookuper.java | 2 +
.../fluss/client/lookup/LookupSenderTest.java | 452 +++++++++++++++++++--
.../client/metadata/TestingMetadataUpdater.java | 60 ++-
.../org/apache/fluss/config/ConfigOptions.java | 8 +
.../fluss/flink/catalog/FlinkTableFactory.java | 8 +-
.../fluss/flink/source/FlinkTableSource.java | 7 -
.../source/lookup/FlinkAsyncLookupFunction.java | 82 ++--
.../flink/source/lookup/FlinkLookupFunction.java | 52 +--
.../apache/fluss/flink/utils/PushdownUtils.java | 2 -
.../source/lookup/FlinkLookupFunctionTest.java | 3 -
website/docs/engine-flink/options.md | 13 +-
16 files changed, 650 insertions(+), 176 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookupQuery.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookupQuery.java
index 1737a7696..efaf0100a 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookupQuery.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookupQuery.java
@@ -28,10 +28,12 @@ public abstract class AbstractLookupQuery<T> {
private final TableBucket tableBucket;
private final byte[] key;
+ private int retries;
public AbstractLookupQuery(TableBucket tableBucket, byte[] key) {
this.tableBucket = tableBucket;
this.key = key;
+ this.retries = 0;
}
public byte[] key() {
@@ -42,6 +44,14 @@ public abstract class AbstractLookupQuery<T> {
return tableBucket;
}
+ public int retries() {
+ return retries;
+ }
+
+ public void incrementRetries() {
+ retries++;
+ }
+
public abstract LookupType lookupType();
public abstract CompletableFuture<T> future();
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java
index 3c201541a..a974ba972 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java
@@ -67,7 +67,8 @@ public class LookupClient {
new LookupSender(
metadataUpdater,
lookupQueue,
-
conf.getInt(ConfigOptions.CLIENT_LOOKUP_MAX_INFLIGHT_SIZE));
+
conf.getInt(ConfigOptions.CLIENT_LOOKUP_MAX_INFLIGHT_SIZE),
+ conf.getInt(ConfigOptions.CLIENT_LOOKUP_MAX_RETRIES));
lookupSenderThreadPool.submit(lookupSender);
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
index 0f249213a..df2db466d 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
@@ -24,6 +24,7 @@ import org.apache.fluss.exception.ApiException;
import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.exception.InvalidMetadataException;
import org.apache.fluss.exception.LeaderNotAvailableException;
+import org.apache.fluss.exception.RetriableException;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePartition;
@@ -74,10 +75,17 @@ class LookupSender implements Runnable {
private final Semaphore maxInFlightReuqestsSemaphore;
- LookupSender(MetadataUpdater metadataUpdater, LookupQueue lookupQueue, int
maxFlightRequests) {
+ private final int maxRetries;
+
+ LookupSender(
+ MetadataUpdater metadataUpdater,
+ LookupQueue lookupQueue,
+ int maxFlightRequests,
+ int maxRetries) {
this.metadataUpdater = metadataUpdater;
this.lookupQueue = lookupQueue;
this.maxInFlightReuqestsSemaphore = new Semaphore(maxFlightRequests);
+ this.maxRetries = maxRetries;
this.running = true;
}
@@ -307,10 +315,8 @@ class LookupSender implements Runnable {
pbLookupRespForBucket.getBucketId());
LookupBatch lookupBatch = lookupsByBucket.get(tableBucket);
if (pbLookupRespForBucket.hasErrorCode()) {
- // TODO for re-triable error, we should retry here instead of
throwing exception.
ApiError error =
ApiError.fromErrorMessage(pbLookupRespForBucket);
- handleLookupExceptionForBucket(tableBucket, destination,
error, "lookup");
- lookupBatch.completeExceptionally(error.exception());
+ handleLookupError(tableBucket, destination, error,
lookupBatch.lookups(), "lookup");
} else {
List<byte[]> byteValues =
pbLookupRespForBucket.getValuesList().stream()
@@ -345,10 +351,13 @@ class LookupSender implements Runnable {
PrefixLookupBatch prefixLookupBatch =
prefixLookupsByBucket.get(tableBucket);
if (pbRespForBucket.hasErrorCode()) {
- // TODO for re-triable error, we should retry here instead of
throwing exception.
ApiError error = ApiError.fromErrorMessage(pbRespForBucket);
- handleLookupExceptionForBucket(tableBucket, destination,
error, "prefixLookup");
- prefixLookupBatch.completeExceptionally(error.exception());
+ handleLookupError(
+ tableBucket,
+ destination,
+ error,
+ prefixLookupBatch.lookups(),
+ "prefix lookup");
} else {
List<List<byte[]>> result = new
ArrayList<>(pbRespForBucket.getValueListsCount());
for (int i = 0; i < pbRespForBucket.getValueListsCount(); i++)
{
@@ -368,58 +377,106 @@ class LookupSender implements Runnable {
Throwable t, int destination, Map<TableBucket, LookupBatch>
lookupsByBucket) {
ApiError error = ApiError.fromThrowable(t);
for (LookupBatch lookupBatch : lookupsByBucket.values()) {
- // TODO for re-triable error, we should retry here instead of
throwing exception.
- handleLookupExceptionForBucket(lookupBatch.tableBucket(),
destination, error, "lookup");
- lookupBatch.completeExceptionally(error.exception());
+ handleLookupError(
+ lookupBatch.tableBucket(), destination, error,
lookupBatch.lookups(), "lookup");
}
}
private void handlePrefixLookupException(
Throwable t, int destination, Map<TableBucket, PrefixLookupBatch>
lookupsByBucket) {
ApiError error = ApiError.fromThrowable(t);
- // TODO If error, we need to retry send the request instead of throw
exception.
for (PrefixLookupBatch lookupBatch : lookupsByBucket.values()) {
- handleLookupExceptionForBucket(
- lookupBatch.tableBucket(), destination, error,
"prefixLookup");
- lookupBatch.completeExceptionally(error.exception());
+ handleLookupError(
+ lookupBatch.tableBucket(),
+ destination,
+ error,
+ lookupBatch.lookups(),
+ "prefix lookup");
}
}
- void forceClose() {
- forceClose = true;
- initiateClose();
+ private void reEnqueueLookup(AbstractLookupQuery<?> lookup) {
+ lookup.incrementRetries();
+ lookupQueue.appendLookup(lookup);
}
- void initiateClose() {
- // Ensure accumulator is closed first to guarantee that no more
appends are accepted after
- // breaking from the sender loop. Otherwise, we may miss some
callbacks when shutting down.
- lookupQueue.close();
- running = false;
+ private boolean canRetry(AbstractLookupQuery<?> lookup, Exception
exception) {
+ return lookup.retries() < maxRetries
+ && !lookup.future().isDone()
+ && exception instanceof RetriableException;
}
- private void handleLookupExceptionForBucket(
- TableBucket tb, int destination, ApiError error, String
lookupType) {
+ /**
+ * Handle lookup error with retry logic. For each lookup in the list,
check if it can be
+ * retried. If yes, re-enqueue it; otherwise, complete it exceptionally.
+ *
+ * @param tableBucket the table bucket
+ * @param error the error from server response
+ * @param lookups the list of lookups to handle
+ * @param lookupType the type of lookup ("" for regular lookup, "prefix "
for prefix lookup)
+ */
+ private void handleLookupError(
+ TableBucket tableBucket,
+ int destination,
+ ApiError error,
+ List<? extends AbstractLookupQuery<?>> lookups,
+ String lookupType) {
ApiException exception = error.error().exception();
LOG.error(
- "Failed to {} from node {} for bucket {}", lookupType,
destination, tb, exception);
+ "Failed to {} from node {} for bucket {}",
+ lookupType,
+ destination,
+ tableBucket,
+ exception);
if (exception instanceof InvalidMetadataException) {
LOG.warn(
"Invalid metadata error in {} request. Going to request
metadata update.",
lookupType,
exception);
- long tableId = tb.getTableId();
+ long tableId = tableBucket.getTableId();
TableOrPartitions tableOrPartitions;
- if (tb.getPartitionId() == null) {
+ if (tableBucket.getPartitionId() == null) {
tableOrPartitions = new
TableOrPartitions(Collections.singleton(tableId), null);
} else {
tableOrPartitions =
new TableOrPartitions(
null,
Collections.singleton(
- new TablePartition(tableId,
tb.getPartitionId())));
+ new TablePartition(tableId,
tableBucket.getPartitionId())));
}
invalidTableOrPartitions(tableOrPartitions);
}
+
+ for (AbstractLookupQuery<?> lookup : lookups) {
+ if (canRetry(lookup, error.exception())) {
+ LOG.warn(
+ "Get error {} response on table bucket {}, retrying
({} attempts left). Error: {}",
+ lookupType,
+ tableBucket,
+ maxRetries - lookup.retries(),
+ error.formatErrMsg());
+ reEnqueueLookup(lookup);
+ } else {
+ LOG.warn(
+ "Get error {} response on table bucket {}, fail.
Error: {}",
+ lookupType,
+ tableBucket,
+ error.formatErrMsg());
+ lookup.future().completeExceptionally(error.exception());
+ }
+ }
+ }
+
+ void forceClose() {
+ forceClose = true;
+ initiateClose();
+ }
+
+ void initiateClose() {
+ // Ensure accumulator is closed first to guarantee that no more
appends are accepted after
+ // breaking from the sender loop. Otherwise, we may miss some
callbacks when shutting down.
+ lookupQueue.close();
+ running = false;
}
/** A helper class to hold table ids or table partitions. */
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java
index ccb632318..37157d9b1 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java
@@ -20,14 +20,21 @@ package org.apache.fluss.client.lookup;
import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.row.InternalRow;
+import javax.annotation.concurrent.NotThreadSafe;
+
import java.util.concurrent.CompletableFuture;
/**
- * The lookup-er is used to lookup row of a primary key table by primary key
or prefix key.
+ * The lookup-er is used to lookup row of a primary key table by primary key
or prefix key. The
+ * lookuper has retriable ability to handle transient errors during lookup
operations which is
+ * configured by {@link
org.apache.fluss.config.ConfigOptions#CLIENT_LOOKUP_MAX_RETRIES}.
+ *
+ * <p>Note: Lookuper instances are not thread-safe.
*
* @since 0.6
*/
@PublicEvolving
+@NotThreadSafe
public interface Lookuper {
/**
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java
index 258d9b1c2..61645374f 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java
@@ -32,6 +32,7 @@ import org.apache.fluss.types.DataType;
import org.apache.fluss.types.RowType;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
import java.util.ArrayList;
import java.util.Collections;
@@ -46,6 +47,7 @@ import static
org.apache.fluss.client.utils.ClientUtils.getPartitionId;
* An implementation of {@link Lookuper} that lookups by prefix key. A prefix
key is a prefix subset
* of the primary key.
*/
+@NotThreadSafe
class PrefixKeyLookuper implements Lookuper {
private final TableInfo tableInfo;
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
index 2a6233e46..2945f1ab4 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
@@ -32,6 +32,7 @@ import org.apache.fluss.types.DataType;
import org.apache.fluss.types.RowType;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
@@ -40,6 +41,7 @@ import static
org.apache.fluss.client.utils.ClientUtils.getPartitionId;
import static org.apache.fluss.utils.Preconditions.checkArgument;
/** An implementation of {@link Lookuper} that lookups by primary key. */
+@NotThreadSafe
class PrimaryKeyLookuper implements Lookuper {
private final TableInfo tableInfo;
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
index 336f588ea..bba239d1e 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
@@ -21,20 +21,41 @@ import
org.apache.fluss.client.metadata.TestingMetadataUpdater;
import org.apache.fluss.cluster.BucketLocation;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.exception.NotLeaderOrFollowerException;
+import org.apache.fluss.exception.TableNotExistException;
+import org.apache.fluss.exception.TimeoutException;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.rpc.gateway.TabletServerGateway;
+import org.apache.fluss.rpc.messages.LookupRequest;
+import org.apache.fluss.rpc.messages.LookupResponse;
+import org.apache.fluss.rpc.messages.PbLookupRespForBucket;
+import org.apache.fluss.rpc.messages.PbPrefixLookupRespForBucket;
+import org.apache.fluss.rpc.messages.PrefixLookupRequest;
+import org.apache.fluss.rpc.messages.PrefixLookupResponse;
+import org.apache.fluss.rpc.protocol.ApiError;
+import org.apache.fluss.server.tablet.TestTabletServerGateway;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.util.Collections;
+import java.time.Duration;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID_PK;
import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO_PK;
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
+import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -46,17 +67,51 @@ public class LookupSenderTest {
private TestingMetadataUpdater metadataUpdater;
private LookupSender lookupSender;
+ private static final int MAX_RETRIES = 3;
+ private static final int MAX_INFLIGHT_REQUESTS = 10;
+ private static final TableBucket TABLE_BUCKET = new
TableBucket(DATA1_TABLE_ID_PK, 0);
+
+ private LookupQueue lookupQueue;
+ private Thread senderThread;
+ private ConfigurableTestTabletServerGateway gateway;
+
@BeforeEach
- public void setup() {
- metadataUpdater = initializeMetadataUpdater();
+ void setup() {
+ // create a configurable gateway for testing
+ gateway = new ConfigurableTestTabletServerGateway();
+
+ // build metadata updater with custom gateway using builder pattern
+ Map<TablePath, TableInfo> tableInfos = new HashMap<>();
+ tableInfos.put(DATA1_TABLE_PATH_PK, DATA1_TABLE_INFO_PK);
+ metadataUpdater =
+ TestingMetadataUpdater.builder(tableInfos)
+ .withTabletServerGateway(1, gateway)
+ .build();
+
Configuration conf = new Configuration();
conf.set(ConfigOptions.CLIENT_LOOKUP_QUEUE_SIZE, 5);
conf.set(ConfigOptions.CLIENT_LOOKUP_MAX_BATCH_SIZE, 10);
- lookupSender = new LookupSender(metadataUpdater, new
LookupQueue(conf), 5);
+ lookupQueue = new LookupQueue(conf);
+
+ lookupSender =
+ new LookupSender(metadataUpdater, lookupQueue,
MAX_INFLIGHT_REQUESTS, MAX_RETRIES);
+
+ senderThread = new Thread(lookupSender);
+ senderThread.start();
+ }
+
+ @AfterEach
+ void teardown() throws InterruptedException {
+ if (lookupSender != null) {
+ lookupSender.forceClose();
+ }
+ if (senderThread != null) {
+ senderThread.join(5000);
+ }
}
@Test
- void testSendLookupRequestWithNotLeaderOrFollowerException() {
+ void testSendLookupRequestWithNotLeaderOrFollowerException() throws
Exception {
assertThat(metadataUpdater.getBucketLocation(tb1))
.hasValue(
new BucketLocation(
@@ -65,26 +120,36 @@ public class LookupSenderTest {
1,
new int[] {1, 2, 3}));
- // send LookupRequest to serverId 1, which will respond with
NotLeaderOrFollowerException
- // as responseLogicId=1 do.
- metadataUpdater.setResponseLogicId(1, 1);
+ // Configure gateway to always return NotLeaderOrFollowerException for
all attempts
+ // (including retries)
+ gateway.setLookupHandler(
+ request ->
+ createFailedResponse(
+ request,
+ new NotLeaderOrFollowerException(
+ "mock not leader or follower
exception.")));
+
+ // send LookupRequest through the queue so that retry mechanism can
work
LookupQuery lookupQuery = new LookupQuery(tb1, new byte[0]);
CompletableFuture<byte[]> result = lookupQuery.future();
assertThat(result).isNotDone();
- lookupSender.sendLookups(1, LookupType.LOOKUP,
Collections.singletonList(lookupQuery));
+ lookupQueue.appendLookup(lookupQuery);
+
+ // Wait for all retries to complete and verify it eventually fails
+ assertThatThrownBy(() -> result.get(5, TimeUnit.SECONDS))
+ .isInstanceOf(ExecutionException.class)
+ .hasMessageContaining("Leader not found after retry");
+
+ // Verify that retries happened (should be 1, because server meta
invalidated)
+ assertThat(lookupQuery.retries()).isEqualTo(1);
- assertThat(result.isCompletedExceptionally()).isTrue();
- assertThatThrownBy(result::get)
- .rootCause()
- .isInstanceOf(NotLeaderOrFollowerException.class)
- .hasMessage("mock not leader or follower exception.");
// When NotLeaderOrFollowerException is received, the bucketLocation
will be removed from
// metadata updater to trigger get the latest bucketLocation in next
lookup round.
assertThat(metadataUpdater.getBucketLocation(tb1)).isNotPresent();
}
@Test
- void testSendPrefixLookupRequestWithNotLeaderOrFollowerException() {
+ void testSendPrefixLookupRequestWithNotLeaderOrFollowerException() throws
Exception {
assertThat(metadataUpdater.getBucketLocation(tb1))
.hasValue(
new BucketLocation(
@@ -93,27 +158,356 @@ public class LookupSenderTest {
1,
new int[] {1, 2, 3}));
- // send PrefixLookupRequest to serverId 1, which will respond with
- // NotLeaderOrFollowerException as responseLogicId=1 do.
- metadataUpdater.setResponseLogicId(1, 1);
+ // Configure gateway to always return NotLeaderOrFollowerException for
all attempts
+ // (including retries)
+ gateway.setPrefixLookupHandler(
+ request ->
+ createFailedPrefixLookupResponse(
+ request,
+ new NotLeaderOrFollowerException(
+ "mock not leader or follower
exception.")));
+
+ // send PrefixLookupRequest through the queue so that retry mechanism
can work
PrefixLookupQuery prefixLookupQuery = new PrefixLookupQuery(tb1, new
byte[0]);
CompletableFuture<List<byte[]>> future = prefixLookupQuery.future();
assertThat(future).isNotDone();
- lookupSender.sendLookups(
- 1, LookupType.PREFIX_LOOKUP,
Collections.singletonList(prefixLookupQuery));
-
- assertThat(future.isCompletedExceptionally()).isTrue();
- assertThatThrownBy(future::get)
- .rootCause()
- .isInstanceOf(NotLeaderOrFollowerException.class)
- .hasMessage("mock not leader or follower exception.");
+ lookupQueue.appendLookup(prefixLookupQuery);
+
+ // Wait for all retries to complete and verify it eventually fails
+ assertThatThrownBy(() -> future.get(5, TimeUnit.SECONDS))
+ .isInstanceOf(ExecutionException.class)
+ .hasMessageContaining("Leader not found after retry");
+
+ // Verify that retries happened (should be 1, because server meta
invalidated)
+ assertThat(prefixLookupQuery.retries()).isEqualTo(1);
+
// When NotLeaderOrFollowerException is received, the bucketLocation
will be removed from
// metadata updater to trigger get the latest bucketLocation in next
lookup round.
assertThat(metadataUpdater.getBucketLocation(tb1)).isNotPresent();
}
- private TestingMetadataUpdater initializeMetadataUpdater() {
- return new TestingMetadataUpdater(
- Collections.singletonMap(DATA1_TABLE_PATH_PK,
DATA1_TABLE_INFO_PK));
+ @Test
+ void testRetriableExceptionTriggersRetry() throws Exception {
+ // setup: fail twice with retriable exception, then succeed
+ AtomicInteger attemptCount = new AtomicInteger(0);
+ gateway.setLookupHandler(
+ request -> {
+ int attempt = attemptCount.incrementAndGet();
+ if (attempt <= 2) {
+ // first two attempts fail with retriable exception
+ return createFailedResponse(
+ request, new TimeoutException("simulated
timeout"));
+ } else {
+ // third attempt succeeds
+ return createSuccessResponse(request,
"value".getBytes());
+ }
+ });
+
+ // execute: submit lookup
+ byte[] key = "key".getBytes();
+ LookupQuery query = new LookupQuery(TABLE_BUCKET, key);
+ lookupQueue.appendLookup(query);
+
+ // verify: eventually succeeds after retries
+ byte[] result = query.future().get(5, TimeUnit.SECONDS);
+ assertThat(result).isEqualTo("value".getBytes());
+ assertThat(attemptCount.get()).isEqualTo(3);
+ assertThat(query.retries()).isEqualTo(2); // retried 2 times
+ }
+
+ @Test
+ void testNonRetriableExceptionDoesNotRetry() throws Exception {
+ // setup: fail with non-retriable exception
+ gateway.setLookupHandler(
+ request ->
+ createFailedResponse(
+ request, new TableNotExistException("table not
found")));
+
+ // execute: submit lookup
+ byte[] key = "key".getBytes();
+ LookupQuery query = new LookupQuery(TABLE_BUCKET, key);
+ lookupQueue.appendLookup(query);
+
+ // verify: fails immediately without retry
+ assertThatThrownBy(() -> query.future().get(5, TimeUnit.SECONDS))
+ .isInstanceOf(ExecutionException.class)
+ .hasRootCauseInstanceOf(TableNotExistException.class);
+ assertThat(query.retries()).isEqualTo(0); // no retries
+ }
+
+ @Test
+ void testMaxRetriesEnforced() throws Exception {
+ // setup: always fail with retriable exception
+ AtomicInteger attemptCount = new AtomicInteger(0);
+ gateway.setLookupHandler(
+ request -> {
+ attemptCount.incrementAndGet();
+ return createFailedResponse(request, new
TimeoutException("timeout"));
+ });
+
+ // execute: submit lookup
+ byte[] key = "key".getBytes();
+ LookupQuery query = new LookupQuery(TABLE_BUCKET, key);
+ lookupQueue.appendLookup(query);
+
+ // verify: eventually fails after max retries
+ assertThatThrownBy(() -> query.future().get(5, TimeUnit.SECONDS))
+ .isInstanceOf(ExecutionException.class)
+ .hasRootCauseInstanceOf(TimeoutException.class);
+
+ // should attempt: 1 initial + MAX_RETRIES retries
+ assertThat(attemptCount.get()).isEqualTo(1 + MAX_RETRIES);
+ assertThat(query.retries()).isEqualTo(MAX_RETRIES);
+ }
+
+ @Test
+ void testRetryStopsIfFutureCompleted() throws Exception {
+ // setup: always fail with retriable exception
+ AtomicInteger attemptCount = new AtomicInteger(0);
+ gateway.setLookupHandler(
+ request -> {
+ int attempt = attemptCount.incrementAndGet();
+ if (attempt == 1) {
+ // first attempt fails
+ return createFailedResponse(request, new
TimeoutException("timeout"));
+ } else {
+ try {
+ // Avoid attempting again too quickly
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ // subsequent attempts should not happen if we
complete the future
+ throw new AssertionError(
+ "Should not retry after future is completed
externally");
+ }
+ });
+
+ // execute: submit lookup
+ byte[] key = "key".getBytes();
+ LookupQuery query = new LookupQuery(TABLE_BUCKET, key);
+ lookupQueue.appendLookup(query);
+
+ // complete the future externally before retry happens
+ waitUntil(() -> attemptCount.get() >= 1, Duration.ofSeconds(5), "first
attempt to be made");
+ query.future().complete("external".getBytes());
+
+ // verify: completed externally
+ byte[] result = query.future().get(1, TimeUnit.SECONDS);
+ assertThat(result).isEqualTo("external".getBytes());
+ // retries is less than 3, because we stop the query so it won't send
again.
+ assertThat(query.retries()).isGreaterThanOrEqualTo(0).isLessThan(3);
+ assertThat(attemptCount.get()).isGreaterThanOrEqualTo(1).isLessThan(4);
+ }
+
+ @Test
+ void testDifferentExceptionTypesHandledCorrectly() throws Exception {
+ // test multiple exception types
+ testException(new TimeoutException("timeout"), true, 3); // retriable,
should retry
+ testException(new InvalidTableException("invalid"), false, 0); //
non-retriable, no retry
+ testException(new TableNotExistException("not exist"), false, 0); //
non-retriable, no retry
+ }
+
+ @Test
+ void testPrefixLookupRetry() throws Exception {
+ // setup: fail twice with retriable exception, then succeed
+ AtomicInteger attemptCount = new AtomicInteger(0);
+ gateway.setPrefixLookupHandler(
+ request -> {
+ int attempt = attemptCount.incrementAndGet();
+ if (attempt <= 2) {
+ // first two attempts fail
+ return createFailedPrefixLookupResponse(
+ request, new TimeoutException("timeout"));
+ } else {
+ // third attempt succeeds
+ return createSuccessPrefixLookupResponse(request);
+ }
+ });
+
+ // execute: submit prefix lookup
+ byte[] prefixKey = "prefix".getBytes();
+ PrefixLookupQuery query = new PrefixLookupQuery(TABLE_BUCKET,
prefixKey);
+ lookupQueue.appendLookup(query);
+
+ // verify: eventually succeeds after retries
+ query.future().get(5, TimeUnit.SECONDS);
+ assertThat(attemptCount.get()).isEqualTo(3);
+ assertThat(query.retries()).isEqualTo(2);
+ }
+
+ @Test
+ void testMultipleConcurrentLookupsWithRetries() throws Exception {
+ // setup: first attempt fails, second succeeds
+ AtomicInteger attemptCount = new AtomicInteger(0);
+ gateway.setLookupHandler(
+ request -> {
+ int attempt = attemptCount.incrementAndGet();
+ if (attempt % 2 == 1) {
+ // odd attempts fail
+ return createFailedResponse(request, new
TimeoutException("timeout"));
+ } else {
+ // even attempts succeed
+ return createSuccessResponse(request, ("value" +
attempt).getBytes());
+ }
+ });
+
+ // execute: submit multiple lookups
+ LookupQuery query1 = new LookupQuery(TABLE_BUCKET, "key1".getBytes());
+ LookupQuery query2 = new LookupQuery(TABLE_BUCKET, "key2".getBytes());
+ LookupQuery query3 = new LookupQuery(TABLE_BUCKET, "key3".getBytes());
+
+ lookupQueue.appendLookup(query1);
+ lookupQueue.appendLookup(query2);
+ lookupQueue.appendLookup(query3);
+
+ // verify: all succeed after retries
+ assertThat(query1.future().get(5, TimeUnit.SECONDS)).isNotNull();
+ assertThat(query2.future().get(5, TimeUnit.SECONDS)).isNotNull();
+ assertThat(query3.future().get(5, TimeUnit.SECONDS)).isNotNull();
+ // Note: lookups are batched together, so attemptCount reflects batch
attempts, not
+ // individual lookups
+ assertThat(attemptCount.get())
+ .isGreaterThanOrEqualTo(2); // at least 1 failure + 1 success
for the batch
+ }
+
+ // Helper methods
+
+ private void testException(Exception exception, boolean shouldRetry, int
expectedRetries)
+ throws Exception {
+ // reset gateway
+ AtomicInteger attemptCount = new AtomicInteger(0);
+ gateway.setLookupHandler(
+ request -> {
+ attemptCount.incrementAndGet();
+ return createFailedResponse(request, exception);
+ });
+
+ // execute
+ byte[] key = ("key-" +
exception.getClass().getSimpleName()).getBytes();
+ LookupQuery query = new LookupQuery(TABLE_BUCKET, key);
+ lookupQueue.appendLookup(query);
+
+ // verify
+ assertThatThrownBy(() -> query.future().get(5, TimeUnit.SECONDS))
+ .isInstanceOf(ExecutionException.class);
+
+ if (shouldRetry) {
+ assertThat(attemptCount.get()).isEqualTo(1 + MAX_RETRIES);
+ assertThat(query.retries()).isEqualTo(expectedRetries);
+ } else {
+ assertThat(attemptCount.get()).isEqualTo(1); // only initial
attempt
+ assertThat(query.retries()).isEqualTo(expectedRetries);
+ }
+
+ // wait a bit to ensure no more attempts
+ Thread.sleep(200);
+ }
+
+ private CompletableFuture<LookupResponse> createSuccessResponse(
+ LookupRequest request, byte[] value) {
+ LookupResponse response = new LookupResponse();
+ PbLookupRespForBucket bucketResp = response.addBucketsResp();
+ bucketResp.setBucketId(TABLE_BUCKET.getBucket());
+ if (TABLE_BUCKET.getPartitionId() != null) {
+ bucketResp.setPartitionId(TABLE_BUCKET.getPartitionId());
+ }
+ // Add value for each key in the request
+ int keyCount = request.getBucketsReqAt(0).getKeysCount();
+ for (int i = 0; i < keyCount; i++) {
+ bucketResp.addValue().setValues(value);
+ }
+ return CompletableFuture.completedFuture(response);
+ }
+
+ private CompletableFuture<LookupResponse> createFailedResponse(
+ LookupRequest request, Exception exception) {
+ LookupResponse response = new LookupResponse();
+ PbLookupRespForBucket bucketResp = response.addBucketsResp();
+ bucketResp.setBucketId(TABLE_BUCKET.getBucket());
+ if (TABLE_BUCKET.getPartitionId() != null) {
+ bucketResp.setPartitionId(TABLE_BUCKET.getPartitionId());
+ }
+ ApiError error = ApiError.fromThrowable(exception);
+ bucketResp.setErrorCode(error.error().code());
+ bucketResp.setErrorMessage(error.formatErrMsg());
+ return CompletableFuture.completedFuture(response);
+ }
+
+ private CompletableFuture<PrefixLookupResponse>
createSuccessPrefixLookupResponse(
+ PrefixLookupRequest request) {
+ PrefixLookupResponse response = new PrefixLookupResponse();
+ // Create response for each prefix key in request
+ PbPrefixLookupRespForBucket bucketResp = response.addBucketsResp();
+ bucketResp.setBucketId(TABLE_BUCKET.getBucket());
+ if (TABLE_BUCKET.getPartitionId() != null) {
+ bucketResp.setPartitionId(TABLE_BUCKET.getPartitionId());
+ }
+ // Add empty value list for each prefix key
+ int keyCount = request.getBucketsReqAt(0).getKeysCount();
+ for (int i = 0; i < keyCount; i++) {
+ bucketResp.addValueList(); // empty list is valid for prefix lookup
+ }
+ return CompletableFuture.completedFuture(response);
+ }
+
+ private CompletableFuture<PrefixLookupResponse>
createFailedPrefixLookupResponse(
+ PrefixLookupRequest request, Exception exception) {
+ PrefixLookupResponse response = new PrefixLookupResponse();
+ PbPrefixLookupRespForBucket bucketResp = response.addBucketsResp();
+ bucketResp.setBucketId(TABLE_BUCKET.getBucket());
+ if (TABLE_BUCKET.getPartitionId() != null) {
+ bucketResp.setPartitionId(TABLE_BUCKET.getPartitionId());
+ }
+ ApiError error = ApiError.fromThrowable(exception);
+ bucketResp.setErrorCode(error.error().code());
+ bucketResp.setErrorMessage(error.formatErrMsg());
+ return CompletableFuture.completedFuture(response);
+ }
+
+ /**
+ * A configurable {@link TabletServerGateway} for testing that allows
setting custom handlers
+ * for lookup operations.
+ */
+ private static class ConfigurableTestTabletServerGateway extends
TestTabletServerGateway {
+
+ private java.util.function.Function<LookupRequest,
CompletableFuture<LookupResponse>>
+ lookupHandler;
+ private java.util.function.Function<
+ PrefixLookupRequest,
CompletableFuture<PrefixLookupResponse>>
+ prefixLookupHandler;
+
+ public ConfigurableTestTabletServerGateway() {
+ super(false);
+ }
+
+ public void setLookupHandler(
+ java.util.function.Function<LookupRequest,
CompletableFuture<LookupResponse>>
+ handler) {
+ this.lookupHandler = handler;
+ }
+
+ public void setPrefixLookupHandler(
+ java.util.function.Function<
+ PrefixLookupRequest,
CompletableFuture<PrefixLookupResponse>>
+ handler) {
+ this.prefixLookupHandler = handler;
+ }
+
+ @Override
+ public CompletableFuture<LookupResponse> lookup(LookupRequest request)
{
+ if (lookupHandler != null) {
+ return lookupHandler.apply(request);
+ }
+ return CompletableFuture.completedFuture(new LookupResponse());
+ }
+
+ @Override
+ public CompletableFuture<PrefixLookupResponse>
prefixLookup(PrefixLookupRequest request) {
+ if (prefixLookupHandler != null) {
+ return prefixLookupHandler.apply(request);
+ }
+ return CompletableFuture.completedFuture(new
PrefixLookupResponse());
+ }
}
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
index 38994cbca..6ca025f33 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
@@ -55,22 +55,72 @@ public class TestingMetadataUpdater extends MetadataUpdater
{
private final Map<Integer, TestTabletServerGateway> tabletServerGatewayMap;
public TestingMetadataUpdater(Map<TablePath, TableInfo> tableInfos) {
- this(COORDINATOR, Arrays.asList(NODE1, NODE2, NODE3), tableInfos);
+ this(COORDINATOR, Arrays.asList(NODE1, NODE2, NODE3), tableInfos,
null);
}
private TestingMetadataUpdater(
ServerNode coordinatorServer,
List<ServerNode> tabletServers,
- Map<TablePath, TableInfo> tableInfos) {
+ Map<TablePath, TableInfo> tableInfos,
+ Map<Integer, TestTabletServerGateway> customGateways) {
super(
RpcClient.create(
new Configuration(),
TestingClientMetricGroup.newInstance(), false),
Cluster.empty());
initializeCluster(coordinatorServer, tabletServers, tableInfos);
coordinatorGateway = new TestCoordinatorGateway();
- tabletServerGatewayMap = new HashMap<>();
- for (ServerNode tabletServer : tabletServers) {
- tabletServerGatewayMap.put(tabletServer.id(), new
TestTabletServerGateway(false));
+ if (customGateways != null) {
+ tabletServerGatewayMap = customGateways;
+ } else {
+ tabletServerGatewayMap = new HashMap<>();
+ for (ServerNode tabletServer : tabletServers) {
+ tabletServerGatewayMap.put(tabletServer.id(), new
TestTabletServerGateway(false));
+ }
+ }
+ }
+
+ /**
+ * Create a builder for constructing TestingMetadataUpdater with custom
gateways.
+ *
+ * @param tableInfos the table information map
+ * @return a builder instance
+ */
+ public static Builder builder(Map<TablePath, TableInfo> tableInfos) {
+ return new Builder(tableInfos);
+ }
+
+ /** Builder for TestingMetadataUpdater to support custom gateway
configuration. */
+ public static class Builder {
+ private final Map<TablePath, TableInfo> tableInfos;
+ private final Map<Integer, TestTabletServerGateway> customGateways =
new HashMap<>();
+
+ private Builder(Map<TablePath, TableInfo> tableInfos) {
+ this.tableInfos = tableInfos;
+ }
+
+ /**
+ * Set a custom gateway for a specific tablet server node.
+ *
+ * @param serverId the server id (1, 2, or 3 for default nodes)
+ * @param gateway the custom gateway
+ * @return this builder
+ */
+ public Builder withTabletServerGateway(int serverId,
TestTabletServerGateway gateway) {
+ customGateways.put(serverId, gateway);
+ return this;
+ }
+
+ /**
+ * Build the TestingMetadataUpdater instance.
+ *
+ * @return the configured TestingMetadataUpdater
+ */
+ public TestingMetadataUpdater build() {
+ return new TestingMetadataUpdater(
+ COORDINATOR,
+ Arrays.asList(NODE1, NODE2, NODE3),
+ tableInfos,
+ customGateways.isEmpty() ? null : customGateways);
}
}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index 6b045bb38..875e875d5 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -1112,6 +1112,14 @@ public class ConfigOptions {
"The maximum time to wait for the lookup batch to
full, if this timeout is reached, "
+ "the lookup batch will be closed to
send.");
+ public static final ConfigOption<Integer> CLIENT_LOOKUP_MAX_RETRIES =
+ key("client.lookup.max-retries")
+ .intType()
+ .defaultValue(3)
+ .withDescription(
+ "Setting a value greater than zero will cause the
client to resend any lookup request "
+ + "that fails with a potentially transient
error.");
+
public static final ConfigOption<Integer>
CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM =
key("client.scanner.remote-log.prefetch-num")
.intType()
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
index 92579e3ac..d3ddf29ac 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
@@ -148,7 +148,6 @@ public class FlinkTableFactory implements
DynamicTableSourceFactory, DynamicTabl
partitionKeyIndexes,
isStreamingMode,
startupOptions,
- tableOptions.get(LookupOptions.MAX_RETRIES),
tableOptions.get(FlinkConnectorOptions.LOOKUP_ASYNC),
cache,
partitionDiscoveryIntervalMs,
@@ -244,6 +243,13 @@ public class FlinkTableFactory implements
DynamicTableSourceFactory, DynamicTabl
}
});
+ // map flink lookup.max-retries to client.lookup.max-retries
+ if (tableOptions.containsKey(LookupOptions.MAX_RETRIES.key())) {
+ flussConfig.setString(
+ ConfigOptions.CLIENT_LOOKUP_MAX_RETRIES.key(),
+ tableOptions.get(LookupOptions.MAX_RETRIES.key()));
+ }
+
// pass flink io tmp dir to fluss client.
flussConfig.setString(
ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR,
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
index e7c7357e3..ef4b63812 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
@@ -127,7 +127,6 @@ public class FlinkTableSource
private final FlinkConnectorOptionsUtils.StartupOptions startupOptions;
// options for lookup source
- private final int lookupMaxRetryTimes;
private final boolean lookupAsync;
@Nullable private final LookupCache cache;
@@ -166,7 +165,6 @@ public class FlinkTableSource
int[] partitionKeyIndexes,
boolean streaming,
FlinkConnectorOptionsUtils.StartupOptions startupOptions,
- int lookupMaxRetryTimes,
boolean lookupAsync,
@Nullable LookupCache cache,
long scanPartitionDiscoveryIntervalMs,
@@ -183,7 +181,6 @@ public class FlinkTableSource
this.streaming = streaming;
this.startupOptions = checkNotNull(startupOptions, "startupOptions
must not be null");
- this.lookupMaxRetryTimes = lookupMaxRetryTimes;
this.lookupAsync = lookupAsync;
this.cache = cache;
@@ -253,7 +250,6 @@ public class FlinkTableSource
flussConfig,
tableOutputType,
primaryKeyIndexes,
- lookupMaxRetryTimes,
projectedFields);
} else if (limit > 0) {
results =
@@ -412,7 +408,6 @@ public class FlinkTableSource
flussConfig,
tablePath,
tableOutputType,
- lookupMaxRetryTimes,
lookupNormalizer,
projectedFields);
if (cache != null) {
@@ -426,7 +421,6 @@ public class FlinkTableSource
flussConfig,
tablePath,
tableOutputType,
- lookupMaxRetryTimes,
lookupNormalizer,
projectedFields);
if (cache != null) {
@@ -449,7 +443,6 @@ public class FlinkTableSource
partitionKeyIndexes,
streaming,
startupOptions,
- lookupMaxRetryTimes,
lookupAsync,
cache,
scanPartitionDiscoveryIntervalMs,
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java
index 716a74199..e1906712a 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkAsyncLookupFunction.java
@@ -33,6 +33,7 @@ import
org.apache.fluss.flink.utils.FlussRowToFlinkRowConverter;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.ProjectedRow;
+import org.apache.fluss.utils.ExceptionUtils;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.AsyncLookupFunction;
@@ -58,7 +59,6 @@ public class FlinkAsyncLookupFunction extends
AsyncLookupFunction {
private final Configuration flussConfig;
private final TablePath tablePath;
- private final int maxRetryTimes;
private final RowType flinkRowType;
private final LookupNormalizer lookupNormalizer;
@Nullable private final int[] projection;
@@ -73,12 +73,10 @@ public class FlinkAsyncLookupFunction extends
AsyncLookupFunction {
Configuration flussConfig,
TablePath tablePath,
RowType flinkRowType,
- int maxRetryTimes,
LookupNormalizer lookupNormalizer,
@Nullable int[] projection) {
this.flussConfig = flussConfig;
this.tablePath = tablePath;
- this.maxRetryTimes = maxRetryTimes;
this.flinkRowType = flinkRowType;
this.lookupNormalizer = lookupNormalizer;
this.projection = projection;
@@ -121,69 +119,35 @@ public class FlinkAsyncLookupFunction extends
AsyncLookupFunction {
RowData normalizedKeyRow = lookupNormalizer.normalizeLookupKey(keyRow);
RemainingFilter remainingFilter =
lookupNormalizer.createRemainingFilter(keyRow);
InternalRow flussKeyRow = lookupRow.replace(normalizedKeyRow);
- CompletableFuture<Collection<RowData>> future = new
CompletableFuture<>();
- // fetch result
- fetchResult(future, 0, flussKeyRow, remainingFilter);
- return future;
- }
- /**
- * Execute async fetch result .
- *
- * @param resultFuture The result or exception is returned.
- * @param currentRetry Current number of retries.
- * @param keyRow the key row to get.
- * @param remainingFilter the nullable remaining filter to filter the
result.
- */
- private void fetchResult(
- CompletableFuture<Collection<RowData>> resultFuture,
- int currentRetry,
- InternalRow keyRow,
- @Nullable RemainingFilter remainingFilter) {
- lookuper.lookup(keyRow)
+ // the retry mechanism is now handled by the underlying LookupClient
layer,
+ // we can't call lookuper.lookup() in whenComplete callback as
lookuper is not thread-safe.
+ CompletableFuture<Collection<RowData>> future = new
CompletableFuture<>();
+ lookuper.lookup(flussKeyRow)
.whenComplete(
(result, throwable) -> {
if (throwable != null) {
- handleLookupFailed(
- resultFuture,
- throwable,
- currentRetry,
- keyRow,
- remainingFilter);
+ if (ExceptionUtils.findThrowable(
+ throwable,
TableNotExistException.class)
+ .isPresent()) {
+ LOG.error("Table '{}' not found ",
tablePath, throwable);
+ future.completeExceptionally(
+ new RuntimeException(
+ "Fluss table '" +
tablePath + "' not found.",
+ throwable));
+ } else {
+ LOG.error("Fluss asyncLookup error",
throwable);
+ future.completeExceptionally(
+ new RuntimeException(
+ "Execution of Fluss
asyncLookup failed: "
+ +
throwable.getMessage(),
+ throwable));
+ }
} else {
- handleLookupSuccess(
- resultFuture, result.getRowList(),
remainingFilter);
+ handleLookupSuccess(future,
result.getRowList(), remainingFilter);
}
});
- }
-
- private void handleLookupFailed(
- CompletableFuture<Collection<RowData>> resultFuture,
- Throwable throwable,
- int currentRetry,
- InternalRow keyRow,
- @Nullable RemainingFilter remainingFilter) {
- if (throwable instanceof TableNotExistException) {
- LOG.error("Table '{}' not found ", tablePath, throwable);
- resultFuture.completeExceptionally(
- new RuntimeException("Fluss table '" + tablePath + "' not
found.", throwable));
- } else {
- LOG.error("Fluss asyncLookup error, retry times = {}",
currentRetry, throwable);
- if (currentRetry >= maxRetryTimes) {
- String exceptionMsg =
- String.format(
- "Execution of Fluss asyncLookup failed: %s,
retry times = %d.",
- throwable.getMessage(), currentRetry);
- resultFuture.completeExceptionally(new
RuntimeException(exceptionMsg, throwable));
- } else {
- try {
- Thread.sleep(1000L * currentRetry);
- } catch (InterruptedException e1) {
- resultFuture.completeExceptionally(e1);
- }
- fetchResult(resultFuture, currentRetry + 1, keyRow,
remainingFilter);
- }
- }
+ return future;
}
private void handleLookupSuccess(
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java
index 480247f0f..79418f9b2 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunction.java
@@ -54,7 +54,6 @@ public class FlinkLookupFunction extends LookupFunction {
private final Configuration flussConfig;
private final TablePath tablePath;
- private final int maxRetryTimes;
private final RowType flinkRowType;
private final LookupNormalizer lookupNormalizer;
@Nullable private final int[] projection;
@@ -70,12 +69,10 @@ public class FlinkLookupFunction extends LookupFunction {
Configuration flussConfig,
TablePath tablePath,
RowType flinkRowType,
- int maxRetryTimes,
LookupNormalizer lookupNormalizer,
@Nullable int[] projection) {
this.flussConfig = flussConfig;
this.tablePath = tablePath;
- this.maxRetryTimes = maxRetryTimes;
this.flinkRowType = flinkRowType;
this.lookupNormalizer = lookupNormalizer;
this.projection = projection;
@@ -124,41 +121,28 @@ public class FlinkLookupFunction extends LookupFunction {
lookupNormalizer.createRemainingFilter(keyRow);
// wrap flink row as fluss row to lookup, the flink row has already
been in expected order.
InternalRow flussKeyRow = lookupRow.replace(normalizedKeyRow);
- for (int retry = 0; retry <= maxRetryTimes; retry++) {
- try {
- List<InternalRow> lookupRows =
lookuper.lookup(flussKeyRow).get().getRowList();
- if (lookupRows.isEmpty()) {
- return Collections.emptyList();
- }
- List<RowData> projectedRows = new ArrayList<>();
- for (InternalRow row : lookupRows) {
- if (row != null) {
- RowData flinkRow =
-
flussRowToFlinkRowConverter.toFlinkRowData(maybeProject(row));
- if (remainingFilter == null ||
remainingFilter.isMatch(flinkRow)) {
- projectedRows.add(flinkRow);
- }
- }
- }
- return projectedRows;
- } catch (Exception e) {
- LOG.error(String.format("Fluss lookup error, retry times =
%d", retry), e);
- if (retry >= maxRetryTimes) {
- String exceptionMsg =
- String.format(
- "Execution of Fluss lookup failed, retry
times = %d.", retry);
- throw new RuntimeException(exceptionMsg, e);
- }
- try {
- Thread.sleep(1000L * retry);
- } catch (InterruptedException interruptedException) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(interruptedException);
+ // the retry mechanism will be handled by the underlying LookupClient
layer
+ try {
+ List<InternalRow> lookupRows =
lookuper.lookup(flussKeyRow).get().getRowList();
+ if (lookupRows.isEmpty()) {
+ return Collections.emptyList();
+ }
+ List<RowData> projectedRows = new ArrayList<>();
+ for (InternalRow row : lookupRows) {
+ if (row != null) {
+ RowData flinkRow =
+
flussRowToFlinkRowConverter.toFlinkRowData(maybeProject(row));
+ if (remainingFilter == null ||
remainingFilter.isMatch(flinkRow)) {
+ projectedRows.add(flinkRow);
+ }
}
}
+ return projectedRows;
+ } catch (Exception e) {
+ LOG.error("Fluss lookup error", e);
+ throw new RuntimeException("Execution of Fluss lookup failed: " +
e.getMessage(), e);
}
- return Collections.emptyList();
}
private InternalRow maybeProject(InternalRow row) {
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
index e32e0752c..1d1f39853 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
@@ -262,7 +262,6 @@ public class PushdownUtils {
Configuration flussConfig,
RowType sourceOutputType,
int[] primaryKeyIndexes,
- int lookupMaxRetryTimes,
@Nullable int[] projectedFields) {
LookupNormalizer lookupNormalizer =
createPrimaryKeyLookupNormalizer(primaryKeyIndexes,
sourceOutputType);
@@ -271,7 +270,6 @@ public class PushdownUtils {
flussConfig,
tablePath,
sourceOutputType,
- lookupMaxRetryTimes,
lookupNormalizer,
projectedFields);
try {
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
index a68350a0c..297c6a16d 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
@@ -23,7 +23,6 @@ import org.apache.fluss.flink.utils.FlinkConversions;
import org.apache.fluss.flink.utils.FlinkTestBase;
import org.apache.fluss.metadata.TablePath;
-import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.AsyncLookupFunction;
import org.apache.flink.table.types.logical.RowType;
@@ -57,7 +56,6 @@ class FlinkLookupFunctionTest extends FlinkTestBase {
clientConf,
tablePath,
flinkRowType,
- LookupOptions.MAX_RETRIES.defaultValue(),
createPrimaryKeyLookupNormalizer(new int[] {0},
flinkRowType),
null);
@@ -95,7 +93,6 @@ class FlinkLookupFunctionTest extends FlinkTestBase {
clientConf,
tablePath,
flinkRowType,
- LookupOptions.MAX_RETRIES.defaultValue(),
createPrimaryKeyLookupNormalizer(new int[] {0},
flinkRowType),
null);
asyncLookupFunction.open(null);
diff --git a/website/docs/engine-flink/options.md
b/website/docs/engine-flink/options.md
index caddc0f58..e5ecc0ff5 100644
--- a/website/docs/engine-flink/options.md
+++ b/website/docs/engine-flink/options.md
@@ -109,16 +109,17 @@ See more details about [ALTER TABLE ...
SET](engine-flink/ddl.md#set-properties)
| Option | Type | Default |
Description
|
|------------------------------------------|------------|---------|-----------------------------------------------------------------------------------------------------------------------------|
| lookup.async | Boolean | true | Whether to
use asynchronous lookup. Asynchronous lookup has better throughput performance
than synchronous lookup. |
-| lookup.cache | Enum | NONE | The
caching strategy for this lookup table, including NONE, PARTIAL.
|
|
-| lookup.max-retries | Integer | 3 | The
maximum allowed retries if a lookup operation fails.
|
|
-| lookup.partial-cache.expire-after-access | Duration | (None) | Duration
to expire an entry in the cache after accessing.
|
|
-| lookup.partial-cache.expire-after-write | Duration | (None) | Duration
to expire an entry in the cache after writing.
|
|
-| lookup.partial-cache.cache-missing-key | Boolean | true | Whether to
store an empty value into the cache if the lookup key doesn't match any rows in
the table. |
|
-| lookup.partial-cache.max-rows | Long | (None) | The
maximum number of rows to store in the cache.
|
|
+| lookup.cache | Enum | NONE | The
caching strategy for this lookup table, including NONE, PARTIAL.
|
+| lookup.max-retries | Integer | 3 | The
maximum allowed retries if a lookup operation fails. Setting this value will
override option 'client.lookup.max-retries'.|
+| lookup.partial-cache.expire-after-access | Duration | (None) | Duration
to expire an entry in the cache after accessing.
|
+| lookup.partial-cache.expire-after-write | Duration | (None) | Duration
to expire an entry in the cache after writing.
|
+| lookup.partial-cache.cache-missing-key | Boolean | true | Whether to
store an empty value into the cache if the lookup key doesn't match any rows in
the table. |
+| lookup.partial-cache.max-rows | Long | (None) | The
maximum number of rows to store in the cache.
|
| client.lookup.queue-size | Integer | 25600 | The
maximum number of pending lookup operations.
|
| client.lookup.max-batch-size | Integer | 128 | The
maximum batch size of merging lookup operations to one lookup request.
|
| client.lookup.max-inflight-requests | Integer | 128 | The
maximum number of unacknowledged lookup requests for lookup operations.
|
| client.lookup.batch-timeout | Duration | 100ms | The
maximum time to wait for the lookup batch to full, if this timeout is reached,
the lookup batch will be closed to send. |
+| client.lookup.max-retries | Integer | 3 | Setting a
value greater than zero will cause the client to resend any lookup request that
fails with a potentially transient error. |
## Write Options