This is an automated email from the ASF dual-hosted git repository.

yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 183c21315 [Client] Fix LookupSender doesn't update bucket metadata 
when receive NotLeaderOrFollowerException in bucket level (#1928)
183c21315 is described below

commit 183c2131545ec3198bc79a6816ccdfbcb9c9c06e
Author: yunhong <[email protected]>
AuthorDate: Mon Nov 10 10:13:14 2025 +0800

    [Client] Fix LookupSender doesn't update bucket metadata when receive 
NotLeaderOrFollowerException in bucket level (#1928)
---
 .../apache/fluss/client/lookup/LookupSender.java   | 106 +++++++++++++-----
 .../fluss/client/lookup/LookupSenderTest.java      | 119 +++++++++++++++++++++
 .../client/metadata/TestingMetadataUpdater.java    |   4 +
 .../server/tablet/TestTabletServerGateway.java     |  56 +++++++++-
 4 files changed, 259 insertions(+), 26 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
index 87fa0dff1..0f249213a 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
@@ -18,10 +18,15 @@
 package org.apache.fluss.client.lookup;
 
 import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.client.metadata.MetadataUpdater;
+import org.apache.fluss.exception.ApiException;
 import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.exception.InvalidMetadataException;
 import org.apache.fluss.exception.LeaderNotAvailableException;
+import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePartition;
 import org.apache.fluss.rpc.gateway.TabletServerGateway;
 import org.apache.fluss.rpc.messages.LookupRequest;
 import org.apache.fluss.rpc.messages.LookupResponse;
@@ -36,10 +41,14 @@ import org.apache.fluss.utils.types.Tuple2;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Semaphore;
 import java.util.stream.Collectors;
 
@@ -145,7 +154,8 @@ class LookupSender implements Runnable {
         return lookupBatchesByLeader;
     }
 
-    private void sendLookups(
+    @VisibleForTesting
+    void sendLookups(
             int destination, LookupType lookupType, 
List<AbstractLookupQuery<?>> lookupBatches) {
         TabletServerGateway gateway = 
metadataUpdater.newTabletServerClientForNode(destination);
         if (gateway == null) {
@@ -155,16 +165,16 @@ class LookupSender implements Runnable {
         }
 
         if (lookupType == LookupType.LOOKUP) {
-            sendLookupRequest(gateway, lookupBatches);
+            sendLookupRequest(destination, gateway, lookupBatches);
         } else if (lookupType == LookupType.PREFIX_LOOKUP) {
-            sendPrefixLookupRequest(gateway, lookupBatches);
+            sendPrefixLookupRequest(destination, gateway, lookupBatches);
         } else {
             throw new IllegalArgumentException("Unsupported lookup type: " + 
lookupType);
         }
     }
 
     private void sendLookupRequest(
-            TabletServerGateway gateway, List<AbstractLookupQuery<?>> lookups) 
{
+            int destination, TabletServerGateway gateway, 
List<AbstractLookupQuery<?>> lookups) {
         // table id -> (bucket -> lookups)
         Map<Long, Map<TableBucket, LookupBatch>> lookupByTableId = new 
HashMap<>();
         for (AbstractLookupQuery<?> abstractLookupQuery : lookups) {
@@ -180,6 +190,7 @@ class LookupSender implements Runnable {
         lookupByTableId.forEach(
                 (tableId, lookupsByBucket) ->
                         sendLookupRequestAndHandleResponse(
+                                destination,
                                 gateway,
                                 makeLookupRequest(tableId, 
lookupsByBucket.values()),
                                 tableId,
@@ -187,7 +198,9 @@ class LookupSender implements Runnable {
     }
 
     private void sendPrefixLookupRequest(
-            TabletServerGateway gateway, List<AbstractLookupQuery<?>> 
prefixLookups) {
+            int destination,
+            TabletServerGateway gateway,
+            List<AbstractLookupQuery<?>> prefixLookups) {
         // table id -> (bucket -> lookups)
         Map<Long, Map<TableBucket, PrefixLookupBatch>> lookupByTableId = new 
HashMap<>();
         for (AbstractLookupQuery<?> abstractLookupQuery : prefixLookups) {
@@ -203,6 +216,7 @@ class LookupSender implements Runnable {
         lookupByTableId.forEach(
                 (tableId, prefixLookupBatch) ->
                         sendPrefixLookupRequestAndHandleResponse(
+                                destination,
                                 gateway,
                                 makePrefixLookupRequest(tableId, 
prefixLookupBatch.values()),
                                 tableId,
@@ -210,6 +224,7 @@ class LookupSender implements Runnable {
     }
 
     private void sendLookupRequestAndHandleResponse(
+            int destination,
             TabletServerGateway gateway,
             LookupRequest lookupRequest,
             long tableId,
@@ -224,7 +239,8 @@ class LookupSender implements Runnable {
                 .thenAccept(
                         lookupResponse -> {
                             try {
-                                handleLookupResponse(tableId, lookupResponse, 
lookupsByBucket);
+                                handleLookupResponse(
+                                        tableId, destination, lookupResponse, 
lookupsByBucket);
                             } finally {
                                 maxInFlightReuqestsSemaphore.release();
                             }
@@ -232,7 +248,7 @@ class LookupSender implements Runnable {
                 .exceptionally(
                         e -> {
                             try {
-                                handleLookupRequestException(e, 
lookupsByBucket);
+                                handleLookupRequestException(e, destination, 
lookupsByBucket);
                                 return null;
                             } finally {
                                 maxInFlightReuqestsSemaphore.release();
@@ -241,6 +257,7 @@ class LookupSender implements Runnable {
     }
 
     private void sendPrefixLookupRequestAndHandleResponse(
+            int destination,
             TabletServerGateway gateway,
             PrefixLookupRequest prefixLookupRequest,
             long tableId,
@@ -256,7 +273,10 @@ class LookupSender implements Runnable {
                         prefixLookupResponse -> {
                             try {
                                 handlePrefixLookupResponse(
-                                        tableId, prefixLookupResponse, 
lookupsByBucket);
+                                        tableId,
+                                        destination,
+                                        prefixLookupResponse,
+                                        lookupsByBucket);
                             } finally {
                                 maxInFlightReuqestsSemaphore.release();
                             }
@@ -264,7 +284,7 @@ class LookupSender implements Runnable {
                 .exceptionally(
                         e -> {
                             try {
-                                handlePrefixLookupException(e, 
lookupsByBucket);
+                                handlePrefixLookupException(e, destination, 
lookupsByBucket);
                                 return null;
                             } finally {
                                 maxInFlightReuqestsSemaphore.release();
@@ -274,6 +294,7 @@ class LookupSender implements Runnable {
 
     private void handleLookupResponse(
             long tableId,
+            int destination,
             LookupResponse lookupResponse,
             Map<TableBucket, LookupBatch> lookupsByBucket) {
         for (PbLookupRespForBucket pbLookupRespForBucket : 
lookupResponse.getBucketsRespsList()) {
@@ -288,10 +309,7 @@ class LookupSender implements Runnable {
             if (pbLookupRespForBucket.hasErrorCode()) {
                 // TODO for re-triable error, we should retry here instead of 
throwing exception.
                 ApiError error = 
ApiError.fromErrorMessage(pbLookupRespForBucket);
-                LOG.warn(
-                        "Get error lookup response on table bucket {}, fail. 
Error: {}",
-                        tableBucket,
-                        error.formatErrMsg());
+                handleLookupExceptionForBucket(tableBucket, destination, 
error, "lookup");
                 lookupBatch.completeExceptionally(error.exception());
             } else {
                 List<byte[]> byteValues =
@@ -312,6 +330,7 @@ class LookupSender implements Runnable {
 
     private void handlePrefixLookupResponse(
             long tableId,
+            int destination,
             PrefixLookupResponse prefixLookupResponse,
             Map<TableBucket, PrefixLookupBatch> prefixLookupsByBucket) {
         for (PbPrefixLookupRespForBucket pbRespForBucket :
@@ -328,10 +347,7 @@ class LookupSender implements Runnable {
             if (pbRespForBucket.hasErrorCode()) {
                 // TODO for re-triable error, we should retry here instead of 
throwing exception.
                 ApiError error = ApiError.fromErrorMessage(pbRespForBucket);
-                LOG.warn(
-                        "Get error prefix lookup response on table bucket {}, 
fail. Error: {}",
-                        tableBucket,
-                        error.formatErrMsg());
+                handleLookupExceptionForBucket(tableBucket, destination, 
error, "prefixLookup");
                 prefixLookupBatch.completeExceptionally(error.exception());
             } else {
                 List<List<byte[]>> result = new 
ArrayList<>(pbRespForBucket.getValueListsCount());
@@ -349,24 +365,22 @@ class LookupSender implements Runnable {
     }
 
     private void handleLookupRequestException(
-            Throwable t, Map<TableBucket, LookupBatch> lookupsByBucket) {
+            Throwable t, int destination, Map<TableBucket, LookupBatch> 
lookupsByBucket) {
         ApiError error = ApiError.fromThrowable(t);
         for (LookupBatch lookupBatch : lookupsByBucket.values()) {
             // TODO for re-triable error, we should retry here instead of 
throwing exception.
-            LOG.warn(
-                    "Get error lookup response on table bucket {}, fail. 
Error: {}",
-                    lookupBatch.tableBucket(),
-                    error.formatErrMsg());
+            handleLookupExceptionForBucket(lookupBatch.tableBucket(), 
destination, error, "lookup");
             lookupBatch.completeExceptionally(error.exception());
         }
     }
 
     private void handlePrefixLookupException(
-            Throwable t, Map<TableBucket, PrefixLookupBatch> lookupsByBucket) {
+            Throwable t, int destination, Map<TableBucket, PrefixLookupBatch> 
lookupsByBucket) {
         ApiError error = ApiError.fromThrowable(t);
         // TODO If error, we need to retry send the request instead of throw 
exception.
-        LOG.warn("Get error prefix lookup response. Error: {}", 
error.formatErrMsg());
         for (PrefixLookupBatch lookupBatch : lookupsByBucket.values()) {
+            handleLookupExceptionForBucket(
+                    lookupBatch.tableBucket(), destination, error, 
"prefixLookup");
             lookupBatch.completeExceptionally(error.exception());
         }
     }
@@ -382,4 +396,48 @@ class LookupSender implements Runnable {
         lookupQueue.close();
         running = false;
     }
+
+    private void handleLookupExceptionForBucket(
+            TableBucket tb, int destination, ApiError error, String 
lookupType) {
+        ApiException exception = error.error().exception();
+        LOG.error(
+                "Failed to {} from node {} for bucket {}", lookupType, 
destination, tb, exception);
+        if (exception instanceof InvalidMetadataException) {
+            LOG.warn(
+                    "Invalid metadata error in {} request. Going to request 
metadata update.",
+                    lookupType,
+                    exception);
+            long tableId = tb.getTableId();
+            TableOrPartitions tableOrPartitions;
+            if (tb.getPartitionId() == null) {
+                tableOrPartitions = new 
TableOrPartitions(Collections.singleton(tableId), null);
+            } else {
+                tableOrPartitions =
+                        new TableOrPartitions(
+                                null,
+                                Collections.singleton(
+                                        new TablePartition(tableId, 
tb.getPartitionId())));
+            }
+            invalidTableOrPartitions(tableOrPartitions);
+        }
+    }
+
+    /** A helper class to hold table ids or table partitions. */
+    private static class TableOrPartitions {
+        private final @Nullable Set<Long> tableIds;
+        private final @Nullable Set<TablePartition> tablePartitions;
+
+        TableOrPartitions(
+                @Nullable Set<Long> tableIds, @Nullable Set<TablePartition> 
tablePartitions) {
+            this.tableIds = tableIds;
+            this.tablePartitions = tablePartitions;
+        }
+    }
+
+    private void invalidTableOrPartitions(TableOrPartitions tableOrPartitions) 
{
+        Set<PhysicalTablePath> physicalTablePaths =
+                metadataUpdater.getPhysicalTablePathByIds(
+                        tableOrPartitions.tableIds, 
tableOrPartitions.tablePartitions);
+        metadataUpdater.invalidPhysicalTableBucketMeta(physicalTablePaths);
+    }
 }
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
new file mode 100644
index 000000000..336f588ea
--- /dev/null
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.lookup;
+
+import org.apache.fluss.client.metadata.TestingMetadataUpdater;
+import org.apache.fluss.cluster.BucketLocation;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.NotLeaderOrFollowerException;
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.fluss.record.TestData.DATA1_TABLE_ID_PK;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO_PK;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link LookupSender}. */
+public class LookupSenderTest {
+
+    private final TableBucket tb1 = new TableBucket(DATA1_TABLE_ID_PK, 0);
+
+    private TestingMetadataUpdater metadataUpdater;
+    private LookupSender lookupSender;
+
+    @BeforeEach
+    public void setup() {
+        metadataUpdater = initializeMetadataUpdater();
+        Configuration conf = new Configuration();
+        conf.set(ConfigOptions.CLIENT_LOOKUP_QUEUE_SIZE, 5);
+        conf.set(ConfigOptions.CLIENT_LOOKUP_MAX_BATCH_SIZE, 10);
+        lookupSender = new LookupSender(metadataUpdater, new 
LookupQueue(conf), 5);
+    }
+
+    @Test
+    void testSendLookupRequestWithNotLeaderOrFollowerException() {
+        assertThat(metadataUpdater.getBucketLocation(tb1))
+                .hasValue(
+                        new BucketLocation(
+                                PhysicalTablePath.of(DATA1_TABLE_PATH_PK),
+                                tb1,
+                                1,
+                                new int[] {1, 2, 3}));
+
+        // send LookupRequest to serverId 1, which will respond with 
NotLeaderOrFollowerException
+        // as responseLogicId=1 do.
+        metadataUpdater.setResponseLogicId(1, 1);
+        LookupQuery lookupQuery = new LookupQuery(tb1, new byte[0]);
+        CompletableFuture<byte[]> result = lookupQuery.future();
+        assertThat(result).isNotDone();
+        lookupSender.sendLookups(1, LookupType.LOOKUP, 
Collections.singletonList(lookupQuery));
+
+        assertThat(result.isCompletedExceptionally()).isTrue();
+        assertThatThrownBy(result::get)
+                .rootCause()
+                .isInstanceOf(NotLeaderOrFollowerException.class)
+                .hasMessage("mock not leader or follower exception.");
+        // When NotLeaderOrFollowerException is received, the bucketLocation 
will be removed from
+        // metadata updater to trigger get the latest bucketLocation in next 
lookup round.
+        assertThat(metadataUpdater.getBucketLocation(tb1)).isNotPresent();
+    }
+
+    @Test
+    void testSendPrefixLookupRequestWithNotLeaderOrFollowerException() {
+        assertThat(metadataUpdater.getBucketLocation(tb1))
+                .hasValue(
+                        new BucketLocation(
+                                PhysicalTablePath.of(DATA1_TABLE_PATH_PK),
+                                tb1,
+                                1,
+                                new int[] {1, 2, 3}));
+
+        // send PrefixLookupRequest to serverId 1, which will respond with
+        // NotLeaderOrFollowerException as responseLogicId=1 do.
+        metadataUpdater.setResponseLogicId(1, 1);
+        PrefixLookupQuery prefixLookupQuery = new PrefixLookupQuery(tb1, new 
byte[0]);
+        CompletableFuture<List<byte[]>> future = prefixLookupQuery.future();
+        assertThat(future).isNotDone();
+        lookupSender.sendLookups(
+                1, LookupType.PREFIX_LOOKUP, 
Collections.singletonList(prefixLookupQuery));
+
+        assertThat(future.isCompletedExceptionally()).isTrue();
+        assertThatThrownBy(future::get)
+                .rootCause()
+                .isInstanceOf(NotLeaderOrFollowerException.class)
+                .hasMessage("mock not leader or follower exception.");
+        // When NotLeaderOrFollowerException is received, the bucketLocation 
will be removed from
+        // metadata updater to trigger get the latest bucketLocation in next 
lookup round.
+        assertThat(metadataUpdater.getBucketLocation(tb1)).isNotPresent();
+    }
+
+    private TestingMetadataUpdater initializeMetadataUpdater() {
+        return new TestingMetadataUpdater(
+                Collections.singletonMap(DATA1_TABLE_PATH_PK, 
DATA1_TABLE_INFO_PK));
+    }
+}
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
index 4420a2ebb..38994cbca 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
@@ -78,6 +78,10 @@ public class TestingMetadataUpdater extends MetadataUpdater {
         this.cluster = cluster;
     }
 
+    public void setResponseLogicId(int serverId, int responseLogicId) {
+        
tabletServerGatewayMap.get(serverId).setResponseLogicId(responseLogicId);
+    }
+
     @Override
     public void checkAndUpdateTableMetadata(Set<TablePath> tablePaths) {
         Set<TablePath> needUpdateTablePaths =
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
index 337aea70b..637e909f6 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
@@ -17,9 +17,12 @@
 
 package org.apache.fluss.server.tablet;
 
+import org.apache.fluss.exception.NotLeaderOrFollowerException;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.record.MemoryLogRecords;
 import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
+import org.apache.fluss.rpc.entity.LookupResultForBucket;
+import org.apache.fluss.rpc.entity.PrefixLookupResultForBucket;
 import org.apache.fluss.rpc.gateway.TabletServerGateway;
 import org.apache.fluss.rpc.messages.ApiMessage;
 import org.apache.fluss.rpc.messages.ApiVersionsRequest;
@@ -87,6 +90,7 @@ import org.apache.fluss.rpc.messages.TableExistsRequest;
 import org.apache.fluss.rpc.messages.TableExistsResponse;
 import org.apache.fluss.rpc.messages.UpdateMetadataRequest;
 import org.apache.fluss.rpc.messages.UpdateMetadataResponse;
+import org.apache.fluss.rpc.protocol.ApiError;
 import org.apache.fluss.server.entity.FetchReqInfo;
 import org.apache.fluss.utils.types.Tuple2;
 
@@ -104,6 +108,10 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.getFetchLogData;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeFetchLogResponse;
+import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeLookupResponse;
+import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.makePrefixLookupResponse;
+import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toLookupData;
+import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.toPrefixLookupData;
 
 /** A {@link TabletServerGateway} for test purpose. */
 public class TestTabletServerGateway implements TabletServerGateway {
@@ -111,6 +119,9 @@ public class TestTabletServerGateway implements 
TabletServerGateway {
     private final boolean alwaysFail;
     private final AtomicLong writerId = new AtomicLong(0);
 
+    /** The id to define the response logic. */
+    private int responseLogicId;
+
     // Use concurrent queue for storing request and related completable future 
response so that
     // requests may be queried from a different thread.
     private final Queue<Tuple2<ApiMessage, CompletableFuture<?>>> requests =
@@ -118,6 +129,7 @@ public class TestTabletServerGateway implements 
TabletServerGateway {
 
     public TestTabletServerGateway(boolean alwaysFail) {
         this.alwaysFail = alwaysFail;
+        this.responseLogicId = 0;
     }
 
     @Override
@@ -182,12 +194,48 @@ public class TestTabletServerGateway implements 
TabletServerGateway {
 
     @Override
     public CompletableFuture<LookupResponse> lookup(LookupRequest request) {
-        return null;
+        Map<TableBucket, List<byte[]>> lookupData = toLookupData(request);
+        Map<TableBucket, LookupResultForBucket> errorResponseMap = new 
HashMap<>();
+        if (responseLogicId == 1) {
+            // return with NotLeaderOrFollowerException.
+            lookupData
+                    .keySet()
+                    .forEach(
+                            tb ->
+                                    errorResponseMap.put(
+                                            tb,
+                                            new LookupResultForBucket(
+                                                    tb,
+                                                    ApiError.fromThrowable(
+                                                            new 
NotLeaderOrFollowerException(
+                                                                    "mock not 
leader or follower exception.")))));
+            return 
CompletableFuture.completedFuture(makeLookupResponse(errorResponseMap));
+        } else {
+            return null;
+        }
     }
 
     @Override
     public CompletableFuture<PrefixLookupResponse> 
prefixLookup(PrefixLookupRequest request) {
-        throw new UnsupportedOperationException();
+        Map<TableBucket, List<byte[]>> prefixLookupData = 
toPrefixLookupData(request);
+        Map<TableBucket, PrefixLookupResultForBucket> errorResponseMap = new 
HashMap<>();
+        if (responseLogicId == 1) {
+            // return with NotLeaderOrFollowerException.
+            prefixLookupData
+                    .keySet()
+                    .forEach(
+                            tb ->
+                                    errorResponseMap.put(
+                                            tb,
+                                            new PrefixLookupResultForBucket(
+                                                    tb,
+                                                    ApiError.fromThrowable(
+                                                            new 
NotLeaderOrFollowerException(
+                                                                    "mock not 
leader or follower exception.")))));
+            return 
CompletableFuture.completedFuture(makePrefixLookupResponse(errorResponseMap));
+        } else {
+            throw new UnsupportedOperationException();
+        }
     }
 
     @Override
@@ -387,6 +435,10 @@ public class TestTabletServerGateway implements 
TabletServerGateway {
         }
     }
 
+    public void setResponseLogicId(int responseLogicId) {
+        this.responseLogicId = responseLogicId;
+    }
+
     private StopReplicaResponse mockStopReplicaResponse(
             StopReplicaRequest stopReplicaRequest,
             @Nullable Integer errCode,

Reply via email to