This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 9c412c945 [client] LookupSender get table info from LookupQuery (#2093)
9c412c945 is described below

commit 9c412c94583167a7fa41f39011a91ca7088da67a
Author: yunhong <[email protected]>
AuthorDate: Thu Dec 4 11:36:20 2025 +0800

    [client] LookupSender get table info from LookupQuery (#2093)
---
 .../org/apache/fluss/client/admin/FlussAdmin.java  | 14 ++++++++--
 .../fluss/client/lookup/AbstractLookupQuery.java   |  9 +++++-
 .../apache/fluss/client/lookup/LookupClient.java   | 17 +++++++-----
 .../apache/fluss/client/lookup/LookupQuery.java    |  5 ++--
 .../apache/fluss/client/lookup/LookupSender.java   |  3 +-
 .../fluss/client/lookup/PrefixKeyLookuper.java     |  2 +-
 .../fluss/client/lookup/PrefixLookupQuery.java     |  5 ++--
 .../fluss/client/lookup/PrimaryKeyLookuper.java    |  2 +-
 .../fluss/client/metadata/MetadataUpdater.java     |  3 +-
 .../table/scanner/batch/LimitBatchScanner.java     |  2 +-
 .../fluss/client/lookup/LookupQueueTest.java       |  4 ++-
 .../fluss/client/lookup/LookupSenderTest.java      | 32 ++++++++++++----------
 .../client/table/scanner/log/LogFetcherTest.java   |  2 +-
 .../org/apache/fluss/client/write/SenderTest.java  | 14 ++++++----
 14 files changed, 71 insertions(+), 43 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java 
b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
index 69070e01c..27124c70f 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
@@ -419,7 +419,12 @@ public class FlussAdmin implements Admin {
         }
         Map<Integer, ListOffsetsRequest> requestMap =
                 prepareListOffsetsRequests(
-                        metadataUpdater, tableInfo.getTableId(), partitionId, 
buckets, offsetSpec);
+                        metadataUpdater,
+                        tableInfo.getTableId(),
+                        partitionId,
+                        buckets,
+                        offsetSpec,
+                        tableInfo.getTablePath());
         Map<Integer, CompletableFuture<Long>> bucketToOffsetMap = 
MapUtils.newConcurrentHashMap();
         for (int bucket : buckets) {
             bucketToOffsetMap.put(bucket, new CompletableFuture<>());
@@ -536,10 +541,13 @@ public class FlussAdmin implements Admin {
             long tableId,
             @Nullable Long partitionId,
             Collection<Integer> buckets,
-            OffsetSpec offsetSpec) {
+            OffsetSpec offsetSpec,
+            TablePath tablePath) {
         Map<Integer, List<Integer>> nodeForBucketList = new HashMap<>();
         for (Integer bucketId : buckets) {
-            int leader = metadataUpdater.leaderFor(new TableBucket(tableId, 
partitionId, bucketId));
+            int leader =
+                    metadataUpdater.leaderFor(
+                            tablePath, new TableBucket(tableId, partitionId, 
bucketId));
             nodeForBucketList.computeIfAbsent(leader, k -> new 
ArrayList<>()).add(bucketId);
         }
 
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookupQuery.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookupQuery.java
index efaf0100a..d82e30e8a 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookupQuery.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookupQuery.java
@@ -19,6 +19,7 @@ package org.apache.fluss.client.lookup;
 
 import org.apache.fluss.annotation.Internal;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
 
 import java.util.concurrent.CompletableFuture;
 
@@ -26,11 +27,13 @@ import java.util.concurrent.CompletableFuture;
 @Internal
 public abstract class AbstractLookupQuery<T> {
 
+    private final TablePath tablePath;
     private final TableBucket tableBucket;
     private final byte[] key;
     private int retries;
 
-    public AbstractLookupQuery(TableBucket tableBucket, byte[] key) {
+    public AbstractLookupQuery(TablePath tablePath, TableBucket tableBucket, 
byte[] key) {
+        this.tablePath = tablePath;
         this.tableBucket = tableBucket;
         this.key = key;
         this.retries = 0;
@@ -40,6 +43,10 @@ public abstract class AbstractLookupQuery<T> {
         return key;
     }
 
+    public TablePath tablePath() {
+        return tablePath;
+    }
+
     public TableBucket tableBucket() {
         return tableBucket;
     }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java
index a974ba972..bcea2302c 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java
@@ -22,6 +22,7 @@ import org.apache.fluss.client.metadata.MetadataUpdater;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
 
 import org.slf4j.Logger;
@@ -43,9 +44,9 @@ import java.util.concurrent.TimeUnit;
  * that is responsible for turning these lookup operations into network 
requests and transmitting
  * them to the cluster.
  *
- * <p>The {@link #lookup(TableBucket, byte[])} method is asynchronous, when 
called, it adds the
- * lookup operation to a queue of pending lookup operations and immediately 
returns. This allows the
- * lookup operations to batch together individual lookup operations for 
efficiency.
+ * <p>The {@link #lookup(TablePath, TableBucket, byte[])} method is 
asynchronous, when called, it
+ * adds the lookup operation to a queue of pending lookup operations and 
immediately returns. This
+ * allows the lookup operations to batch together individual lookup operations 
for efficiency.
  */
 @ThreadSafe
 @Internal
@@ -78,14 +79,16 @@ public class LookupClient {
         return Executors.newFixedThreadPool(1, new 
ExecutorThreadFactory(LOOKUP_THREAD_PREFIX));
     }
 
-    public CompletableFuture<byte[]> lookup(TableBucket tableBucket, byte[] 
keyBytes) {
-        LookupQuery lookup = new LookupQuery(tableBucket, keyBytes);
+    public CompletableFuture<byte[]> lookup(
+            TablePath tablePath, TableBucket tableBucket, byte[] keyBytes) {
+        LookupQuery lookup = new LookupQuery(tablePath, tableBucket, keyBytes);
         lookupQueue.appendLookup(lookup);
         return lookup.future();
     }
 
-    public CompletableFuture<List<byte[]>> prefixLookup(TableBucket 
tableBucket, byte[] keyBytes) {
-        PrefixLookupQuery prefixLookup = new PrefixLookupQuery(tableBucket, 
keyBytes);
+    public CompletableFuture<List<byte[]>> prefixLookup(
+            TablePath tablePath, TableBucket tableBucket, byte[] keyBytes) {
+        PrefixLookupQuery prefixLookup = new PrefixLookupQuery(tablePath, 
tableBucket, keyBytes);
         lookupQueue.appendLookup(prefixLookup);
         return prefixLookup.future();
     }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupQuery.java 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupQuery.java
index 0b21214d4..cc6e70e34 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupQuery.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupQuery.java
@@ -19,6 +19,7 @@ package org.apache.fluss.client.lookup;
 
 import org.apache.fluss.annotation.Internal;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
 
 import java.util.concurrent.CompletableFuture;
 
@@ -31,8 +32,8 @@ public class LookupQuery extends AbstractLookupQuery<byte[]> {
 
     private final CompletableFuture<byte[]> future;
 
-    LookupQuery(TableBucket tableBucket, byte[] key) {
-        super(tableBucket, key);
+    LookupQuery(TablePath tablePath, TableBucket tableBucket, byte[] key) {
+        super(tablePath, tableBucket, key);
         this.future = new CompletableFuture<>();
     }
 
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
index a098b8532..087060ef5 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
@@ -157,9 +157,10 @@ class LookupSender implements Runnable {
             // lookup the leader node
             TableBucket tb = lookup.tableBucket();
             try {
-                leader = metadataUpdater.leaderFor(tb);
+                leader = metadataUpdater.leaderFor(lookup.tablePath(), tb);
             } catch (Exception e) {
                 // if leader is not found, re-enqueue the lookup to send again.
+                LOG.warn("Failed to lookup the leader for {} when lookup", tb, 
e);
                 reEnqueueLookup(lookup);
                 continue;
             }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java
index cf658ce80..a2d1029d2 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java
@@ -151,7 +151,7 @@ class PrefixKeyLookuper extends AbstractLookuper {
         CompletableFuture<LookupResult> lookupFuture = new 
CompletableFuture<>();
         TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), 
partitionId, bucketId);
         lookupClient
-                .prefixLookup(tableBucket, bucketKeyBytes)
+                .prefixLookup(tableInfo.getTablePath(), tableBucket, 
bucketKeyBytes)
                 .whenComplete(
                         (result, error) -> {
                             if (error != null) {
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixLookupQuery.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixLookupQuery.java
index 732dde6bf..5246aedca 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixLookupQuery.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixLookupQuery.java
@@ -19,6 +19,7 @@ package org.apache.fluss.client.lookup;
 
 import org.apache.fluss.annotation.Internal;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -31,8 +32,8 @@ import java.util.concurrent.CompletableFuture;
 public class PrefixLookupQuery extends AbstractLookupQuery<List<byte[]>> {
     private final CompletableFuture<List<byte[]>> future;
 
-    PrefixLookupQuery(TableBucket tableBucket, byte[] prefixKey) {
-        super(tableBucket, prefixKey);
+    PrefixLookupQuery(TablePath tablePath, TableBucket tableBucket, byte[] 
prefixKey) {
+        super(tablePath, tableBucket, prefixKey);
         this.future = new CompletableFuture<>();
     }
 
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
index 755e454df..6c3264763 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
@@ -114,7 +114,7 @@ class PrimaryKeyLookuper extends AbstractLookuper {
         TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), 
partitionId, bucketId);
         CompletableFuture<LookupResult> lookupFuture = new 
CompletableFuture<>();
         lookupClient
-                .lookup(tableBucket, pkBytes)
+                .lookup(tableInfo.getTablePath(), tableBucket, pkBytes)
                 .whenComplete(
                         (result, error) -> {
                             if (error != null) {
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
index 0c61eb5fd..5f838ada8 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
@@ -98,11 +98,10 @@ public class MetadataUpdater {
         return cluster.getBucketLocation(tableBucket);
     }
 
-    public int leaderFor(TableBucket tableBucket) {
+    public int leaderFor(TablePath tablePath, TableBucket tableBucket) {
         Integer serverNode = cluster.leaderFor(tableBucket);
         if (serverNode == null) {
             for (int i = 0; i < MAX_RETRY_TIMES; i++) {
-                TablePath tablePath = 
cluster.getTablePathOrElseThrow(tableBucket.getTableId());
                 // check if bucket is for a partition
                 if (tableBucket.getPartitionId() != null) {
                     updateMetadata(
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java
index 06c79fce8..f4a3be53f 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java
@@ -105,7 +105,7 @@ public class LimitBatchScanner implements BatchScanner {
         }
 
         // because that rocksdb is not suitable to projection, thus do it in 
client.
-        int leader = metadataUpdater.leaderFor(tableBucket);
+        int leader = metadataUpdater.leaderFor(tableInfo.getTablePath(), 
tableBucket);
         TabletServerGateway gateway = 
metadataUpdater.newTabletServerClientForNode(leader);
         if (gateway == null) {
             // TODO handle this exception, like retry.
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupQueueTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupQueueTest.java
index 016aced4b..ebe7b99b8 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupQueueTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupQueueTest.java
@@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test;
 
 import static 
org.apache.fluss.config.ConfigOptions.CLIENT_LOOKUP_BATCH_TIMEOUT;
 import static 
org.apache.fluss.config.ConfigOptions.CLIENT_LOOKUP_MAX_BATCH_SIZE;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link LookupQueue}. */
@@ -60,7 +61,8 @@ class LookupQueueTest {
 
     private static void appendLookups(LookupQueue queue, int count) {
         for (int i = 0; i < count; i++) {
-            queue.appendLookup(new LookupQuery(new TableBucket(1, 1), new 
byte[] {0}));
+            queue.appendLookup(
+                    new LookupQuery(DATA1_TABLE_PATH_PK, new TableBucket(1, 
1), new byte[] {0}));
         }
     }
 }
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
index f56535b01..557a049a4 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
@@ -112,7 +112,7 @@ public class LookupSenderTest {
     }
 
     @Test
-    void testSendLookupRequestWithNotLeaderOrFollowerException() throws 
Exception {
+    void testSendLookupRequestWithNotLeaderOrFollowerException() {
         assertThat(metadataUpdater.getBucketLocation(tb1))
                 .hasValue(
                         new BucketLocation(
@@ -131,7 +131,7 @@ public class LookupSenderTest {
                                         "mock not leader or follower 
exception.")));
 
         // send LookupRequest through the queue so that retry mechanism can 
work
-        LookupQuery lookupQuery = new LookupQuery(tb1, new byte[0]);
+        LookupQuery lookupQuery = new LookupQuery(DATA1_TABLE_PATH_PK, tb1, 
new byte[0]);
         CompletableFuture<byte[]> result = lookupQuery.future();
         assertThat(result).isNotDone();
         lookupQueue.appendLookup(lookupQuery);
@@ -150,7 +150,7 @@ public class LookupSenderTest {
     }
 
     @Test
-    void testSendPrefixLookupRequestWithNotLeaderOrFollowerException() throws 
Exception {
+    void testSendPrefixLookupRequestWithNotLeaderOrFollowerException() {
         assertThat(metadataUpdater.getBucketLocation(tb1))
                 .hasValue(
                         new BucketLocation(
@@ -169,7 +169,8 @@ public class LookupSenderTest {
                                         "mock not leader or follower 
exception.")));
 
         // send PrefixLookupRequest through the queue so that retry mechanism 
can work
-        PrefixLookupQuery prefixLookupQuery = new PrefixLookupQuery(tb1, new 
byte[0]);
+        PrefixLookupQuery prefixLookupQuery =
+                new PrefixLookupQuery(DATA1_TABLE_PATH_PK, tb1, new byte[0]);
         CompletableFuture<List<byte[]>> future = prefixLookupQuery.future();
         assertThat(future).isNotDone();
         lookupQueue.appendLookup(prefixLookupQuery);
@@ -206,7 +207,7 @@ public class LookupSenderTest {
 
         // execute: submit lookup
         byte[] key = "key".getBytes();
-        LookupQuery query = new LookupQuery(TABLE_BUCKET, key);
+        LookupQuery query = new LookupQuery(DATA1_TABLE_PATH_PK, TABLE_BUCKET, 
key);
         lookupQueue.appendLookup(query);
 
         // verify: eventually succeeds after retries
@@ -217,7 +218,7 @@ public class LookupSenderTest {
     }
 
     @Test
-    void testNonRetriableExceptionDoesNotRetry() throws Exception {
+    void testNonRetriableExceptionDoesNotRetry() {
         // setup: fail with non-retriable exception
         gateway.setLookupHandler(
                 request ->
@@ -226,7 +227,7 @@ public class LookupSenderTest {
 
         // execute: submit lookup
         byte[] key = "key".getBytes();
-        LookupQuery query = new LookupQuery(TABLE_BUCKET, key);
+        LookupQuery query = new LookupQuery(DATA1_TABLE_PATH_PK, TABLE_BUCKET, 
key);
         lookupQueue.appendLookup(query);
 
         // verify: fails immediately without retry
@@ -237,7 +238,7 @@ public class LookupSenderTest {
     }
 
     @Test
-    void testMaxRetriesEnforced() throws Exception {
+    void testMaxRetriesEnforced() {
         // setup: always fail with retriable exception
         AtomicInteger attemptCount = new AtomicInteger(0);
         gateway.setLookupHandler(
@@ -248,7 +249,7 @@ public class LookupSenderTest {
 
         // execute: submit lookup
         byte[] key = "key".getBytes();
-        LookupQuery query = new LookupQuery(TABLE_BUCKET, key);
+        LookupQuery query = new LookupQuery(DATA1_TABLE_PATH_PK, TABLE_BUCKET, 
key);
         lookupQueue.appendLookup(query);
 
         // verify: eventually fails after max retries
@@ -286,7 +287,7 @@ public class LookupSenderTest {
 
         // execute: submit lookup
         byte[] key = "key".getBytes();
-        LookupQuery query = new LookupQuery(TABLE_BUCKET, key);
+        LookupQuery query = new LookupQuery(DATA1_TABLE_PATH_PK, TABLE_BUCKET, 
key);
         lookupQueue.appendLookup(query);
 
         // complete the future externally before retry happens
@@ -328,7 +329,8 @@ public class LookupSenderTest {
 
         // execute: submit prefix lookup
         byte[] prefixKey = "prefix".getBytes();
-        PrefixLookupQuery query = new PrefixLookupQuery(TABLE_BUCKET, 
prefixKey);
+        PrefixLookupQuery query =
+                new PrefixLookupQuery(DATA1_TABLE_PATH_PK, TABLE_BUCKET, 
prefixKey);
         lookupQueue.appendLookup(query);
 
         // verify: eventually succeeds after retries
@@ -354,9 +356,9 @@ public class LookupSenderTest {
                 });
 
         // execute: submit multiple lookups
-        LookupQuery query1 = new LookupQuery(TABLE_BUCKET, "key1".getBytes());
-        LookupQuery query2 = new LookupQuery(TABLE_BUCKET, "key2".getBytes());
-        LookupQuery query3 = new LookupQuery(TABLE_BUCKET, "key3".getBytes());
+        LookupQuery query1 = new LookupQuery(DATA1_TABLE_PATH_PK, 
TABLE_BUCKET, "key1".getBytes());
+        LookupQuery query2 = new LookupQuery(DATA1_TABLE_PATH_PK, 
TABLE_BUCKET, "key2".getBytes());
+        LookupQuery query3 = new LookupQuery(DATA1_TABLE_PATH_PK, 
TABLE_BUCKET, "key3".getBytes());
 
         lookupQueue.appendLookup(query1);
         lookupQueue.appendLookup(query2);
@@ -386,7 +388,7 @@ public class LookupSenderTest {
 
         // execute
         byte[] key = ("key-" + 
exception.getClass().getSimpleName()).getBytes();
-        LookupQuery query = new LookupQuery(TABLE_BUCKET, key);
+        LookupQuery query = new LookupQuery(DATA1_TABLE_PATH_PK, TABLE_BUCKET, 
key);
         lookupQueue.appendLookup(query);
 
         // verify
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
index e5a8c8ff0..0f65685c8 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
@@ -246,7 +246,7 @@ public class LogFetcherTest extends 
ClientToServerITCaseBase {
         MetadataUpdater metadataUpdater = new MetadataUpdater(clientConf, 
rpcClient);
         
metadataUpdater.checkAndUpdateTableMetadata(Collections.singleton(DATA1_TABLE_PATH));
 
-        int leaderNode = metadataUpdater.leaderFor(tb0);
+        int leaderNode = metadataUpdater.leaderFor(DATA1_TABLE_PATH, tb0);
 
         // now, remove leader nodd ,so that fetch destination
         // server node is null
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java 
b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
index 5618d224d..2d06057e8 100644
--- a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
+++ b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
@@ -658,7 +658,7 @@ final class SenderTest {
         CompletableFuture<Exception> future = new CompletableFuture<>();
         appendToAccumulator(tb1, row(1, "a"), future::complete);
 
-        int leaderNode = metadataUpdater.leaderFor(tb1);
+        int leaderNode = metadataUpdater.leaderFor(DATA1_TABLE_PATH, tb1);
         // now, remove leader node ,so that send destination
         // server node is null
         Cluster oldCluster = metadataUpdater.getCluster();
@@ -717,21 +717,24 @@ final class SenderTest {
     private ApiMessage getRequest(TableBucket tb, int index) {
         TestTabletServerGateway gateway =
                 (TestTabletServerGateway)
-                        
metadataUpdater.newTabletServerClientForNode(metadataUpdater.leaderFor(tb));
+                        metadataUpdater.newTabletServerClientForNode(
+                                metadataUpdater.leaderFor(DATA1_TABLE_PATH, 
tb));
         return gateway.getRequest(index);
     }
 
     private void finishProduceLogRequest(TableBucket tb, int index, 
ProduceLogResponse response) {
         TestTabletServerGateway gateway =
                 (TestTabletServerGateway)
-                        
metadataUpdater.newTabletServerClientForNode(metadataUpdater.leaderFor(tb));
+                        metadataUpdater.newTabletServerClientForNode(
+                                metadataUpdater.leaderFor(DATA1_TABLE_PATH, 
tb));
         gateway.response(index, response);
     }
 
     private int pendingRequestSize(TableBucket tb) {
         TestTabletServerGateway gateway =
                 (TestTabletServerGateway)
-                        
metadataUpdater.newTabletServerClientForNode(metadataUpdater.leaderFor(tb));
+                        metadataUpdater.newTabletServerClientForNode(
+                                metadataUpdater.leaderFor(DATA1_TABLE_PATH, 
tb));
         return gateway.pendingRequestSize();
     }
 
@@ -739,7 +742,8 @@ final class SenderTest {
             int batchSequence, TableBucket tb, int index, ProduceLogResponse 
response) {
         TestTabletServerGateway gateway =
                 (TestTabletServerGateway)
-                        
metadataUpdater.newTabletServerClientForNode(metadataUpdater.leaderFor(tb));
+                        metadataUpdater.newTabletServerClientForNode(
+                                metadataUpdater.leaderFor(DATA1_TABLE_PATH, 
tb));
         ApiMessage request = getRequest(tb1, index);
         assertThat(request).isInstanceOf(ProduceLogRequest.class);
         assertThat(hasIdempotentRecords(tb1, (ProduceLogRequest) 
request)).isTrue();

Reply via email to