This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 6567deb00 [server] Fix ReplicaFetcherThread keeps throwing
OutOfOrderSequenceException because of writer id expire (#1639)
6567deb00 is described below
commit 6567deb001e03cdf5a42a05ae8c5ee92735a25f0
Author: Liebing <[email protected]>
AuthorDate: Mon Sep 15 10:16:13 2025 +0800
[server] Fix ReplicaFetcherThread keeps throwing
OutOfOrderSequenceException because of writer id expire (#1639)
---
.../org/apache/fluss/server/log/LogTablet.java | 17 +++--
.../apache/fluss/server/log/WriterAppendInfo.java | 67 ++++++++++++++----
.../fluss/server/log/WriterStateManagerTest.java | 5 +-
.../apache/fluss/server/replica/ReplicaTest.java | 38 ++++++++++
.../fluss/server/replica/ReplicaTestBase.java | 2 +
.../replica/fetcher/ReplicaFetcherThreadTest.java | 80 +++++++++++++++++++++-
6 files changed, 185 insertions(+), 24 deletions(-)
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
index 8eb19362f..fc0e7f27f 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
@@ -640,7 +640,7 @@ public final class LogTablet {
// now that we have valid records, offsets assigned, we need to
validate the idempotent
// state of the writers and collect some metadata.
Either<WriterStateEntry.BatchMetadata,
Collection<WriterAppendInfo>> validateResult =
- analyzeAndValidateWriterState(validRecords);
+ analyzeAndValidateWriterState(validRecords,
appendAsLeader);
if (validateResult.isLeft()) {
// have duplicated batch metadata, skip the append and update
append info.
@@ -1003,7 +1003,7 @@ public final class LogTablet {
/** Returns either the duplicated batch metadata (left) or the updated
writers (right). */
private Either<WriterStateEntry.BatchMetadata,
Collection<WriterAppendInfo>>
- analyzeAndValidateWriterState(MemoryLogRecords records) {
+ analyzeAndValidateWriterState(MemoryLogRecords records, boolean
isAppendAsLeader) {
Map<Long, WriterAppendInfo> updatedWriters = new HashMap<>();
for (LogRecordBatch batch : records.batches()) {
@@ -1020,14 +1020,15 @@ public final class LogTablet {
}
// update write append info.
- updateWriterAppendInfo(writerStateManager, batch,
updatedWriters);
+ updateWriterAppendInfo(writerStateManager, batch,
updatedWriters, isAppendAsLeader);
}
}
return Either.right(updatedWriters.values());
}
- void removeExpiredWriter(long currentTimeMs) {
+ @VisibleForTesting
+ public void removeExpiredWriter(long currentTimeMs) {
synchronized (lock) {
writerStateManager.removeExpiredWriters(currentTimeMs);
}
@@ -1107,14 +1108,16 @@ public final class LogTablet {
private static void updateWriterAppendInfo(
WriterStateManager writerStateManager,
LogRecordBatch batch,
- Map<Long, WriterAppendInfo> writers) {
+ Map<Long, WriterAppendInfo> writers,
+ boolean isAppendAsLeader) {
long writerId = batch.writerId();
// update writers.
WriterAppendInfo appendInfo =
writers.computeIfAbsent(writerId, id ->
writerStateManager.prepareUpdate(writerId));
appendInfo.append(
batch,
-
writerStateManager.isWriterInBatchExpired(System.currentTimeMillis(), batch));
+
writerStateManager.isWriterInBatchExpired(System.currentTimeMillis(), batch),
+ isAppendAsLeader);
}
static void rebuildWriterState(
@@ -1227,7 +1230,7 @@ public final class LogTablet {
Map<Long, WriterAppendInfo> loadedWriters = new HashMap<>();
for (LogRecordBatch batch : records.batches()) {
if (batch.hasWriterId()) {
- updateWriterAppendInfo(writerStateManager, batch,
loadedWriters);
+ updateWriterAppendInfo(writerStateManager, batch,
loadedWriters, true);
}
}
loadedWriters.values().forEach(writerStateManager::update);
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterAppendInfo.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterAppendInfo.java
index cd625b155..cc11cf17f 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterAppendInfo.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterAppendInfo.java
@@ -44,13 +44,15 @@ public class WriterAppendInfo {
return writerId;
}
- public void append(LogRecordBatch batch, boolean isWriterInBatchExpired) {
+ public void append(
+ LogRecordBatch batch, boolean isWriterInBatchExpired, boolean
isAppendAsLeader) {
LogOffsetMetadata firstOffsetMetadata = new
LogOffsetMetadata(batch.baseLogOffset());
appendDataBatch(
batch.batchSequence(),
firstOffsetMetadata,
batch.lastLogOffset(),
isWriterInBatchExpired,
+ isAppendAsLeader,
batch.commitTimestamp());
}
@@ -59,8 +61,9 @@ public class WriterAppendInfo {
LogOffsetMetadata firstOffsetMetadata,
long lastOffset,
boolean isWriterInBatchExpired,
+ boolean isAppendAsLeader,
long batchTimestamp) {
- maybeValidateDataBatch(batchSequence, isWriterInBatchExpired,
lastOffset);
+ maybeValidateDataBatch(batchSequence, isWriterInBatchExpired,
lastOffset, isAppendAsLeader);
updatedEntry.addBath(
batchSequence,
lastOffset,
@@ -69,13 +72,16 @@ public class WriterAppendInfo {
}
private void maybeValidateDataBatch(
- int appendFirstSeq, boolean isWriterInBatchExpired, long
lastOffset) {
+ int appendFirstSeq,
+ boolean isWriterInBatchExpired,
+ long lastOffset,
+ boolean isAppendAsLeader) {
int currentLastSeq =
!updatedEntry.isEmpty()
? updatedEntry.lastBatchSequence()
: currentEntry.lastBatchSequence();
// must be in sequence, even for the first batch should start from 0
- if (!inSequence(currentLastSeq, appendFirstSeq,
isWriterInBatchExpired)) {
+ if (!inSequence(currentLastSeq, appendFirstSeq,
isWriterInBatchExpired, isAppendAsLeader)) {
throw new OutOfOrderSequenceException(
String.format(
"Out of order batch sequence for writer %s at
offset %s in "
@@ -93,16 +99,53 @@ public class WriterAppendInfo {
* three scenarios will be judged as in sequence:
*
* <ul>
- * <li>If lastBatchSeq equals NO_BATCH_SEQUENCE, we need to check
whether the committed
- * timestamp of the next batch under the current writerId has
expired. If it has expired,
- * we consider this a special case caused by writerId expiration,
for this case, to ensure
- * the correctness of follower sync, we still treat it as in
sequence.
- * <li>nextBatchSeq == lastBatchSeq + 1L
- * <li>lastBatchSeq reaches its maximum value
+ * <li>1. If lastBatchSeq equals NO_BATCH_SEQUENCE, the following two
scenarios will be judged
+ * as in sequence:
+ * <ul>
+ * <li>1.1 If the committed timestamp of the next batch under the
current writerId has
+ * expired, we consider this a special case caused by writerId
expiration, for this
+ * case, to ensure the correctness of follower sync, we still
treat it as in
+ * sequence.
+ * <li>1.2 If the append request is from the follower, we consider
this is a special
+ * case caused by inconsistent expiration of writerId between
the leader and
+ * follower. To prevent continuous fetch failures on the
follower side, we still
+ * treat it as in sequence.
+ * </ul>
+ * <li>2. nextBatchSeq == lastBatchSeq + 1L
+ * <li>3. lastBatchSeq reaches its maximum value
* </ul>
+ *
+ * <p>For case 1.2, here is a detailed example: The expiration of a writer
is triggered
+ * asynchronously by the {@code PeriodicWriterIdExpirationCheck} thread at
intervals defined by
+ * {@code server.writer-id.expiration-check-interval}, which can result in
slight differences in
+ * the actual expiration times of the same writer on the leader replica
and follower replicas.
+ * This slight difference leads to a dreadful corner case. Imagine the
following scenario(set
+ * {@code server.writer-id.expiration-check-interval}: 10min, {@code
+ * server.writer-id.expiration-time}: 12h):
+ *
+ * <pre>{@code
+ * Step Time Action of Leader Action of
Follower
+ * 1 00:03:38 receive batch 0 of writer 101
+ * 2 00:03:38 fetch batch 0
of writer 101
+ * 3 12:05:00 remove state of
writer 101
+ * 4 12:10:02 receive batch 1 of writer 101
+ * 5 12:10:02 fetch batch 0
of writer 101
+ * 6 12:11:00 remove state of writer 101
+ * }</pre>
+ *
+ * <p>In step 3, the follower removes the state of writer 101 first, since
it has been more than
+ * 12 hours since writer 101's last batch write, making it safe to remove.
However, since the
+ * expiration of writer 101 has not yet occurred on the leader, and a new
batch 1 is received at
+ * this time, it is successfully written on the leader. At this point, the
fetcher pulls batch 1
+ * from the leader, but since the state of writer 101 has already been
cleaned up, an {@link
+ * OutOfOrderSequenceException} will occur during to write if we don't
treat it as in sequence.
*/
- private boolean inSequence(int lastBatchSeq, int nextBatchSeq, boolean
isWriterInBatchExpired) {
- return (lastBatchSeq == NO_BATCH_SEQUENCE && isWriterInBatchExpired)
+ private boolean inSequence(
+ int lastBatchSeq,
+ int nextBatchSeq,
+ boolean isWriterInBatchExpired,
+ boolean isAppendAsLeader) {
+ return (lastBatchSeq == NO_BATCH_SEQUENCE && (isWriterInBatchExpired
|| !isAppendAsLeader))
|| nextBatchSeq == lastBatchSeq + 1L
|| (nextBatchSeq == 0 && lastBatchSeq == Integer.MAX_VALUE);
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/log/WriterStateManagerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/log/WriterStateManagerTest.java
index b0ac3cd2b..94b2ccc5c 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/log/WriterStateManagerTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/log/WriterStateManagerTest.java
@@ -126,14 +126,14 @@ public class WriterStateManagerTest {
void testPrepareUpdateDoesNotMutate() {
WriterAppendInfo appendInfo = stateManager.prepareUpdate(writerId);
appendInfo.appendDataBatch(
- 0, new LogOffsetMetadata(15L), 20L, false,
System.currentTimeMillis());
+ 0, new LogOffsetMetadata(15L), 20L, false, true,
System.currentTimeMillis());
assertThat(stateManager.lastEntry(writerId)).isNotPresent();
stateManager.update(appendInfo);
assertThat(stateManager.lastEntry(writerId)).isPresent();
WriterAppendInfo nextAppendInfo = stateManager.prepareUpdate(writerId);
nextAppendInfo.appendDataBatch(
- 1, new LogOffsetMetadata(26L), 30L, false,
System.currentTimeMillis());
+ 1, new LogOffsetMetadata(26L), 30L, false, true,
System.currentTimeMillis());
assertThat(stateManager.lastEntry(writerId)).isPresent();
WriterStateEntry lastEntry = stateManager.lastEntry(writerId).get();
@@ -521,6 +521,7 @@ public class WriterStateManagerTest {
new LogOffsetMetadata(offset),
offset,
isWriterInBatchExpired,
+ true,
lastTimestamp);
stateManager.update(appendInfo);
stateManager.updateMapEndOffset(offset + 1);
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
index 045bd3525..bc1365ab1 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
@@ -18,6 +18,7 @@
package org.apache.fluss.server.replica;
import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.exception.OutOfOrderSequenceException;
import org.apache.fluss.metadata.LogFormat;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
@@ -48,6 +49,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -76,10 +78,12 @@ import static
org.apache.fluss.testutils.DataTestUtils.createBasicMemoryLogRecor
import static org.apache.fluss.testutils.DataTestUtils.genKvRecordBatch;
import static org.apache.fluss.testutils.DataTestUtils.genKvRecords;
import static
org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
+import static
org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsWithWriterId;
import static org.apache.fluss.testutils.DataTestUtils.getKeyValuePairs;
import static org.apache.fluss.testutils.LogRecordsAssert.assertThatLogRecords;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link Replica}. */
final class ReplicaTest extends ReplicaTestBase {
@@ -130,6 +134,40 @@ final class ReplicaTest extends ReplicaTestBase {
assertLogRecordsEquals(DATA1_ROW_TYPE,
logReadInfo.getFetchedData().getRecords(), DATA1);
}
+ @Test
+ void testAppendRecordsWithOutOfOrderBatchSequence() throws Exception {
+ Replica logReplica =
+ makeLogReplica(DATA1_PHYSICAL_TABLE_PATH, new
TableBucket(DATA1_TABLE_ID, 1));
+ makeLogReplicaAsLeader(logReplica);
+
+ long writerId = 101L;
+
+ // 1. append a batch with batchSequence = 0
+
logReplica.appendRecordsToLeader(genMemoryLogRecordsWithWriterId(DATA1,
writerId, 0, 0), 0);
+
+ // manual advance time and remove expired writer, the state of writer
101 will be removed
+ manualClock.advanceTime(Duration.ofHours(12));
+ manualClock.advanceTime(Duration.ofSeconds(1));
+
assertThat(logReplica.getLogTablet().writerStateManager().activeWriters().size())
+ .isEqualTo(1);
+
logReplica.getLogTablet().removeExpiredWriter(manualClock.milliseconds());
+
assertThat(logReplica.getLogTablet().writerStateManager().activeWriters().size())
+ .isEqualTo(0);
+
+ // 2. try to append an out of ordered batch as leader, will throw
+ // OutOfOrderSequenceException
+ assertThatThrownBy(
+ () ->
+ logReplica.appendRecordsToLeader(
+ genMemoryLogRecordsWithWriterId(DATA1,
writerId, 2, 10), 0))
+ .isInstanceOf(OutOfOrderSequenceException.class);
+ assertThat(logReplica.getLocalLogEndOffset()).isEqualTo(10);
+
+ // 3. try to append an out of ordered batch as follower
+
logReplica.appendRecordsToFollower(genMemoryLogRecordsWithWriterId(DATA1,
writerId, 2, 10));
+ assertThat(logReplica.getLocalLogEndOffset()).isEqualTo(20);
+ }
+
@Test
void testPartialPutRecordsToLeader() throws Exception {
Replica kvReplica =
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
index 36862c4e5..88e7ec065 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
@@ -174,6 +174,8 @@ public class ReplicaTestBase {
conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE,
MemorySize.parse("512b"));
conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE,
MemorySize.parse("1kb"));
+ conf.set(ConfigOptions.WRITER_ID_EXPIRATION_TIME,
Duration.ofHours(12));
+
scheduler = new FlussScheduler(2);
scheduler.startup();
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
index a982ff058..38c4e075c 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
@@ -46,7 +46,7 @@ import org.apache.fluss.server.zk.data.LeaderAndIsr;
import org.apache.fluss.server.zk.data.TableRegistration;
import org.apache.fluss.testutils.common.AllCallbackWrapper;
import org.apache.fluss.utils.clock.Clock;
-import org.apache.fluss.utils.clock.SystemClock;
+import org.apache.fluss.utils.clock.ManualClock;
import org.apache.fluss.utils.concurrent.FlussScheduler;
import org.apache.fluss.utils.concurrent.Scheduler;
@@ -86,6 +86,7 @@ public class ReplicaFetcherThreadTest {
new AllCallbackWrapper<>(new ZooKeeperExtension());
private static ZooKeeperClient zkClient;
+ private ManualClock manualClock;
private @TempDir File tempDir;
private TableBucket tb;
private final int leaderServerId = 1;
@@ -106,6 +107,7 @@ public class ReplicaFetcherThreadTest {
@BeforeEach
public void setup() throws Exception {
ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().cleanupRoot();
+ manualClock = new ManualClock(System.currentTimeMillis());
Configuration conf = new Configuration();
tb = new TableBucket(DATA1_TABLE_ID, 0);
leaderRM = createReplicaManager(leaderServerId);
@@ -277,6 +279,77 @@ public class ReplicaFetcherThreadTest {
() ->
assertThat(followerReplica.getLocalLogEndOffset()).isEqualTo(120L));
}
+ @Test
+ void testAppendAsFollowerThrowOutOfOrderSequenceException() throws
Exception {
+ Replica followerReplica = followerRM.getReplicaOrException(tb);
+
+ long writerId = 101;
+ CompletableFuture<List<ProduceLogResultForBucket>> future;
+
+ // 1. append 2 batches to leader with (writerId=101L, batchSequence=0
offset=0L)
+ future = new CompletableFuture<>();
+ leaderRM.appendRecordsToLog(
+ 1000,
+ 1,
+ Collections.singletonMap(
+ tb, genMemoryLogRecordsWithWriterId(DATA1, writerId,
0, 0)),
+ future::complete);
+ assertThat(future.get()).containsOnly(new
ProduceLogResultForBucket(tb, 0L, 10L));
+
+ // 2. append the first batch to follower with (writerId=101L,
batchSequence=0 offset=0L) to
+ // mock
+ // follower have already fetched one batch.
+ followerReplica.appendRecordsToFollower(
+ genMemoryLogRecordsWithWriterId(DATA1, writerId, 0, 0));
+ assertThat(followerReplica.getLocalLogEndOffset()).isEqualTo(10L);
+
+ // advance time
+ manualClock.advanceTime(Duration.ofHours(13));
+
+ // 3. append the second batch to leader with (writerId=101L,
batchSequence=1 offset=10L)
+ future = new CompletableFuture<>();
+ leaderRM.appendRecordsToLog(
+ 1000,
+ 1,
+ Collections.singletonMap(
+ tb, genMemoryLogRecordsWithWriterId(DATA1, writerId,
1, 0)),
+ future::complete);
+ assertThat(future.get()).containsOnly(new
ProduceLogResultForBucket(tb, 10L, 20L));
+
+ // 3. mock remove expired writer, writerId=101 will be removed.
+
assertThat(followerReplica.getLogTablet().writerStateManager().activeWriters().size())
+ .isEqualTo(1);
+
followerReplica.getLogTablet().removeExpiredWriter(manualClock.milliseconds());
+
assertThat(followerReplica.getLogTablet().writerStateManager().activeWriters().size())
+ .isEqualTo(0);
+
+ // 4. begin fetcher thread.
+ followerFetcher.addBuckets(
+ Collections.singletonMap(
+ tb,
+ new InitialFetchStatus(
+ DATA1_TABLE_ID, DATA1_TABLE_PATH, leader.id(),
10L)));
+ followerFetcher.start();
+ // fetcher will force append the second batch to follower
+ retry(
+ Duration.ofSeconds(20),
+ () ->
assertThat(followerReplica.getLocalLogEndOffset()).isEqualTo(20L));
+
+ // 5. mock new batch to leader with (writerId=101L, batchSequence=2
offset=20L)
+ future = new CompletableFuture<>();
+ leaderRM.appendRecordsToLog(
+ 1000,
+ 1,
+ Collections.singletonMap(
+ tb, genMemoryLogRecordsWithWriterId(DATA1, writerId,
2, 0)),
+ future::complete);
+ assertThat(future.get()).containsOnly(new
ProduceLogResultForBucket(tb, 20L, 30L));
+ // now fetcher will work well since the state of writerId=101 is
established
+ retry(
+ Duration.ofSeconds(20),
+ () ->
assertThat(followerReplica.getLocalLogEndOffset()).isEqualTo(30L));
+ }
+
private void registerTableInZkClient() throws Exception {
ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().cleanupRoot();
zkClient.registerTable(
@@ -319,6 +392,7 @@ public class ReplicaFetcherThreadTest {
private ReplicaManager createReplicaManager(int serverId) throws Exception
{
Configuration conf = new Configuration();
conf.setString(ConfigOptions.DATA_DIR, tempDir.getAbsolutePath() +
"/server-" + serverId);
+ conf.set(ConfigOptions.WRITER_ID_EXPIRATION_TIME,
Duration.ofHours(12));
Scheduler scheduler = new FlussScheduler(2);
scheduler.startup();
@@ -327,7 +401,7 @@ public class ReplicaFetcherThreadTest {
conf,
zkClient,
scheduler,
- SystemClock.getInstance(),
+ manualClock,
TestingMetricGroups.TABLET_SERVER_METRICS);
logManager.startup();
ReplicaManager replicaManager =
@@ -341,7 +415,7 @@ public class ReplicaFetcherThreadTest {
new TabletServerMetadataCache(new
MetadataManager(null, conf), null),
RpcClient.create(conf,
TestingClientMetricGroup.newInstance(), false),
TestingMetricGroups.TABLET_SERVER_METRICS,
- SystemClock.getInstance());
+ manualClock);
replicaManager.startup();
return replicaManager;
}