This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 72e4c4334 [log] Fix the recovery log failed situation because of
OutOfOrderSequenceException cause by writerId expire (#1386)
72e4c4334 is described below
commit 72e4c43345ebca50db0e8b71aac4cfea2e029c0b
Author: yunhong <[email protected]>
AuthorDate: Tue Aug 12 19:47:50 2025 +0800
[log] Fix the recovery log failed situation because of
OutOfOrderSequenceException cause by writerId expire (#1386)
---
.../com/alibaba/fluss/server/log/LogTablet.java | 4 +-
.../alibaba/fluss/server/log/WriterAppendInfo.java | 37 ++++++++++----
.../fluss/server/log/WriterStateManager.java | 4 ++
.../alibaba/fluss/server/log/LogTabletTest.java | 2 +-
.../fluss/server/log/WriterStateManagerTest.java | 56 ++++++++++++++++++----
5 files changed, 82 insertions(+), 21 deletions(-)
diff --git
a/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java
b/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java
index 490c02aa6..68231bc47 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java
@@ -1115,7 +1115,9 @@ public final class LogTablet {
// update writers.
WriterAppendInfo appendInfo =
writers.computeIfAbsent(writerId, id ->
writerStateManager.prepareUpdate(writerId));
- appendInfo.append(batch);
+ appendInfo.append(
+ batch,
+
writerStateManager.isWriterInBatchExpired(System.currentTimeMillis(), batch));
}
static void rebuildWriterState(
diff --git
a/fluss-server/src/main/java/com/alibaba/fluss/server/log/WriterAppendInfo.java
b/fluss-server/src/main/java/com/alibaba/fluss/server/log/WriterAppendInfo.java
index a27d5299a..ea755e6c8 100644
---
a/fluss-server/src/main/java/com/alibaba/fluss/server/log/WriterAppendInfo.java
+++
b/fluss-server/src/main/java/com/alibaba/fluss/server/log/WriterAppendInfo.java
@@ -21,6 +21,8 @@ import
com.alibaba.fluss.exception.OutOfOrderSequenceException;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.record.LogRecordBatch;
+import static com.alibaba.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE;
+
/**
* This class is used to validate the records appended by a given writer
before they are written to
* log. It's initialized with writer's state after the last successful append.
@@ -42,35 +44,38 @@ public class WriterAppendInfo {
return writerId;
}
- public void append(LogRecordBatch batch) {
+ public void append(LogRecordBatch batch, boolean isWriterInBatchExpired) {
LogOffsetMetadata firstOffsetMetadata = new
LogOffsetMetadata(batch.baseLogOffset());
appendDataBatch(
batch.batchSequence(),
firstOffsetMetadata,
batch.lastLogOffset(),
- System.currentTimeMillis()); // TODO, add timestamp to record
batch.
+ isWriterInBatchExpired,
+ batch.commitTimestamp());
}
public void appendDataBatch(
int batchSequence,
LogOffsetMetadata firstOffsetMetadata,
long lastOffset,
- long lastTimestamp) {
- maybeValidateDataBatch(batchSequence, lastOffset);
+ boolean isWriterInBatchExpired,
+ long batchTimestamp) {
+ maybeValidateDataBatch(batchSequence, isWriterInBatchExpired,
lastOffset);
updatedEntry.addBath(
batchSequence,
lastOffset,
(int) (lastOffset - firstOffsetMetadata.getMessageOffset()),
- lastTimestamp);
+ batchTimestamp);
}
- private void maybeValidateDataBatch(int appendFirstSeq, long lastOffset) {
+ private void maybeValidateDataBatch(
+ int appendFirstSeq, boolean isWriterInBatchExpired, long
lastOffset) {
int currentLastSeq =
!updatedEntry.isEmpty()
? updatedEntry.lastBatchSequence()
: currentEntry.lastBatchSequence();
// must be in sequence, even for the first batch should start from 0
- if (!inSequence(currentLastSeq, appendFirstSeq)) {
+ if (!inSequence(currentLastSeq, appendFirstSeq,
isWriterInBatchExpired)) {
throw new OutOfOrderSequenceException(
String.format(
"Out of order batch sequence for writer %s at
offset %s in "
@@ -83,8 +88,22 @@ public class WriterAppendInfo {
return updatedEntry;
}
- private boolean inSequence(int lastBatchSeq, int nextBatchSeq) {
- return nextBatchSeq == lastBatchSeq + 1L
+ /**
+ * Check if the next batch sequence is in sequence with the last batch
sequence. The following
+ * three scenarios will be judged as in sequence:
+ *
+ * <ul>
+ * <li>If lastBatchSeq equals NO_BATCH_SEQUENCE, we need to check
whether the committed
+ * timestamp of the next batch under the current writerId has
expired. If it has expired,
+ * we consider this a special case caused by writerId expiration,
for this case, to ensure
+ * the correctness of follower sync, we still treat it as in
sequence.
+ * <li>nextBatchSeq == lastBatchSeq + 1L
+ * <li>lastBatchSeq reaches its maximum value
+ * </ul>
+ */
+ private boolean inSequence(int lastBatchSeq, int nextBatchSeq, boolean
isWriterInBatchExpired) {
+ return (lastBatchSeq == NO_BATCH_SEQUENCE && isWriterInBatchExpired)
+ || nextBatchSeq == lastBatchSeq + 1L
|| (nextBatchSeq == 0 && lastBatchSeq == Integer.MAX_VALUE);
}
}
diff --git
a/fluss-server/src/main/java/com/alibaba/fluss/server/log/WriterStateManager.java
b/fluss-server/src/main/java/com/alibaba/fluss/server/log/WriterStateManager.java
index 2d0c2b921..6163d86eb 100644
---
a/fluss-server/src/main/java/com/alibaba/fluss/server/log/WriterStateManager.java
+++
b/fluss-server/src/main/java/com/alibaba/fluss/server/log/WriterStateManager.java
@@ -419,6 +419,10 @@ public class WriterStateManager {
return currentTimeMs - writerStateEntry.lastBatchTimestamp() >
writerExpirationMs;
}
+ public boolean isWriterInBatchExpired(long currentTimeMs, LogRecordBatch
recordBatch) {
+ return currentTimeMs - recordBatch.commitTimestamp() >
writerExpirationMs;
+ }
+
private static List<WriterStateEntry> readSnapshot(File file) {
try {
byte[] json = Files.readAllBytes(file.toPath());
diff --git
a/fluss-server/src/test/java/com/alibaba/fluss/server/log/LogTabletTest.java
b/fluss-server/src/test/java/com/alibaba/fluss/server/log/LogTabletTest.java
index 4da484579..5b69d5065 100644
--- a/fluss-server/src/test/java/com/alibaba/fluss/server/log/LogTabletTest.java
+++ b/fluss-server/src/test/java/com/alibaba/fluss/server/log/LogTabletTest.java
@@ -411,7 +411,7 @@ final class LogTabletTest extends LogTestBase {
@Test
void testPeriodicWriterIdExpiration() throws Exception {
- conf.set(ConfigOptions.WRITER_ID_EXPIRATION_TIME,
Duration.ofMillis(3000));
+ conf.set(ConfigOptions.WRITER_ID_EXPIRATION_TIME,
Duration.ofMillis(1000));
conf.set(ConfigOptions.WRITER_ID_EXPIRATION_CHECK_INTERVAL,
Duration.ofMillis(1000));
long writerId1 = 23L;
LogTablet log = createLogTablet(conf);
diff --git
a/fluss-server/src/test/java/com/alibaba/fluss/server/log/WriterStateManagerTest.java
b/fluss-server/src/test/java/com/alibaba/fluss/server/log/WriterStateManagerTest.java
index 6ebe62978..d02138fbe 100644
---
a/fluss-server/src/test/java/com/alibaba/fluss/server/log/WriterStateManagerTest.java
+++
b/fluss-server/src/test/java/com/alibaba/fluss/server/log/WriterStateManagerTest.java
@@ -21,6 +21,7 @@ import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.exception.OutOfOrderSequenceException;
import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.utils.clock.ManualClock;
import com.alibaba.fluss.utils.types.Tuple2;
import org.junit.jupiter.api.BeforeEach;
@@ -124,14 +125,15 @@ public class WriterStateManagerTest {
@Test
void testPrepareUpdateDoesNotMutate() {
WriterAppendInfo appendInfo = stateManager.prepareUpdate(writerId);
- appendInfo.appendDataBatch(0, new LogOffsetMetadata(15L), 20L,
System.currentTimeMillis());
+ appendInfo.appendDataBatch(
+ 0, new LogOffsetMetadata(15L), 20L, false,
System.currentTimeMillis());
assertThat(stateManager.lastEntry(writerId)).isNotPresent();
stateManager.update(appendInfo);
assertThat(stateManager.lastEntry(writerId)).isPresent();
WriterAppendInfo nextAppendInfo = stateManager.prepareUpdate(writerId);
nextAppendInfo.appendDataBatch(
- 1, new LogOffsetMetadata(26L), 30L,
System.currentTimeMillis());
+ 1, new LogOffsetMetadata(26L), 30L, false,
System.currentTimeMillis());
assertThat(stateManager.lastEntry(writerId)).isPresent();
WriterStateEntry lastEntry = stateManager.lastEntry(writerId).get();
@@ -179,8 +181,8 @@ public class WriterStateManagerTest {
@Test
void testRemoveExpiredWritersOnReload() throws IOException {
- append(stateManager, writerId, 0, 0L, 0);
- append(stateManager, writerId, 1, 1L, 1);
+ append(stateManager, writerId, 0, 0L, false, 0);
+ append(stateManager, writerId, 1, 1L, false, 1);
stateManager.takeSnapshot();
WriterStateManager recoveredMapping =
@@ -194,14 +196,14 @@ public class WriterStateManagerTest {
// the writer mapping. If writing with the same writerId and non-zero
batch sequence, the
// OutOfOrderSequenceException will throw. If you want to continue to
write, you need to get
// a new writer id.
- assertThatThrownBy(() -> append(recoveredMapping, writerId, 2, 2L,
70001))
+ assertThatThrownBy(() -> append(recoveredMapping, writerId, 2, 2L,
false, 3000L))
.isInstanceOf(OutOfOrderSequenceException.class)
.hasMessageContaining(
"Out of order batch sequence for writer 1 at offset 2
in "
+ "table-bucket TableBucket{tableId=1001,
bucket=0}"
+ " : 2 (incoming batch seq.), -1 (current
batch seq.)");
- append(recoveredMapping, 2L, 0, 2L, 70002);
+ append(recoveredMapping, 2L, 0, 2L, false, 70002);
assertThat(recoveredMapping.activeWriters().size()).isEqualTo(1);
assertThat(recoveredMapping.activeWriters().values().iterator().next().lastBatchSequence())
@@ -209,6 +211,34 @@ public class WriterStateManagerTest {
assertThat(recoveredMapping.mapEndOffset()).isEqualTo(3L);
}
+ @Test
+ void testAppendAnExpiredBatchWithEmptyWriterStatus() throws Exception {
+ ManualClock clock = new ManualClock(5000L);
+
+ // 2 seconds to expire the writer.
+ conf.set(ConfigOptions.WRITER_ID_EXPIRATION_TIME,
Duration.ofSeconds(2));
+ WriterStateManager stateManager1 =
+ new WriterStateManager(
+ tableBucket,
+ logDir,
+ (int)
conf.get(ConfigOptions.WRITER_ID_EXPIRATION_TIME).toMillis());
+
+ // If we try to append an expired batch with none zero batch sequence,
the
+ // OutOfOrderSequenceException will not been throw.
+ append(stateManager1, 1L, 10, 10L, true, clock.milliseconds());
+ assertThat(stateManager1.activeWriters().size()).isEqualTo(1);
+
assertThat(stateManager1.activeWriters().values().iterator().next().lastBatchSequence())
+ .isEqualTo(10);
+
+ // If we try to append a none-expired batch with none zero batch
sequence, the
+ // OutOfOrderSequenceException will throw.
+ assertThatThrownBy(() -> append(stateManager1, 2L, 10, 10L, false,
1000L))
+ .isInstanceOf(OutOfOrderSequenceException.class)
+ .hasMessageContaining(
+ "Out of order batch sequence for writer 2 at offset 10
in table-bucket "
+ + "TableBucket{tableId=1001, bucket=0} : 10
(incoming batch seq.), -1 (current batch seq.)");
+ }
+
@Test
void testDeleteSnapshotsBefore() throws IOException {
append(stateManager, writerId, 0, 0L);
@@ -322,7 +352,7 @@ public class WriterStateManagerTest {
@Test
void testSkipSnapshotIfOffsetUnchanged() throws IOException {
- append(stateManager, writerId, 0, 0L, 0L);
+ append(stateManager, writerId, 0, 0L, false, 0L);
stateManager.takeSnapshot();
assertThat(Objects.requireNonNull(logDir.listFiles()).length).isEqualTo(1);
@@ -475,7 +505,7 @@ public class WriterStateManagerTest {
private void append(
WriterStateManager stateManager, long writerId, int batchSequence,
long offset) {
- append(stateManager, writerId, batchSequence, offset,
System.currentTimeMillis());
+ append(stateManager, writerId, batchSequence, offset, false,
System.currentTimeMillis());
}
private void append(
@@ -483,9 +513,15 @@ public class WriterStateManagerTest {
long writerId,
int batchSequence,
long offset,
- long timestamp) {
+ boolean isWriterInBatchExpired,
+ long lastTimestamp) {
WriterAppendInfo appendInfo = stateManager.prepareUpdate(writerId);
- appendInfo.appendDataBatch(batchSequence, new
LogOffsetMetadata(offset), offset, timestamp);
+ appendInfo.appendDataBatch(
+ batchSequence,
+ new LogOffsetMetadata(offset),
+ offset,
+ isWriterInBatchExpired,
+ lastTimestamp);
stateManager.update(appendInfo);
stateManager.updateMapEndOffset(offset + 1);
}