This is an automated email from the ASF dual-hosted git repository.
yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 183c21315 [Client] Fix LookupSender doesn't update bucket metadata
when receive NotLeaderOrFollowerException in bucket level (#1928)
183c21315 is described below
commit 183c2131545ec3198bc79a6816ccdfbcb9c9c06e
Author: yunhong <[email protected]>
AuthorDate: Mon Nov 10 10:13:14 2025 +0800
[Client] Fix LookupSender doesn't update bucket metadata when receive
NotLeaderOrFollowerException in bucket level (#1928)
---
.../apache/fluss/client/lookup/LookupSender.java | 106 +++++++++++++-----
.../fluss/client/lookup/LookupSenderTest.java | 119 +++++++++++++++++++++
.../client/metadata/TestingMetadataUpdater.java | 4 +
.../server/tablet/TestTabletServerGateway.java | 56 +++++++++-
4 files changed, 259 insertions(+), 26 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
index 87fa0dff1..0f249213a 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
@@ -18,10 +18,15 @@
package org.apache.fluss.client.lookup;
import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.client.metadata.MetadataUpdater;
+import org.apache.fluss.exception.ApiException;
import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.exception.InvalidMetadataException;
import org.apache.fluss.exception.LeaderNotAvailableException;
+import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePartition;
import org.apache.fluss.rpc.gateway.TabletServerGateway;
import org.apache.fluss.rpc.messages.LookupRequest;
import org.apache.fluss.rpc.messages.LookupResponse;
@@ -36,10 +41,14 @@ import org.apache.fluss.utils.types.Tuple2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
@@ -145,7 +154,8 @@ class LookupSender implements Runnable {
return lookupBatchesByLeader;
}
- private void sendLookups(
+ @VisibleForTesting
+ void sendLookups(
int destination, LookupType lookupType,
List<AbstractLookupQuery<?>> lookupBatches) {
TabletServerGateway gateway =
metadataUpdater.newTabletServerClientForNode(destination);
if (gateway == null) {
@@ -155,16 +165,16 @@ class LookupSender implements Runnable {
}
if (lookupType == LookupType.LOOKUP) {
- sendLookupRequest(gateway, lookupBatches);
+ sendLookupRequest(destination, gateway, lookupBatches);
} else if (lookupType == LookupType.PREFIX_LOOKUP) {
- sendPrefixLookupRequest(gateway, lookupBatches);
+ sendPrefixLookupRequest(destination, gateway, lookupBatches);
} else {
throw new IllegalArgumentException("Unsupported lookup type: " +
lookupType);
}
}
private void sendLookupRequest(
- TabletServerGateway gateway, List<AbstractLookupQuery<?>> lookups)
{
+ int destination, TabletServerGateway gateway,
List<AbstractLookupQuery<?>> lookups) {
// table id -> (bucket -> lookups)
Map<Long, Map<TableBucket, LookupBatch>> lookupByTableId = new
HashMap<>();
for (AbstractLookupQuery<?> abstractLookupQuery : lookups) {
@@ -180,6 +190,7 @@ class LookupSender implements Runnable {
lookupByTableId.forEach(
(tableId, lookupsByBucket) ->
sendLookupRequestAndHandleResponse(
+ destination,
gateway,
makeLookupRequest(tableId,
lookupsByBucket.values()),
tableId,
@@ -187,7 +198,9 @@ class LookupSender implements Runnable {
}
private void sendPrefixLookupRequest(
- TabletServerGateway gateway, List<AbstractLookupQuery<?>>
prefixLookups) {
+ int destination,
+ TabletServerGateway gateway,
+ List<AbstractLookupQuery<?>> prefixLookups) {
// table id -> (bucket -> lookups)
Map<Long, Map<TableBucket, PrefixLookupBatch>> lookupByTableId = new
HashMap<>();
for (AbstractLookupQuery<?> abstractLookupQuery : prefixLookups) {
@@ -203,6 +216,7 @@ class LookupSender implements Runnable {
lookupByTableId.forEach(
(tableId, prefixLookupBatch) ->
sendPrefixLookupRequestAndHandleResponse(
+ destination,
gateway,
makePrefixLookupRequest(tableId,
prefixLookupBatch.values()),
tableId,
@@ -210,6 +224,7 @@ class LookupSender implements Runnable {
}
private void sendLookupRequestAndHandleResponse(
+ int destination,
TabletServerGateway gateway,
LookupRequest lookupRequest,
long tableId,
@@ -224,7 +239,8 @@ class LookupSender implements Runnable {
.thenAccept(
lookupResponse -> {
try {
- handleLookupResponse(tableId, lookupResponse,
lookupsByBucket);
+ handleLookupResponse(
+ tableId, destination, lookupResponse,
lookupsByBucket);
} finally {
maxInFlightReuqestsSemaphore.release();
}
@@ -232,7 +248,7 @@ class LookupSender implements Runnable {
.exceptionally(
e -> {
try {
- handleLookupRequestException(e,
lookupsByBucket);
+ handleLookupRequestException(e, destination,
lookupsByBucket);
return null;
} finally {
maxInFlightReuqestsSemaphore.release();
@@ -241,6 +257,7 @@ class LookupSender implements Runnable {
}
private void sendPrefixLookupRequestAndHandleResponse(
+ int destination,
TabletServerGateway gateway,
PrefixLookupRequest prefixLookupRequest,
long tableId,
@@ -256,7 +273,10 @@ class LookupSender implements Runnable {
prefixLookupResponse -> {
try {
handlePrefixLookupResponse(
- tableId, prefixLookupResponse,
lookupsByBucket);
+ tableId,
+ destination,
+ prefixLookupResponse,
+ lookupsByBucket);
} finally {
maxInFlightReuqestsSemaphore.release();
}
@@ -264,7 +284,7 @@ class LookupSender implements Runnable {
.exceptionally(
e -> {
try {
- handlePrefixLookupException(e,
lookupsByBucket);
+ handlePrefixLookupException(e, destination,
lookupsByBucket);
return null;
} finally {
maxInFlightReuqestsSemaphore.release();
@@ -274,6 +294,7 @@ class LookupSender implements Runnable {
private void handleLookupResponse(
long tableId,
+ int destination,
LookupResponse lookupResponse,
Map<TableBucket, LookupBatch> lookupsByBucket) {
for (PbLookupRespForBucket pbLookupRespForBucket :
lookupResponse.getBucketsRespsList()) {
@@ -288,10 +309,7 @@ class LookupSender implements Runnable {
if (pbLookupRespForBucket.hasErrorCode()) {
// TODO for re-triable error, we should retry here instead of
throwing exception.
ApiError error =
ApiError.fromErrorMessage(pbLookupRespForBucket);
- LOG.warn(
- "Get error lookup response on table bucket {}, fail.
Error: {}",
- tableBucket,
- error.formatErrMsg());
+ handleLookupExceptionForBucket(tableBucket, destination,
error, "lookup");
lookupBatch.completeExceptionally(error.exception());
} else {
List<byte[]> byteValues =
@@ -312,6 +330,7 @@ class LookupSender implements Runnable {
private void handlePrefixLookupResponse(
long tableId,
+ int destination,
PrefixLookupResponse prefixLookupResponse,
Map<TableBucket, PrefixLookupBatch> prefixLookupsByBucket) {
for (PbPrefixLookupRespForBucket pbRespForBucket :
@@ -328,10 +347,7 @@ class LookupSender implements Runnable {
if (pbRespForBucket.hasErrorCode()) {
// TODO for re-triable error, we should retry here instead of
throwing exception.
ApiError error = ApiError.fromErrorMessage(pbRespForBucket);
- LOG.warn(
- "Get error prefix lookup response on table bucket {},
fail. Error: {}",
- tableBucket,
- error.formatErrMsg());
+ handleLookupExceptionForBucket(tableBucket, destination,
error, "prefixLookup");
prefixLookupBatch.completeExceptionally(error.exception());
} else {
List<List<byte[]>> result = new
ArrayList<>(pbRespForBucket.getValueListsCount());
@@ -349,24 +365,22 @@ class LookupSender implements Runnable {
}
private void handleLookupRequestException(
- Throwable t, Map<TableBucket, LookupBatch> lookupsByBucket) {
+ Throwable t, int destination, Map<TableBucket, LookupBatch>
lookupsByBucket) {
ApiError error = ApiError.fromThrowable(t);
for (LookupBatch lookupBatch : lookupsByBucket.values()) {
// TODO for re-triable error, we should retry here instead of
throwing exception.
- LOG.warn(
- "Get error lookup response on table bucket {}, fail.
Error: {}",
- lookupBatch.tableBucket(),
- error.formatErrMsg());
+ handleLookupExceptionForBucket(lookupBatch.tableBucket(),
destination, error, "lookup");
lookupBatch.completeExceptionally(error.exception());
}
}
private void handlePrefixLookupException(
- Throwable t, Map<TableBucket, PrefixLookupBatch> lookupsByBucket) {
+ Throwable t, int destination, Map<TableBucket, PrefixLookupBatch>
lookupsByBucket) {
ApiError error = ApiError.fromThrowable(t);
// TODO If error, we need to retry send the request instead of throw
exception.
- LOG.warn("Get error prefix lookup response. Error: {}",
error.formatErrMsg());
for (PrefixLookupBatch lookupBatch : lookupsByBucket.values()) {
+ handleLookupExceptionForBucket(
+ lookupBatch.tableBucket(), destination, error,
"prefixLookup");
lookupBatch.completeExceptionally(error.exception());
}
}
@@ -382,4 +396,48 @@ class LookupSender implements Runnable {
lookupQueue.close();
running = false;
}
+
+ private void handleLookupExceptionForBucket(
+ TableBucket tb, int destination, ApiError error, String
lookupType) {
+ ApiException exception = error.error().exception();
+ LOG.error(
+ "Failed to {} from node {} for bucket {}", lookupType,
destination, tb, exception);
+ if (exception instanceof InvalidMetadataException) {
+ LOG.warn(
+ "Invalid metadata error in {} request. Going to request
metadata update.",
+ lookupType,
+ exception);
+ long tableId = tb.getTableId();
+ TableOrPartitions tableOrPartitions;
+ if (tb.getPartitionId() == null) {
+ tableOrPartitions = new
TableOrPartitions(Collections.singleton(tableId), null);
+ } else {
+ tableOrPartitions =
+ new TableOrPartitions(
+ null,
+ Collections.singleton(
+ new TablePartition(tableId,
tb.getPartitionId())));
+ }
+ invalidTableOrPartitions(tableOrPartitions);
+ }
+ }
+
+ /** A helper class to hold table ids or table partitions. */
+ private static class TableOrPartitions {
+ private final @Nullable Set<Long> tableIds;
+ private final @Nullable Set<TablePartition> tablePartitions;
+
+ TableOrPartitions(
+ @Nullable Set<Long> tableIds, @Nullable Set<TablePartition>
tablePartitions) {
+ this.tableIds = tableIds;
+ this.tablePartitions = tablePartitions;
+ }
+ }
+
+ private void invalidTableOrPartitions(TableOrPartitions tableOrPartitions)
{
+ Set<PhysicalTablePath> physicalTablePaths =
+ metadataUpdater.getPhysicalTablePathByIds(
+ tableOrPartitions.tableIds,
tableOrPartitions.tablePartitions);
+ metadataUpdater.invalidPhysicalTableBucketMeta(physicalTablePaths);
+ }
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
new file mode 100644
index 000000000..336f588ea
--- /dev/null
+++
b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.lookup;
+
+import org.apache.fluss.client.metadata.TestingMetadataUpdater;
+import org.apache.fluss.cluster.BucketLocation;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.NotLeaderOrFollowerException;
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.fluss.record.TestData.DATA1_TABLE_ID_PK;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO_PK;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link LookupSender}. */
+public class LookupSenderTest {
+
+ private final TableBucket tb1 = new TableBucket(DATA1_TABLE_ID_PK, 0);
+
+ private TestingMetadataUpdater metadataUpdater;
+ private LookupSender lookupSender;
+
+ @BeforeEach
+ public void setup() {
+ metadataUpdater = initializeMetadataUpdater();
+ Configuration conf = new Configuration();
+ conf.set(ConfigOptions.CLIENT_LOOKUP_QUEUE_SIZE, 5);
+ conf.set(ConfigOptions.CLIENT_LOOKUP_MAX_BATCH_SIZE, 10);
+ lookupSender = new LookupSender(metadataUpdater, new
LookupQueue(conf), 5);
+ }
+
+ @Test
+ void testSendLookupRequestWithNotLeaderOrFollowerException() {
+ assertThat(metadataUpdater.getBucketLocation(tb1))
+ .hasValue(
+ new BucketLocation(
+ PhysicalTablePath.of(DATA1_TABLE_PATH_PK),
+ tb1,
+ 1,
+ new int[] {1, 2, 3}));
+
+ // send LookupRequest to serverId 1, which will respond with
NotLeaderOrFollowerException
+ // as responseLogicId=1 do.
+ metadataUpdater.setResponseLogicId(1, 1);
+ LookupQuery lookupQuery = new LookupQuery(tb1, new byte[0]);
+ CompletableFuture<byte[]> result = lookupQuery.future();
+ assertThat(result).isNotDone();
+ lookupSender.sendLookups(1, LookupType.LOOKUP,
Collections.singletonList(lookupQuery));
+
+ assertThat(result.isCompletedExceptionally()).isTrue();
+ assertThatThrownBy(result::get)
+ .rootCause()
+ .isInstanceOf(NotLeaderOrFollowerException.class)
+ .hasMessage("mock not leader or follower exception.");
+ // When NotLeaderOrFollowerException is received, the bucketLocation
will be removed from
+ // metadata updater to trigger get the latest bucketLocation in next
lookup round.
+ assertThat(metadataUpdater.getBucketLocation(tb1)).isNotPresent();
+ }
+
+ @Test
+ void testSendPrefixLookupRequestWithNotLeaderOrFollowerException() {
+ assertThat(metadataUpdater.getBucketLocation(tb1))
+ .hasValue(
+ new BucketLocation(
+ PhysicalTablePath.of(DATA1_TABLE_PATH_PK),
+ tb1,
+ 1,
+ new int[] {1, 2, 3}));
+
+ // send PrefixLookupRequest to serverId 1, which will respond with
+ // NotLeaderOrFollowerException as responseLogicId=1 do.
+ metadataUpdater.setResponseLogicId(1, 1);
+ PrefixLookupQuery prefixLookupQuery = new PrefixLookupQuery(tb1, new
byte[0]);
+ CompletableFuture<List<byte[]>> future = prefixLookupQuery.future();
+ assertThat(future).isNotDone();
+ lookupSender.sendLookups(
+ 1, LookupType.PREFIX_LOOKUP,
Collections.singletonList(prefixLookupQuery));
+
+ assertThat(future.isCompletedExceptionally()).isTrue();
+ assertThatThrownBy(future::get)
+ .rootCause()
+ .isInstanceOf(NotLeaderOrFollowerException.class)
+ .hasMessage("mock not leader or follower exception.");
+ // When NotLeaderOrFollowerException is received, the bucketLocation
will be removed from
+ // metadata updater to trigger get the latest bucketLocation in next
lookup round.
+ assertThat(metadataUpdater.getBucketLocation(tb1)).isNotPresent();
+ }
+
+ private TestingMetadataUpdater initializeMetadataUpdater() {
+ return new TestingMetadataUpdater(
+ Collections.singletonMap(DATA1_TABLE_PATH_PK,
DATA1_TABLE_INFO_PK));
+ }
+}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
index 4420a2ebb..38994cbca 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
@@ -78,6 +78,10 @@ public class TestingMetadataUpdater extends MetadataUpdater {
this.cluster = cluster;
}
+ public void setResponseLogicId(int serverId, int responseLogicId) {
+
tabletServerGatewayMap.get(serverId).setResponseLogicId(responseLogicId);
+ }
+
@Override
public void checkAndUpdateTableMetadata(Set<TablePath> tablePaths) {
Set<TablePath> needUpdateTablePaths =
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
index 337aea70b..637e909f6 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
@@ -17,9 +17,12 @@
package org.apache.fluss.server.tablet;
+import org.apache.fluss.exception.NotLeaderOrFollowerException;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.record.MemoryLogRecords;
import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
+import org.apache.fluss.rpc.entity.LookupResultForBucket;
+import org.apache.fluss.rpc.entity.PrefixLookupResultForBucket;
import org.apache.fluss.rpc.gateway.TabletServerGateway;
import org.apache.fluss.rpc.messages.ApiMessage;
import org.apache.fluss.rpc.messages.ApiVersionsRequest;
@@ -87,6 +90,7 @@ import org.apache.fluss.rpc.messages.TableExistsRequest;
import org.apache.fluss.rpc.messages.TableExistsResponse;
import org.apache.fluss.rpc.messages.UpdateMetadataRequest;
import org.apache.fluss.rpc.messages.UpdateMetadataResponse;
+import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.server.entity.FetchReqInfo;
import org.apache.fluss.utils.types.Tuple2;
@@ -104,6 +108,10 @@ import java.util.concurrent.atomic.AtomicLong;
import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.getFetchLogData;
import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeFetchLogResponse;
+import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeLookupResponse;
+import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.makePrefixLookupResponse;
+import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toLookupData;
+import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.toPrefixLookupData;
/** A {@link TabletServerGateway} for test purpose. */
public class TestTabletServerGateway implements TabletServerGateway {
@@ -111,6 +119,9 @@ public class TestTabletServerGateway implements
TabletServerGateway {
private final boolean alwaysFail;
private final AtomicLong writerId = new AtomicLong(0);
+ /** The id to define the response logic. */
+ private int responseLogicId;
+
// Use concurrent queue for storing request and related completable future
response so that
// requests may be queried from a different thread.
private final Queue<Tuple2<ApiMessage, CompletableFuture<?>>> requests =
@@ -118,6 +129,7 @@ public class TestTabletServerGateway implements
TabletServerGateway {
public TestTabletServerGateway(boolean alwaysFail) {
this.alwaysFail = alwaysFail;
+ this.responseLogicId = 0;
}
@Override
@@ -182,12 +194,48 @@ public class TestTabletServerGateway implements
TabletServerGateway {
@Override
public CompletableFuture<LookupResponse> lookup(LookupRequest request) {
- return null;
+ Map<TableBucket, List<byte[]>> lookupData = toLookupData(request);
+ Map<TableBucket, LookupResultForBucket> errorResponseMap = new
HashMap<>();
+ if (responseLogicId == 1) {
+ // return with NotLeaderOrFollowerException.
+ lookupData
+ .keySet()
+ .forEach(
+ tb ->
+ errorResponseMap.put(
+ tb,
+ new LookupResultForBucket(
+ tb,
+ ApiError.fromThrowable(
+ new
NotLeaderOrFollowerException(
+ "mock not
leader or follower exception.")))));
+ return
CompletableFuture.completedFuture(makeLookupResponse(errorResponseMap));
+ } else {
+ return null;
+ }
}
@Override
public CompletableFuture<PrefixLookupResponse>
prefixLookup(PrefixLookupRequest request) {
- throw new UnsupportedOperationException();
+ Map<TableBucket, List<byte[]>> prefixLookupData =
toPrefixLookupData(request);
+ Map<TableBucket, PrefixLookupResultForBucket> errorResponseMap = new
HashMap<>();
+ if (responseLogicId == 1) {
+ // return with NotLeaderOrFollowerException.
+ prefixLookupData
+ .keySet()
+ .forEach(
+ tb ->
+ errorResponseMap.put(
+ tb,
+ new PrefixLookupResultForBucket(
+ tb,
+ ApiError.fromThrowable(
+ new
NotLeaderOrFollowerException(
+ "mock not
leader or follower exception.")))));
+ return
CompletableFuture.completedFuture(makePrefixLookupResponse(errorResponseMap));
+ } else {
+ throw new UnsupportedOperationException();
+ }
}
@Override
@@ -387,6 +435,10 @@ public class TestTabletServerGateway implements
TabletServerGateway {
}
}
+ public void setResponseLogicId(int responseLogicId) {
+ this.responseLogicId = responseLogicId;
+ }
+
private StopReplicaResponse mockStopReplicaResponse(
StopReplicaRequest stopReplicaRequest,
@Nullable Integer errCode,