This is an automated email from the ASF dual-hosted git repository.
yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new e47748b72 [Client] Fix LogFetcher doesn't update bucket metadata when
receive NotLeaderOrFollowerException in bucket level (#1885)
e47748b72 is described below
commit e47748b72ee72445882dc08a2c3952fe7af84660
Author: yunhong <[email protected]>
AuthorDate: Thu Dec 18 18:11:53 2025 +0800
[Client] Fix LogFetcher doesn't update bucket metadata when receive
NotLeaderOrFollowerException in bucket level (#1885)
---
.../fluss/client/metadata/ClientSchemaGetter.java | 2 +-
.../fluss/client/table/scanner/log/LogFetcher.java | 60 ++-
.../client/metadata/TestingClientSchemaGetter.java | 55 +++
.../client/metadata/TestingMetadataUpdater.java | 14 +-
.../{LogFetcherTest.java => LogFetcherITCase.java} | 6 +-
.../client/table/scanner/log/LogFetcherTest.java | 402 +++++----------------
.../server/tablet/TestTabletServerGateway.java | 56 +--
7 files changed, 210 insertions(+), 385 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/ClientSchemaGetter.java
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/ClientSchemaGetter.java
index 6b6a11d77..486f80744 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/ClientSchemaGetter.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/ClientSchemaGetter.java
@@ -42,7 +42,7 @@ public class ClientSchemaGetter implements SchemaGetter {
private static final Logger LOG =
LoggerFactory.getLogger(ClientSchemaGetter.class);
private final TablePath tablePath;
- private final Map<Integer, Schema> schemasById;
+ protected final Map<Integer, Schema> schemasById;
private final Admin admin;
private volatile SchemaInfo latestSchemaInfo;
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
index b19347772..a97bffac0 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
@@ -26,8 +26,10 @@ import org.apache.fluss.client.table.scanner.ScanRecord;
import org.apache.fluss.cluster.BucketLocation;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.ApiException;
import org.apache.fluss.exception.InvalidMetadataException;
import org.apache.fluss.exception.LeaderNotAvailableException;
+import org.apache.fluss.exception.PartitionNotExistException;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.SchemaGetter;
@@ -48,6 +50,7 @@ import org.apache.fluss.rpc.messages.PbFetchLogReqForBucket;
import org.apache.fluss.rpc.messages.PbFetchLogReqForTable;
import org.apache.fluss.rpc.messages.PbFetchLogRespForBucket;
import org.apache.fluss.rpc.messages.PbFetchLogRespForTable;
+import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.rpc.protocol.Errors;
import org.apache.fluss.utils.IOUtils;
import org.apache.fluss.utils.Projection;
@@ -217,14 +220,27 @@ public class LogFetcher implements Closeable {
}
}
- if (isPartitioned && !partitionIds.isEmpty()) {
- metadataUpdater.updateMetadata(Collections.singleton(tablePath),
null, partitionIds);
- } else if (needUpdate) {
- metadataUpdater.updateTableOrPartitionMetadata(tablePath, null);
+ try {
+ if (isPartitioned && !partitionIds.isEmpty()) {
+ metadataUpdater.updateMetadata(
+ Collections.singleton(tablePath), null, partitionIds);
+ } else if (needUpdate) {
+ metadataUpdater.updateTableOrPartitionMetadata(tablePath,
null);
+ }
+ } catch (Exception e) {
+ if (e instanceof PartitionNotExistException) {
+ // ignore this exception, this is probably happen because the
partition is deleted.
+ // The fetcher can also work fine. The caller like flink can
remove the partition
+ // from fetch list when receive exception.
+ LOG.warn("Receive PartitionNotExistException when update
metadata, ignore it", e);
+ } else {
+ throw e;
+ }
}
}
- private void sendFetchRequest(int destination, FetchLogRequest
fetchLogRequest) {
+ @VisibleForTesting
+ void sendFetchRequest(int destination, FetchLogRequest fetchLogRequest) {
TableOrPartitions tableOrPartitionsInFetchRequest =
getTableOrPartitionsInFetchRequest(fetchLogRequest);
// TODO cache the tablet server gateway.
@@ -345,6 +361,14 @@ public class LogFetcher implements Closeable {
respForBucket.getBucketId());
FetchLogResultForBucket fetchResultForBucket =
getFetchLogResultForBucket(tb, tablePath,
respForBucket);
+
+ // if error code is not NONE, it means the fetch log
request failed, we need to
+ // clear table bucket meta for InvalidMetadataException.
+ if (fetchResultForBucket.getErrorCode() !=
Errors.NONE.code()) {
+ ApiError error =
ApiError.fromErrorMessage(respForBucket);
+ handleFetchLogExceptionForBucket(tb, destination,
error);
+ }
+
Long fetchOffset = logScannerStatus.getBucketOffset(tb);
// if the offset is null, it means the bucket has been
unsubscribed,
// we just set a Long.MAX_VALUE as the next fetch offset
@@ -387,6 +411,29 @@ public class LogFetcher implements Closeable {
}
}
+ private void handleFetchLogExceptionForBucket(TableBucket tb, int
destination, ApiError error) {
+ ApiException exception = error.error().exception();
+ LOG.error("Failed to fetch log from node {} for bucket {}",
destination, tb, exception);
+ if (exception instanceof InvalidMetadataException) {
+ LOG.warn(
+ "Invalid metadata error in fetch log request. "
+ + "Going to request metadata update.",
+ exception);
+ long tableId = tb.getTableId();
+ TableOrPartitions tableOrPartitions;
+ if (tb.getPartitionId() == null) {
+ tableOrPartitions = new
TableOrPartitions(Collections.singleton(tableId), null);
+ } else {
+ tableOrPartitions =
+ new TableOrPartitions(
+ null,
+ Collections.singleton(
+ new TablePartition(tableId,
tb.getPartitionId())));
+ }
+ invalidTableOrPartitions(tableOrPartitions);
+ }
+ }
+
private void pendRemoteFetches(
RemoteLogFetchInfo remoteLogFetchInfo, long firstFetchOffset, long
highWatermark) {
checkNotNull(remoteLogFetchInfo);
@@ -417,7 +464,8 @@ public class LogFetcher implements Closeable {
}
}
- private Map<Integer, FetchLogRequest> prepareFetchLogRequests() {
+ @VisibleForTesting
+ Map<Integer, FetchLogRequest> prepareFetchLogRequests() {
Map<Integer, List<PbFetchLogReqForBucket>> fetchLogReqForBuckets = new
HashMap<>();
int readyForFetchCount = 0;
Long tableId = null;
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingClientSchemaGetter.java
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingClientSchemaGetter.java
new file mode 100644
index 000000000..a983eea11
--- /dev/null
+++
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingClientSchemaGetter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.metadata;
+
+import org.apache.fluss.client.admin.FlussAdmin;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.SchemaInfo;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.rpc.RpcClient;
+import org.apache.fluss.rpc.metrics.TestingClientMetricGroup;
+
+import java.util.concurrent.CompletableFuture;
+
+/** Testing class for {@link ClientSchemaGetter}. */
+public class TestingClientSchemaGetter extends ClientSchemaGetter {
+ public TestingClientSchemaGetter(
+ TablePath tablePath,
+ SchemaInfo latestSchemaInfo,
+ TestingMetadataUpdater metadataUpdater) {
+ super(
+ tablePath,
+ latestSchemaInfo,
+ new FlussAdmin(
+ RpcClient.create(
+ new Configuration(),
TestingClientMetricGroup.newInstance(), false),
+ metadataUpdater));
+ }
+
+ @Override
+ public Schema getSchema(int schemaId) {
+ return schemasById.get(schemaId);
+ }
+
+ @Override
+ public CompletableFuture<SchemaInfo> getSchemaInfoAsync(int schemaId) {
+ return CompletableFuture.completedFuture(
+ new SchemaInfo(schemasById.get(schemaId), schemaId));
+ }
+}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
index 1c8f825a9..206395135 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
@@ -42,13 +42,13 @@ import java.util.stream.Collectors;
/** Testing class for metadata updater. */
public class TestingMetadataUpdater extends MetadataUpdater {
- private static final ServerNode COORDINATOR =
+ public static final ServerNode COORDINATOR =
new ServerNode(0, "localhost", 90, ServerType.COORDINATOR);
- private static final ServerNode NODE1 =
+ public static final ServerNode NODE1 =
new ServerNode(1, "localhost", 90, ServerType.TABLET_SERVER,
"rack1");
- private static final ServerNode NODE2 =
+ public static final ServerNode NODE2 =
new ServerNode(2, "localhost", 91, ServerType.TABLET_SERVER,
"rack2");
- private static final ServerNode NODE3 =
+ public static final ServerNode NODE3 =
new ServerNode(3, "localhost", 92, ServerType.TABLET_SERVER,
"rack3");
private final TestCoordinatorGateway coordinatorGateway;
@@ -63,7 +63,7 @@ public class TestingMetadataUpdater extends MetadataUpdater {
new Configuration());
}
- private TestingMetadataUpdater(
+ public TestingMetadataUpdater(
ServerNode coordinatorServer,
List<ServerNode> tabletServers,
Map<TablePath, TableInfo> tableInfos,
@@ -137,10 +137,6 @@ public class TestingMetadataUpdater extends
MetadataUpdater {
this.cluster = cluster;
}
- public void setResponseLogicId(int serverId, int responseLogicId) {
-
tabletServerGatewayMap.get(serverId).setResponseLogicId(responseLogicId);
- }
-
@Override
public void checkAndUpdateTableMetadata(Set<TablePath> tablePaths) {
Set<TablePath> needUpdateTablePaths =
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java
similarity index 99%
copy from
fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
copy to
fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java
index bb36cb7f1..b6817b327 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java
@@ -67,8 +67,8 @@ import static
org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObje
import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
import static org.assertj.core.api.Assertions.assertThat;
-/** Test for {@link LogFetcher}. */
-public class LogFetcherTest extends ClientToServerITCaseBase {
+/** IT test for {@link LogFetcher}. */
+public class LogFetcherITCase extends ClientToServerITCaseBase {
private LogFetcher logFetcher;
private long tableId;
private final int bucketId0 = 0;
@@ -78,8 +78,6 @@ public class LogFetcherTest extends ClientToServerITCaseBase {
private MetadataUpdater metadataUpdater;
private ClientSchemaGetter clientSchemaGetter;
- // TODO covert this test to UT as kafka.
-
@BeforeEach
protected void setup() throws Exception {
super.setup();
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
index bb36cb7f1..f06f88614 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
@@ -17,96 +17,67 @@
package org.apache.fluss.client.table.scanner.log;
-import org.apache.fluss.client.admin.ClientToServerITCaseBase;
-import org.apache.fluss.client.admin.FlussAdmin;
import org.apache.fluss.client.metadata.ClientSchemaGetter;
-import org.apache.fluss.client.metadata.MetadataUpdater;
+import org.apache.fluss.client.metadata.TestingClientSchemaGetter;
+import org.apache.fluss.client.metadata.TestingMetadataUpdater;
import org.apache.fluss.client.metrics.TestingScannerMetricGroup;
import org.apache.fluss.client.table.scanner.RemoteFileDownloader;
-import org.apache.fluss.client.table.scanner.ScanRecord;
-import org.apache.fluss.cluster.Cluster;
-import org.apache.fluss.cluster.ServerNode;
+import org.apache.fluss.cluster.BucketLocation;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.NotLeaderOrFollowerException;
+import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.SchemaInfo;
import org.apache.fluss.metadata.TableBucket;
-import org.apache.fluss.metadata.TableChange;
-import org.apache.fluss.metadata.TableInfo;
-import org.apache.fluss.record.MemoryLogRecords;
-import org.apache.fluss.row.GenericRow;
-import org.apache.fluss.rpc.RpcClient;
-import org.apache.fluss.rpc.gateway.TabletServerGateway;
-import org.apache.fluss.rpc.messages.PbProduceLogRespForBucket;
-import org.apache.fluss.rpc.messages.ProduceLogResponse;
-import org.apache.fluss.testutils.DataTestUtils;
-import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
+import org.apache.fluss.rpc.messages.FetchLogRequest;
+import org.apache.fluss.rpc.messages.FetchLogResponse;
+import org.apache.fluss.rpc.protocol.ApiError;
+import org.apache.fluss.server.entity.FetchReqInfo;
+import org.apache.fluss.server.tablet.TestTabletServerGateway;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.time.Duration;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
-import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
-import static org.apache.fluss.record.TestData.DATA1;
+import static org.apache.fluss.client.metadata.TestingMetadataUpdater.NODE1;
+import static org.apache.fluss.client.metadata.TestingMetadataUpdater.NODE2;
+import static org.apache.fluss.client.metadata.TestingMetadataUpdater.NODE3;
import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
-import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO;
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
-import static org.apache.fluss.record.TestData.DATA2;
-import static org.apache.fluss.record.TestData.DATA2_ROW_TYPE;
-import static org.apache.fluss.record.TestData.DATA2_SCHEMA;
-import static
org.apache.fluss.server.testutils.RpcMessageTestUtils.newProduceLogRequest;
-import static
org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
-import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
+import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.getFetchLogData;
+import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeFetchLogResponse;
import static org.assertj.core.api.Assertions.assertThat;
-/** Test for {@link LogFetcher}. */
-public class LogFetcherTest extends ClientToServerITCaseBase {
- private LogFetcher logFetcher;
- private long tableId;
- private final int bucketId0 = 0;
- private final int bucketId1 = 1;
- private LogScannerStatus logScannerStatus;
- private FlussAdmin admin;
- private MetadataUpdater metadataUpdater;
- private ClientSchemaGetter clientSchemaGetter;
+/** UT Test for {@link LogFetcher}. */
+public class LogFetcherTest {
+ private final TableBucket tb1 = new TableBucket(DATA1_TABLE_ID, 0);
- // TODO covert this test to UT as kafka.
+ private TestingMetadataUpdater metadataUpdater;
+ private LogFetcher logFetcher = null;
- @BeforeEach
- protected void setup() throws Exception {
- super.setup();
-
- // We create table data1NonPkTablePath previously.
- tableId = createTable(DATA1_TABLE_PATH, DATA1_TABLE_DESCRIPTOR, false);
- FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
+ // TODO Add more ut tests like kafka.
- RpcClient rpcClient = FLUSS_CLUSTER_EXTENSION.getRpcClient();
- metadataUpdater = new MetadataUpdater(clientConf, rpcClient);
-
metadataUpdater.checkAndUpdateTableMetadata(Collections.singleton(DATA1_TABLE_PATH));
-
- Map<TableBucket, Long> scanBuckets = new HashMap<>();
- // add bucket 0 and bucket 1 to log scanner status.
- scanBuckets.put(new TableBucket(tableId, bucketId0), 0L);
- scanBuckets.put(new TableBucket(tableId, bucketId1), 0L);
- logScannerStatus = new LogScannerStatus();
- logScannerStatus.assignScanBuckets(scanBuckets);
- admin = new FlussAdmin(rpcClient, metadataUpdater);
- clientSchemaGetter =
- new ClientSchemaGetter(DATA1_TABLE_PATH, new
SchemaInfo(DATA1_SCHEMA, 1), admin);
+ @BeforeEach
+ public void setup() {
+ metadataUpdater = initializeMetadataUpdater();
+ ClientSchemaGetter clientSchemaGetter =
+ new TestingClientSchemaGetter(
+ DATA1_TABLE_PATH, new SchemaInfo(DATA1_SCHEMA, 0),
metadataUpdater);
+ LogScannerStatus logScannerStatus = initializeLogScannerStatus();
logFetcher =
new LogFetcher(
DATA1_TABLE_INFO,
null,
logScannerStatus,
- clientConf,
+ new Configuration(),
metadataUpdater,
TestingScannerMetricGroup.newInstance(),
new RemoteFileDownloader(1),
@@ -114,259 +85,68 @@ public class LogFetcherTest extends
ClientToServerITCaseBase {
}
@Test
- void testFetchWithSchemaChange() throws Exception {
- // add one batch records to tb0.
- TableBucket tb0 = new TableBucket(tableId, bucketId0);
- addRecordsToBucket(tb0, genMemoryLogRecordsByObject(DATA1), 0L);
-
- // add new column(which equals to DATA2_ROW_TYPE)
- admin.alterTable(
- DATA1_TABLE_PATH,
- Collections.singletonList(
- TableChange.addColumn(
- "c",
- DataTypes.STRING(),
- null,
- TableChange.ColumnPosition.last())),
- false)
- .get();
- // add one batch records with new schema to tb0.
- addRecordsToBucket(
- tb0,
- genMemoryLogRecordsByObject(DATA2_ROW_TYPE, 2,
CURRENT_LOG_MAGIC_VALUE, DATA2),
- 10L);
-
- // Read data with old schema, thus DATA2 will be truncated as DATA1
- List<GenericRow> expectedRows =
-
DATA1.stream().map(DataTestUtils::row).collect(Collectors.toList());
-
expectedRows.addAll(DATA1.stream().map(DataTestUtils::row).collect(Collectors.toList()));
- logFetcher.sendFetches();
- // The fetcher is async to fetch data, so we need to wait the result
write to the
- // logFetchBuffer.
- retry(
- Duration.ofMinutes(1),
- () -> {
- assertThat(logFetcher.hasAvailableFetches()).isTrue();
-
assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(2);
- });
- Map<TableBucket, List<ScanRecord>> records = logFetcher.collectFetch();
- assertThat(records.size()).isEqualTo(1);
- List<ScanRecord> scanRecords = records.get(tb0);
-
assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList()))
- .isEqualTo(expectedRows);
-
- // read data with new schema, thus DATA2 will be appended with null
value.
- expectedRows =
- DATA1.stream()
- .map(row -> DataTestUtils.row(row[0], row[1], null))
- .collect(Collectors.toList());
-
expectedRows.addAll(DATA2.stream().map(DataTestUtils::row).collect(Collectors.toList()));
- logScannerStatus.assignScanBuckets(Collections.singletonMap(tb0, 0L));
- LogFetcher newSchemaLogFetcher =
- new LogFetcher(
- new TableInfo(
- DATA1_TABLE_INFO.getTablePath(),
- tableId,
- 2,
- DATA2_SCHEMA,
- DATA1_TABLE_INFO.getBucketKeys(),
- DATA1_TABLE_INFO.getPartitionKeys(),
- DATA1_TABLE_INFO.getNumBuckets(),
- DATA1_TABLE_INFO.getProperties(),
- DATA1_TABLE_INFO.getCustomProperties(),
- DATA1_TABLE_INFO.getComment().orElse(null),
- DATA1_TABLE_INFO.getCreatedTime(),
- DATA1_TABLE_INFO.getModifiedTime()),
- null,
- logScannerStatus,
- clientConf,
- metadataUpdater,
- TestingScannerMetricGroup.newInstance(),
- new RemoteFileDownloader(1),
- clientSchemaGetter);
- newSchemaLogFetcher.sendFetches();
- // The fetcher is async to fetch data, so we need to wait the result
write to the
- // logFetchBuffer.
- retry(
- Duration.ofMinutes(1),
- () -> {
-
assertThat(newSchemaLogFetcher.hasAvailableFetches()).isTrue();
-
assertThat(newSchemaLogFetcher.getCompletedFetchesSize()).isEqualTo(2);
- });
- records = newSchemaLogFetcher.collectFetch();
- assertThat(records.size()).isEqualTo(1);
- assertThat(records.get(tb0)).hasSize(20);
- scanRecords = records.get(tb0);
-
assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList()))
- .isEqualTo(expectedRows);
- newSchemaLogFetcher.close();
- }
-
- @Test
- void testFetch() throws Exception {
- // add one batch records to tb0.
- TableBucket tb0 = new TableBucket(tableId, bucketId0);
- addRecordsToBucket(tb0, genMemoryLogRecordsByObject(DATA1), 0L);
-
- // add one batch records to tb1.
- TableBucket tb1 = new TableBucket(tableId, bucketId1);
- addRecordsToBucket(tb1, genMemoryLogRecordsByObject(DATA1), 0L);
-
- assertThat(logFetcher.hasAvailableFetches()).isFalse();
- // collect fetch will be empty while no available fetch.
- assertThat(logFetcher.collectFetch()).isEmpty();
-
- // send fetcher to fetch data.
- logFetcher.sendFetches();
- // The fetcher is async to fetch data, so we need to wait the result
write to the
- // logFetchBuffer.
- retry(
- Duration.ofMinutes(1),
- () -> {
- assertThat(logFetcher.hasAvailableFetches()).isTrue();
-
assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(2);
- });
-
- Map<TableBucket, List<ScanRecord>> records = logFetcher.collectFetch();
- assertThat(records.size()).isEqualTo(2);
- assertThat(records.get(tb0).size()).isEqualTo(10);
- assertThat(records.get(tb1).size()).isEqualTo(10);
-
- // after collect fetch, the fetcher is empty.
- assertThat(logFetcher.hasAvailableFetches()).isFalse();
- assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(0);
- }
-
- @Test
- void testFetchWhenDestinationIsNullInMetadata() throws Exception {
- TableBucket tb0 = new TableBucket(tableId, bucketId0);
- addRecordsToBucket(tb0, genMemoryLogRecordsByObject(DATA1), 0L);
-
- RpcClient rpcClient = FLUSS_CLUSTER_EXTENSION.getRpcClient();
- MetadataUpdater metadataUpdater = new MetadataUpdater(clientConf,
rpcClient);
-
metadataUpdater.checkAndUpdateTableMetadata(Collections.singleton(DATA1_TABLE_PATH));
-
- int leaderNode = metadataUpdater.leaderFor(DATA1_TABLE_PATH, tb0);
-
- // now, remove leader nodd ,so that fetch destination
- // server node is null
- Cluster oldCluster = metadataUpdater.getCluster();
- Map<Integer, ServerNode> aliveTabletServersById =
- new HashMap<>(oldCluster.getAliveTabletServers());
- aliveTabletServersById.remove(leaderNode);
- Cluster newCluster =
- new Cluster(
- aliveTabletServersById,
- oldCluster.getCoordinatorServer(),
- oldCluster.getBucketLocationsByPath(),
- oldCluster.getTableIdByPath(),
- oldCluster.getPartitionIdByPath());
- metadataUpdater = new MetadataUpdater(rpcClient, clientConf,
newCluster);
-
- LogScannerStatus logScannerStatus = new LogScannerStatus();
- logScannerStatus.assignScanBuckets(Collections.singletonMap(tb0, 0L));
-
- ClientSchemaGetter clientSchemaGetter =
- new ClientSchemaGetter(
- DATA1_TABLE_PATH,
- new SchemaInfo(DATA1_SCHEMA, 1),
- new FlussAdmin(rpcClient, metadataUpdater));
-
- LogFetcher logFetcher =
- new LogFetcher(
- DATA1_TABLE_INFO,
- null,
- logScannerStatus,
- clientConf,
- metadataUpdater,
- TestingScannerMetricGroup.newInstance(),
- new RemoteFileDownloader(1),
- clientSchemaGetter);
-
- // send fetches to fetch data, should have no available fetch.
- logFetcher.sendFetches();
- assertThat(logFetcher.hasAvailableFetches()).isFalse();
-
- // then fetches again, should have available fetch.
- // first send fetch is for update metadata
- logFetcher.sendFetches();
- // second send fetch will do real fetch data
- logFetcher.sendFetches();
- retry(
- Duration.ofMinutes(1),
- () -> {
- assertThat(logFetcher.hasAvailableFetches()).isTrue();
-
assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(1);
- });
- Map<TableBucket, List<ScanRecord>> records = logFetcher.collectFetch();
- assertThat(records.size()).isEqualTo(1);
- assertThat(records.get(tb0).size()).isEqualTo(10);
+ void sendFetchRequestWithNotLeaderOrFollowerException() {
+ Map<Integer, FetchLogRequest> requestMap =
logFetcher.prepareFetchLogRequests();
+ Set<Integer> serverSet = requestMap.keySet();
+ assertThat(serverSet).containsExactlyInAnyOrder(1);
+
+ assertThat(metadataUpdater.getBucketLocation(tb1))
+ .hasValue(
+ new BucketLocation(
+ PhysicalTablePath.of(DATA1_TABLE_PATH),
+ tb1,
+ 1,
+ new int[] {1, 2, 3}));
+
+ // send fetchLogRequest to serverId 1, which will respond with
NotLeaderOrFollowerException
+ // as responseLogicId=1 do.
+ logFetcher.sendFetchRequest(1, requestMap.get(1));
+
+ // When NotLeaderOrFollowerException is received, the bucketLocation
will be removed from
+ // metadata updater to trigger get the latest bucketLocation in next
fetch round.
+ assertThat(metadataUpdater.getBucketLocation(tb1)).isNotPresent();
}
- @Test
- void testFetchWithInvalidTableOrPartitions() throws Exception {
- MetadataUpdater metadataUpdater1 =
- new MetadataUpdater(clientConf,
FLUSS_CLUSTER_EXTENSION.getRpcClient());
- ClientSchemaGetter clientSchemaGetter =
- new ClientSchemaGetter(
- DATA1_TABLE_PATH,
- new SchemaInfo(DATA1_SCHEMA, 1),
- new FlussAdmin(FLUSS_CLUSTER_EXTENSION.getRpcClient(),
metadataUpdater1));
- logFetcher =
- new LogFetcher(
- DATA1_TABLE_INFO,
- null,
- logScannerStatus,
- clientConf,
- metadataUpdater1,
- TestingScannerMetricGroup.newInstance(),
- new RemoteFileDownloader(1),
- clientSchemaGetter);
-
- ExecutorService executor = Executors.newSingleThreadExecutor();
- Future<?> future =
- executor.submit(
- () -> {
- // If this test blocked, please checking whether
it was blocked with
- // the same reason as
https://github.com/apache/fluss/pull/1666
- for (int i = 0; i < 1000; i++) {
- logFetcher.sendFetches();
- logFetcher.invalidTableOrPartitions(
- new LogFetcher.TableOrPartitions(
-
Collections.singleton(tableId), null));
- }
- });
+ private TestingMetadataUpdater initializeMetadataUpdater() {
- future.get(30, TimeUnit.SECONDS);
- assertThat(future.isDone()).isTrue();
- executor.shutdownNow();
+ return new TestingMetadataUpdater(
+ TestingMetadataUpdater.COORDINATOR,
+ Arrays.asList(NODE1, NODE2, NODE3),
+ Collections.singletonMap(DATA1_TABLE_PATH, DATA1_TABLE_INFO),
+ Collections.singletonMap(1, new TestingTabletServerGateway()),
+ new Configuration());
}
- private void addRecordsToBucket(
- TableBucket tableBucket, MemoryLogRecords logRecords, long
expectedBaseOffset)
- throws Exception {
- int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tableBucket);
- TabletServerGateway leaderGateWay =
- FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader);
- assertProduceLogResponse(
- leaderGateWay
- .produceLog(
- newProduceLogRequest(
- tableBucket.getTableId(),
- tableBucket.getBucket(),
- -1, // need ack, so we can make sure
every batch is acked.
- logRecords))
- .get(),
- tableBucket.getBucket(),
- expectedBaseOffset);
+ private LogScannerStatus initializeLogScannerStatus() {
+ Map<TableBucket, Long> scanBucketAndOffsets = new HashMap<>();
+ scanBucketAndOffsets.put(tb1, 0L);
+ LogScannerStatus status = new LogScannerStatus();
+ status.assignScanBuckets(scanBucketAndOffsets);
+ return status;
}
- private static void assertProduceLogResponse(
- ProduceLogResponse produceLogResponse, int bucketId, Long
baseOffset) {
- assertThat(produceLogResponse.getBucketsRespsCount()).isEqualTo(1);
- PbProduceLogRespForBucket produceLogRespForBucket =
- produceLogResponse.getBucketsRespsList().get(0);
- assertThat(produceLogRespForBucket.getBucketId()).isEqualTo(bucketId);
-
assertThat(produceLogRespForBucket.getBaseOffset()).isEqualTo(baseOffset);
+ private static class TestingTabletServerGateway extends
TestTabletServerGateway {
+
+ public TestingTabletServerGateway() {
+ super(false, Collections.emptySet());
+ }
+
+ @Override
+ public CompletableFuture<FetchLogResponse> fetchLog(FetchLogRequest
request) {
+ Map<TableBucket, FetchReqInfo> fetchLogData =
getFetchLogData(request);
+ Map<TableBucket, FetchLogResultForBucket> resultForBucketMap = new
HashMap<>();
+ // return with NotLeaderOrFollowerException.
+ fetchLogData.forEach(
+ (tableBucket, fetchData) -> {
+ FetchLogResultForBucket fetchLogResultForBucket =
+ new FetchLogResultForBucket(
+ tableBucket,
+ ApiError.fromThrowable(
+ new
NotLeaderOrFollowerException(
+ "mock fetchLog fail
for not leader or follower exception.")));
+ resultForBucketMap.put(tableBucket,
fetchLogResultForBucket);
+ });
+ return
CompletableFuture.completedFuture(makeFetchLogResponse(resultForBucketMap));
+ }
}
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
index 5750404b5..8ebcc5748 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
@@ -17,12 +17,9 @@
package org.apache.fluss.server.tablet;
-import org.apache.fluss.exception.NotLeaderOrFollowerException;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.record.MemoryLogRecords;
import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
-import org.apache.fluss.rpc.entity.LookupResultForBucket;
-import org.apache.fluss.rpc.entity.PrefixLookupResultForBucket;
import org.apache.fluss.rpc.gateway.TabletServerGateway;
import org.apache.fluss.rpc.messages.ApiMessage;
import org.apache.fluss.rpc.messages.ApiVersionsRequest;
@@ -90,7 +87,6 @@ import org.apache.fluss.rpc.messages.TableExistsRequest;
import org.apache.fluss.rpc.messages.TableExistsResponse;
import org.apache.fluss.rpc.messages.UpdateMetadataRequest;
import org.apache.fluss.rpc.messages.UpdateMetadataResponse;
-import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.rpc.protocol.ApiKeys;
import org.apache.fluss.server.entity.FetchReqInfo;
import org.apache.fluss.utils.types.Tuple2;
@@ -109,10 +105,6 @@ import java.util.concurrent.atomic.AtomicLong;
import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.getFetchLogData;
import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeFetchLogResponse;
-import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeLookupResponse;
-import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.makePrefixLookupResponse;
-import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toLookupData;
-import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.toPrefixLookupData;
/** A {@link TabletServerGateway} for test purpose. */
public class TestTabletServerGateway implements TabletServerGateway {
@@ -120,9 +112,6 @@ public class TestTabletServerGateway implements
TabletServerGateway {
private final boolean alwaysFail;
private final AtomicLong writerId = new AtomicLong(0);
- /** The id to define the response logic. */
- private int responseLogicId;
-
// Use concurrent queue for storing request and related completable future
response so that
// requests may be queried from a different thread.
private final ConcurrentLinkedDeque<Tuple2<ApiMessage,
CompletableFuture<?>>> requests =
@@ -131,7 +120,6 @@ public class TestTabletServerGateway implements
TabletServerGateway {
public TestTabletServerGateway(boolean alwaysFail, Set<ApiKeys>
ignoreApiKeys) {
this.alwaysFail = alwaysFail;
- this.responseLogicId = 0;
this.ignoreApiKeys = ignoreApiKeys;
}
@@ -201,48 +189,12 @@ public class TestTabletServerGateway implements
TabletServerGateway {
@Override
public CompletableFuture<LookupResponse> lookup(LookupRequest request) {
- Map<TableBucket, List<byte[]>> lookupData = toLookupData(request);
- Map<TableBucket, LookupResultForBucket> errorResponseMap = new
HashMap<>();
- if (responseLogicId == 1) {
- // return with NotLeaderOrFollowerException.
- lookupData
- .keySet()
- .forEach(
- tb ->
- errorResponseMap.put(
- tb,
- new LookupResultForBucket(
- tb,
- ApiError.fromThrowable(
- new
NotLeaderOrFollowerException(
- "mock not
leader or follower exception.")))));
- return
CompletableFuture.completedFuture(makeLookupResponse(errorResponseMap));
- } else {
- return null;
- }
+ return null;
}
@Override
public CompletableFuture<PrefixLookupResponse>
prefixLookup(PrefixLookupRequest request) {
- Map<TableBucket, List<byte[]>> prefixLookupData =
toPrefixLookupData(request);
- Map<TableBucket, PrefixLookupResultForBucket> errorResponseMap = new
HashMap<>();
- if (responseLogicId == 1) {
- // return with NotLeaderOrFollowerException.
- prefixLookupData
- .keySet()
- .forEach(
- tb ->
- errorResponseMap.put(
- tb,
- new PrefixLookupResultForBucket(
- tb,
- ApiError.fromThrowable(
- new
NotLeaderOrFollowerException(
- "mock not
leader or follower exception.")))));
- return
CompletableFuture.completedFuture(makePrefixLookupResponse(errorResponseMap));
- } else {
- throw new UnsupportedOperationException();
- }
+ return null;
}
@Override
@@ -442,10 +394,6 @@ public class TestTabletServerGateway implements
TabletServerGateway {
}
}
- public void setResponseLogicId(int responseLogicId) {
- this.responseLogicId = responseLogicId;
- }
-
private StopReplicaResponse mockStopReplicaResponse(
StopReplicaRequest stopReplicaRequest,
@Nullable Integer errCode,