This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 9c412c945 [client] LookupSender get table info from LookupQuery (#2093)
9c412c945 is described below
commit 9c412c94583167a7fa41f39011a91ca7088da67a
Author: yunhong <[email protected]>
AuthorDate: Thu Dec 4 11:36:20 2025 +0800
[client] LookupSender get table info from LookupQuery (#2093)
---
.../org/apache/fluss/client/admin/FlussAdmin.java | 14 ++++++++--
.../fluss/client/lookup/AbstractLookupQuery.java | 9 +++++-
.../apache/fluss/client/lookup/LookupClient.java | 17 +++++++-----
.../apache/fluss/client/lookup/LookupQuery.java | 5 ++--
.../apache/fluss/client/lookup/LookupSender.java | 3 +-
.../fluss/client/lookup/PrefixKeyLookuper.java | 2 +-
.../fluss/client/lookup/PrefixLookupQuery.java | 5 ++--
.../fluss/client/lookup/PrimaryKeyLookuper.java | 2 +-
.../fluss/client/metadata/MetadataUpdater.java | 3 +-
.../table/scanner/batch/LimitBatchScanner.java | 2 +-
.../fluss/client/lookup/LookupQueueTest.java | 4 ++-
.../fluss/client/lookup/LookupSenderTest.java | 32 ++++++++++++----------
.../client/table/scanner/log/LogFetcherTest.java | 2 +-
.../org/apache/fluss/client/write/SenderTest.java | 14 ++++++----
14 files changed, 71 insertions(+), 43 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
index 69070e01c..27124c70f 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
@@ -419,7 +419,12 @@ public class FlussAdmin implements Admin {
}
Map<Integer, ListOffsetsRequest> requestMap =
prepareListOffsetsRequests(
- metadataUpdater, tableInfo.getTableId(), partitionId,
buckets, offsetSpec);
+ metadataUpdater,
+ tableInfo.getTableId(),
+ partitionId,
+ buckets,
+ offsetSpec,
+ tableInfo.getTablePath());
Map<Integer, CompletableFuture<Long>> bucketToOffsetMap =
MapUtils.newConcurrentHashMap();
for (int bucket : buckets) {
bucketToOffsetMap.put(bucket, new CompletableFuture<>());
@@ -536,10 +541,13 @@ public class FlussAdmin implements Admin {
long tableId,
@Nullable Long partitionId,
Collection<Integer> buckets,
- OffsetSpec offsetSpec) {
+ OffsetSpec offsetSpec,
+ TablePath tablePath) {
Map<Integer, List<Integer>> nodeForBucketList = new HashMap<>();
for (Integer bucketId : buckets) {
- int leader = metadataUpdater.leaderFor(new TableBucket(tableId,
partitionId, bucketId));
+ int leader =
+ metadataUpdater.leaderFor(
+ tablePath, new TableBucket(tableId, partitionId,
bucketId));
nodeForBucketList.computeIfAbsent(leader, k -> new
ArrayList<>()).add(bucketId);
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookupQuery.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookupQuery.java
index efaf0100a..d82e30e8a 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookupQuery.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookupQuery.java
@@ -19,6 +19,7 @@ package org.apache.fluss.client.lookup;
import org.apache.fluss.annotation.Internal;
import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
import java.util.concurrent.CompletableFuture;
@@ -26,11 +27,13 @@ import java.util.concurrent.CompletableFuture;
@Internal
public abstract class AbstractLookupQuery<T> {
+ private final TablePath tablePath;
private final TableBucket tableBucket;
private final byte[] key;
private int retries;
- public AbstractLookupQuery(TableBucket tableBucket, byte[] key) {
+ public AbstractLookupQuery(TablePath tablePath, TableBucket tableBucket,
byte[] key) {
+ this.tablePath = tablePath;
this.tableBucket = tableBucket;
this.key = key;
this.retries = 0;
@@ -40,6 +43,10 @@ public abstract class AbstractLookupQuery<T> {
return key;
}
+ public TablePath tablePath() {
+ return tablePath;
+ }
+
public TableBucket tableBucket() {
return tableBucket;
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java
index a974ba972..bcea2302c 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java
@@ -22,6 +22,7 @@ import org.apache.fluss.client.metadata.MetadataUpdater;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
@@ -43,9 +44,9 @@ import java.util.concurrent.TimeUnit;
* that is responsible for turning these lookup operations into network
requests and transmitting
* them to the cluster.
*
- * <p>The {@link #lookup(TableBucket, byte[])} method is asynchronous, when
called, it adds the
- * lookup operation to a queue of pending lookup operations and immediately
returns. This allows the
- * lookup operations to batch together individual lookup operations for
efficiency.
+ * <p>The {@link #lookup(TablePath, TableBucket, byte[])} method is
asynchronous, when called, it
+ * adds the lookup operation to a queue of pending lookup operations and
immediately returns. This
+ * allows the lookup operations to batch together individual lookup operations
for efficiency.
*/
@ThreadSafe
@Internal
@@ -78,14 +79,16 @@ public class LookupClient {
return Executors.newFixedThreadPool(1, new
ExecutorThreadFactory(LOOKUP_THREAD_PREFIX));
}
- public CompletableFuture<byte[]> lookup(TableBucket tableBucket, byte[]
keyBytes) {
- LookupQuery lookup = new LookupQuery(tableBucket, keyBytes);
+ public CompletableFuture<byte[]> lookup(
+ TablePath tablePath, TableBucket tableBucket, byte[] keyBytes) {
+ LookupQuery lookup = new LookupQuery(tablePath, tableBucket, keyBytes);
lookupQueue.appendLookup(lookup);
return lookup.future();
}
- public CompletableFuture<List<byte[]>> prefixLookup(TableBucket
tableBucket, byte[] keyBytes) {
- PrefixLookupQuery prefixLookup = new PrefixLookupQuery(tableBucket,
keyBytes);
+ public CompletableFuture<List<byte[]>> prefixLookup(
+ TablePath tablePath, TableBucket tableBucket, byte[] keyBytes) {
+ PrefixLookupQuery prefixLookup = new PrefixLookupQuery(tablePath,
tableBucket, keyBytes);
lookupQueue.appendLookup(prefixLookup);
return prefixLookup.future();
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupQuery.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupQuery.java
index 0b21214d4..cc6e70e34 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupQuery.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupQuery.java
@@ -19,6 +19,7 @@ package org.apache.fluss.client.lookup;
import org.apache.fluss.annotation.Internal;
import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
import java.util.concurrent.CompletableFuture;
@@ -31,8 +32,8 @@ public class LookupQuery extends AbstractLookupQuery<byte[]> {
private final CompletableFuture<byte[]> future;
- LookupQuery(TableBucket tableBucket, byte[] key) {
- super(tableBucket, key);
+ LookupQuery(TablePath tablePath, TableBucket tableBucket, byte[] key) {
+ super(tablePath, tableBucket, key);
this.future = new CompletableFuture<>();
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
index a098b8532..087060ef5 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java
@@ -157,9 +157,10 @@ class LookupSender implements Runnable {
// lookup the leader node
TableBucket tb = lookup.tableBucket();
try {
- leader = metadataUpdater.leaderFor(tb);
+ leader = metadataUpdater.leaderFor(lookup.tablePath(), tb);
} catch (Exception e) {
// if leader is not found, re-enqueue the lookup to send again.
+ LOG.warn("Failed to lookup the leader for {} when lookup", tb,
e);
reEnqueueLookup(lookup);
continue;
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java
index cf658ce80..a2d1029d2 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java
@@ -151,7 +151,7 @@ class PrefixKeyLookuper extends AbstractLookuper {
CompletableFuture<LookupResult> lookupFuture = new
CompletableFuture<>();
TableBucket tableBucket = new TableBucket(tableInfo.getTableId(),
partitionId, bucketId);
lookupClient
- .prefixLookup(tableBucket, bucketKeyBytes)
+ .prefixLookup(tableInfo.getTablePath(), tableBucket,
bucketKeyBytes)
.whenComplete(
(result, error) -> {
if (error != null) {
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixLookupQuery.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixLookupQuery.java
index 732dde6bf..5246aedca 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixLookupQuery.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixLookupQuery.java
@@ -19,6 +19,7 @@ package org.apache.fluss.client.lookup;
import org.apache.fluss.annotation.Internal;
import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -31,8 +32,8 @@ import java.util.concurrent.CompletableFuture;
public class PrefixLookupQuery extends AbstractLookupQuery<List<byte[]>> {
private final CompletableFuture<List<byte[]>> future;
- PrefixLookupQuery(TableBucket tableBucket, byte[] prefixKey) {
- super(tableBucket, prefixKey);
+ PrefixLookupQuery(TablePath tablePath, TableBucket tableBucket, byte[]
prefixKey) {
+ super(tablePath, tableBucket, prefixKey);
this.future = new CompletableFuture<>();
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
index 755e454df..6c3264763 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
@@ -114,7 +114,7 @@ class PrimaryKeyLookuper extends AbstractLookuper {
TableBucket tableBucket = new TableBucket(tableInfo.getTableId(),
partitionId, bucketId);
CompletableFuture<LookupResult> lookupFuture = new
CompletableFuture<>();
lookupClient
- .lookup(tableBucket, pkBytes)
+ .lookup(tableInfo.getTablePath(), tableBucket, pkBytes)
.whenComplete(
(result, error) -> {
if (error != null) {
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
index 0c61eb5fd..5f838ada8 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
@@ -98,11 +98,10 @@ public class MetadataUpdater {
return cluster.getBucketLocation(tableBucket);
}
- public int leaderFor(TableBucket tableBucket) {
+ public int leaderFor(TablePath tablePath, TableBucket tableBucket) {
Integer serverNode = cluster.leaderFor(tableBucket);
if (serverNode == null) {
for (int i = 0; i < MAX_RETRY_TIMES; i++) {
- TablePath tablePath =
cluster.getTablePathOrElseThrow(tableBucket.getTableId());
// check if bucket is for a partition
if (tableBucket.getPartitionId() != null) {
updateMetadata(
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java
index 06c79fce8..f4a3be53f 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java
@@ -105,7 +105,7 @@ public class LimitBatchScanner implements BatchScanner {
}
// because that rocksdb is not suitable to projection, thus do it in
client.
- int leader = metadataUpdater.leaderFor(tableBucket);
+ int leader = metadataUpdater.leaderFor(tableInfo.getTablePath(),
tableBucket);
TabletServerGateway gateway =
metadataUpdater.newTabletServerClientForNode(leader);
if (gateway == null) {
// TODO handle this exception, like retry.
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupQueueTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupQueueTest.java
index 016aced4b..ebe7b99b8 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupQueueTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupQueueTest.java
@@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test;
import static
org.apache.fluss.config.ConfigOptions.CLIENT_LOOKUP_BATCH_TIMEOUT;
import static
org.apache.fluss.config.ConfigOptions.CLIENT_LOOKUP_MAX_BATCH_SIZE;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link LookupQueue}. */
@@ -60,7 +61,8 @@ class LookupQueueTest {
private static void appendLookups(LookupQueue queue, int count) {
for (int i = 0; i < count; i++) {
- queue.appendLookup(new LookupQuery(new TableBucket(1, 1), new
byte[] {0}));
+ queue.appendLookup(
+ new LookupQuery(DATA1_TABLE_PATH_PK, new TableBucket(1,
1), new byte[] {0}));
}
}
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
index f56535b01..557a049a4 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java
@@ -112,7 +112,7 @@ public class LookupSenderTest {
}
@Test
- void testSendLookupRequestWithNotLeaderOrFollowerException() throws
Exception {
+ void testSendLookupRequestWithNotLeaderOrFollowerException() {
assertThat(metadataUpdater.getBucketLocation(tb1))
.hasValue(
new BucketLocation(
@@ -131,7 +131,7 @@ public class LookupSenderTest {
"mock not leader or follower
exception.")));
// send LookupRequest through the queue so that retry mechanism can
work
- LookupQuery lookupQuery = new LookupQuery(tb1, new byte[0]);
+ LookupQuery lookupQuery = new LookupQuery(DATA1_TABLE_PATH_PK, tb1,
new byte[0]);
CompletableFuture<byte[]> result = lookupQuery.future();
assertThat(result).isNotDone();
lookupQueue.appendLookup(lookupQuery);
@@ -150,7 +150,7 @@ public class LookupSenderTest {
}
@Test
- void testSendPrefixLookupRequestWithNotLeaderOrFollowerException() throws
Exception {
+ void testSendPrefixLookupRequestWithNotLeaderOrFollowerException() {
assertThat(metadataUpdater.getBucketLocation(tb1))
.hasValue(
new BucketLocation(
@@ -169,7 +169,8 @@ public class LookupSenderTest {
"mock not leader or follower
exception.")));
// send PrefixLookupRequest through the queue so that retry mechanism
can work
- PrefixLookupQuery prefixLookupQuery = new PrefixLookupQuery(tb1, new
byte[0]);
+ PrefixLookupQuery prefixLookupQuery =
+ new PrefixLookupQuery(DATA1_TABLE_PATH_PK, tb1, new byte[0]);
CompletableFuture<List<byte[]>> future = prefixLookupQuery.future();
assertThat(future).isNotDone();
lookupQueue.appendLookup(prefixLookupQuery);
@@ -206,7 +207,7 @@ public class LookupSenderTest {
// execute: submit lookup
byte[] key = "key".getBytes();
- LookupQuery query = new LookupQuery(TABLE_BUCKET, key);
+ LookupQuery query = new LookupQuery(DATA1_TABLE_PATH_PK, TABLE_BUCKET,
key);
lookupQueue.appendLookup(query);
// verify: eventually succeeds after retries
@@ -217,7 +218,7 @@ public class LookupSenderTest {
}
@Test
- void testNonRetriableExceptionDoesNotRetry() throws Exception {
+ void testNonRetriableExceptionDoesNotRetry() {
// setup: fail with non-retriable exception
gateway.setLookupHandler(
request ->
@@ -226,7 +227,7 @@ public class LookupSenderTest {
// execute: submit lookup
byte[] key = "key".getBytes();
- LookupQuery query = new LookupQuery(TABLE_BUCKET, key);
+ LookupQuery query = new LookupQuery(DATA1_TABLE_PATH_PK, TABLE_BUCKET,
key);
lookupQueue.appendLookup(query);
// verify: fails immediately without retry
@@ -237,7 +238,7 @@ public class LookupSenderTest {
}
@Test
- void testMaxRetriesEnforced() throws Exception {
+ void testMaxRetriesEnforced() {
// setup: always fail with retriable exception
AtomicInteger attemptCount = new AtomicInteger(0);
gateway.setLookupHandler(
@@ -248,7 +249,7 @@ public class LookupSenderTest {
// execute: submit lookup
byte[] key = "key".getBytes();
- LookupQuery query = new LookupQuery(TABLE_BUCKET, key);
+ LookupQuery query = new LookupQuery(DATA1_TABLE_PATH_PK, TABLE_BUCKET,
key);
lookupQueue.appendLookup(query);
// verify: eventually fails after max retries
@@ -286,7 +287,7 @@ public class LookupSenderTest {
// execute: submit lookup
byte[] key = "key".getBytes();
- LookupQuery query = new LookupQuery(TABLE_BUCKET, key);
+ LookupQuery query = new LookupQuery(DATA1_TABLE_PATH_PK, TABLE_BUCKET,
key);
lookupQueue.appendLookup(query);
// complete the future externally before retry happens
@@ -328,7 +329,8 @@ public class LookupSenderTest {
// execute: submit prefix lookup
byte[] prefixKey = "prefix".getBytes();
- PrefixLookupQuery query = new PrefixLookupQuery(TABLE_BUCKET,
prefixKey);
+ PrefixLookupQuery query =
+ new PrefixLookupQuery(DATA1_TABLE_PATH_PK, TABLE_BUCKET,
prefixKey);
lookupQueue.appendLookup(query);
// verify: eventually succeeds after retries
@@ -354,9 +356,9 @@ public class LookupSenderTest {
});
// execute: submit multiple lookups
- LookupQuery query1 = new LookupQuery(TABLE_BUCKET, "key1".getBytes());
- LookupQuery query2 = new LookupQuery(TABLE_BUCKET, "key2".getBytes());
- LookupQuery query3 = new LookupQuery(TABLE_BUCKET, "key3".getBytes());
+ LookupQuery query1 = new LookupQuery(DATA1_TABLE_PATH_PK,
TABLE_BUCKET, "key1".getBytes());
+ LookupQuery query2 = new LookupQuery(DATA1_TABLE_PATH_PK,
TABLE_BUCKET, "key2".getBytes());
+ LookupQuery query3 = new LookupQuery(DATA1_TABLE_PATH_PK,
TABLE_BUCKET, "key3".getBytes());
lookupQueue.appendLookup(query1);
lookupQueue.appendLookup(query2);
@@ -386,7 +388,7 @@ public class LookupSenderTest {
// execute
byte[] key = ("key-" +
exception.getClass().getSimpleName()).getBytes();
- LookupQuery query = new LookupQuery(TABLE_BUCKET, key);
+ LookupQuery query = new LookupQuery(DATA1_TABLE_PATH_PK, TABLE_BUCKET,
key);
lookupQueue.appendLookup(query);
// verify
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
index e5a8c8ff0..0f65685c8 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
@@ -246,7 +246,7 @@ public class LogFetcherTest extends
ClientToServerITCaseBase {
MetadataUpdater metadataUpdater = new MetadataUpdater(clientConf,
rpcClient);
metadataUpdater.checkAndUpdateTableMetadata(Collections.singleton(DATA1_TABLE_PATH));
- int leaderNode = metadataUpdater.leaderFor(tb0);
+ int leaderNode = metadataUpdater.leaderFor(DATA1_TABLE_PATH, tb0);
// now, remove leader nodd ,so that fetch destination
// server node is null
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
index 5618d224d..2d06057e8 100644
--- a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
+++ b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
@@ -658,7 +658,7 @@ final class SenderTest {
CompletableFuture<Exception> future = new CompletableFuture<>();
appendToAccumulator(tb1, row(1, "a"), future::complete);
- int leaderNode = metadataUpdater.leaderFor(tb1);
+ int leaderNode = metadataUpdater.leaderFor(DATA1_TABLE_PATH, tb1);
// now, remove leader node ,so that send destination
// server node is null
Cluster oldCluster = metadataUpdater.getCluster();
@@ -717,21 +717,24 @@ final class SenderTest {
private ApiMessage getRequest(TableBucket tb, int index) {
TestTabletServerGateway gateway =
(TestTabletServerGateway)
-
metadataUpdater.newTabletServerClientForNode(metadataUpdater.leaderFor(tb));
+ metadataUpdater.newTabletServerClientForNode(
+ metadataUpdater.leaderFor(DATA1_TABLE_PATH,
tb));
return gateway.getRequest(index);
}
private void finishProduceLogRequest(TableBucket tb, int index,
ProduceLogResponse response) {
TestTabletServerGateway gateway =
(TestTabletServerGateway)
-
metadataUpdater.newTabletServerClientForNode(metadataUpdater.leaderFor(tb));
+ metadataUpdater.newTabletServerClientForNode(
+ metadataUpdater.leaderFor(DATA1_TABLE_PATH,
tb));
gateway.response(index, response);
}
private int pendingRequestSize(TableBucket tb) {
TestTabletServerGateway gateway =
(TestTabletServerGateway)
-
metadataUpdater.newTabletServerClientForNode(metadataUpdater.leaderFor(tb));
+ metadataUpdater.newTabletServerClientForNode(
+ metadataUpdater.leaderFor(DATA1_TABLE_PATH,
tb));
return gateway.pendingRequestSize();
}
@@ -739,7 +742,8 @@ final class SenderTest {
int batchSequence, TableBucket tb, int index, ProduceLogResponse
response) {
TestTabletServerGateway gateway =
(TestTabletServerGateway)
-
metadataUpdater.newTabletServerClientForNode(metadataUpdater.leaderFor(tb));
+ metadataUpdater.newTabletServerClientForNode(
+ metadataUpdater.leaderFor(DATA1_TABLE_PATH,
tb));
ApiMessage request = getRequest(tb1, index);
assertThat(request).isInstanceOf(ProduceLogRequest.class);
assertThat(hasIdempotentRecords(tb1, (ProduceLogRequest)
request)).isTrue();