This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 72e4c4334 [log] Fix the recovery log failed situation because of 
OutOfOrderSequenceException cause by writerId expire (#1386)
72e4c4334 is described below

commit 72e4c43345ebca50db0e8b71aac4cfea2e029c0b
Author: yunhong <[email protected]>
AuthorDate: Tue Aug 12 19:47:50 2025 +0800

    [log] Fix the recovery log failed situation because of 
OutOfOrderSequenceException cause by writerId expire (#1386)
---
 .../com/alibaba/fluss/server/log/LogTablet.java    |  4 +-
 .../alibaba/fluss/server/log/WriterAppendInfo.java | 37 ++++++++++----
 .../fluss/server/log/WriterStateManager.java       |  4 ++
 .../alibaba/fluss/server/log/LogTabletTest.java    |  2 +-
 .../fluss/server/log/WriterStateManagerTest.java   | 56 ++++++++++++++++++----
 5 files changed, 82 insertions(+), 21 deletions(-)

diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java 
b/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java
index 490c02aa6..68231bc47 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java
@@ -1115,7 +1115,9 @@ public final class LogTablet {
         // update writers.
         WriterAppendInfo appendInfo =
                 writers.computeIfAbsent(writerId, id -> 
writerStateManager.prepareUpdate(writerId));
-        appendInfo.append(batch);
+        appendInfo.append(
+                batch,
+                
writerStateManager.isWriterInBatchExpired(System.currentTimeMillis(), batch));
     }
 
     static void rebuildWriterState(
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/log/WriterAppendInfo.java 
b/fluss-server/src/main/java/com/alibaba/fluss/server/log/WriterAppendInfo.java
index a27d5299a..ea755e6c8 100644
--- 
a/fluss-server/src/main/java/com/alibaba/fluss/server/log/WriterAppendInfo.java
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/log/WriterAppendInfo.java
@@ -21,6 +21,8 @@ import 
com.alibaba.fluss.exception.OutOfOrderSequenceException;
 import com.alibaba.fluss.metadata.TableBucket;
 import com.alibaba.fluss.record.LogRecordBatch;
 
+import static com.alibaba.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE;
+
 /**
  * This class is used to validate the records appended by a given writer 
before they are written to
  * log. It's initialized with writer's state after the last successful append.
@@ -42,35 +44,38 @@ public class WriterAppendInfo {
         return writerId;
     }
 
-    public void append(LogRecordBatch batch) {
+    public void append(LogRecordBatch batch, boolean isWriterInBatchExpired) {
         LogOffsetMetadata firstOffsetMetadata = new 
LogOffsetMetadata(batch.baseLogOffset());
         appendDataBatch(
                 batch.batchSequence(),
                 firstOffsetMetadata,
                 batch.lastLogOffset(),
-                System.currentTimeMillis()); // TODO, add timestamp to record 
batch.
+                isWriterInBatchExpired,
+                batch.commitTimestamp());
     }
 
     public void appendDataBatch(
             int batchSequence,
             LogOffsetMetadata firstOffsetMetadata,
             long lastOffset,
-            long lastTimestamp) {
-        maybeValidateDataBatch(batchSequence, lastOffset);
+            boolean isWriterInBatchExpired,
+            long batchTimestamp) {
+        maybeValidateDataBatch(batchSequence, isWriterInBatchExpired, 
lastOffset);
         updatedEntry.addBath(
                 batchSequence,
                 lastOffset,
                 (int) (lastOffset - firstOffsetMetadata.getMessageOffset()),
-                lastTimestamp);
+                batchTimestamp);
     }
 
-    private void maybeValidateDataBatch(int appendFirstSeq, long lastOffset) {
+    private void maybeValidateDataBatch(
+            int appendFirstSeq, boolean isWriterInBatchExpired, long 
lastOffset) {
         int currentLastSeq =
                 !updatedEntry.isEmpty()
                         ? updatedEntry.lastBatchSequence()
                         : currentEntry.lastBatchSequence();
         // must be in sequence, even for the first batch should start from 0
-        if (!inSequence(currentLastSeq, appendFirstSeq)) {
+        if (!inSequence(currentLastSeq, appendFirstSeq, 
isWriterInBatchExpired)) {
             throw new OutOfOrderSequenceException(
                     String.format(
                             "Out of order batch sequence for writer %s at 
offset %s in "
@@ -83,8 +88,22 @@ public class WriterAppendInfo {
         return updatedEntry;
     }
 
-    private boolean inSequence(int lastBatchSeq, int nextBatchSeq) {
-        return nextBatchSeq == lastBatchSeq + 1L
+    /**
+     * Check if the next batch sequence is in sequence with the last batch 
sequence. The following
+     * three scenarios will be judged as in sequence:
+     *
+     * <ul>
+     *   <li>If lastBatchSeq equals NO_BATCH_SEQUENCE, we need to check 
whether the committed
+     *       timestamp of the next batch under the current writerId has 
expired. If it has expired,
+     *       we consider this a special case caused by writerId expiration, 
for this case, to ensure
+     *       the correctness of follower sync, we still treat it as in 
sequence.
+     *   <li>nextBatchSeq == lastBatchSeq + 1L
+     *   <li>lastBatchSeq reaches its maximum value
+     * </ul>
+     */
+    private boolean inSequence(int lastBatchSeq, int nextBatchSeq, boolean 
isWriterInBatchExpired) {
+        return (lastBatchSeq == NO_BATCH_SEQUENCE && isWriterInBatchExpired)
+                || nextBatchSeq == lastBatchSeq + 1L
                 || (nextBatchSeq == 0 && lastBatchSeq == Integer.MAX_VALUE);
     }
 }
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/log/WriterStateManager.java
 
b/fluss-server/src/main/java/com/alibaba/fluss/server/log/WriterStateManager.java
index 2d0c2b921..6163d86eb 100644
--- 
a/fluss-server/src/main/java/com/alibaba/fluss/server/log/WriterStateManager.java
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/log/WriterStateManager.java
@@ -419,6 +419,10 @@ public class WriterStateManager {
         return currentTimeMs - writerStateEntry.lastBatchTimestamp() > 
writerExpirationMs;
     }
 
+    public boolean isWriterInBatchExpired(long currentTimeMs, LogRecordBatch 
recordBatch) {
+        return currentTimeMs - recordBatch.commitTimestamp() > 
writerExpirationMs;
+    }
+
     private static List<WriterStateEntry> readSnapshot(File file) {
         try {
             byte[] json = Files.readAllBytes(file.toPath());
diff --git 
a/fluss-server/src/test/java/com/alibaba/fluss/server/log/LogTabletTest.java 
b/fluss-server/src/test/java/com/alibaba/fluss/server/log/LogTabletTest.java
index 4da484579..5b69d5065 100644
--- a/fluss-server/src/test/java/com/alibaba/fluss/server/log/LogTabletTest.java
+++ b/fluss-server/src/test/java/com/alibaba/fluss/server/log/LogTabletTest.java
@@ -411,7 +411,7 @@ final class LogTabletTest extends LogTestBase {
 
     @Test
     void testPeriodicWriterIdExpiration() throws Exception {
-        conf.set(ConfigOptions.WRITER_ID_EXPIRATION_TIME, 
Duration.ofMillis(3000));
+        conf.set(ConfigOptions.WRITER_ID_EXPIRATION_TIME, 
Duration.ofMillis(1000));
         conf.set(ConfigOptions.WRITER_ID_EXPIRATION_CHECK_INTERVAL, 
Duration.ofMillis(1000));
         long writerId1 = 23L;
         LogTablet log = createLogTablet(conf);
diff --git 
a/fluss-server/src/test/java/com/alibaba/fluss/server/log/WriterStateManagerTest.java
 
b/fluss-server/src/test/java/com/alibaba/fluss/server/log/WriterStateManagerTest.java
index 6ebe62978..d02138fbe 100644
--- 
a/fluss-server/src/test/java/com/alibaba/fluss/server/log/WriterStateManagerTest.java
+++ 
b/fluss-server/src/test/java/com/alibaba/fluss/server/log/WriterStateManagerTest.java
@@ -21,6 +21,7 @@ import com.alibaba.fluss.config.ConfigOptions;
 import com.alibaba.fluss.config.Configuration;
 import com.alibaba.fluss.exception.OutOfOrderSequenceException;
 import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.utils.clock.ManualClock;
 import com.alibaba.fluss.utils.types.Tuple2;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -124,14 +125,15 @@ public class WriterStateManagerTest {
     @Test
     void testPrepareUpdateDoesNotMutate() {
         WriterAppendInfo appendInfo = stateManager.prepareUpdate(writerId);
-        appendInfo.appendDataBatch(0, new LogOffsetMetadata(15L), 20L, 
System.currentTimeMillis());
+        appendInfo.appendDataBatch(
+                0, new LogOffsetMetadata(15L), 20L, false, 
System.currentTimeMillis());
         assertThat(stateManager.lastEntry(writerId)).isNotPresent();
         stateManager.update(appendInfo);
         assertThat(stateManager.lastEntry(writerId)).isPresent();
 
         WriterAppendInfo nextAppendInfo = stateManager.prepareUpdate(writerId);
         nextAppendInfo.appendDataBatch(
-                1, new LogOffsetMetadata(26L), 30L, 
System.currentTimeMillis());
+                1, new LogOffsetMetadata(26L), 30L, false, 
System.currentTimeMillis());
         assertThat(stateManager.lastEntry(writerId)).isPresent();
 
         WriterStateEntry lastEntry = stateManager.lastEntry(writerId).get();
@@ -179,8 +181,8 @@ public class WriterStateManagerTest {
 
     @Test
     void testRemoveExpiredWritersOnReload() throws IOException {
-        append(stateManager, writerId, 0, 0L, 0);
-        append(stateManager, writerId, 1, 1L, 1);
+        append(stateManager, writerId, 0, 0L, false, 0);
+        append(stateManager, writerId, 1, 1L, false, 1);
 
         stateManager.takeSnapshot();
         WriterStateManager recoveredMapping =
@@ -194,14 +196,14 @@ public class WriterStateManagerTest {
         // the writer mapping. If writing with the same writerId and non-zero 
batch sequence, the
         // OutOfOrderSequenceException will throw. If you want to continue to 
write, you need to get
         // a new writer id.
-        assertThatThrownBy(() -> append(recoveredMapping, writerId, 2, 2L, 
70001))
+        assertThatThrownBy(() -> append(recoveredMapping, writerId, 2, 2L, 
false, 3000L))
                 .isInstanceOf(OutOfOrderSequenceException.class)
                 .hasMessageContaining(
                         "Out of order batch sequence for writer 1 at offset 2 
in "
                                 + "table-bucket TableBucket{tableId=1001, 
bucket=0}"
                                 + " : 2 (incoming batch seq.), -1 (current 
batch seq.)");
 
-        append(recoveredMapping, 2L, 0, 2L, 70002);
+        append(recoveredMapping, 2L, 0, 2L, false, 70002);
 
         assertThat(recoveredMapping.activeWriters().size()).isEqualTo(1);
         
assertThat(recoveredMapping.activeWriters().values().iterator().next().lastBatchSequence())
@@ -209,6 +211,34 @@ public class WriterStateManagerTest {
         assertThat(recoveredMapping.mapEndOffset()).isEqualTo(3L);
     }
 
+    @Test
+    void testAppendAnExpiredBatchWithEmptyWriterStatus() throws Exception {
+        ManualClock clock = new ManualClock(5000L);
+
+        // 2 seconds to expire the writer.
+        conf.set(ConfigOptions.WRITER_ID_EXPIRATION_TIME, 
Duration.ofSeconds(2));
+        WriterStateManager stateManager1 =
+                new WriterStateManager(
+                        tableBucket,
+                        logDir,
+                        (int) 
conf.get(ConfigOptions.WRITER_ID_EXPIRATION_TIME).toMillis());
+
+        // If we try to append an expired batch with none zero batch sequence, 
the
+        // OutOfOrderSequenceException will not been throw.
+        append(stateManager1, 1L, 10, 10L, true, clock.milliseconds());
+        assertThat(stateManager1.activeWriters().size()).isEqualTo(1);
+        
assertThat(stateManager1.activeWriters().values().iterator().next().lastBatchSequence())
+                .isEqualTo(10);
+
+        // If we try to append a none-expired batch with none zero batch 
sequence, the
+        // OutOfOrderSequenceException will throw.
+        assertThatThrownBy(() -> append(stateManager1, 2L, 10, 10L, false, 
1000L))
+                .isInstanceOf(OutOfOrderSequenceException.class)
+                .hasMessageContaining(
+                        "Out of order batch sequence for writer 2 at offset 10 
in table-bucket "
+                                + "TableBucket{tableId=1001, bucket=0} : 10 
(incoming batch seq.), -1 (current batch seq.)");
+    }
+
     @Test
     void testDeleteSnapshotsBefore() throws IOException {
         append(stateManager, writerId, 0, 0L);
@@ -322,7 +352,7 @@ public class WriterStateManagerTest {
 
     @Test
     void testSkipSnapshotIfOffsetUnchanged() throws IOException {
-        append(stateManager, writerId, 0, 0L, 0L);
+        append(stateManager, writerId, 0, 0L, false, 0L);
 
         stateManager.takeSnapshot();
         
assertThat(Objects.requireNonNull(logDir.listFiles()).length).isEqualTo(1);
@@ -475,7 +505,7 @@ public class WriterStateManagerTest {
 
     private void append(
             WriterStateManager stateManager, long writerId, int batchSequence, 
long offset) {
-        append(stateManager, writerId, batchSequence, offset, 
System.currentTimeMillis());
+        append(stateManager, writerId, batchSequence, offset, false, 
System.currentTimeMillis());
     }
 
     private void append(
@@ -483,9 +513,15 @@ public class WriterStateManagerTest {
             long writerId,
             int batchSequence,
             long offset,
-            long timestamp) {
+            boolean isWriterInBatchExpired,
+            long lastTimestamp) {
         WriterAppendInfo appendInfo = stateManager.prepareUpdate(writerId);
-        appendInfo.appendDataBatch(batchSequence, new 
LogOffsetMetadata(offset), offset, timestamp);
+        appendInfo.appendDataBatch(
+                batchSequence,
+                new LogOffsetMetadata(offset),
+                offset,
+                isWriterInBatchExpired,
+                lastTimestamp);
         stateManager.update(appendInfo);
         stateManager.updateMapEndOffset(offset + 1);
     }

Reply via email to