This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new a25c64bdd [log] Bump LogRecordBatch's CURRENT_LOG_MAGIC_VALUE to V1 to 
support leaderEpoch (#778)
a25c64bdd is described below

commit a25c64bddf3e331977661be1ed7fb286cfd31bbc
Author: yunhong <[email protected]>
AuthorDate: Wed Sep 10 13:50:34 2025 +0800

    [log] Bump LogRecordBatch's CURRENT_LOG_MAGIC_VALUE to V1 to support 
leaderEpoch (#778)
    
    Co-authored-by: Jark Wu <[email protected]>
---
 .../fluss/client/write/IdempotenceBucketEntry.java |   2 +-
 .../fluss/client/write/IdempotenceManager.java     |   8 +-
 .../fluss/client/write/RecordAccumulator.java      |   8 +-
 .../org/apache/fluss/client/write/WriteBatch.java  |   4 +-
 .../org/apache/fluss/client/write/WriteRecord.java |   5 +-
 .../scanner/log/DefaultCompletedFetchTest.java     |  50 ++--
 .../client/write/IndexedLogWriteBatchTest.java     |   5 +-
 .../fluss/client/write/RecordAccumulatorTest.java  |   8 +-
 .../org/apache/fluss/client/write/SenderTest.java  |   4 +-
 .../apache/fluss/record/DefaultLogRecordBatch.java | 150 +++++------
 .../apache/fluss/record/FileLogInputStream.java    |  22 +-
 .../org/apache/fluss/record/FileLogProjection.java |  89 +++++--
 .../org/apache/fluss/record/IndexedLogRecord.java  |   2 +-
 .../apache/fluss/record/KvRecordBatchBuilder.java  |   6 +-
 .../org/apache/fluss/record/LogRecordBatch.java    |  21 +-
 .../apache/fluss/record/LogRecordBatchFormat.java  | 292 +++++++++++++++++++++
 .../org/apache/fluss/record/MemoryLogRecords.java  |   9 +-
 .../fluss/record/MemoryLogRecordsArrowBuilder.java |  44 ++--
 .../record/MemoryLogRecordsIndexedBuilder.java     |  34 ++-
 .../fluss/record/MemorySegmentLogInputStream.java  |   8 +-
 .../fluss/record/DefaultLogRecordBatchTest.java    |  27 +-
 .../fluss/record/FileLogInputStreamTest.java       |  26 +-
 .../apache/fluss/record/FileLogProjectionTest.java | 129 ++++++++-
 .../org/apache/fluss/record/KvRecordTestUtils.java |   4 +-
 .../fluss/record/LogRecordBatchFormatTest.java     |  81 ++++++
 .../record/MemoryLogRecordsArrowBuilderTest.java   |  69 +++--
 .../record/MemorySegmentLogInputStreamTest.java    |  13 +-
 .../java/org/apache/fluss/record/TestData.java     |   4 +-
 .../fluss/row/arrow/ArrowReaderWriterTest.java     |   9 +-
 .../org/apache/fluss/testutils/DataTestUtils.java  |  36 ++-
 .../org/apache/fluss/server/log/LogSegment.java    |   5 +-
 .../apache/fluss/server/log/WriterAppendInfo.java  |   2 +-
 .../apache/fluss/server/log/WriterStateEntry.java  |   2 +-
 .../fluss/server/log/WriterStateManager.java       |   3 +-
 .../org/apache/fluss/server/kv/KvTabletTest.java   |   6 +-
 .../apache/fluss/server/replica/ReplicaTest.java   |   6 +-
 .../fluss/server/tablet/TabletServiceITCase.java   |  30 ++-
 37 files changed, 937 insertions(+), 286 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceBucketEntry.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceBucketEntry.java
index f48377aa5..c2426b20e 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceBucketEntry.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceBucketEntry.java
@@ -29,7 +29,7 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.function.Consumer;
 
-import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
 
 /** Entry to store the idempotence information of each table-bucket. */
 @Internal
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java
index 7b5605263..5593c1999 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java
@@ -25,7 +25,6 @@ import org.apache.fluss.exception.UnknownWriterIdException;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TablePath;
-import org.apache.fluss.record.LogRecordBatch;
 import org.apache.fluss.rpc.gateway.TabletServerGateway;
 import org.apache.fluss.rpc.messages.InitWriterRequest;
 import org.apache.fluss.rpc.protocol.Errors;
@@ -40,7 +39,8 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
 
 /* This file is based on source code of Apache Kafka Project 
(https://kafka.apache.org/), licensed by the Apache
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
@@ -182,10 +182,10 @@ public class IdempotenceManager {
      */
     synchronized int firstInFlightBatchSequence(TableBucket tableBucket) {
         if (!hasInflightBatches(tableBucket)) {
-            return LogRecordBatch.NO_BATCH_SEQUENCE;
+            return NO_BATCH_SEQUENCE;
         }
         WriteBatch batch = nextBatchBySequence(tableBucket);
-        return batch == null ? LogRecordBatch.NO_BATCH_SEQUENCE : 
batch.batchSequence();
+        return batch == null ? NO_BATCH_SEQUENCE : batch.batchSequence();
     }
 
     synchronized void handleCompletedBatch(ReadyWriteBatch readyWriteBatch) {
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
index 6164d6ecf..2cfd74da3 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
@@ -32,7 +32,6 @@ import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metrics.MetricNames;
-import org.apache.fluss.record.LogRecordBatch;
 import org.apache.fluss.row.arrow.ArrowWriter;
 import org.apache.fluss.row.arrow.ArrowWriterPool;
 import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
@@ -61,7 +60,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
 import static org.apache.fluss.utils.Preconditions.checkNotNull;
 
 /* This file is based on source code of Apache Kafka Project 
(https://kafka.apache.org/), licensed by the Apache
@@ -757,7 +757,7 @@ public final class RecordAccumulator {
             // flight request count to 1.
             int firstInFlightSequence = 
idempotenceManager.firstInFlightBatchSequence(tableBucket);
             boolean isFirstInFlightBatch =
-                    firstInFlightSequence == LogRecordBatch.NO_BATCH_SEQUENCE
+                    firstInFlightSequence == NO_BATCH_SEQUENCE
                             || (first.hasBatchSequence()
                                     && first.batchSequence() == 
firstInFlightSequence);
 
@@ -824,7 +824,7 @@ public final class RecordAccumulator {
             Deque<WriteBatch> deque, WriteBatch batch, TableBucket 
tableBucket) {
         // When we are re-enqueue and have enabled idempotence, the 
re-enqueued batch must always
         // have a batch sequence.
-        if (batch.batchSequence() == LogRecordBatch.NO_BATCH_SEQUENCE) {
+        if (batch.batchSequence() == NO_BATCH_SEQUENCE) {
             throw new IllegalStateException(
                     "Trying to re-enqueue a batch which doesn't have a 
sequence even "
                             + "though idempotence is enabled.");
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java 
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java
index fe8950845..49766612c 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java
@@ -21,7 +21,6 @@ import org.apache.fluss.annotation.Internal;
 import org.apache.fluss.memory.MemorySegment;
 import org.apache.fluss.memory.MemorySegmentPool;
 import org.apache.fluss.metadata.PhysicalTablePath;
-import org.apache.fluss.record.LogRecordBatch;
 import org.apache.fluss.record.bytesview.BytesView;
 
 import org.slf4j.Logger;
@@ -35,6 +34,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
 import static org.apache.fluss.utils.Preconditions.checkNotNull;
 
 /** The abstract write batch contains write callback object to wait write 
request feedback. */
@@ -113,7 +113,7 @@ public abstract class WriteBatch {
     public abstract void abortRecordAppends();
 
     public boolean hasBatchSequence() {
-        return batchSequence() != LogRecordBatch.NO_BATCH_SEQUENCE;
+        return batchSequence() != NO_BATCH_SEQUENCE;
     }
 
     public void resetWriterState(long writerId, int batchSequence) {
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java 
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
index b2b73778c..0c1427e6f 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
@@ -21,7 +21,6 @@ import org.apache.fluss.annotation.Internal;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.record.DefaultKvRecord;
 import org.apache.fluss.record.DefaultKvRecordBatch;
-import org.apache.fluss.record.DefaultLogRecordBatch;
 import org.apache.fluss.record.IndexedLogRecord;
 import org.apache.fluss.row.BinaryRow;
 import org.apache.fluss.row.InternalRow;
@@ -29,6 +28,8 @@ import org.apache.fluss.row.indexed.IndexedRow;
 
 import javax.annotation.Nullable;
 
+import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
+import static 
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
 import static org.apache.fluss.utils.Preconditions.checkNotNull;
 
 /**
@@ -85,7 +86,7 @@ public final class WriteRecord {
             PhysicalTablePath tablePath, IndexedRow row, @Nullable byte[] 
bucketKey) {
         checkNotNull(row);
         int estimatedSizeInBytes =
-                IndexedLogRecord.sizeOf(row) + 
DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
+                IndexedLogRecord.sizeOf(row) + 
recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE);
         return new WriteRecord(
                 tablePath,
                 null,
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
index 7a3d8845a..3502dcc3c 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
@@ -39,16 +39,22 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V0;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
 import static org.apache.fluss.record.TestData.DATA2;
 import static org.apache.fluss.record.TestData.DATA2_ROW_TYPE;
 import static org.apache.fluss.record.TestData.DATA2_TABLE_ID;
@@ -78,14 +84,15 @@ public class DefaultCompletedFetchTest {
         logScannerStatus.assignScanBuckets(scanBuckets);
     }
 
-    @Test
-    void testSimple() throws Exception {
+    @ParameterizedTest
+    @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
+    void testSimple(byte recordBatchMagic) throws Exception {
         long fetchOffset = 0L;
         int bucketId = 0; // records for 0-10.
         TableBucket tb = new TableBucket(DATA2_TABLE_ID, bucketId);
         FetchLogResultForBucket resultForBucket0 =
                 new FetchLogResultForBucket(
-                        tb, createMemoryLogRecords(DATA2, LogFormat.ARROW), 
10L);
+                        tb, createMemoryLogRecords(DATA2, LogFormat.ARROW, 
recordBatchMagic), 10L);
         DefaultCompletedFetch defaultCompletedFetch =
                 makeCompletedFetch(tb, resultForBucket0, fetchOffset);
         List<ScanRecord> scanRecords = defaultCompletedFetch.fetchRecords(8);
@@ -100,14 +107,15 @@ public class DefaultCompletedFetchTest {
         assertThat(scanRecords.size()).isEqualTo(0);
     }
 
-    @Test
-    void testNegativeFetchCount() throws Exception {
+    @ParameterizedTest
+    @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
+    void testNegativeFetchCount(byte recordBatchMagic) throws Exception {
         long fetchOffset = 0L;
         int bucketId = 0; // records for 0-10.
         TableBucket tb = new TableBucket(DATA2_TABLE_ID, bucketId);
         FetchLogResultForBucket resultForBucket0 =
                 new FetchLogResultForBucket(
-                        tb, createMemoryLogRecords(DATA2, LogFormat.ARROW), 
10L);
+                        tb, createMemoryLogRecords(DATA2, LogFormat.ARROW, 
recordBatchMagic), 10L);
         DefaultCompletedFetch defaultCompletedFetch =
                 makeCompletedFetch(tb, resultForBucket0, fetchOffset);
         List<ScanRecord> scanRecords = defaultCompletedFetch.fetchRecords(-10);
@@ -128,9 +136,8 @@ public class DefaultCompletedFetchTest {
     }
 
     @ParameterizedTest
-    @ValueSource(strings = {"INDEXED", "ARROW"})
-    void testProjection(String format) throws Exception {
-        LogFormat logFormat = LogFormat.fromString(format);
+    @MethodSource("typeAndMagic")
+    void testProjection(LogFormat logFormat, byte magic) throws Exception {
         Schema schema =
                 Schema.newBuilder()
                         .column("a", DataTypes.INT())
@@ -158,9 +165,9 @@ public class DefaultCompletedFetchTest {
         Projection projection = Projection.of(new int[] {0, 2});
         MemoryLogRecords memoryLogRecords;
         if (logFormat == LogFormat.ARROW) {
-            memoryLogRecords = genRecordsWithProjection(DATA2, projection);
+            memoryLogRecords = genRecordsWithProjection(DATA2, projection, 
magic);
         } else {
-            memoryLogRecords = createMemoryLogRecords(DATA2, 
LogFormat.INDEXED);
+            memoryLogRecords = createMemoryLogRecords(DATA2, 
LogFormat.INDEXED, magic);
         }
         FetchLogResultForBucket resultForBucket0 =
                 new FetchLogResultForBucket(tb, memoryLogRecords, 10L);
@@ -224,19 +231,28 @@ public class DefaultCompletedFetchTest {
                 offset);
     }
 
-    private MemoryLogRecords createMemoryLogRecords(List<Object[]> objects, 
LogFormat logFormat)
-            throws Exception {
+    private static Collection<Arguments> typeAndMagic() {
+        List<Arguments> params = new ArrayList<>();
+        params.add(Arguments.arguments(LogFormat.ARROW, LOG_MAGIC_VALUE_V1));
+        params.add(Arguments.arguments(LogFormat.INDEXED, LOG_MAGIC_VALUE_V1));
+        params.add(Arguments.arguments(LogFormat.ARROW, LOG_MAGIC_VALUE_V0));
+        params.add(Arguments.arguments(LogFormat.INDEXED, LOG_MAGIC_VALUE_V0));
+        return params;
+    }
+
+    private MemoryLogRecords createMemoryLogRecords(
+            List<Object[]> objects, LogFormat logFormat, byte magic) throws 
Exception {
         return createRecordsWithoutBaseLogOffset(
-                rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, objects, logFormat);
+                rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, magic, objects, 
logFormat);
     }
 
-    private MemoryLogRecords genRecordsWithProjection(List<Object[]> objects, 
Projection projection)
-            throws Exception {
+    private MemoryLogRecords genRecordsWithProjection(
+            List<Object[]> objects, Projection projection, byte magic) throws 
Exception {
         File logFile = FlussPaths.logFile(tempDir, 0L);
         FileLogRecords fileLogRecords = FileLogRecords.open(logFile, false, 
1024 * 1024, false);
         fileLogRecords.append(
                 createRecordsWithoutBaseLogOffset(
-                        rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, objects, 
LogFormat.ARROW));
+                        rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, magic, objects, 
LogFormat.ARROW));
         fileLogRecords.flush();
 
         FileLogProjection fileLogProjection = new FileLogProjection();
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
index 53bf730e1..07544e8da 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
@@ -21,7 +21,6 @@ import org.apache.fluss.memory.MemorySegment;
 import org.apache.fluss.memory.PreAllocatedPagedOutputView;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.record.ChangeType;
-import org.apache.fluss.record.DefaultLogRecordBatch;
 import org.apache.fluss.record.IndexedLogRecord;
 import org.apache.fluss.record.LogRecord;
 import org.apache.fluss.record.LogRecordBatch;
@@ -40,6 +39,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
+import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
+import static 
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
 import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
 import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
@@ -72,7 +73,7 @@ public class IndexedLogWriteBatchTest {
 
         for (int i = 0;
                 i
-                        < (writeLimit - 
DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE)
+                        < (writeLimit - 
recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE))
                                 / estimatedSizeInBytes;
                 i++) {
             boolean appendResult =
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
index 444b93a5b..e5d71a927 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
@@ -66,7 +66,8 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import static 
org.apache.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
+import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
+import static 
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
 import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
 import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
 import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
@@ -614,7 +615,7 @@ class RecordAccumulatorTest {
 
     /** Return the offset delta. */
     private int expectedNumAppends(IndexedRow row, int batchSize) {
-        int size = RECORD_BATCH_HEADER_SIZE;
+        int size = recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE);
         int offsetDelta = 0;
         while (true) {
             int recordSize = IndexedLogRecord.sizeOf(row);
@@ -652,7 +653,8 @@ class RecordAccumulatorTest {
     }
 
     private long getTestBatchSize(BinaryRow row) {
-        return RECORD_BATCH_HEADER_SIZE + DefaultKvRecord.sizeOf(new byte[4], 
row);
+        return recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE)
+                + DefaultKvRecord.sizeOf(new byte[4], row);
     }
 
     private int getBatchNumInAccum(RecordAccumulator accum) {
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java 
b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
index cc0b0b21d..7209a315d 100644
--- a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
+++ b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
@@ -26,7 +26,6 @@ import org.apache.fluss.config.Configuration;
 import org.apache.fluss.config.MemorySize;
 import org.apache.fluss.exception.TimeoutException;
 import org.apache.fluss.metadata.TableBucket;
-import org.apache.fluss.record.LogRecordBatch;
 import org.apache.fluss.record.MemoryLogRecords;
 import org.apache.fluss.row.GenericRow;
 import org.apache.fluss.rpc.entity.ProduceLogResultForBucket;
@@ -51,6 +50,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
 import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO;
@@ -797,7 +797,7 @@ final class SenderTest {
 
     private static boolean hasIdempotentRecords(TableBucket tb, 
ProduceLogRequest request) {
         MemoryLogRecords memoryLogRecords = getProduceLogData(request).get(tb);
-        return memoryLogRecords.batchIterator().next().writerId() != 
LogRecordBatch.NO_WRITER_ID;
+        return memoryLogRecords.batchIterator().next().writerId() != 
NO_WRITER_ID;
     }
 
     private static void assertBatchSequenceEquals(
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java 
b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
index c2c7b5fb1..eaacc48a6 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
@@ -19,7 +19,6 @@ package org.apache.fluss.record;
 
 import org.apache.fluss.annotation.PublicEvolving;
 import org.apache.fluss.exception.CorruptMessageException;
-import org.apache.fluss.exception.OutOfOrderSequenceException;
 import org.apache.fluss.memory.MemorySegment;
 import org.apache.fluss.metadata.LogFormat;
 import org.apache.fluss.row.arrow.ArrowReader;
@@ -35,95 +34,54 @@ import org.apache.fluss.utils.crc.Crc32C;
 import java.nio.ByteBuffer;
 import java.util.NoSuchElementException;
 
+import static org.apache.fluss.record.LogRecordBatchFormat.BASE_OFFSET_OFFSET;
+import static 
org.apache.fluss.record.LogRecordBatchFormat.COMMIT_TIMESTAMP_OFFSET;
+import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_OFFSET;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_OVERHEAD;
+import static org.apache.fluss.record.LogRecordBatchFormat.MAGIC_OFFSET;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_LEADER_EPOCH;
+import static 
org.apache.fluss.record.LogRecordBatchFormat.arrowChangeTypeOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.attributeOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.batchSequenceOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.crcOffset;
+import static 
org.apache.fluss.record.LogRecordBatchFormat.lastOffsetDeltaOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.leaderEpochOffset;
+import static 
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
+import static org.apache.fluss.record.LogRecordBatchFormat.recordsCountOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.schemaIdOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.writeClientIdOffset;
+
 /* This file is based on source code of Apache Kafka Project 
(https://kafka.apache.org/), licensed by the Apache
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
  * additional information regarding copyright ownership. */
 
 /**
- * LogRecordBatch implementation for magic 0 and above. The schema of {@link 
LogRecordBatch} is
- * given below:
+ * LogRecordBatch implementation for different magic version.
+ *
+ * <p>To learn more about the recordBatch format, see {@link 
LogRecordBatchFormat}. Supported
+ * recordBatch format:
  *
  * <ul>
- *   RecordBatch =>
- *   <li>BaseOffset => Int64
- *   <li>Length => Int32
- *   <li>Magic => Int8
- *   <li>CommitTimestamp => Int64
- *   <li>CRC => Uint32
- *   <li>SchemaId => Int16
- *   <li>Attributes => Int8
- *   <li>LastOffsetDelta => Int32
- *   <li>WriterID => Int64
- *   <li>SequenceID => Int32
- *   <li>RecordCount => Int32
- *   <li>Records => [Record]
+ *   <li>V0 => {@link LogRecordBatchFormat#LOG_MAGIC_VALUE_V0}
+ *   <li>V1 => {@link LogRecordBatchFormat#LOG_MAGIC_VALUE_V1}
  * </ul>
  *
- * <p>The CRC covers the data from the schemaId to the end of the batch (i.e. 
all the bytes that
- * follow the CRC). It is located after the magic byte, which means that 
clients must parse the
- * magic byte before deciding how to interpret the bytes between the batch 
length and the magic
- * byte. The CRC-32C (Castagnoli) polynomial is used for the computation. 
CommitTimestamp is also
- * located before the CRC, because it is determined in server side.
- *
- * <p>The field 'lastOffsetDelta is used to calculate the lastOffset of the 
current batch as:
- * [lastOffset = baseOffset + LastOffsetDelta] instead of [lastOffset = 
baseOffset + recordCount -
- * 1]. The reason for introducing this field is that there might be cases 
where the offset delta in
- * batch does not match the recordCount. For example, when generating CDC logs 
for a kv table and
- * sending a batch that only contains the deletion of non-existent kvs, no CDC 
logs would be
- * generated. However, we need to increment the batchSequence for the 
corresponding writerId to make
- * sure no {@link OutOfOrderSequenceException} will be thrown. In such a case, 
we would generate a
- * logRecordBatch with a LastOffsetDelta of 0 but a recordCount of 0.
- *
- * <p>The current attributes are given below:
- *
- * <pre>
- * ------------------------------------------
- * |  Unused (1-7)   |  AppendOnly Flag (0) |
- * ------------------------------------------
- * </pre>
- *
  * @since 0.1
  */
 // TODO rename to MemoryLogRecordBatch
 @PublicEvolving
 public class DefaultLogRecordBatch implements LogRecordBatch {
-    protected static final int BASE_OFFSET_LENGTH = 8;
-    public static final int LENGTH_LENGTH = 4;
-    static final int MAGIC_LENGTH = 1;
-    static final int COMMIT_TIMESTAMP_LENGTH = 8;
-    static final int CRC_LENGTH = 4;
-    static final int SCHEMA_ID_LENGTH = 2;
-    static final int ATTRIBUTE_LENGTH = 1;
-    static final int LAST_OFFSET_DELTA_LENGTH = 4;
-    static final int WRITE_CLIENT_ID_LENGTH = 8;
-    static final int BATCH_SEQUENCE_LENGTH = 4;
-    static final int RECORDS_COUNT_LENGTH = 4;
-
-    static final int BASE_OFFSET_OFFSET = 0;
-    public static final int LENGTH_OFFSET = BASE_OFFSET_OFFSET + 
BASE_OFFSET_LENGTH;
-    static final int MAGIC_OFFSET = LENGTH_OFFSET + LENGTH_LENGTH;
-    static final int COMMIT_TIMESTAMP_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH;
-    public static final int CRC_OFFSET = COMMIT_TIMESTAMP_OFFSET + 
COMMIT_TIMESTAMP_LENGTH;
-    protected static final int SCHEMA_ID_OFFSET = CRC_OFFSET + CRC_LENGTH;
-    public static final int ATTRIBUTES_OFFSET = SCHEMA_ID_OFFSET + 
SCHEMA_ID_LENGTH;
-    static final int LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET + 
ATTRIBUTE_LENGTH;
-    static final int WRITE_CLIENT_ID_OFFSET = LAST_OFFSET_DELTA_OFFSET + 
LAST_OFFSET_DELTA_LENGTH;
-    static final int BATCH_SEQUENCE_OFFSET = WRITE_CLIENT_ID_OFFSET + 
WRITE_CLIENT_ID_LENGTH;
-    public static final int RECORDS_COUNT_OFFSET = BATCH_SEQUENCE_OFFSET + 
BATCH_SEQUENCE_LENGTH;
-    static final int RECORDS_OFFSET = RECORDS_COUNT_OFFSET + 
RECORDS_COUNT_LENGTH;
-
-    public static final int RECORD_BATCH_HEADER_SIZE = RECORDS_OFFSET;
-    public static final int ARROW_CHANGETYPE_OFFSET = RECORD_BATCH_HEADER_SIZE;
-    public static final int LOG_OVERHEAD = LENGTH_OFFSET + LENGTH_LENGTH;
-
     public static final byte APPEND_ONLY_FLAG_MASK = 0x01;
 
     private MemorySegment segment;
     private int position;
+    private byte magic;
 
     public void pointTo(MemorySegment segment, int position) {
         this.segment = segment;
         this.position = position;
+        this.magic = segment.get(position + MAGIC_OFFSET);
     }
 
     public void setBaseLogOffset(long baseLogOffset) {
@@ -132,7 +90,7 @@ public class DefaultLogRecordBatch implements LogRecordBatch 
{
 
     @Override
     public byte magic() {
-        return segment.get(position + MAGIC_OFFSET);
+        return magic;
     }
 
     @Override
@@ -144,25 +102,43 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
         segment.putLong(position + COMMIT_TIMESTAMP_OFFSET, timestamp);
     }
 
+    public void setLeaderEpoch(int leaderEpoch) {
+        if (magic >= LOG_MAGIC_VALUE_V1) {
+            segment.putInt(position + leaderEpochOffset(magic), leaderEpoch);
+        } else {
+            throw new UnsupportedOperationException(
+                    "Set leader epoch is not supported for magic v" + magic + 
" record batch");
+        }
+    }
+
     @Override
     public long writerId() {
-        return segment.getLong(position + WRITE_CLIENT_ID_OFFSET);
+        return segment.getLong(position + writeClientIdOffset(magic));
     }
 
     @Override
     public int batchSequence() {
-        return segment.getInt(position + BATCH_SEQUENCE_OFFSET);
+        return segment.getInt(position + batchSequenceOffset(magic));
+    }
+
+    @Override
+    public int leaderEpoch() {
+        if (magic >= LOG_MAGIC_VALUE_V1) {
+            return segment.getInt(position + leaderEpochOffset(magic));
+        } else {
+            return NO_LEADER_EPOCH;
+        }
     }
 
     @Override
     public void ensureValid() {
         int sizeInBytes = sizeInBytes();
-        if (sizeInBytes < RECORD_BATCH_HEADER_SIZE) {
+        if (sizeInBytes < recordBatchHeaderSize(magic)) {
             throw new CorruptMessageException(
                     "Record batch is corrupt (the size "
                             + sizeInBytes
                             + " is smaller than the minimum allowed overhead "
-                            + RECORD_BATCH_HEADER_SIZE
+                            + recordBatchHeaderSize(magic)
                             + ")");
         }
 
@@ -178,17 +154,18 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
 
     @Override
     public boolean isValid() {
-        return sizeInBytes() >= RECORD_BATCH_HEADER_SIZE && checksum() == 
computeChecksum();
+        return sizeInBytes() >= recordBatchHeaderSize(magic) && checksum() == 
computeChecksum();
     }
 
     private long computeChecksum() {
         ByteBuffer buffer = segment.wrap(position, sizeInBytes());
-        return Crc32C.compute(buffer, SCHEMA_ID_OFFSET, sizeInBytes() - 
SCHEMA_ID_OFFSET);
+        int schemaIdOffset = schemaIdOffset(magic);
+        return Crc32C.compute(buffer, schemaIdOffset, sizeInBytes() - 
schemaIdOffset);
     }
 
     private byte attributes() {
         // note we're not using the byte of attributes now.
-        return segment.get(ATTRIBUTES_OFFSET + position);
+        return segment.get(attributeOffset(magic) + position);
     }
 
     @Override
@@ -198,12 +175,12 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
 
     @Override
     public long checksum() {
-        return segment.getUnsignedInt(CRC_OFFSET + position);
+        return segment.getUnsignedInt(crcOffset(magic) + position);
     }
 
     @Override
     public short schemaId() {
-        return segment.getShort(SCHEMA_ID_OFFSET + position);
+        return segment.getShort(schemaIdOffset(magic) + position);
     }
 
     @Override
@@ -217,7 +194,7 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
     }
 
     private int lastOffsetDelta() {
-        return segment.getInt(LAST_OFFSET_DELTA_OFFSET + position);
+        return segment.getInt(lastOffsetDeltaOffset(magic) + position);
     }
 
     @Override
@@ -227,7 +204,7 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
 
     @Override
     public int getRecordCount() {
-        return segment.getInt(RECORDS_COUNT_OFFSET + position);
+        return segment.getInt(position + recordsCountOffset(magic));
     }
 
     @Override
@@ -278,7 +255,7 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
     private CloseableIterator<LogRecord> rowRecordIterator(RowType rowType, 
long timestamp) {
         DataType[] fieldTypes = rowType.getChildren().toArray(new DataType[0]);
         return new LogRecordIterator() {
-            int position = DefaultLogRecordBatch.this.position + 
RECORD_BATCH_HEADER_SIZE;
+            int position = DefaultLogRecordBatch.this.position + 
recordBatchHeaderSize(magic);
             int rowId = 0;
 
             @Override
@@ -307,8 +284,9 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
         if (isAppendOnly) {
             // append only batch, no change type vector,
             // the start of the arrow data is the beginning of the batch 
records
-            int arrowOffset = position + RECORD_BATCH_HEADER_SIZE;
-            int arrowLength = sizeInBytes() - RECORD_BATCH_HEADER_SIZE;
+            int recordBatchHeaderSize = recordBatchHeaderSize(magic);
+            int arrowOffset = position + recordBatchHeaderSize;
+            int arrowLength = sizeInBytes() - recordBatchHeaderSize;
             ArrowReader reader =
                     ArrowUtils.createArrowReader(
                             segment, arrowOffset, arrowLength, root, 
allocator, rowType);
@@ -321,12 +299,12 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
         } else {
             // with change type, decode the change type vector first,
             // the arrow data starts after the change type vector
-            int changeTypeOffset = position + ARROW_CHANGETYPE_OFFSET;
+            int changeTypeOffset = position + arrowChangeTypeOffset(magic);
             ChangeTypeVector changeTypeVector =
                     new ChangeTypeVector(segment, changeTypeOffset, 
getRecordCount());
             int arrowOffset = changeTypeOffset + 
changeTypeVector.sizeInBytes();
             int arrowLength =
-                    sizeInBytes() - ARROW_CHANGETYPE_OFFSET - 
changeTypeVector.sizeInBytes();
+                    sizeInBytes() - arrowChangeTypeOffset(magic) - 
changeTypeVector.sizeInBytes();
             ArrowReader reader =
                     ArrowUtils.createArrowReader(
                             segment, arrowOffset, arrowLength, root, 
allocator, rowType);
@@ -394,7 +372,7 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
                         "Found invalid record count "
                                 + numRecords
                                 + " in magic v"
-                                + magic()
+                                + magic
                                 + " batch");
             }
             this.numRecords = numRecords;
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/FileLogInputStream.java 
b/fluss-common/src/main/java/org/apache/fluss/record/FileLogInputStream.java
index 3f314ce46..9be7e459a 100644
--- a/fluss-common/src/main/java/org/apache/fluss/record/FileLogInputStream.java
+++ b/fluss-common/src/main/java/org/apache/fluss/record/FileLogInputStream.java
@@ -28,12 +28,12 @@ import java.nio.ByteOrder;
 import java.nio.channels.FileChannel;
 import java.util.Objects;
 
-import static org.apache.fluss.record.DefaultLogRecordBatch.BASE_OFFSET_OFFSET;
-import static org.apache.fluss.record.DefaultLogRecordBatch.LENGTH_OFFSET;
-import static org.apache.fluss.record.DefaultLogRecordBatch.LOG_OVERHEAD;
-import static org.apache.fluss.record.DefaultLogRecordBatch.MAGIC_LENGTH;
-import static org.apache.fluss.record.DefaultLogRecordBatch.MAGIC_OFFSET;
-import static 
org.apache.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
+import static org.apache.fluss.record.LogRecordBatchFormat.BASE_OFFSET_OFFSET;
+import static 
org.apache.fluss.record.LogRecordBatchFormat.HEADER_SIZE_UP_TO_MAGIC;
+import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_OFFSET;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_OVERHEAD;
+import static org.apache.fluss.record.LogRecordBatchFormat.MAGIC_OFFSET;
+import static 
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
 
 /* This file is based on source code of Apache Kafka Project 
(https://kafka.apache.org/), licensed by the Apache
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
@@ -42,9 +42,6 @@ import static 
org.apache.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_
 /** A log input stream which is backed by a {@link FileChannel}. */
 public class FileLogInputStream
         implements 
LogInputStream<FileLogInputStream.FileChannelLogRecordBatch> {
-
-    private static final int HEADER_SIZE_UP_TO_MAGIC = MAGIC_OFFSET + 
MAGIC_LENGTH;
-
     private int position;
     private final int end;
     private final FileLogRecords fileRecords;
@@ -152,6 +149,11 @@ public class FileLogInputStream
             return loadBatchHeader().batchSequence();
         }
 
+        @Override
+        public int leaderEpoch() {
+            return loadBatchHeader().leaderEpoch();
+        }
+
         @Override
         public long lastLogOffset() {
             return loadBatchHeader().lastLogOffset();
@@ -189,7 +191,7 @@ public class FileLogInputStream
         }
 
         private int headerSize() {
-            return RECORD_BATCH_HEADER_SIZE;
+            return recordBatchHeaderSize(magic);
         }
 
         protected LogRecordBatch loadFullBatch() {
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java 
b/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java
index 736ae63e8..1f889fff1 100644
--- a/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java
+++ b/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java
@@ -41,6 +41,7 @@ import org.apache.fluss.utils.ArrowUtils;
 import org.apache.fluss.utils.types.Tuple2;
 
 import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -54,12 +55,18 @@ import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.fluss.record.DefaultLogRecordBatch.APPEND_ONLY_FLAG_MASK;
-import static 
org.apache.fluss.record.DefaultLogRecordBatch.ARROW_CHANGETYPE_OFFSET;
-import static org.apache.fluss.record.DefaultLogRecordBatch.ATTRIBUTES_OFFSET;
-import static org.apache.fluss.record.DefaultLogRecordBatch.LENGTH_OFFSET;
-import static org.apache.fluss.record.DefaultLogRecordBatch.LOG_OVERHEAD;
-import static 
org.apache.fluss.record.DefaultLogRecordBatch.RECORDS_COUNT_OFFSET;
-import static 
org.apache.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
+import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_OFFSET;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V0;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_OVERHEAD;
+import static org.apache.fluss.record.LogRecordBatchFormat.MAGIC_OFFSET;
+import static 
org.apache.fluss.record.LogRecordBatchFormat.V0_RECORD_BATCH_HEADER_SIZE;
+import static 
org.apache.fluss.record.LogRecordBatchFormat.V1_RECORD_BATCH_HEADER_SIZE;
+import static 
org.apache.fluss.record.LogRecordBatchFormat.arrowChangeTypeOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.attributeOffset;
+import static 
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
+import static org.apache.fluss.record.LogRecordBatchFormat.recordsCountOffset;
+import static org.apache.fluss.utils.FileUtils.readFully;
 import static org.apache.fluss.utils.FileUtils.readFullyOrFail;
 import static org.apache.fluss.utils.Preconditions.checkNotNull;
 import static org.apache.fluss.utils.Preconditions.checkState;
@@ -81,7 +88,13 @@ public class FileLogProjection {
     // shared resources for multiple projections
     private final ByteArrayOutputStream outputStream;
     private final WriteChannel writeChannel;
-    private final ByteBuffer logHeaderBuffer = 
ByteBuffer.allocate(RECORD_BATCH_HEADER_SIZE);
+
+    /**
+     * Buffer to read log records batch header. V1 is larger than V0, so use 
V1 head buffer can read
+     * V0 header even if there is no enough bytes in log file.
+     */
+    private final ByteBuffer logHeaderBuffer = 
ByteBuffer.allocate(V1_RECORD_BATCH_HEADER_SIZE);
+
     private final ByteBuffer arrowHeaderBuffer = 
ByteBuffer.allocate(ARROW_HEADER_SIZE);
     private ByteBuffer arrowMetadataBuffer;
 
@@ -163,17 +176,23 @@ public class FileLogProjection {
         checkNotNull(currentProjection, "There is no projection registered 
yet.");
         MultiBytesView.Builder builder = MultiBytesView.builder();
         int position = start;
-        while (maxBytes > RECORD_BATCH_HEADER_SIZE) {
-            if (position >= end - RECORD_BATCH_HEADER_SIZE) {
-                // the remaining bytes in the file are not enough to read a 
batch header
+
+        // The condition is an optimization to avoid read log header when 
there is no enough bytes,
+        // So we use V0 header size here for a conservative judgment. In the 
end, the condition
+        // of (position >= end - recordBatchHeaderSize) will ensure the final 
correctness.
+        while (maxBytes > V0_RECORD_BATCH_HEADER_SIZE) {
+            if (position >= end - V0_RECORD_BATCH_HEADER_SIZE) {
+                // the remaining bytes in the file are not enough to read a 
batch header up to
+                // magic.
                 return new BytesViewLogRecords(builder.build());
             }
-
             // read log header
             logHeaderBuffer.rewind();
-            readFullyOrFail(channel, logHeaderBuffer, position, "log header");
+            readLogHeaderFullyOrFail(channel, logHeaderBuffer, position);
 
             logHeaderBuffer.rewind();
+            byte magic = logHeaderBuffer.get(MAGIC_OFFSET);
+            int recordBatchHeaderSize = recordBatchHeaderSize(magic);
             int batchSizeInBytes = LOG_OVERHEAD + 
logHeaderBuffer.getInt(LENGTH_OFFSET);
             if (position > end - batchSizeInBytes) {
                 // the remaining bytes in the file are not enough to read a 
full batch
@@ -183,22 +202,22 @@ public class FileLogProjection {
             // Skip empty batch. The empty batch was generated when build cdc 
log batch when there
             // is no cdc log generated for this kv batch. See the comments 
about the field
             // 'lastOffsetDelta' in DefaultLogRecordBatch.
-            if (batchSizeInBytes == RECORD_BATCH_HEADER_SIZE) {
+            if (batchSizeInBytes == recordBatchHeaderSize) {
                 position += batchSizeInBytes;
                 continue;
             }
 
             boolean isAppendOnly =
-                    (logHeaderBuffer.get(ATTRIBUTES_OFFSET) & 
APPEND_ONLY_FLAG_MASK) > 0;
+                    (logHeaderBuffer.get(attributeOffset(magic)) & 
APPEND_ONLY_FLAG_MASK) > 0;
 
             final int changeTypeBytes;
             final long arrowHeaderOffset;
             if (isAppendOnly) {
                 changeTypeBytes = 0;
-                arrowHeaderOffset = position + RECORD_BATCH_HEADER_SIZE;
+                arrowHeaderOffset = position + recordBatchHeaderSize;
             } else {
-                changeTypeBytes = logHeaderBuffer.getInt(RECORDS_COUNT_OFFSET);
-                arrowHeaderOffset = position + RECORD_BATCH_HEADER_SIZE + 
changeTypeBytes;
+                changeTypeBytes = 
logHeaderBuffer.getInt(recordsCountOffset(magic));
+                arrowHeaderOffset = position + recordBatchHeaderSize + 
changeTypeBytes;
             }
 
             // read arrow header
@@ -226,7 +245,7 @@ public class FileLogProjection {
             long arrowBodyLength = projectedArrowBatch.bodyLength();
 
             int newBatchSizeInBytes =
-                    RECORD_BATCH_HEADER_SIZE
+                    recordBatchHeaderSize
                             + changeTypeBytes
                             + currentProjection.arrowMetadataLength
                             + (int) arrowBodyLength; // safe to cast to int
@@ -250,13 +269,13 @@ public class FileLogProjection {
             logHeaderBuffer.putInt(newBatchSizeInBytes - LOG_OVERHEAD);
             logHeaderBuffer.rewind();
             // the logHeader can't be reused, as it will be sent to network
-            byte[] logHeader = new byte[RECORD_BATCH_HEADER_SIZE];
+            byte[] logHeader = new byte[recordBatchHeaderSize];
             logHeaderBuffer.get(logHeader);
 
             // 5. build log records
             builder.addBytes(logHeader);
             if (!isAppendOnly) {
-                builder.addBytes(channel, position + ARROW_CHANGETYPE_OFFSET, 
changeTypeBytes);
+                builder.addBytes(channel, position + 
arrowChangeTypeOffset(magic), changeTypeBytes);
             }
             builder.addBytes(headerMetadata);
             final long bufferOffset = arrowHeaderOffset + ARROW_HEADER_SIZE + 
arrowMetadataSize;
@@ -380,6 +399,36 @@ public class FileLogProjection {
         return bitset;
     }
 
+    /**
+     * Read log header fully or fail with EOFException if there is no enough 
bytes to read a full
+     * log header. This handles different log header size for magic v0 and v1.
+     */
+    static void readLogHeaderFullyOrFail(FileChannel channel, ByteBuffer 
buffer, int position)
+            throws IOException {
+        if (position < 0) {
+            throw new IllegalArgumentException(
+                    "The file channel position cannot be negative, but it is " 
+ position);
+        }
+        readFully(channel, buffer, position);
+        if (buffer.hasRemaining()) {
+            int size = buffer.position();
+            byte magic = buffer.get(MAGIC_OFFSET);
+            if (magic == LOG_MAGIC_VALUE_V0 && size < 
V0_RECORD_BATCH_HEADER_SIZE) {
+                throw new EOFException(
+                        String.format(
+                                "Failed to read v0 log header from file 
channel `%s`. Expected to read %d bytes, "
+                                        + "but reached end of file after 
reading %d bytes. Started read from position %d.",
+                                channel, V0_RECORD_BATCH_HEADER_SIZE, size, 
position));
+            } else if (magic == LOG_MAGIC_VALUE_V1 && size < 
V1_RECORD_BATCH_HEADER_SIZE) {
+                throw new EOFException(
+                        String.format(
+                                "Failed to read v1 log header from file 
channel `%s`. Expected to read %d bytes, "
+                                        + "but reached end of file after 
reading %d bytes. Started read from position %d.",
+                                channel, V1_RECORD_BATCH_HEADER_SIZE, size, 
position));
+            }
+        }
+    }
+
     @VisibleForTesting
     ByteBuffer getLogHeaderBuffer() {
         return logHeaderBuffer;
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/IndexedLogRecord.java 
b/fluss-common/src/main/java/org/apache/fluss/record/IndexedLogRecord.java
index e14b6ce14..385e457b8 100644
--- a/fluss-common/src/main/java/org/apache/fluss/record/IndexedLogRecord.java
+++ b/fluss-common/src/main/java/org/apache/fluss/record/IndexedLogRecord.java
@@ -30,7 +30,7 @@ import org.apache.fluss.utils.MurmurHashUtils;
 
 import java.io.IOException;
 
-import static org.apache.fluss.record.DefaultLogRecordBatch.LENGTH_LENGTH;
+import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_LENGTH;
 
 /* This file is based on source code of Apache Kafka Project 
(https://kafka.apache.org/), licensed by the Apache
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/KvRecordBatchBuilder.java 
b/fluss-common/src/main/java/org/apache/fluss/record/KvRecordBatchBuilder.java
index c66ecb383..12c321710 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/record/KvRecordBatchBuilder.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/KvRecordBatchBuilder.java
@@ -37,6 +37,8 @@ import static 
org.apache.fluss.record.DefaultKvRecordBatch.LENGTH_LENGTH;
 import static 
org.apache.fluss.record.DefaultKvRecordBatch.RECORD_BATCH_HEADER_SIZE;
 import static org.apache.fluss.record.DefaultKvRecordBatch.SCHEMA_ID_OFFSET;
 import static org.apache.fluss.record.KvRecordBatch.CURRENT_KV_MAGIC_VALUE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
 import static org.apache.fluss.utils.Preconditions.checkArgument;
 
 /** Builder for {@link DefaultKvRecordBatch} memory bytes. */
@@ -72,8 +74,8 @@ public class KvRecordBatchBuilder implements AutoCloseable {
         this.writeLimit = writeLimit;
         this.pagedOutputView = pagedOutputView;
         this.firstSegment = pagedOutputView.getCurrentSegment();
-        this.writerId = LogRecordBatch.NO_WRITER_ID;
-        this.batchSequence = LogRecordBatch.NO_BATCH_SEQUENCE;
+        this.writerId = NO_WRITER_ID;
+        this.batchSequence = NO_BATCH_SEQUENCE;
         this.currentRecordNumber = 0;
         this.isClosed = false;
         // We don't need to write header information while the builder 
creating,
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java 
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java
index 8dcab1be2..d28c28ad3 100644
--- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java
+++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java
@@ -26,6 +26,9 @@ import org.apache.fluss.utils.CloseableIterator;
 
 import java.util.Iterator;
 
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
+
 /**
  * A record batch is a container for {@link LogRecord LogRecords}.
  *
@@ -33,17 +36,8 @@ import java.util.Iterator;
  */
 @PublicEvolving
 public interface LogRecordBatch {
-
-    /** The "magic" values. */
-    byte LOG_MAGIC_VALUE_V0 = 0;
-
     /** The current "magic" value. */
-    byte CURRENT_LOG_MAGIC_VALUE = LOG_MAGIC_VALUE_V0;
-
-    /** Value used if non-idempotent. */
-    long NO_WRITER_ID = -1L;
-
-    int NO_BATCH_SEQUENCE = -1;
+    byte CURRENT_LOG_MAGIC_VALUE = LOG_MAGIC_VALUE_V1;
 
     /**
      * Check whether the checksum of this batch is correct.
@@ -129,6 +123,13 @@ public interface LogRecordBatch {
      */
     int batchSequence();
 
+    /**
+     * Get leader epoch of this bucket for this log record batch.
+     *
+     * @return leader epoch
+     */
+    int leaderEpoch();
+
     /**
      * Get the size in bytes of this batch, including the size of the record 
and the batch overhead.
      *
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchFormat.java 
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchFormat.java
new file mode 100644
index 000000000..d0f42c052
--- /dev/null
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchFormat.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.record;
+
+import org.apache.fluss.exception.OutOfOrderSequenceException;
+
+/** The format of Fluss how to organize and storage a {@link LogRecordBatch}. 
*/
+public class LogRecordBatchFormat {
+
+    // 
----------------------------------------------------------------------------------------
+    // Common Variables
+    // 
----------------------------------------------------------------------------------------
+
+    /** Value used if non-idempotent. */
+    public static final long NO_WRITER_ID = -1L;
+
+    public static final int NO_BATCH_SEQUENCE = -1;
+
+    /**
+     * Used to indicate an unknown leaderEpoch, which will be the case when 
the record set is first
+     * created by the writer or the magic lower than V1.
+     */
+    public static final int NO_LEADER_EPOCH = -1;
+
+    public static final int BASE_OFFSET_LENGTH = 8;
+    public static final int LENGTH_LENGTH = 4;
+    public static final int MAGIC_LENGTH = 1;
+    private static final int COMMIT_TIMESTAMP_LENGTH = 8;
+    private static final int CRC_LENGTH = 4;
+    private static final int SCHEMA_ID_LENGTH = 2;
+    private static final int LEADER_EPOCH_LENGTH = 4;
+    private static final int ATTRIBUTE_LENGTH = 1;
+    private static final int LAST_OFFSET_DELTA_LENGTH = 4;
+    private static final int WRITE_CLIENT_ID_LENGTH = 8;
+    private static final int BATCH_SEQUENCE_LENGTH = 4;
+    private static final int RECORDS_COUNT_LENGTH = 4;
+
+    public static final int BASE_OFFSET_OFFSET = 0;
+    public static final int LENGTH_OFFSET = BASE_OFFSET_OFFSET + 
BASE_OFFSET_LENGTH;
+    public static final int MAGIC_OFFSET = LENGTH_OFFSET + LENGTH_LENGTH;
+    public static final int COMMIT_TIMESTAMP_OFFSET = MAGIC_OFFSET + 
MAGIC_LENGTH;
+    public static final int LOG_OVERHEAD = LENGTH_OFFSET + LENGTH_LENGTH;
+    public static final int HEADER_SIZE_UP_TO_MAGIC = MAGIC_OFFSET + 
MAGIC_LENGTH;
+
+    // 
----------------------------------------------------------------------------------------
+    // Format of Magic Version: V1
+    // 
----------------------------------------------------------------------------------------
+
+    /**
+     * LogRecordBatch implementation for magic 1 (V1). The schema of {@link 
LogRecordBatch} is given
+     * below:
+     *
+     * <ul>
+     *   RecordBatch =>
+     *   <li>BaseOffset => Int64
+     *   <li>Length => Int32
+     *   <li>Magic => Int8
+     *   <li>CommitTimestamp => Int64
+     *   <li>LeaderEpoch => Int32
+     *   <li>CRC => Uint32
+     *   <li>SchemaId => Int16
+     *   <li>Attributes => Int8
+     *   <li>LastOffsetDelta => Int32
+     *   <li>WriterID => Int64
+     *   <li>SequenceID => Int32
+     *   <li>RecordCount => Int32
+     *   <li>Records => [Record]
+     * </ul>
+     *
+     * <p>Newly added field in LogRecordBatch header of magic V1 is 
LeaderEpoch, which used to build
+     * a consistent leaderEpoch cache across different tabletServers.
+     *
+     * <p>The CRC covers the data from the schemaId to the end of the batch 
(i.e. all the bytes that
+     * follow the CRC). It is located after the magic byte, which means that 
clients must parse the
+     * magic byte before deciding how to interpret the bytes between the batch 
length and the magic
+     * byte. The CRC-32C (Castagnoli) polynomial is used for the computation. 
CommitTimestamp is
+     * also located before the CRC, because it is determined in server side.
+     *
+     * <p>The field 'lastOffsetDelta is used to calculate the lastOffset of 
the current batch as:
+     * [lastOffset = baseOffset + LastOffsetDelta] instead of [lastOffset = 
baseOffset + recordCount
+     * - 1]. The reason for introducing this field is that there might be 
cases where the offset
+     * delta in batch does not match the recordCount. For example, when 
generating CDC logs for a kv
+     * table and sending a batch that only contains the deletion of 
non-existent kvs, no CDC logs
+     * would be generated. However, we need to increment the batchSequence for 
the corresponding
+     * writerId to make sure no {@link OutOfOrderSequenceException} will be 
thrown. In such a case,
+     * we would generate a logRecordBatch with a LastOffsetDelta of 0 but a 
recordCount of 0.
+     *
+     * <p>The current attributes are given below:
+     *
+     * <pre>
+     * ------------------------------------------
+     * |  Unused (1-7)   |  AppendOnly Flag (0) |
+     * ------------------------------------------
+     * </pre>
+     *
+     * @since 0.7
+     */
+    public static final byte LOG_MAGIC_VALUE_V1 = 1;
+
+    private static final int V1_LEADER_EPOCH_OFFSET =
+            COMMIT_TIMESTAMP_OFFSET + COMMIT_TIMESTAMP_LENGTH;
+    private static final int V1_CRC_OFFSET = V1_LEADER_EPOCH_OFFSET + 
LEADER_EPOCH_LENGTH;
+    private static final int V1_SCHEMA_ID_OFFSET = V1_CRC_OFFSET + CRC_LENGTH;
+    private static final int V1_ATTRIBUTES_OFFSET = V1_SCHEMA_ID_OFFSET + 
SCHEMA_ID_LENGTH;
+    private static final int V1_LAST_OFFSET_DELTA_OFFSET = 
V1_ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
+    private static final int V1_WRITE_CLIENT_ID_OFFSET =
+            V1_LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH;
+    private static final int V1_BATCH_SEQUENCE_OFFSET =
+            V1_WRITE_CLIENT_ID_OFFSET + WRITE_CLIENT_ID_LENGTH;
+    private static final int V1_RECORDS_COUNT_OFFSET =
+            V1_BATCH_SEQUENCE_OFFSET + BATCH_SEQUENCE_LENGTH;
+    private static final int V1_RECORDS_OFFSET = V1_RECORDS_COUNT_OFFSET + 
RECORDS_COUNT_LENGTH;
+
+    public static final int V1_RECORD_BATCH_HEADER_SIZE = V1_RECORDS_OFFSET;
+    private static final int V1_ARROW_CHANGETYPE_OFFSET = 
V1_RECORD_BATCH_HEADER_SIZE;
+
+    // 
----------------------------------------------------------------------------------------
+    // Format of Magic Version: V0
+    // 
----------------------------------------------------------------------------------------
+
+    /**
+     * LogRecordBatch implementation for magic 0 (V0). The schema of {@link 
LogRecordBatch} is given
+     * below:
+     *
+     * <ul>
+     *   RecordBatch =>
+     *   <li>BaseOffset => Int64
+     *   <li>Length => Int32
+     *   <li>Magic => Int8
+     *   <li>CommitTimestamp => Int64
+     *   <li>CRC => Uint32
+     *   <li>SchemaId => Int16
+     *   <li>Attributes => Int8
+     *   <li>LastOffsetDelta => Int32
+     *   <li>WriterID => Int64
+     *   <li>SequenceID => Int32
+     *   <li>RecordCount => Int32
+     *   <li>Records => [Record]
+     * </ul>
+     *
+     * <p>The current attributes are given below:
+     *
+     * <pre>
+     * ------------------------------------------
+     * |  Unused (1-7)   |  AppendOnly Flag (0) |
+     * ------------------------------------------
+     * </pre>
+     *
+     * @since 0.1
+     */
+    public static final byte LOG_MAGIC_VALUE_V0 = 0;
+
+    private static final int V0_CRC_OFFSET = COMMIT_TIMESTAMP_OFFSET + 
COMMIT_TIMESTAMP_LENGTH;
+    private static final int V0_SCHEMA_ID_OFFSET = V0_CRC_OFFSET + CRC_LENGTH;
+    private static final int V0_ATTRIBUTES_OFFSET = V0_SCHEMA_ID_OFFSET + 
SCHEMA_ID_LENGTH;
+    private static final int V0_LAST_OFFSET_DELTA_OFFSET = 
V0_ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
+    private static final int V0_WRITE_CLIENT_ID_OFFSET =
+            V0_LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH;
+    private static final int V0_BATCH_SEQUENCE_OFFSET =
+            V0_WRITE_CLIENT_ID_OFFSET + WRITE_CLIENT_ID_LENGTH;
+    private static final int V0_RECORDS_COUNT_OFFSET =
+            V0_BATCH_SEQUENCE_OFFSET + BATCH_SEQUENCE_LENGTH;
+    private static final int V0_RECORDS_OFFSET = V0_RECORDS_COUNT_OFFSET + 
RECORDS_COUNT_LENGTH;
+
+    public static final int V0_RECORD_BATCH_HEADER_SIZE = V0_RECORDS_OFFSET;
+    private static final int V0_ARROW_CHANGETYPE_OFFSET = 
V0_RECORD_BATCH_HEADER_SIZE;
+
+    // 
----------------------------------------------------------------------------------------
+    // Static Methods
+    // 
----------------------------------------------------------------------------------------
+
+    public static int leaderEpochOffset(byte magic) {
+        if (magic == LOG_MAGIC_VALUE_V1) {
+            return V1_LEADER_EPOCH_OFFSET;
+        }
+        throw new IllegalArgumentException("Unsupported magic value " + magic);
+    }
+
+    public static int crcOffset(byte magic) {
+        switch (magic) {
+            case LOG_MAGIC_VALUE_V1:
+                return V1_CRC_OFFSET;
+            case LOG_MAGIC_VALUE_V0:
+                return V0_CRC_OFFSET;
+            default:
+                throw new IllegalArgumentException("Unsupported magic value " 
+ magic);
+        }
+    }
+
+    public static int schemaIdOffset(byte magic) {
+        switch (magic) {
+            case LOG_MAGIC_VALUE_V1:
+                return V1_SCHEMA_ID_OFFSET;
+            case LOG_MAGIC_VALUE_V0:
+                return V0_SCHEMA_ID_OFFSET;
+            default:
+                throw new IllegalArgumentException("Unsupported magic value " 
+ magic);
+        }
+    }
+
+    public static int attributeOffset(byte magic) {
+        switch (magic) {
+            case LOG_MAGIC_VALUE_V1:
+                return V1_ATTRIBUTES_OFFSET;
+            case LOG_MAGIC_VALUE_V0:
+                return V0_ATTRIBUTES_OFFSET;
+            default:
+                throw new IllegalArgumentException("Unsupported magic value " 
+ magic);
+        }
+    }
+
+    public static int lastOffsetDeltaOffset(byte magic) {
+        switch (magic) {
+            case LOG_MAGIC_VALUE_V1:
+                return V1_LAST_OFFSET_DELTA_OFFSET;
+            case LOG_MAGIC_VALUE_V0:
+                return V0_LAST_OFFSET_DELTA_OFFSET;
+            default:
+                throw new IllegalArgumentException("Unsupported magic value " 
+ magic);
+        }
+    }
+
+    public static int writeClientIdOffset(byte magic) {
+        switch (magic) {
+            case LOG_MAGIC_VALUE_V1:
+                return V1_WRITE_CLIENT_ID_OFFSET;
+            case LOG_MAGIC_VALUE_V0:
+                return V0_WRITE_CLIENT_ID_OFFSET;
+            default:
+                throw new IllegalArgumentException("Unsupported magic value " 
+ magic);
+        }
+    }
+
+    public static int batchSequenceOffset(byte magic) {
+        switch (magic) {
+            case LOG_MAGIC_VALUE_V1:
+                return V1_BATCH_SEQUENCE_OFFSET;
+            case LOG_MAGIC_VALUE_V0:
+                return V0_BATCH_SEQUENCE_OFFSET;
+            default:
+                throw new IllegalArgumentException("Unsupported magic value " 
+ magic);
+        }
+    }
+
+    public static int recordsCountOffset(byte magic) {
+        switch (magic) {
+            case LOG_MAGIC_VALUE_V1:
+                return V1_RECORDS_COUNT_OFFSET;
+            case LOG_MAGIC_VALUE_V0:
+                return V0_RECORDS_COUNT_OFFSET;
+            default:
+                throw new IllegalArgumentException("Unsupported magic value " 
+ magic);
+        }
+    }
+
+    public static int recordBatchHeaderSize(byte magic) {
+        switch (magic) {
+            case LOG_MAGIC_VALUE_V1:
+                return V1_RECORD_BATCH_HEADER_SIZE;
+            case LOG_MAGIC_VALUE_V0:
+                return V0_RECORD_BATCH_HEADER_SIZE;
+            default:
+                throw new IllegalArgumentException("Unsupported magic value " 
+ magic);
+        }
+    }
+
+    public static int arrowChangeTypeOffset(byte magic) {
+        switch (magic) {
+            case LOG_MAGIC_VALUE_V1:
+                return V1_ARROW_CHANGETYPE_OFFSET;
+            case LOG_MAGIC_VALUE_V0:
+                return V0_ARROW_CHANGETYPE_OFFSET;
+            default:
+                throw new IllegalArgumentException("Unsupported magic value " 
+ magic);
+        }
+    }
+}
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecords.java 
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecords.java
index ab8128f06..8ff8997e5 100644
--- a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecords.java
+++ b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecords.java
@@ -27,6 +27,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.GatheringByteChannel;
 
+import static 
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
+
 /* This file is based on source code of Apache Kafka Project 
(https://kafka.apache.org/), licensed by the Apache
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
  * additional information regarding copyright ownership. */
@@ -72,13 +74,14 @@ public class MemoryLogRecords implements LogRecords {
         sizeInBytes = 0;
     }
 
-    public void ensureValid() {
-        if (sizeInBytes < DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE) {
+    public void ensureValid(byte recordBatchMagic) {
+        int recordBatchHeaderSize = recordBatchHeaderSize(recordBatchMagic);
+        if (sizeInBytes < recordBatchHeaderSize) {
             throw new RuntimeException(
                     "Record batch is corrupt (the size "
                             + sizeInBytes
                             + " is smaller than the minimum allowed overhead "
-                            + DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE
+                            + recordBatchHeaderSize
                             + ")");
         }
     }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java
 
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java
index 225e948db..94b2fe59f 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java
@@ -29,15 +29,17 @@ import org.apache.fluss.utils.crc.Crc32C;
 
 import java.io.IOException;
 
-import static 
org.apache.fluss.record.DefaultLogRecordBatch.ARROW_CHANGETYPE_OFFSET;
-import static org.apache.fluss.record.DefaultLogRecordBatch.BASE_OFFSET_LENGTH;
-import static org.apache.fluss.record.DefaultLogRecordBatch.CRC_OFFSET;
-import static org.apache.fluss.record.DefaultLogRecordBatch.LENGTH_LENGTH;
-import static 
org.apache.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
-import static org.apache.fluss.record.DefaultLogRecordBatch.SCHEMA_ID_OFFSET;
 import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
-import static org.apache.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE;
-import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID;
+import static org.apache.fluss.record.LogRecordBatchFormat.BASE_OFFSET_LENGTH;
+import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_LENGTH;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_LEADER_EPOCH;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
+import static 
org.apache.fluss.record.LogRecordBatchFormat.arrowChangeTypeOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.crcOffset;
+import static 
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
+import static org.apache.fluss.record.LogRecordBatchFormat.schemaIdOffset;
 import static org.apache.fluss.utils.Preconditions.checkArgument;
 import static org.apache.fluss.utils.Preconditions.checkNotNull;
 
@@ -89,24 +91,26 @@ public class MemoryLogRecordsArrowBuilder implements 
AutoCloseable {
 
         this.pagedOutputView = pagedOutputView;
         this.firstSegment = pagedOutputView.getCurrentSegment();
+        int arrowChangeTypeOffset = arrowChangeTypeOffset(magic);
         checkArgument(
-                firstSegment.size() >= ARROW_CHANGETYPE_OFFSET,
+                firstSegment.size() >= arrowChangeTypeOffset,
                 "The size of first segment of pagedOutputView is too small, 
need at least "
-                        + ARROW_CHANGETYPE_OFFSET
+                        + arrowChangeTypeOffset
                         + " bytes.");
-        this.changeTypeWriter = new ChangeTypeVectorWriter(firstSegment, 
ARROW_CHANGETYPE_OFFSET);
-        this.estimatedSizeInBytes = RECORD_BATCH_HEADER_SIZE;
+        this.changeTypeWriter = new ChangeTypeVectorWriter(firstSegment, 
arrowChangeTypeOffset);
+        this.estimatedSizeInBytes = recordBatchHeaderSize(magic);
         this.recordCount = 0;
     }
 
     @VisibleForTesting
     public static MemoryLogRecordsArrowBuilder builder(
             long baseLogOffset,
+            byte magic,
             int schemaId,
             ArrowWriter arrowWriter,
             AbstractPagedOutputView outputView) {
         return new MemoryLogRecordsArrowBuilder(
-                baseLogOffset, schemaId, CURRENT_LOG_MAGIC_VALUE, arrowWriter, 
outputView, false);
+                baseLogOffset, schemaId, magic, arrowWriter, outputView, 
false);
     }
 
     /** Builder with limited write size and the memory segment used to 
serialize records. */
@@ -139,7 +143,7 @@ public class MemoryLogRecordsArrowBuilder implements 
AutoCloseable {
 
         // serialize the arrow batch to dynamically allocated memory segments
         arrowWriter.serializeToOutputView(
-                pagedOutputView, ARROW_CHANGETYPE_OFFSET + 
changeTypeWriter.sizeInBytes());
+                pagedOutputView, arrowChangeTypeOffset(magic) + 
changeTypeWriter.sizeInBytes());
         recordCount = arrowWriter.getRecordsCount();
         bytesView =
                 MultiBytesView.builder()
@@ -237,7 +241,7 @@ public class MemoryLogRecordsArrowBuilder implements 
AutoCloseable {
         if (reCalculateSizeInBytes) {
             // make size in bytes up-to-date
             estimatedSizeInBytes =
-                    ARROW_CHANGETYPE_OFFSET
+                    arrowChangeTypeOffset(magic)
                             + changeTypeWriter.sizeInBytes()
                             + arrowWriter.estimatedSizeInBytes();
         }
@@ -259,6 +263,12 @@ public class MemoryLogRecordsArrowBuilder implements 
AutoCloseable {
 
         // write empty timestamp which will be overridden on server side
         outputView.writeLong(0);
+
+        // write empty leaderEpoch which will be overridden on server side
+        if (magic >= LOG_MAGIC_VALUE_V1) {
+            outputView.writeInt(NO_LEADER_EPOCH);
+        }
+
         // write empty crc first.
         outputView.writeUnsignedInt(0);
         // write schema id
@@ -278,8 +288,8 @@ public class MemoryLogRecordsArrowBuilder implements 
AutoCloseable {
         outputView.writeInt(recordCount);
 
         // Update crc.
-        long crc = Crc32C.compute(pagedOutputView.getWrittenSegments(), 
SCHEMA_ID_OFFSET);
-        outputView.setPosition(CRC_OFFSET);
+        long crc = Crc32C.compute(pagedOutputView.getWrittenSegments(), 
schemaIdOffset(magic));
+        outputView.setPosition(crcOffset(magic));
         outputView.writeUnsignedInt(crc);
     }
 
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
 
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
index b62c2cef7..8488c3351 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
@@ -29,15 +29,17 @@ import org.apache.fluss.utils.crc.Crc32C;
 
 import java.io.IOException;
 
-import static org.apache.fluss.record.DefaultLogRecordBatch.BASE_OFFSET_LENGTH;
-import static org.apache.fluss.record.DefaultLogRecordBatch.CRC_OFFSET;
-import static 
org.apache.fluss.record.DefaultLogRecordBatch.LAST_OFFSET_DELTA_OFFSET;
-import static org.apache.fluss.record.DefaultLogRecordBatch.LENGTH_LENGTH;
-import static 
org.apache.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
-import static org.apache.fluss.record.DefaultLogRecordBatch.SCHEMA_ID_OFFSET;
 import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
-import static org.apache.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE;
-import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID;
+import static org.apache.fluss.record.LogRecordBatchFormat.BASE_OFFSET_LENGTH;
+import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_LENGTH;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_LEADER_EPOCH;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
+import static org.apache.fluss.record.LogRecordBatchFormat.crcOffset;
+import static 
org.apache.fluss.record.LogRecordBatchFormat.lastOffsetDeltaOffset;
+import static 
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
+import static org.apache.fluss.record.LogRecordBatchFormat.schemaIdOffset;
 import static org.apache.fluss.utils.Preconditions.checkArgument;
 
 /**
@@ -87,8 +89,8 @@ public class MemoryLogRecordsIndexedBuilder implements 
AutoCloseable {
 
         // We don't need to write header information while the builder 
creating,
         // we'll skip it first.
-        this.pagedOutputView.setPosition(RECORD_BATCH_HEADER_SIZE);
-        this.sizeInBytes = RECORD_BATCH_HEADER_SIZE;
+        this.pagedOutputView.setPosition(recordBatchHeaderSize(magic));
+        this.sizeInBytes = recordBatchHeaderSize(magic);
     }
 
     public static MemoryLogRecordsIndexedBuilder builder(
@@ -219,6 +221,12 @@ public class MemoryLogRecordsIndexedBuilder implements 
AutoCloseable {
 
         // write empty timestamp which will be overridden on server side
         outputView.writeLong(0);
+
+        // write empty leaderEpoch which will be overridden on server side
+        if (magic >= LOG_MAGIC_VALUE_V1) {
+            outputView.writeInt(NO_LEADER_EPOCH);
+        }
+
         // write empty crc first.
         outputView.writeUnsignedInt(0);
 
@@ -226,7 +234,7 @@ public class MemoryLogRecordsIndexedBuilder implements 
AutoCloseable {
         // write attributes (currently only appendOnly flag)
         outputView.writeBoolean(appendOnly);
         // skip write attribute byte for now.
-        outputView.setPosition(LAST_OFFSET_DELTA_OFFSET);
+        outputView.setPosition(lastOffsetDeltaOffset(magic));
         if (currentRecordNumber > 0) {
             outputView.writeInt(currentRecordNumber - 1);
         } else {
@@ -239,8 +247,8 @@ public class MemoryLogRecordsIndexedBuilder implements 
AutoCloseable {
         outputView.writeInt(currentRecordNumber);
 
         // Update crc.
-        long crc = Crc32C.compute(pagedOutputView.getWrittenSegments(), 
SCHEMA_ID_OFFSET);
-        outputView.setPosition(CRC_OFFSET);
+        long crc = Crc32C.compute(pagedOutputView.getWrittenSegments(), 
schemaIdOffset(magic));
+        outputView.setPosition(crcOffset(magic));
         outputView.writeUnsignedInt(crc);
     }
 }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/MemorySegmentLogInputStream.java
 
b/fluss-common/src/main/java/org/apache/fluss/record/MemorySegmentLogInputStream.java
index feaf430ff..5e2bdebe5 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/record/MemorySegmentLogInputStream.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/MemorySegmentLogInputStream.java
@@ -19,8 +19,9 @@ package org.apache.fluss.record;
 
 import org.apache.fluss.memory.MemorySegment;
 
-import static org.apache.fluss.record.DefaultLogRecordBatch.LENGTH_OFFSET;
-import static org.apache.fluss.record.DefaultLogRecordBatch.LOG_OVERHEAD;
+import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_OFFSET;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_OVERHEAD;
+import static 
org.apache.fluss.record.LogRecordBatchFormat.V0_RECORD_BATCH_HEADER_SIZE;
 
 /**
  * A byte buffer backed log input stream. This class avoids the need to copy 
records by returning
@@ -40,7 +41,8 @@ class MemorySegmentLogInputStream implements 
LogInputStream<LogRecordBatch> {
 
     public LogRecordBatch nextBatch() {
         Integer batchSize = nextBatchSize();
-        if (batchSize == null || remaining < batchSize) {
+        // should at-least larger than V0 header size, because V1 header is 
larger than V0.
+        if (batchSize == null || remaining < batchSize || remaining < 
V0_RECORD_BATCH_HEADER_SIZE) {
             return null;
         }
 
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/record/DefaultLogRecordBatchTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/record/DefaultLogRecordBatchTest.java
index bfb23da94..70f881e06 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/record/DefaultLogRecordBatchTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/record/DefaultLogRecordBatchTest.java
@@ -24,21 +24,26 @@ import org.apache.fluss.testutils.DataTestUtils;
 import org.apache.fluss.types.RowType;
 import org.apache.fluss.utils.CloseableIterator;
 
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V0;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
+import static 
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link DefaultLogRecordBatch}. */
 public class DefaultLogRecordBatchTest extends LogTestBase {
 
-    @Test
-    void testRecordBatchSize() throws Exception {
+    @ParameterizedTest
+    @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
+    void testRecordBatchSize(byte magic) throws Exception {
         MemoryLogRecords memoryLogRecords =
-                DataTestUtils.genMemoryLogRecordsByObject(TestData.DATA1);
+                DataTestUtils.genMemoryLogRecordsByObject(magic, 
TestData.DATA1);
         int totalSize = 0;
         for (LogRecordBatch logRecordBatch : memoryLogRecords.batches()) {
             totalSize += logRecordBatch.sizeInBytes();
@@ -46,8 +51,9 @@ public class DefaultLogRecordBatchTest extends LogTestBase {
         assertThat(totalSize).isEqualTo(memoryLogRecords.sizeInBytes());
     }
 
-    @Test
-    void testIndexedRowWriteAndReadBatch() throws Exception {
+    @ParameterizedTest
+    @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
+    void testIndexedRowWriteAndReadBatch(byte magic) throws Exception {
         int recordNumber = 50;
         RowType allRowType = TestInternalRowGenerator.createAllRowType();
         MemoryLogRecordsIndexedBuilder builder =
@@ -98,8 +104,9 @@ public class DefaultLogRecordBatchTest extends LogTestBase {
         builder.close();
     }
 
-    @Test
-    void testNoRecordAppend() throws Exception {
+    @ParameterizedTest
+    @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
+    void testNoRecordAppend(byte magic) throws Exception {
         // 1. no record append with baseOffset as 0.
         MemoryLogRecordsIndexedBuilder builder =
                 MemoryLogRecordsIndexedBuilder.builder(
@@ -107,7 +114,7 @@ public class DefaultLogRecordBatchTest extends LogTestBase {
         MemoryLogRecords memoryLogRecords = 
MemoryLogRecords.pointToBytesView(builder.build());
         Iterator<LogRecordBatch> iterator = 
memoryLogRecords.batches().iterator();
         // only contains batch header.
-        assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(48);
+        
assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(recordBatchHeaderSize(magic));
 
         assertThat(iterator.hasNext()).isTrue();
         LogRecordBatch logRecordBatch = iterator.next();
@@ -135,7 +142,7 @@ public class DefaultLogRecordBatchTest extends LogTestBase {
         memoryLogRecords = MemoryLogRecords.pointToBytesView(builder.build());
         iterator = memoryLogRecords.batches().iterator();
         // only contains batch header.
-        assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(48);
+        
assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(recordBatchHeaderSize(magic));
 
         assertThat(iterator.hasNext()).isTrue();
         logRecordBatch = iterator.next();
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/record/FileLogInputStreamTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/record/FileLogInputStreamTest.java
index 9c8e1dab2..7de046ec7 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/record/FileLogInputStreamTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/record/FileLogInputStreamTest.java
@@ -17,28 +17,40 @@
 
 package org.apache.fluss.record;
 
+import org.apache.fluss.metadata.LogFormat;
 import org.apache.fluss.utils.CloseableIterator;
 
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
 import java.util.Collections;
 
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V0;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
 import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
-import static 
org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsWithBaseOffset;
+import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
+import static 
org.apache.fluss.testutils.DataTestUtils.createRecordsWithoutBaseLogOffset;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link FileLogInputStream}. */
 public class FileLogInputStreamTest extends LogTestBase {
     private @TempDir File tempDir;
 
-    @Test
-    void testWriteTo() throws Exception {
+    @ParameterizedTest
+    @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
+    void testWriteTo(byte recordBatchMagic) throws Exception {
         try (FileLogRecords fileLogRecords = FileLogRecords.open(new 
File(tempDir, "test.tmp"))) {
             fileLogRecords.append(
-                    genMemoryLogRecordsWithBaseOffset(
-                            0L, Collections.singletonList(new Object[] {0, 
"abc"})));
+                    createRecordsWithoutBaseLogOffset(
+                            DATA1_ROW_TYPE,
+                            DEFAULT_SCHEMA_ID,
+                            0L,
+                            -1L,
+                            recordBatchMagic,
+                            Collections.singletonList(new Object[] {0, "abc"}),
+                            LogFormat.ARROW));
             fileLogRecords.flush();
 
             FileLogInputStream logInputStream =
@@ -46,7 +58,7 @@ public class FileLogInputStreamTest extends LogTestBase {
 
             FileLogInputStream.FileChannelLogRecordBatch batch = 
logInputStream.nextBatch();
             assertThat(batch).isNotNull();
-            assertThat(batch.magic()).isEqualTo(magic);
+            assertThat(batch.magic()).isEqualTo(recordBatchMagic);
 
             LogRecordBatch recordBatch = batch.loadFullBatch();
 
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java 
b/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java
index 515289cb4..c19ecc882 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java
@@ -28,15 +28,23 @@ import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.EOFException;
 import java.io.File;
+import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 import java.util.stream.Stream;
 
 import static 
org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V0;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
+import static org.apache.fluss.record.LogRecordBatchFormat.MAGIC_OFFSET;
+import static 
org.apache.fluss.record.LogRecordBatchFormat.V0_RECORD_BATCH_HEADER_SIZE;
+import static 
org.apache.fluss.record.LogRecordBatchFormat.V1_RECORD_BATCH_HEADER_SIZE;
 import static 
org.apache.fluss.record.LogRecordReadContext.createArrowReadContext;
 import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
 import static 
org.apache.fluss.testutils.DataTestUtils.createRecordsWithoutBaseLogOffset;
@@ -126,17 +134,23 @@ class FileLogProjectionTest {
 
     static Stream<Arguments> projectedFieldsArgs() {
         return Stream.of(
-                Arguments.of((Object) new int[] {0}),
-                Arguments.arguments((Object) new int[] {1}),
-                Arguments.arguments((Object) new int[] {0, 1}));
+                Arguments.of((Object) new int[] {0}, LOG_MAGIC_VALUE_V0),
+                Arguments.arguments((Object) new int[] {1}, 
LOG_MAGIC_VALUE_V0),
+                Arguments.arguments((Object) new int[] {0, 1}, 
LOG_MAGIC_VALUE_V0),
+                Arguments.of((Object) new int[] {0}, LOG_MAGIC_VALUE_V1),
+                Arguments.arguments((Object) new int[] {1}, 
LOG_MAGIC_VALUE_V1),
+                Arguments.arguments((Object) new int[] {0, 1}, 
LOG_MAGIC_VALUE_V1));
     }
 
     @ParameterizedTest
     @MethodSource("projectedFieldsArgs")
-    void testProject(int[] projectedFields) throws Exception {
+    void testProject(int[] projectedFields, byte recordBatchMagic) throws 
Exception {
         FileLogRecords fileLogRecords =
                 createFileLogRecords(
-                        TestData.DATA1_ROW_TYPE, TestData.DATA1, 
TestData.ANOTHER_DATA1);
+                        recordBatchMagic,
+                        TestData.DATA1_ROW_TYPE,
+                        TestData.DATA1,
+                        TestData.ANOTHER_DATA1);
         List<Object[]> results =
                 doProjection(
                         new FileLogProjection(),
@@ -159,11 +173,15 @@ class FileLogProjectionTest {
         assertEquals(results, expected);
     }
 
-    @Test
-    void testIllegalByteOrder() throws Exception {
+    @ParameterizedTest
+    @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
+    void testIllegalByteOrder(byte recordBatchMagic) throws Exception {
         FileLogRecords fileLogRecords =
                 createFileLogRecords(
-                        TestData.DATA1_ROW_TYPE, TestData.DATA1, 
TestData.ANOTHER_DATA1);
+                        recordBatchMagic,
+                        TestData.DATA1_ROW_TYPE,
+                        TestData.DATA1,
+                        TestData.ANOTHER_DATA1);
         FileLogProjection projection = new FileLogProjection();
         // overwrite the wrong decoding byte order endian
         projection.getLogHeaderBuffer().order(ByteOrder.BIG_ENDIAN);
@@ -180,14 +198,18 @@ class FileLogProjectionTest {
                 .hasMessageContaining("Failed to read `arrow header` from file 
channel");
     }
 
-    @Test
-    void testProjectSizeLimited() throws Exception {
+    @ParameterizedTest
+    @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
+    void testProjectSizeLimited(byte recordBatchMagic) throws Exception {
         List<Object[]> allData = new ArrayList<>();
         allData.addAll(TestData.DATA1);
         allData.addAll(TestData.ANOTHER_DATA1);
         FileLogRecords fileLogRecords =
                 createFileLogRecords(
-                        TestData.DATA1_ROW_TYPE, TestData.DATA1, 
TestData.ANOTHER_DATA1);
+                        recordBatchMagic,
+                        TestData.DATA1_ROW_TYPE,
+                        TestData.DATA1,
+                        TestData.ANOTHER_DATA1);
         int totalSize = fileLogRecords.sizeInBytes();
         boolean hasEmpty = false;
         boolean hasHalf = false;
@@ -218,9 +240,89 @@ class FileLogProjectionTest {
         assertThat(hasFull).isTrue();
     }
 
+    @Test
+    void testReadLogHeaderFullyOrFail() throws Exception {
+        ByteBuffer logHeaderBuffer = 
ByteBuffer.allocate(V1_RECORD_BATCH_HEADER_SIZE);
+
+        // only V1 log header, should read fully
+        try (FileLogRecords fileLogRecords =
+                createFileWithLogHeader(LOG_MAGIC_VALUE_V1, 
V1_RECORD_BATCH_HEADER_SIZE)) {
+            FileLogProjection.readLogHeaderFullyOrFail(
+                    fileLogRecords.channel(), logHeaderBuffer, 0);
+            assertThat(logHeaderBuffer.hasRemaining()).isFalse();
+        }
+
+        // V1 log header with data, should read fully
+        try (FileLogRecords fileLogRecords = 
createFileWithLogHeader(LOG_MAGIC_VALUE_V1, 100)) {
+            logHeaderBuffer.rewind();
+            FileLogProjection.readLogHeaderFullyOrFail(
+                    fileLogRecords.channel(), logHeaderBuffer, 0);
+            assertThat(logHeaderBuffer.hasRemaining()).isFalse();
+        }
+
+        // only v0 log header, should only read 48 bytes
+        try (FileLogRecords fileLogRecords =
+                createFileWithLogHeader(LOG_MAGIC_VALUE_V0, 
V0_RECORD_BATCH_HEADER_SIZE)) {
+            logHeaderBuffer.rewind();
+            FileLogProjection.readLogHeaderFullyOrFail(
+                    fileLogRecords.channel(), logHeaderBuffer, 0);
+            assertThat(logHeaderBuffer.hasRemaining()).isTrue();
+            
assertThat(logHeaderBuffer.position()).isEqualTo(V0_RECORD_BATCH_HEADER_SIZE);
+        }
+
+        // v0 log header with data, should read fully
+        try (FileLogRecords fileLogRecords = 
createFileWithLogHeader(LOG_MAGIC_VALUE_V0, 100)) {
+            logHeaderBuffer.rewind();
+            FileLogProjection.readLogHeaderFullyOrFail(
+                    fileLogRecords.channel(), logHeaderBuffer, 0);
+            assertThat(logHeaderBuffer.hasRemaining()).isFalse();
+        }
+
+        // v1 log header incomplete, should throw exception
+        try (FileLogRecords fileLogRecords =
+                createFileWithLogHeader(LOG_MAGIC_VALUE_V1, 
V0_RECORD_BATCH_HEADER_SIZE)) {
+            logHeaderBuffer.rewind();
+            assertThatThrownBy(
+                            () ->
+                                    FileLogProjection.readLogHeaderFullyOrFail(
+                                            fileLogRecords.channel(), 
logHeaderBuffer, 0),
+                            "Should throw exception if the log header is 
incomplete")
+                    .isInstanceOf(EOFException.class)
+                    .hasMessageContaining(
+                            "Expected to read 52 bytes, but reached end of 
file after reading 48 bytes.");
+        }
+
+        // v0 log header incomplete, should throw exception
+        try (FileLogRecords fileLogRecords =
+                createFileWithLogHeader(LOG_MAGIC_VALUE_V0, 
V0_RECORD_BATCH_HEADER_SIZE - 1)) {
+            logHeaderBuffer.rewind();
+            assertThatThrownBy(
+                            () ->
+                                    FileLogProjection.readLogHeaderFullyOrFail(
+                                            fileLogRecords.channel(), 
logHeaderBuffer, 0),
+                            "Should throw exception if the log header is 
incomplete")
+                    .isInstanceOf(EOFException.class)
+                    .hasMessageContaining(
+                            "Expected to read 48 bytes, but reached end of 
file after reading 47 bytes.");
+        }
+    }
+
+    private FileLogRecords createFileWithLogHeader(byte magic, int length) 
throws Exception {
+        ByteBuffer buffer = 
ByteBuffer.allocate(length).order(ByteOrder.LITTLE_ENDIAN);
+        buffer.position(MAGIC_OFFSET);
+        buffer.put(magic);
+        buffer.position(length);
+        buffer.flip();
+        File file = new File(tempDir, UUID.randomUUID() + ".log");
+        FileLogRecords fileLogRecords = FileLogRecords.open(file);
+        fileLogRecords.channel().write(buffer);
+        fileLogRecords.flush();
+        return fileLogRecords;
+    }
+
     @SafeVarargs
-    private final FileLogRecords createFileLogRecords(RowType rowType, 
List<Object[]>... inputs)
-            throws Exception {
+    final FileLogRecords createFileLogRecords(
+            byte recordBatchMagic, RowType rowType, List<Object[]>... inputs) 
throws Exception {
         FileLogRecords fileLogRecords = FileLogRecords.open(new File(tempDir, 
"test.tmp"));
         long offsetBase = 0L;
         for (List<Object[]> input : inputs) {
@@ -230,6 +332,7 @@ class FileLogProjectionTest {
                             DEFAULT_SCHEMA_ID,
                             offsetBase,
                             System.currentTimeMillis(),
+                            recordBatchMagic,
                             input,
                             LogFormat.ARROW));
             offsetBase += input.size();
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/record/KvRecordTestUtils.java 
b/fluss-common/src/test/java/org/apache/fluss/record/KvRecordTestUtils.java
index 5a0115fd1..679920a21 100644
--- a/fluss-common/src/test/java/org/apache/fluss/record/KvRecordTestUtils.java
+++ b/fluss-common/src/test/java/org/apache/fluss/record/KvRecordTestUtils.java
@@ -33,8 +33,8 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
 
-import static org.apache.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE;
-import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
 import static org.apache.fluss.testutils.DataTestUtils.compactedRow;
 
 /** Test utils for kv record. */
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/record/LogRecordBatchFormatTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/record/LogRecordBatchFormatTest.java
new file mode 100644
index 000000000..3f83aa7a5
--- /dev/null
+++ 
b/fluss-common/src/test/java/org/apache/fluss/record/LogRecordBatchFormatTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.record;
+
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.fluss.record.LogRecordBatchFormat.HEADER_SIZE_UP_TO_MAGIC;
+import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_OFFSET;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_OVERHEAD;
+import static org.apache.fluss.record.LogRecordBatchFormat.MAGIC_OFFSET;
+import static 
org.apache.fluss.record.LogRecordBatchFormat.arrowChangeTypeOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.attributeOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.batchSequenceOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.crcOffset;
+import static 
org.apache.fluss.record.LogRecordBatchFormat.lastOffsetDeltaOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.leaderEpochOffset;
+import static 
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
+import static org.apache.fluss.record.LogRecordBatchFormat.recordsCountOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.schemaIdOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.writeClientIdOffset;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link LogRecordBatchFormat}. */
+public class LogRecordBatchFormatTest {
+
+    @Test
+    void testCommonParam() {
+        assertThat(LENGTH_OFFSET).isEqualTo(8);
+        assertThat(MAGIC_OFFSET).isEqualTo(12);
+        assertThat(LOG_OVERHEAD).isEqualTo(12);
+        assertThat(HEADER_SIZE_UP_TO_MAGIC).isEqualTo(13);
+    }
+
+    @Test
+    void testLogRecordBatchFormatForMagicV0() {
+        byte magic = (byte) 0;
+        assertThatThrownBy(() -> leaderEpochOffset(magic))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Unsupported magic value 0");
+        assertThat(crcOffset(magic)).isEqualTo(21);
+        assertThat(schemaIdOffset(magic)).isEqualTo(25);
+        assertThat(attributeOffset(magic)).isEqualTo(27);
+        assertThat(lastOffsetDeltaOffset(magic)).isEqualTo(28);
+        assertThat(writeClientIdOffset(magic)).isEqualTo(32);
+        assertThat(batchSequenceOffset(magic)).isEqualTo(40);
+        assertThat(recordsCountOffset(magic)).isEqualTo(44);
+        assertThat(recordBatchHeaderSize(magic)).isEqualTo(48);
+        assertThat(arrowChangeTypeOffset(magic)).isEqualTo(48);
+    }
+
+    @Test
+    void testLogRecordBatchFormatForMagicV1() {
+        byte magic = (byte) 1;
+        assertThat(leaderEpochOffset(magic)).isEqualTo(21);
+        assertThat(crcOffset(magic)).isEqualTo(25);
+        assertThat(schemaIdOffset(magic)).isEqualTo(29);
+        assertThat(attributeOffset(magic)).isEqualTo(31);
+        assertThat(lastOffsetDeltaOffset(magic)).isEqualTo(32);
+        assertThat(writeClientIdOffset(magic)).isEqualTo(36);
+        assertThat(batchSequenceOffset(magic)).isEqualTo(44);
+        assertThat(recordsCountOffset(magic)).isEqualTo(48);
+        assertThat(recordBatchHeaderSize(magic)).isEqualTo(52);
+        assertThat(arrowChangeTypeOffset(magic)).isEqualTo(52);
+    }
+}
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java
index 7759e711e..5c0df6e36 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java
@@ -38,17 +38,23 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
 
 import static 
org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
 import static org.apache.fluss.compression.ArrowCompressionInfo.NO_COMPRESSION;
+import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V0;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
 import static org.apache.fluss.record.TestData.DATA1;
 import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
 import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
@@ -77,14 +83,15 @@ public class MemoryLogRecordsArrowBuilderTest {
         allocator.close();
     }
 
-    @Test
-    void testAppendWithEmptyRecord() throws Exception {
+    @ParameterizedTest
+    @MethodSource("magicAndExpectedBatchSize")
+    void testAppendWithEmptyRecord(byte recordBatchMagic, int 
expectedBatchSize) throws Exception {
         int maxSizeInBytes = 1024;
         ArrowWriter writer =
                 provider.getOrCreateWriter(
                         1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE, 
DEFAULT_COMPRESSION);
         MemoryLogRecordsArrowBuilder builder =
-                createMemoryLogRecordsArrowBuilder(0, writer, 10, 100);
+                createMemoryLogRecordsArrowBuilder(0, writer, 10, 100, 
recordBatchMagic);
         assertThat(builder.isFull()).isFalse();
         assertThat(builder.getWriteLimitInBytes())
                 .isEqualTo((int) (maxSizeInBytes * BUFFER_USAGE_RATIO));
@@ -95,18 +102,19 @@ public class MemoryLogRecordsArrowBuilderTest {
         assertThat(iterator.hasNext()).isTrue();
         LogRecordBatch batch = iterator.next();
         assertThat(batch.getRecordCount()).isEqualTo(0);
-        assertThat(batch.sizeInBytes()).isEqualTo(48);
+        assertThat(batch.sizeInBytes()).isEqualTo(expectedBatchSize);
         assertThat(iterator.hasNext()).isFalse();
     }
 
-    @Test
-    void testAppend() throws Exception {
+    @ParameterizedTest
+    @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
+    void testAppend(byte recordBatchMagic) throws Exception {
         int maxSizeInBytes = 1024;
         ArrowWriter writer =
                 provider.getOrCreateWriter(
                         1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE, 
DEFAULT_COMPRESSION);
         MemoryLogRecordsArrowBuilder builder =
-                createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024);
+                createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024, 
recordBatchMagic);
         List<ChangeType> changeTypes =
                 DATA1.stream().map(row -> 
ChangeType.APPEND_ONLY).collect(Collectors.toList());
         List<InternalRow> rows =
@@ -157,7 +165,7 @@ public class MemoryLogRecordsArrowBuilderTest {
                 provider.getOrCreateWriter(
                         1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE, 
NO_COMPRESSION);
         MemoryLogRecordsArrowBuilder builder =
-                createMemoryLogRecordsArrowBuilder(0, writer1, 10, 1024);
+                createMemoryLogRecordsArrowBuilder(0, writer1, 10, 1024, 
CURRENT_LOG_MAGIC_VALUE);
         for (Object[] data : dataSet) {
             builder.append(ChangeType.APPEND_ONLY, row(data));
         }
@@ -171,7 +179,7 @@ public class MemoryLogRecordsArrowBuilderTest {
                 provider.getOrCreateWriter(
                         1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE, 
compressionInfo);
         MemoryLogRecordsArrowBuilder builder2 =
-                createMemoryLogRecordsArrowBuilder(0, writer2, 10, 1024);
+                createMemoryLogRecordsArrowBuilder(0, writer2, 10, 1024, 
CURRENT_LOG_MAGIC_VALUE);
         for (Object[] data : dataSet) {
             builder2.append(ChangeType.APPEND_ONLY, row(data));
         }
@@ -184,8 +192,9 @@ public class MemoryLogRecordsArrowBuilderTest {
         assertThat(sizeInBytes1).isGreaterThan(sizeInBytes2);
     }
 
-    @Test
-    void testIllegalArgument() {
+    @ParameterizedTest
+    @MethodSource("magicAndExpectedBatchSize")
+    void testIllegalArgument(byte recordBatchMagic, int expectedBatchSize) {
         int maxSizeInBytes = 1024;
         assertThatThrownBy(
                         () -> {
@@ -196,12 +205,15 @@ public class MemoryLogRecordsArrowBuilderTest {
                                             maxSizeInBytes,
                                             DATA1_ROW_TYPE,
                                             DEFAULT_COMPRESSION)) {
-                                createMemoryLogRecordsArrowBuilder(0, writer, 
10, 30);
+                                createMemoryLogRecordsArrowBuilder(
+                                        0, writer, 10, 30, recordBatchMagic);
                             }
                         })
                 .isInstanceOf(IllegalArgumentException.class)
                 .hasMessage(
-                        "The size of first segment of pagedOutputView is too 
small, need at least 48 bytes.");
+                        "The size of first segment of pagedOutputView is too 
small, need at least "
+                                + expectedBatchSize
+                                + " bytes.");
     }
 
     @Test
@@ -211,7 +223,7 @@ public class MemoryLogRecordsArrowBuilderTest {
                 provider.getOrCreateWriter(
                         1L, DEFAULT_SCHEMA_ID, 1024, DATA1_ROW_TYPE, 
NO_COMPRESSION);
         MemoryLogRecordsArrowBuilder builder =
-                createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024);
+                createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024, 
CURRENT_LOG_MAGIC_VALUE);
         List<ChangeType> changeTypes =
                 DATA1.stream().map(row -> 
ChangeType.APPEND_ONLY).collect(Collectors.toList());
         List<InternalRow> rows =
@@ -244,17 +256,18 @@ public class MemoryLogRecordsArrowBuilderTest {
         writer1.close();
     }
 
-    @Test
-    void testNoRecordAppend() throws Exception {
+    @ParameterizedTest
+    @MethodSource("magicAndExpectedBatchSize")
+    void testNoRecordAppend(byte recordBatchMagic, int expectedBatchSize) 
throws Exception {
         // 1. no record append with base offset as 0.
         ArrowWriter writer =
                 provider.getOrCreateWriter(
                         1L, DEFAULT_SCHEMA_ID, 1024 * 10, DATA1_ROW_TYPE, 
DEFAULT_COMPRESSION);
         MemoryLogRecordsArrowBuilder builder =
-                createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024 * 10);
+                createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024 * 10, 
recordBatchMagic);
         MemoryLogRecords memoryLogRecords = 
MemoryLogRecords.pointToBytesView(builder.build());
         // only contains batch header.
-        assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(48);
+        
assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(expectedBatchSize);
         Iterator<LogRecordBatch> iterator = 
memoryLogRecords.batches().iterator();
         assertThat(iterator.hasNext()).isTrue();
         LogRecordBatch logRecordBatch = iterator.next();
@@ -276,10 +289,10 @@ public class MemoryLogRecordsArrowBuilderTest {
         ArrowWriter writer2 =
                 provider.getOrCreateWriter(
                         1L, DEFAULT_SCHEMA_ID, 1024 * 10, DATA1_ROW_TYPE, 
DEFAULT_COMPRESSION);
-        builder = createMemoryLogRecordsArrowBuilder(100, writer2, 10, 1024 * 
10);
+        builder = createMemoryLogRecordsArrowBuilder(100, writer2, 10, 1024 * 
10, recordBatchMagic);
         memoryLogRecords = MemoryLogRecords.pointToBytesView(builder.build());
         // only contains batch header.
-        assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(48);
+        
assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(expectedBatchSize);
         iterator = memoryLogRecords.batches().iterator();
         assertThat(iterator.hasNext()).isTrue();
         logRecordBatch = iterator.next();
@@ -305,7 +318,7 @@ public class MemoryLogRecordsArrowBuilderTest {
                 provider.getOrCreateWriter(
                         1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE, 
DEFAULT_COMPRESSION);
         MemoryLogRecordsArrowBuilder builder =
-                createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024);
+                createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024, 
CURRENT_LOG_MAGIC_VALUE);
         List<ChangeType> changeTypes =
                 DATA1.stream().map(row -> 
ChangeType.APPEND_ONLY).collect(Collectors.toList());
         List<InternalRow> rows =
@@ -343,8 +356,19 @@ public class MemoryLogRecordsArrowBuilderTest {
                 new ArrowCompressionInfo(ArrowCompressionType.ZSTD, 9));
     }
 
+    private static Collection<Arguments> magicAndExpectedBatchSize() {
+        List<Arguments> params = new ArrayList<>();
+        params.add(Arguments.arguments(LOG_MAGIC_VALUE_V0, 48));
+        params.add(Arguments.arguments(LOG_MAGIC_VALUE_V1, 52));
+        return params;
+    }
+
     private MemoryLogRecordsArrowBuilder createMemoryLogRecordsArrowBuilder(
-            int baseOffset, ArrowWriter writer, int maxPages, int 
pageSizeInBytes)
+            int baseOffset,
+            ArrowWriter writer,
+            int maxPages,
+            int pageSizeInBytes,
+            byte recordBatchMagic)
             throws IOException {
         conf.set(
                 ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE,
@@ -353,6 +377,7 @@ public class MemoryLogRecordsArrowBuilderTest {
         conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, new 
MemorySize(pageSizeInBytes));
         return MemoryLogRecordsArrowBuilder.builder(
                 baseOffset,
+                recordBatchMagic,
                 DEFAULT_SCHEMA_ID,
                 writer,
                 new ManagedPagedOutputView(new 
TestingMemorySegmentPool(pageSizeInBytes)));
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/record/MemorySegmentLogInputStreamTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/record/MemorySegmentLogInputStreamTest.java
index 3fe720853..576e90131 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/record/MemorySegmentLogInputStreamTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/record/MemorySegmentLogInputStreamTest.java
@@ -17,13 +17,16 @@
 
 package org.apache.fluss.record;
 
+import org.apache.fluss.memory.MemorySegment;
 import org.apache.fluss.testutils.DataTestUtils;
 
 import org.junit.jupiter.api.Test;
 
 import java.util.Iterator;
 
-import static org.apache.fluss.record.DefaultLogRecordBatch.LOG_OVERHEAD;
+import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_OVERHEAD;
+import static org.apache.fluss.record.LogRecordBatchFormat.MAGIC_OFFSET;
 import static org.apache.fluss.record.TestData.DATA1;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -52,13 +55,15 @@ public class MemorySegmentLogInputStreamTest {
         iterator = getIterator(memoryLogRecords);
         assertThat(iterator.hasNext()).isFalse();
 
-        // gen batch with enough size.
+        // gen batch with not enough size.
         memoryLogRecords = MemoryLogRecords.pointToBytes(new 
byte[LOG_OVERHEAD]);
         iterator = getIterator(memoryLogRecords);
-        assertThat(iterator.hasNext()).isTrue();
+        assertThat(iterator.hasNext()).isFalse();
 
         // gen batch with enough size.
-        memoryLogRecords = MemoryLogRecords.pointToBytes(new byte[12]);
+        MemorySegment memory = MemorySegment.allocateHeapMemory(100);
+        memory.put(MAGIC_OFFSET, CURRENT_LOG_MAGIC_VALUE);
+        memoryLogRecords = 
MemoryLogRecords.pointToBytes(memory.getHeapMemory());
         iterator = getIterator(memoryLogRecords);
         assertThat(iterator.hasNext()).isTrue();
     }
diff --git a/fluss-common/src/test/java/org/apache/fluss/record/TestData.java 
b/fluss-common/src/test/java/org/apache/fluss/record/TestData.java
index a920efa94..7752b15b0 100644
--- a/fluss-common/src/test/java/org/apache/fluss/record/TestData.java
+++ b/fluss-common/src/test/java/org/apache/fluss/record/TestData.java
@@ -32,11 +32,13 @@ import org.apache.fluss.utils.types.Tuple2;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
+
 /** utils to create test data. */
 public final class TestData {
     public static final short DEFAULT_SCHEMA_ID = 1;
     public static final long BASE_OFFSET = 0L;
-    public static final byte DEFAULT_MAGIC = (byte) 0;
+    public static final byte DEFAULT_MAGIC = CURRENT_LOG_MAGIC_VALUE;
     // ---------------------------- data1 and related table info begin 
---------------------------
     public static final List<Object[]> DATA1 =
             Arrays.asList(
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java
index cedaadcbd..ce9336c5e 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java
@@ -44,7 +44,8 @@ import java.util.List;
 
 import static 
org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
 import static org.apache.fluss.compression.ArrowCompressionInfo.NO_COMPRESSION;
-import static 
org.apache.fluss.record.DefaultLogRecordBatch.ARROW_CHANGETYPE_OFFSET;
+import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
+import static 
org.apache.fluss.record.LogRecordBatchFormat.arrowChangeTypeOffset;
 import static org.apache.fluss.record.TestData.DATA1;
 import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
 import static org.apache.fluss.testutils.DataTestUtils.row;
@@ -147,12 +148,14 @@ class ArrowReaderWriterTest {
                     new ManagedPagedOutputView(new TestingMemorySegmentPool(10 
* 1024));
 
             // skip arrow batch header.
-            int size = writer.serializeToOutputView(pagedOutputView, 
ARROW_CHANGETYPE_OFFSET);
+            int size =
+                    writer.serializeToOutputView(
+                            pagedOutputView, 
arrowChangeTypeOffset(CURRENT_LOG_MAGIC_VALUE));
             MemorySegment segment = 
MemorySegment.allocateHeapMemory(writer.estimatedSizeInBytes());
 
             
assertThat(pagedOutputView.getWrittenSegments().size()).isEqualTo(1);
             MemorySegment firstSegment = pagedOutputView.getCurrentSegment();
-            firstSegment.copyTo(ARROW_CHANGETYPE_OFFSET, segment, 0, size);
+            
firstSegment.copyTo(arrowChangeTypeOffset(CURRENT_LOG_MAGIC_VALUE), segment, 0, 
size);
 
             ArrowReader reader =
                     ArrowUtils.createArrowReader(segment, 0, size, root, 
allocator, rowType);
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java 
b/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java
index 4f00fc14b..2236ebe21 100644
--- a/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java
+++ b/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java
@@ -70,8 +70,9 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import static 
org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
-import static org.apache.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE;
-import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID;
+import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
 import static 
org.apache.fluss.record.LogRecordReadContext.createArrowReadContext;
 import static org.apache.fluss.record.TestData.BASE_OFFSET;
 import static org.apache.fluss.record.TestData.DATA1;
@@ -150,17 +151,23 @@ public class DataTestUtils {
         return (CompactedRow) rowEncoder.finishRow();
     }
 
-    public static MemoryLogRecords genMemoryLogRecordsByObject(List<Object[]> 
objects)
+    public static MemoryLogRecords genMemoryLogRecordsByObject(byte magic, 
List<Object[]> objects)
             throws Exception {
         return createRecordsWithoutBaseLogOffset(
                 DATA1_ROW_TYPE,
                 DEFAULT_SCHEMA_ID,
                 0,
                 System.currentTimeMillis(),
+                magic,
                 objects,
                 LogFormat.ARROW);
     }
 
+    public static MemoryLogRecords genMemoryLogRecordsByObject(List<Object[]> 
objects)
+            throws Exception {
+        return genMemoryLogRecordsByObject(CURRENT_LOG_MAGIC_VALUE, objects);
+    }
+
     public static MemoryLogRecords genMemoryLogRecordsWithWriterId(
             List<Object[]> objects, long writerId, int batchSequence, long 
baseOffset)
             throws Exception {
@@ -171,6 +178,7 @@ public class DataTestUtils {
                 DEFAULT_SCHEMA_ID,
                 baseOffset,
                 System.currentTimeMillis(),
+                CURRENT_LOG_MAGIC_VALUE,
                 writerId,
                 batchSequence,
                 changeTypes,
@@ -196,7 +204,13 @@ public class DataTestUtils {
     public static MemoryLogRecords genMemoryLogRecordsWithBaseOffset(
             long offsetBase, List<Object[]> objects) throws Exception {
         return createRecordsWithoutBaseLogOffset(
-                DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID, offsetBase, -1L, objects, 
LogFormat.ARROW);
+                DATA1_ROW_TYPE,
+                DEFAULT_SCHEMA_ID,
+                offsetBase,
+                -1L,
+                CURRENT_LOG_MAGIC_VALUE,
+                objects,
+                LogFormat.ARROW);
     }
 
     public static MemoryLogRecords genLogRecordsWithBaseOffsetAndTimestamp(
@@ -206,6 +220,7 @@ public class DataTestUtils {
                 DEFAULT_SCHEMA_ID,
                 offsetBase,
                 maxTimestamp,
+                CURRENT_LOG_MAGIC_VALUE,
                 objects,
                 LogFormat.ARROW);
     }
@@ -316,6 +331,7 @@ public class DataTestUtils {
                         DEFAULT_SCHEMA_ID,
                         baseOffset,
                         System.currentTimeMillis(),
+                        CURRENT_LOG_MAGIC_VALUE,
                         objects,
                         logFormat));
         fileLogRecords.flush();
@@ -358,6 +374,7 @@ public class DataTestUtils {
             int schemaId,
             long offsetBase,
             long maxTimestamp,
+            byte magic,
             List<Object[]> objects,
             LogFormat logFormat)
             throws Exception {
@@ -368,6 +385,7 @@ public class DataTestUtils {
                 schemaId,
                 offsetBase,
                 maxTimestamp,
+                magic,
                 NO_WRITER_ID,
                 NO_BATCH_SEQUENCE,
                 changeTypes,
@@ -381,6 +399,7 @@ public class DataTestUtils {
             int schemaId,
             long offsetBase,
             long maxTimestamp,
+            byte magic,
             long writerId,
             int batchSequence,
             List<ChangeType> changeTypes,
@@ -393,6 +412,7 @@ public class DataTestUtils {
                 schemaId,
                 offsetBase,
                 maxTimestamp,
+                magic,
                 writerId,
                 batchSequence,
                 changeTypes,
@@ -406,6 +426,7 @@ public class DataTestUtils {
             int schemaId,
             long offsetBase,
             long maxTimestamp,
+            byte magic,
             long writerId,
             int batchSequence,
             List<ChangeType> changeTypes,
@@ -420,6 +441,7 @@ public class DataTestUtils {
                     rowType,
                     offsetBase,
                     maxTimestamp,
+                    magic,
                     schemaId,
                     writerId,
                     batchSequence,
@@ -458,7 +480,7 @@ public class DataTestUtils {
         }
         builder.setWriterState(writerId, batchSequence);
         MemoryLogRecords memoryLogRecords = 
MemoryLogRecords.pointToBytesView(builder.build());
-        memoryLogRecords.ensureValid();
+        memoryLogRecords.ensureValid(DEFAULT_MAGIC);
 
         ((DefaultLogRecordBatch) memoryLogRecords.batches().iterator().next())
                 .setCommitTimestamp(maxTimestamp);
@@ -470,6 +492,7 @@ public class DataTestUtils {
             RowType rowType,
             long baseLogOffset,
             long maxTimestamp,
+            byte magic,
             int schemaId,
             long writerId,
             int batchSequence,
@@ -485,6 +508,7 @@ public class DataTestUtils {
             MemoryLogRecordsArrowBuilder builder =
                     MemoryLogRecordsArrowBuilder.builder(
                             baseLogOffset,
+                            magic,
                             schemaId,
                             writer,
                             new ManagedPagedOutputView(new 
TestingMemorySegmentPool(10 * 1024)));
@@ -497,7 +521,7 @@ public class DataTestUtils {
 
             ((DefaultLogRecordBatch) 
memoryLogRecords.batches().iterator().next())
                     .setCommitTimestamp(maxTimestamp);
-            memoryLogRecords.ensureValid();
+            memoryLogRecords.ensureValid(magic);
             return memoryLogRecords;
         }
     }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java 
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java
index 105fc47e7..0a30d5f5f 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java
@@ -46,7 +46,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Optional;
 
-import static 
org.apache.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
+import static 
org.apache.fluss.record.LogRecordBatchFormat.V0_RECORD_BATCH_HEADER_SIZE;
 import static org.apache.fluss.utils.IOUtils.closeQuietly;
 
 /* This file is based on source code of Apache Kafka Project 
(https://kafka.apache.org/), licensed by the Apache
@@ -495,7 +495,8 @@ public final class LogSegment {
                 new LogOffsetMetadata(startOffset, this.baseOffset, 
startPosition);
         int adjustedMaxSize =
                 minOneMessage ? Math.max(maxSize, 
startOffsetAndSize.getSize()) : maxSize;
-        if (adjustedMaxSize <= RECORD_BATCH_HEADER_SIZE) {
+        // use V0 size as the lower bound, since V1 header size is large than 
V0
+        if (adjustedMaxSize <= V0_RECORD_BATCH_HEADER_SIZE) {
             return new FetchDataInfo(offsetMetadata, MemoryLogRecords.EMPTY);
         }
         if (projection == null) {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterAppendInfo.java 
b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterAppendInfo.java
index 9ed95e35b..cd625b155 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterAppendInfo.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterAppendInfo.java
@@ -21,7 +21,7 @@ import org.apache.fluss.exception.OutOfOrderSequenceException;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.record.LogRecordBatch;
 
-import static org.apache.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
 
 /**
  * This class is used to validate the records appended by a given writer 
before they are written to
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateEntry.java 
b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateEntry.java
index c0b434991..cf237a73e 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateEntry.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateEntry.java
@@ -25,7 +25,7 @@ import java.util.ArrayDeque;
 import java.util.Deque;
 import java.util.Optional;
 
-import static org.apache.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
 
 /**
  * This class represents the state of a specific writer id. The batch sequence 
number is ordered
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java
index e8712964c..4fa0baced 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java
@@ -56,6 +56,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
 import static org.apache.fluss.utils.FlussPaths.WRITER_SNAPSHOT_FILE_SUFFIX;
 import static org.apache.fluss.utils.FlussPaths.writerSnapshotFile;
 
@@ -232,7 +233,7 @@ public class WriterStateManager {
     /** Update the mapping with the given append information. */
     public void update(WriterAppendInfo appendInfo) {
         long writerId = appendInfo.writerId();
-        if (writerId == LogRecordBatch.NO_WRITER_ID) {
+        if (writerId == NO_WRITER_ID) {
             throw new IllegalArgumentException(
                     "Invalid writer id "
                             + writerId
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
index 1f3271562..6734a32d5 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
@@ -77,8 +77,9 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import static 
org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
-import static org.apache.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE;
-import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID;
+import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
 import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK;
 import static org.apache.fluss.record.TestData.DATA2_SCHEMA;
 import static org.apache.fluss.record.TestData.DATA3_SCHEMA_PK;
@@ -893,6 +894,7 @@ class KvTabletTest {
                 DEFAULT_SCHEMA_ID,
                 baseOffset,
                 -1L,
+                CURRENT_LOG_MAGIC_VALUE,
                 NO_WRITER_ID,
                 NO_BATCH_SEQUENCE,
                 changeTypes,
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
index 526b1b350..045bd3525 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
@@ -57,8 +57,9 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
-import static org.apache.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE;
-import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID;
+import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
 import static org.apache.fluss.record.TestData.DATA1;
 import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
 import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH_PK;
@@ -671,6 +672,7 @@ final class ReplicaTest extends ReplicaTestBase {
                 DEFAULT_SCHEMA_ID,
                 baseOffset,
                 -1L,
+                CURRENT_LOG_MAGIC_VALUE,
                 NO_WRITER_ID,
                 NO_BATCH_SEQUENCE,
                 changeTypes,
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
index 04f8f3ce3..afd10c943 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
@@ -62,6 +62,8 @@ import org.apache.fluss.utils.types.Tuple2;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import javax.annotation.Nullable;
 
@@ -77,6 +79,8 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V0;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
 import static org.apache.fluss.record.TestData.ANOTHER_DATA1;
 import static org.apache.fluss.record.TestData.DATA1;
 import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
@@ -118,8 +122,9 @@ public class TabletServiceITCase {
     public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
             FlussClusterExtension.builder().setNumOfTabletServers(3).build();
 
-    @Test
-    void testProduceLog() throws Exception {
+    @ParameterizedTest
+    @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
+    void testProduceLog(byte recordBatchMagic) throws Exception {
         long tableId =
                 createTable(FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH, 
DATA1_TABLE_DESCRIPTOR);
         TableBucket tb = new TableBucket(tableId, 0);
@@ -134,7 +139,10 @@ public class TabletServiceITCase {
                 leaderGateWay
                         .produceLog(
                                 newProduceLogRequest(
-                                        tableId, 0, 1, 
genMemoryLogRecordsByObject(DATA1)))
+                                        tableId,
+                                        0,
+                                        1,
+                                        
genMemoryLogRecordsByObject(recordBatchMagic, DATA1)))
                         .get(),
                 0,
                 0L);
@@ -205,8 +213,9 @@ public class TabletServiceITCase {
         }
     }
 
-    @Test
-    void testFetchLog() throws Exception {
+    @ParameterizedTest
+    @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
+    void testFetchLog(byte recordBatchMagic) throws Exception {
         long tableId =
                 createTable(FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH, 
DATA1_TABLE_DESCRIPTOR);
         TableBucket tb = new TableBucket(tableId, 0);
@@ -222,7 +231,10 @@ public class TabletServiceITCase {
                 leaderGateWay
                         .produceLog(
                                 newProduceLogRequest(
-                                        tableId, 0, 1, 
genMemoryLogRecordsByObject(DATA1)))
+                                        tableId,
+                                        0,
+                                        1,
+                                        
genMemoryLogRecordsByObject(recordBatchMagic, DATA1)))
                         .get(),
                 0,
                 0L);
@@ -248,7 +260,11 @@ public class TabletServiceITCase {
                 leaderGateWay
                         .produceLog(
                                 newProduceLogRequest(
-                                        tableId, 0, 1, 
genMemoryLogRecordsByObject(ANOTHER_DATA1)))
+                                        tableId,
+                                        0,
+                                        1,
+                                        genMemoryLogRecordsByObject(
+                                                recordBatchMagic, 
ANOTHER_DATA1)))
                         .get(),
                 0,
                 10L);

Reply via email to