This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new a25c64bdd [log] Bump LogRecordBatch's CURRENT_LOG_MAGIC_VALUE to V1 to
support leaderEpoch (#778)
a25c64bdd is described below
commit a25c64bddf3e331977661be1ed7fb286cfd31bbc
Author: yunhong <[email protected]>
AuthorDate: Wed Sep 10 13:50:34 2025 +0800
[log] Bump LogRecordBatch's CURRENT_LOG_MAGIC_VALUE to V1 to support
leaderEpoch (#778)
Co-authored-by: Jark Wu <[email protected]>
---
.../fluss/client/write/IdempotenceBucketEntry.java | 2 +-
.../fluss/client/write/IdempotenceManager.java | 8 +-
.../fluss/client/write/RecordAccumulator.java | 8 +-
.../org/apache/fluss/client/write/WriteBatch.java | 4 +-
.../org/apache/fluss/client/write/WriteRecord.java | 5 +-
.../scanner/log/DefaultCompletedFetchTest.java | 50 ++--
.../client/write/IndexedLogWriteBatchTest.java | 5 +-
.../fluss/client/write/RecordAccumulatorTest.java | 8 +-
.../org/apache/fluss/client/write/SenderTest.java | 4 +-
.../apache/fluss/record/DefaultLogRecordBatch.java | 150 +++++------
.../apache/fluss/record/FileLogInputStream.java | 22 +-
.../org/apache/fluss/record/FileLogProjection.java | 89 +++++--
.../org/apache/fluss/record/IndexedLogRecord.java | 2 +-
.../apache/fluss/record/KvRecordBatchBuilder.java | 6 +-
.../org/apache/fluss/record/LogRecordBatch.java | 21 +-
.../apache/fluss/record/LogRecordBatchFormat.java | 292 +++++++++++++++++++++
.../org/apache/fluss/record/MemoryLogRecords.java | 9 +-
.../fluss/record/MemoryLogRecordsArrowBuilder.java | 44 ++--
.../record/MemoryLogRecordsIndexedBuilder.java | 34 ++-
.../fluss/record/MemorySegmentLogInputStream.java | 8 +-
.../fluss/record/DefaultLogRecordBatchTest.java | 27 +-
.../fluss/record/FileLogInputStreamTest.java | 26 +-
.../apache/fluss/record/FileLogProjectionTest.java | 129 ++++++++-
.../org/apache/fluss/record/KvRecordTestUtils.java | 4 +-
.../fluss/record/LogRecordBatchFormatTest.java | 81 ++++++
.../record/MemoryLogRecordsArrowBuilderTest.java | 69 +++--
.../record/MemorySegmentLogInputStreamTest.java | 13 +-
.../java/org/apache/fluss/record/TestData.java | 4 +-
.../fluss/row/arrow/ArrowReaderWriterTest.java | 9 +-
.../org/apache/fluss/testutils/DataTestUtils.java | 36 ++-
.../org/apache/fluss/server/log/LogSegment.java | 5 +-
.../apache/fluss/server/log/WriterAppendInfo.java | 2 +-
.../apache/fluss/server/log/WriterStateEntry.java | 2 +-
.../fluss/server/log/WriterStateManager.java | 3 +-
.../org/apache/fluss/server/kv/KvTabletTest.java | 6 +-
.../apache/fluss/server/replica/ReplicaTest.java | 6 +-
.../fluss/server/tablet/TabletServiceITCase.java | 30 ++-
37 files changed, 937 insertions(+), 286 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceBucketEntry.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceBucketEntry.java
index f48377aa5..c2426b20e 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceBucketEntry.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceBucketEntry.java
@@ -29,7 +29,7 @@ import java.util.SortedSet;
import java.util.TreeSet;
import java.util.function.Consumer;
-import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
/** Entry to store the idempotence information of each table-bucket. */
@Internal
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java
index 7b5605263..5593c1999 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java
@@ -25,7 +25,6 @@ import org.apache.fluss.exception.UnknownWriterIdException;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;
-import org.apache.fluss.record.LogRecordBatch;
import org.apache.fluss.rpc.gateway.TabletServerGateway;
import org.apache.fluss.rpc.messages.InitWriterRequest;
import org.apache.fluss.rpc.protocol.Errors;
@@ -40,7 +39,8 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
-import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
/* This file is based on source code of Apache Kafka Project
(https://kafka.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
@@ -182,10 +182,10 @@ public class IdempotenceManager {
*/
synchronized int firstInFlightBatchSequence(TableBucket tableBucket) {
if (!hasInflightBatches(tableBucket)) {
- return LogRecordBatch.NO_BATCH_SEQUENCE;
+ return NO_BATCH_SEQUENCE;
}
WriteBatch batch = nextBatchBySequence(tableBucket);
- return batch == null ? LogRecordBatch.NO_BATCH_SEQUENCE :
batch.batchSequence();
+ return batch == null ? NO_BATCH_SEQUENCE : batch.batchSequence();
}
synchronized void handleCompletedBatch(ReadyWriteBatch readyWriteBatch) {
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
index 6164d6ecf..2cfd74da3 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
@@ -32,7 +32,6 @@ import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metrics.MetricNames;
-import org.apache.fluss.record.LogRecordBatch;
import org.apache.fluss.row.arrow.ArrowWriter;
import org.apache.fluss.row.arrow.ArrowWriterPool;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
@@ -61,7 +60,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
/* This file is based on source code of Apache Kafka Project
(https://kafka.apache.org/), licensed by the Apache
@@ -757,7 +757,7 @@ public final class RecordAccumulator {
// flight request count to 1.
int firstInFlightSequence =
idempotenceManager.firstInFlightBatchSequence(tableBucket);
boolean isFirstInFlightBatch =
- firstInFlightSequence == LogRecordBatch.NO_BATCH_SEQUENCE
+ firstInFlightSequence == NO_BATCH_SEQUENCE
|| (first.hasBatchSequence()
&& first.batchSequence() ==
firstInFlightSequence);
@@ -824,7 +824,7 @@ public final class RecordAccumulator {
Deque<WriteBatch> deque, WriteBatch batch, TableBucket
tableBucket) {
// When we are re-enqueue and have enabled idempotence, the
re-enqueued batch must always
// have a batch sequence.
- if (batch.batchSequence() == LogRecordBatch.NO_BATCH_SEQUENCE) {
+ if (batch.batchSequence() == NO_BATCH_SEQUENCE) {
throw new IllegalStateException(
"Trying to re-enqueue a batch which doesn't have a
sequence even "
+ "though idempotence is enabled.");
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java
index fe8950845..49766612c 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java
@@ -21,7 +21,6 @@ import org.apache.fluss.annotation.Internal;
import org.apache.fluss.memory.MemorySegment;
import org.apache.fluss.memory.MemorySegmentPool;
import org.apache.fluss.metadata.PhysicalTablePath;
-import org.apache.fluss.record.LogRecordBatch;
import org.apache.fluss.record.bytesview.BytesView;
import org.slf4j.Logger;
@@ -35,6 +34,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
/** The abstract write batch contains write callback object to wait write
request feedback. */
@@ -113,7 +113,7 @@ public abstract class WriteBatch {
public abstract void abortRecordAppends();
public boolean hasBatchSequence() {
- return batchSequence() != LogRecordBatch.NO_BATCH_SEQUENCE;
+ return batchSequence() != NO_BATCH_SEQUENCE;
}
public void resetWriterState(long writerId, int batchSequence) {
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
index b2b73778c..0c1427e6f 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
@@ -21,7 +21,6 @@ import org.apache.fluss.annotation.Internal;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.record.DefaultKvRecord;
import org.apache.fluss.record.DefaultKvRecordBatch;
-import org.apache.fluss.record.DefaultLogRecordBatch;
import org.apache.fluss.record.IndexedLogRecord;
import org.apache.fluss.row.BinaryRow;
import org.apache.fluss.row.InternalRow;
@@ -29,6 +28,8 @@ import org.apache.fluss.row.indexed.IndexedRow;
import javax.annotation.Nullable;
+import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
+import static
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
/**
@@ -85,7 +86,7 @@ public final class WriteRecord {
PhysicalTablePath tablePath, IndexedRow row, @Nullable byte[]
bucketKey) {
checkNotNull(row);
int estimatedSizeInBytes =
- IndexedLogRecord.sizeOf(row) +
DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
+ IndexedLogRecord.sizeOf(row) +
recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE);
return new WriteRecord(
tablePath,
null,
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
index 7a3d8845a..3502dcc3c 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
@@ -39,16 +39,22 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static
org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V0;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
import static org.apache.fluss.record.TestData.DATA2;
import static org.apache.fluss.record.TestData.DATA2_ROW_TYPE;
import static org.apache.fluss.record.TestData.DATA2_TABLE_ID;
@@ -78,14 +84,15 @@ public class DefaultCompletedFetchTest {
logScannerStatus.assignScanBuckets(scanBuckets);
}
- @Test
- void testSimple() throws Exception {
+ @ParameterizedTest
+ @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
+ void testSimple(byte recordBatchMagic) throws Exception {
long fetchOffset = 0L;
int bucketId = 0; // records for 0-10.
TableBucket tb = new TableBucket(DATA2_TABLE_ID, bucketId);
FetchLogResultForBucket resultForBucket0 =
new FetchLogResultForBucket(
- tb, createMemoryLogRecords(DATA2, LogFormat.ARROW),
10L);
+ tb, createMemoryLogRecords(DATA2, LogFormat.ARROW,
recordBatchMagic), 10L);
DefaultCompletedFetch defaultCompletedFetch =
makeCompletedFetch(tb, resultForBucket0, fetchOffset);
List<ScanRecord> scanRecords = defaultCompletedFetch.fetchRecords(8);
@@ -100,14 +107,15 @@ public class DefaultCompletedFetchTest {
assertThat(scanRecords.size()).isEqualTo(0);
}
- @Test
- void testNegativeFetchCount() throws Exception {
+ @ParameterizedTest
+ @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
+ void testNegativeFetchCount(byte recordBatchMagic) throws Exception {
long fetchOffset = 0L;
int bucketId = 0; // records for 0-10.
TableBucket tb = new TableBucket(DATA2_TABLE_ID, bucketId);
FetchLogResultForBucket resultForBucket0 =
new FetchLogResultForBucket(
- tb, createMemoryLogRecords(DATA2, LogFormat.ARROW),
10L);
+ tb, createMemoryLogRecords(DATA2, LogFormat.ARROW,
recordBatchMagic), 10L);
DefaultCompletedFetch defaultCompletedFetch =
makeCompletedFetch(tb, resultForBucket0, fetchOffset);
List<ScanRecord> scanRecords = defaultCompletedFetch.fetchRecords(-10);
@@ -128,9 +136,8 @@ public class DefaultCompletedFetchTest {
}
@ParameterizedTest
- @ValueSource(strings = {"INDEXED", "ARROW"})
- void testProjection(String format) throws Exception {
- LogFormat logFormat = LogFormat.fromString(format);
+ @MethodSource("typeAndMagic")
+ void testProjection(LogFormat logFormat, byte magic) throws Exception {
Schema schema =
Schema.newBuilder()
.column("a", DataTypes.INT())
@@ -158,9 +165,9 @@ public class DefaultCompletedFetchTest {
Projection projection = Projection.of(new int[] {0, 2});
MemoryLogRecords memoryLogRecords;
if (logFormat == LogFormat.ARROW) {
- memoryLogRecords = genRecordsWithProjection(DATA2, projection);
+ memoryLogRecords = genRecordsWithProjection(DATA2, projection,
magic);
} else {
- memoryLogRecords = createMemoryLogRecords(DATA2,
LogFormat.INDEXED);
+ memoryLogRecords = createMemoryLogRecords(DATA2,
LogFormat.INDEXED, magic);
}
FetchLogResultForBucket resultForBucket0 =
new FetchLogResultForBucket(tb, memoryLogRecords, 10L);
@@ -224,19 +231,28 @@ public class DefaultCompletedFetchTest {
offset);
}
- private MemoryLogRecords createMemoryLogRecords(List<Object[]> objects,
LogFormat logFormat)
- throws Exception {
+ private static Collection<Arguments> typeAndMagic() {
+ List<Arguments> params = new ArrayList<>();
+ params.add(Arguments.arguments(LogFormat.ARROW, LOG_MAGIC_VALUE_V1));
+ params.add(Arguments.arguments(LogFormat.INDEXED, LOG_MAGIC_VALUE_V1));
+ params.add(Arguments.arguments(LogFormat.ARROW, LOG_MAGIC_VALUE_V0));
+ params.add(Arguments.arguments(LogFormat.INDEXED, LOG_MAGIC_VALUE_V0));
+ return params;
+ }
+
+ private MemoryLogRecords createMemoryLogRecords(
+ List<Object[]> objects, LogFormat logFormat, byte magic) throws
Exception {
return createRecordsWithoutBaseLogOffset(
- rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, objects, logFormat);
+ rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, magic, objects,
logFormat);
}
- private MemoryLogRecords genRecordsWithProjection(List<Object[]> objects,
Projection projection)
- throws Exception {
+ private MemoryLogRecords genRecordsWithProjection(
+ List<Object[]> objects, Projection projection, byte magic) throws
Exception {
File logFile = FlussPaths.logFile(tempDir, 0L);
FileLogRecords fileLogRecords = FileLogRecords.open(logFile, false,
1024 * 1024, false);
fileLogRecords.append(
createRecordsWithoutBaseLogOffset(
- rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, objects,
LogFormat.ARROW));
+ rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, magic, objects,
LogFormat.ARROW));
fileLogRecords.flush();
FileLogProjection fileLogProjection = new FileLogProjection();
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
index 53bf730e1..07544e8da 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
@@ -21,7 +21,6 @@ import org.apache.fluss.memory.MemorySegment;
import org.apache.fluss.memory.PreAllocatedPagedOutputView;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.record.ChangeType;
-import org.apache.fluss.record.DefaultLogRecordBatch;
import org.apache.fluss.record.IndexedLogRecord;
import org.apache.fluss.record.LogRecord;
import org.apache.fluss.record.LogRecordBatch;
@@ -40,6 +39,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
+import static
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
@@ -72,7 +73,7 @@ public class IndexedLogWriteBatchTest {
for (int i = 0;
i
- < (writeLimit -
DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE)
+ < (writeLimit -
recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE))
/ estimatedSizeInBytes;
i++) {
boolean appendResult =
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
index 444b93a5b..e5d71a927 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
@@ -66,7 +66,8 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import static
org.apache.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
+import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
+import static
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
@@ -614,7 +615,7 @@ class RecordAccumulatorTest {
/** Return the offset delta. */
private int expectedNumAppends(IndexedRow row, int batchSize) {
- int size = RECORD_BATCH_HEADER_SIZE;
+ int size = recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE);
int offsetDelta = 0;
while (true) {
int recordSize = IndexedLogRecord.sizeOf(row);
@@ -652,7 +653,8 @@ class RecordAccumulatorTest {
}
private long getTestBatchSize(BinaryRow row) {
- return RECORD_BATCH_HEADER_SIZE + DefaultKvRecord.sizeOf(new byte[4],
row);
+ return recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE)
+ + DefaultKvRecord.sizeOf(new byte[4], row);
}
private int getBatchNumInAccum(RecordAccumulator accum) {
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
index cc0b0b21d..7209a315d 100644
--- a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
+++ b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
@@ -26,7 +26,6 @@ import org.apache.fluss.config.Configuration;
import org.apache.fluss.config.MemorySize;
import org.apache.fluss.exception.TimeoutException;
import org.apache.fluss.metadata.TableBucket;
-import org.apache.fluss.record.LogRecordBatch;
import org.apache.fluss.record.MemoryLogRecords;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.rpc.entity.ProduceLogResultForBucket;
@@ -51,6 +50,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO;
@@ -797,7 +797,7 @@ final class SenderTest {
private static boolean hasIdempotentRecords(TableBucket tb,
ProduceLogRequest request) {
MemoryLogRecords memoryLogRecords = getProduceLogData(request).get(tb);
- return memoryLogRecords.batchIterator().next().writerId() !=
LogRecordBatch.NO_WRITER_ID;
+ return memoryLogRecords.batchIterator().next().writerId() !=
NO_WRITER_ID;
}
private static void assertBatchSequenceEquals(
diff --git
a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
index c2c7b5fb1..eaacc48a6 100644
---
a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
+++
b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
@@ -19,7 +19,6 @@ package org.apache.fluss.record;
import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.exception.CorruptMessageException;
-import org.apache.fluss.exception.OutOfOrderSequenceException;
import org.apache.fluss.memory.MemorySegment;
import org.apache.fluss.metadata.LogFormat;
import org.apache.fluss.row.arrow.ArrowReader;
@@ -35,95 +34,54 @@ import org.apache.fluss.utils.crc.Crc32C;
import java.nio.ByteBuffer;
import java.util.NoSuchElementException;
+import static org.apache.fluss.record.LogRecordBatchFormat.BASE_OFFSET_OFFSET;
+import static
org.apache.fluss.record.LogRecordBatchFormat.COMMIT_TIMESTAMP_OFFSET;
+import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_OFFSET;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_OVERHEAD;
+import static org.apache.fluss.record.LogRecordBatchFormat.MAGIC_OFFSET;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_LEADER_EPOCH;
+import static
org.apache.fluss.record.LogRecordBatchFormat.arrowChangeTypeOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.attributeOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.batchSequenceOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.crcOffset;
+import static
org.apache.fluss.record.LogRecordBatchFormat.lastOffsetDeltaOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.leaderEpochOffset;
+import static
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
+import static org.apache.fluss.record.LogRecordBatchFormat.recordsCountOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.schemaIdOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.writeClientIdOffset;
+
/* This file is based on source code of Apache Kafka Project
(https://kafka.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
* additional information regarding copyright ownership. */
/**
- * LogRecordBatch implementation for magic 0 and above. The schema of {@link
LogRecordBatch} is
- * given below:
+ * LogRecordBatch implementation for different magic version.
+ *
+ * <p>To learn more about the recordBatch format, see {@link
LogRecordBatchFormat}. Supported
+ * recordBatch format:
*
* <ul>
- * RecordBatch =>
- * <li>BaseOffset => Int64
- * <li>Length => Int32
- * <li>Magic => Int8
- * <li>CommitTimestamp => Int64
- * <li>CRC => Uint32
- * <li>SchemaId => Int16
- * <li>Attributes => Int8
- * <li>LastOffsetDelta => Int32
- * <li>WriterID => Int64
- * <li>SequenceID => Int32
- * <li>RecordCount => Int32
- * <li>Records => [Record]
+ * <li>V0 => {@link LogRecordBatchFormat#LOG_MAGIC_VALUE_V0}
+ * <li>V1 => {@link LogRecordBatchFormat#LOG_MAGIC_VALUE_V1}
* </ul>
*
- * <p>The CRC covers the data from the schemaId to the end of the batch (i.e.
all the bytes that
- * follow the CRC). It is located after the magic byte, which means that
clients must parse the
- * magic byte before deciding how to interpret the bytes between the batch
length and the magic
- * byte. The CRC-32C (Castagnoli) polynomial is used for the computation.
CommitTimestamp is also
- * located before the CRC, because it is determined in server side.
- *
- * <p>The field 'lastOffsetDelta is used to calculate the lastOffset of the
current batch as:
- * [lastOffset = baseOffset + LastOffsetDelta] instead of [lastOffset =
baseOffset + recordCount -
- * 1]. The reason for introducing this field is that there might be cases
where the offset delta in
- * batch does not match the recordCount. For example, when generating CDC logs
for a kv table and
- * sending a batch that only contains the deletion of non-existent kvs, no CDC
logs would be
- * generated. However, we need to increment the batchSequence for the
corresponding writerId to make
- * sure no {@link OutOfOrderSequenceException} will be thrown. In such a case,
we would generate a
- * logRecordBatch with a LastOffsetDelta of 0 but a recordCount of 0.
- *
- * <p>The current attributes are given below:
- *
- * <pre>
- * ------------------------------------------
- * | Unused (1-7) | AppendOnly Flag (0) |
- * ------------------------------------------
- * </pre>
- *
* @since 0.1
*/
// TODO rename to MemoryLogRecordBatch
@PublicEvolving
public class DefaultLogRecordBatch implements LogRecordBatch {
- protected static final int BASE_OFFSET_LENGTH = 8;
- public static final int LENGTH_LENGTH = 4;
- static final int MAGIC_LENGTH = 1;
- static final int COMMIT_TIMESTAMP_LENGTH = 8;
- static final int CRC_LENGTH = 4;
- static final int SCHEMA_ID_LENGTH = 2;
- static final int ATTRIBUTE_LENGTH = 1;
- static final int LAST_OFFSET_DELTA_LENGTH = 4;
- static final int WRITE_CLIENT_ID_LENGTH = 8;
- static final int BATCH_SEQUENCE_LENGTH = 4;
- static final int RECORDS_COUNT_LENGTH = 4;
-
- static final int BASE_OFFSET_OFFSET = 0;
- public static final int LENGTH_OFFSET = BASE_OFFSET_OFFSET +
BASE_OFFSET_LENGTH;
- static final int MAGIC_OFFSET = LENGTH_OFFSET + LENGTH_LENGTH;
- static final int COMMIT_TIMESTAMP_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH;
- public static final int CRC_OFFSET = COMMIT_TIMESTAMP_OFFSET +
COMMIT_TIMESTAMP_LENGTH;
- protected static final int SCHEMA_ID_OFFSET = CRC_OFFSET + CRC_LENGTH;
- public static final int ATTRIBUTES_OFFSET = SCHEMA_ID_OFFSET +
SCHEMA_ID_LENGTH;
- static final int LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET +
ATTRIBUTE_LENGTH;
- static final int WRITE_CLIENT_ID_OFFSET = LAST_OFFSET_DELTA_OFFSET +
LAST_OFFSET_DELTA_LENGTH;
- static final int BATCH_SEQUENCE_OFFSET = WRITE_CLIENT_ID_OFFSET +
WRITE_CLIENT_ID_LENGTH;
- public static final int RECORDS_COUNT_OFFSET = BATCH_SEQUENCE_OFFSET +
BATCH_SEQUENCE_LENGTH;
- static final int RECORDS_OFFSET = RECORDS_COUNT_OFFSET +
RECORDS_COUNT_LENGTH;
-
- public static final int RECORD_BATCH_HEADER_SIZE = RECORDS_OFFSET;
- public static final int ARROW_CHANGETYPE_OFFSET = RECORD_BATCH_HEADER_SIZE;
- public static final int LOG_OVERHEAD = LENGTH_OFFSET + LENGTH_LENGTH;
-
public static final byte APPEND_ONLY_FLAG_MASK = 0x01;
private MemorySegment segment;
private int position;
+ private byte magic;
public void pointTo(MemorySegment segment, int position) {
this.segment = segment;
this.position = position;
+ this.magic = segment.get(position + MAGIC_OFFSET);
}
public void setBaseLogOffset(long baseLogOffset) {
@@ -132,7 +90,7 @@ public class DefaultLogRecordBatch implements LogRecordBatch
{
@Override
public byte magic() {
- return segment.get(position + MAGIC_OFFSET);
+ return magic;
}
@Override
@@ -144,25 +102,43 @@ public class DefaultLogRecordBatch implements
LogRecordBatch {
segment.putLong(position + COMMIT_TIMESTAMP_OFFSET, timestamp);
}
+ public void setLeaderEpoch(int leaderEpoch) {
+ if (magic >= LOG_MAGIC_VALUE_V1) {
+ segment.putInt(position + leaderEpochOffset(magic), leaderEpoch);
+ } else {
+ throw new UnsupportedOperationException(
+ "Set leader epoch is not supported for magic v" + magic +
" record batch");
+ }
+ }
+
@Override
public long writerId() {
- return segment.getLong(position + WRITE_CLIENT_ID_OFFSET);
+ return segment.getLong(position + writeClientIdOffset(magic));
}
@Override
public int batchSequence() {
- return segment.getInt(position + BATCH_SEQUENCE_OFFSET);
+ return segment.getInt(position + batchSequenceOffset(magic));
+ }
+
+ @Override
+ public int leaderEpoch() {
+ if (magic >= LOG_MAGIC_VALUE_V1) {
+ return segment.getInt(position + leaderEpochOffset(magic));
+ } else {
+ return NO_LEADER_EPOCH;
+ }
}
@Override
public void ensureValid() {
int sizeInBytes = sizeInBytes();
- if (sizeInBytes < RECORD_BATCH_HEADER_SIZE) {
+ if (sizeInBytes < recordBatchHeaderSize(magic)) {
throw new CorruptMessageException(
"Record batch is corrupt (the size "
+ sizeInBytes
+ " is smaller than the minimum allowed overhead "
- + RECORD_BATCH_HEADER_SIZE
+ + recordBatchHeaderSize(magic)
+ ")");
}
@@ -178,17 +154,18 @@ public class DefaultLogRecordBatch implements
LogRecordBatch {
@Override
public boolean isValid() {
- return sizeInBytes() >= RECORD_BATCH_HEADER_SIZE && checksum() ==
computeChecksum();
+ return sizeInBytes() >= recordBatchHeaderSize(magic) && checksum() ==
computeChecksum();
}
private long computeChecksum() {
ByteBuffer buffer = segment.wrap(position, sizeInBytes());
- return Crc32C.compute(buffer, SCHEMA_ID_OFFSET, sizeInBytes() -
SCHEMA_ID_OFFSET);
+ int schemaIdOffset = schemaIdOffset(magic);
+ return Crc32C.compute(buffer, schemaIdOffset, sizeInBytes() -
schemaIdOffset);
}
private byte attributes() {
// note we're not using the byte of attributes now.
- return segment.get(ATTRIBUTES_OFFSET + position);
+ return segment.get(attributeOffset(magic) + position);
}
@Override
@@ -198,12 +175,12 @@ public class DefaultLogRecordBatch implements
LogRecordBatch {
@Override
public long checksum() {
- return segment.getUnsignedInt(CRC_OFFSET + position);
+ return segment.getUnsignedInt(crcOffset(magic) + position);
}
@Override
public short schemaId() {
- return segment.getShort(SCHEMA_ID_OFFSET + position);
+ return segment.getShort(schemaIdOffset(magic) + position);
}
@Override
@@ -217,7 +194,7 @@ public class DefaultLogRecordBatch implements
LogRecordBatch {
}
private int lastOffsetDelta() {
- return segment.getInt(LAST_OFFSET_DELTA_OFFSET + position);
+ return segment.getInt(lastOffsetDeltaOffset(magic) + position);
}
@Override
@@ -227,7 +204,7 @@ public class DefaultLogRecordBatch implements
LogRecordBatch {
@Override
public int getRecordCount() {
- return segment.getInt(RECORDS_COUNT_OFFSET + position);
+ return segment.getInt(position + recordsCountOffset(magic));
}
@Override
@@ -278,7 +255,7 @@ public class DefaultLogRecordBatch implements
LogRecordBatch {
private CloseableIterator<LogRecord> rowRecordIterator(RowType rowType,
long timestamp) {
DataType[] fieldTypes = rowType.getChildren().toArray(new DataType[0]);
return new LogRecordIterator() {
- int position = DefaultLogRecordBatch.this.position +
RECORD_BATCH_HEADER_SIZE;
+ int position = DefaultLogRecordBatch.this.position +
recordBatchHeaderSize(magic);
int rowId = 0;
@Override
@@ -307,8 +284,9 @@ public class DefaultLogRecordBatch implements
LogRecordBatch {
if (isAppendOnly) {
// append only batch, no change type vector,
// the start of the arrow data is the beginning of the batch
records
- int arrowOffset = position + RECORD_BATCH_HEADER_SIZE;
- int arrowLength = sizeInBytes() - RECORD_BATCH_HEADER_SIZE;
+ int recordBatchHeaderSize = recordBatchHeaderSize(magic);
+ int arrowOffset = position + recordBatchHeaderSize;
+ int arrowLength = sizeInBytes() - recordBatchHeaderSize;
ArrowReader reader =
ArrowUtils.createArrowReader(
segment, arrowOffset, arrowLength, root,
allocator, rowType);
@@ -321,12 +299,12 @@ public class DefaultLogRecordBatch implements
LogRecordBatch {
} else {
// with change type, decode the change type vector first,
// the arrow data starts after the change type vector
- int changeTypeOffset = position + ARROW_CHANGETYPE_OFFSET;
+ int changeTypeOffset = position + arrowChangeTypeOffset(magic);
ChangeTypeVector changeTypeVector =
new ChangeTypeVector(segment, changeTypeOffset,
getRecordCount());
int arrowOffset = changeTypeOffset +
changeTypeVector.sizeInBytes();
int arrowLength =
- sizeInBytes() - ARROW_CHANGETYPE_OFFSET -
changeTypeVector.sizeInBytes();
+ sizeInBytes() - arrowChangeTypeOffset(magic) -
changeTypeVector.sizeInBytes();
ArrowReader reader =
ArrowUtils.createArrowReader(
segment, arrowOffset, arrowLength, root,
allocator, rowType);
@@ -394,7 +372,7 @@ public class DefaultLogRecordBatch implements
LogRecordBatch {
"Found invalid record count "
+ numRecords
+ " in magic v"
- + magic()
+ + magic
+ " batch");
}
this.numRecords = numRecords;
diff --git
a/fluss-common/src/main/java/org/apache/fluss/record/FileLogInputStream.java
b/fluss-common/src/main/java/org/apache/fluss/record/FileLogInputStream.java
index 3f314ce46..9be7e459a 100644
--- a/fluss-common/src/main/java/org/apache/fluss/record/FileLogInputStream.java
+++ b/fluss-common/src/main/java/org/apache/fluss/record/FileLogInputStream.java
@@ -28,12 +28,12 @@ import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.util.Objects;
-import static org.apache.fluss.record.DefaultLogRecordBatch.BASE_OFFSET_OFFSET;
-import static org.apache.fluss.record.DefaultLogRecordBatch.LENGTH_OFFSET;
-import static org.apache.fluss.record.DefaultLogRecordBatch.LOG_OVERHEAD;
-import static org.apache.fluss.record.DefaultLogRecordBatch.MAGIC_LENGTH;
-import static org.apache.fluss.record.DefaultLogRecordBatch.MAGIC_OFFSET;
-import static
org.apache.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
+import static org.apache.fluss.record.LogRecordBatchFormat.BASE_OFFSET_OFFSET;
+import static
org.apache.fluss.record.LogRecordBatchFormat.HEADER_SIZE_UP_TO_MAGIC;
+import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_OFFSET;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_OVERHEAD;
+import static org.apache.fluss.record.LogRecordBatchFormat.MAGIC_OFFSET;
+import static
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
/* This file is based on source code of Apache Kafka Project
(https://kafka.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
@@ -42,9 +42,6 @@ import static
org.apache.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_
/** A log input stream which is backed by a {@link FileChannel}. */
public class FileLogInputStream
implements
LogInputStream<FileLogInputStream.FileChannelLogRecordBatch> {
-
- private static final int HEADER_SIZE_UP_TO_MAGIC = MAGIC_OFFSET +
MAGIC_LENGTH;
-
private int position;
private final int end;
private final FileLogRecords fileRecords;
@@ -152,6 +149,11 @@ public class FileLogInputStream
return loadBatchHeader().batchSequence();
}
+ @Override
+ public int leaderEpoch() {
+ return loadBatchHeader().leaderEpoch();
+ }
+
@Override
public long lastLogOffset() {
return loadBatchHeader().lastLogOffset();
@@ -189,7 +191,7 @@ public class FileLogInputStream
}
private int headerSize() {
- return RECORD_BATCH_HEADER_SIZE;
+ return recordBatchHeaderSize(magic);
}
protected LogRecordBatch loadFullBatch() {
diff --git
a/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java
b/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java
index 736ae63e8..1f889fff1 100644
--- a/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java
+++ b/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java
@@ -41,6 +41,7 @@ import org.apache.fluss.utils.ArrowUtils;
import org.apache.fluss.utils.types.Tuple2;
import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -54,12 +55,18 @@ import java.util.List;
import java.util.Map;
import static
org.apache.fluss.record.DefaultLogRecordBatch.APPEND_ONLY_FLAG_MASK;
-import static
org.apache.fluss.record.DefaultLogRecordBatch.ARROW_CHANGETYPE_OFFSET;
-import static org.apache.fluss.record.DefaultLogRecordBatch.ATTRIBUTES_OFFSET;
-import static org.apache.fluss.record.DefaultLogRecordBatch.LENGTH_OFFSET;
-import static org.apache.fluss.record.DefaultLogRecordBatch.LOG_OVERHEAD;
-import static
org.apache.fluss.record.DefaultLogRecordBatch.RECORDS_COUNT_OFFSET;
-import static
org.apache.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
+import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_OFFSET;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V0;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_OVERHEAD;
+import static org.apache.fluss.record.LogRecordBatchFormat.MAGIC_OFFSET;
+import static
org.apache.fluss.record.LogRecordBatchFormat.V0_RECORD_BATCH_HEADER_SIZE;
+import static
org.apache.fluss.record.LogRecordBatchFormat.V1_RECORD_BATCH_HEADER_SIZE;
+import static
org.apache.fluss.record.LogRecordBatchFormat.arrowChangeTypeOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.attributeOffset;
+import static
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
+import static org.apache.fluss.record.LogRecordBatchFormat.recordsCountOffset;
+import static org.apache.fluss.utils.FileUtils.readFully;
import static org.apache.fluss.utils.FileUtils.readFullyOrFail;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
import static org.apache.fluss.utils.Preconditions.checkState;
@@ -81,7 +88,13 @@ public class FileLogProjection {
// shared resources for multiple projections
private final ByteArrayOutputStream outputStream;
private final WriteChannel writeChannel;
- private final ByteBuffer logHeaderBuffer =
ByteBuffer.allocate(RECORD_BATCH_HEADER_SIZE);
+
+ /**
+ * Buffer to read log records batch header. V1 is larger than V0, so use
V1 head buffer can read
+ * V0 header even if there is no enough bytes in log file.
+ */
+ private final ByteBuffer logHeaderBuffer =
ByteBuffer.allocate(V1_RECORD_BATCH_HEADER_SIZE);
+
private final ByteBuffer arrowHeaderBuffer =
ByteBuffer.allocate(ARROW_HEADER_SIZE);
private ByteBuffer arrowMetadataBuffer;
@@ -163,17 +176,23 @@ public class FileLogProjection {
checkNotNull(currentProjection, "There is no projection registered
yet.");
MultiBytesView.Builder builder = MultiBytesView.builder();
int position = start;
- while (maxBytes > RECORD_BATCH_HEADER_SIZE) {
- if (position >= end - RECORD_BATCH_HEADER_SIZE) {
- // the remaining bytes in the file are not enough to read a
batch header
+
+ // The condition is an optimization to avoid read log header when
there is no enough bytes,
+ // So we use V0 header size here for a conservative judgment. In the
end, the condition
+ // of (position >= end - recordBatchHeaderSize) will ensure the final
correctness.
+ while (maxBytes > V0_RECORD_BATCH_HEADER_SIZE) {
+ if (position >= end - V0_RECORD_BATCH_HEADER_SIZE) {
+ // the remaining bytes in the file are not enough to read a
batch header up to
+ // magic.
return new BytesViewLogRecords(builder.build());
}
-
// read log header
logHeaderBuffer.rewind();
- readFullyOrFail(channel, logHeaderBuffer, position, "log header");
+ readLogHeaderFullyOrFail(channel, logHeaderBuffer, position);
logHeaderBuffer.rewind();
+ byte magic = logHeaderBuffer.get(MAGIC_OFFSET);
+ int recordBatchHeaderSize = recordBatchHeaderSize(magic);
int batchSizeInBytes = LOG_OVERHEAD +
logHeaderBuffer.getInt(LENGTH_OFFSET);
if (position > end - batchSizeInBytes) {
// the remaining bytes in the file are not enough to read a
full batch
@@ -183,22 +202,22 @@ public class FileLogProjection {
// Skip empty batch. The empty batch was generated when build cdc
log batch when there
// is no cdc log generated for this kv batch. See the comments
about the field
// 'lastOffsetDelta' in DefaultLogRecordBatch.
- if (batchSizeInBytes == RECORD_BATCH_HEADER_SIZE) {
+ if (batchSizeInBytes == recordBatchHeaderSize) {
position += batchSizeInBytes;
continue;
}
boolean isAppendOnly =
- (logHeaderBuffer.get(ATTRIBUTES_OFFSET) &
APPEND_ONLY_FLAG_MASK) > 0;
+ (logHeaderBuffer.get(attributeOffset(magic)) &
APPEND_ONLY_FLAG_MASK) > 0;
final int changeTypeBytes;
final long arrowHeaderOffset;
if (isAppendOnly) {
changeTypeBytes = 0;
- arrowHeaderOffset = position + RECORD_BATCH_HEADER_SIZE;
+ arrowHeaderOffset = position + recordBatchHeaderSize;
} else {
- changeTypeBytes = logHeaderBuffer.getInt(RECORDS_COUNT_OFFSET);
- arrowHeaderOffset = position + RECORD_BATCH_HEADER_SIZE +
changeTypeBytes;
+ changeTypeBytes =
logHeaderBuffer.getInt(recordsCountOffset(magic));
+ arrowHeaderOffset = position + recordBatchHeaderSize +
changeTypeBytes;
}
// read arrow header
@@ -226,7 +245,7 @@ public class FileLogProjection {
long arrowBodyLength = projectedArrowBatch.bodyLength();
int newBatchSizeInBytes =
- RECORD_BATCH_HEADER_SIZE
+ recordBatchHeaderSize
+ changeTypeBytes
+ currentProjection.arrowMetadataLength
+ (int) arrowBodyLength; // safe to cast to int
@@ -250,13 +269,13 @@ public class FileLogProjection {
logHeaderBuffer.putInt(newBatchSizeInBytes - LOG_OVERHEAD);
logHeaderBuffer.rewind();
// the logHeader can't be reused, as it will be sent to network
- byte[] logHeader = new byte[RECORD_BATCH_HEADER_SIZE];
+ byte[] logHeader = new byte[recordBatchHeaderSize];
logHeaderBuffer.get(logHeader);
// 5. build log records
builder.addBytes(logHeader);
if (!isAppendOnly) {
- builder.addBytes(channel, position + ARROW_CHANGETYPE_OFFSET,
changeTypeBytes);
+ builder.addBytes(channel, position +
arrowChangeTypeOffset(magic), changeTypeBytes);
}
builder.addBytes(headerMetadata);
final long bufferOffset = arrowHeaderOffset + ARROW_HEADER_SIZE +
arrowMetadataSize;
@@ -380,6 +399,36 @@ public class FileLogProjection {
return bitset;
}
+ /**
+ * Read log header fully or fail with EOFException if there is no enough
bytes to read a full
+ * log header. This handles different log header size for magic v0 and v1.
+ */
+ static void readLogHeaderFullyOrFail(FileChannel channel, ByteBuffer
buffer, int position)
+ throws IOException {
+ if (position < 0) {
+ throw new IllegalArgumentException(
+ "The file channel position cannot be negative, but it is "
+ position);
+ }
+ readFully(channel, buffer, position);
+ if (buffer.hasRemaining()) {
+ int size = buffer.position();
+ byte magic = buffer.get(MAGIC_OFFSET);
+ if (magic == LOG_MAGIC_VALUE_V0 && size <
V0_RECORD_BATCH_HEADER_SIZE) {
+ throw new EOFException(
+ String.format(
+ "Failed to read v0 log header from file
channel `%s`. Expected to read %d bytes, "
+ + "but reached end of file after
reading %d bytes. Started read from position %d.",
+ channel, V0_RECORD_BATCH_HEADER_SIZE, size,
position));
+ } else if (magic == LOG_MAGIC_VALUE_V1 && size <
V1_RECORD_BATCH_HEADER_SIZE) {
+ throw new EOFException(
+ String.format(
+ "Failed to read v1 log header from file
channel `%s`. Expected to read %d bytes, "
+ + "but reached end of file after
reading %d bytes. Started read from position %d.",
+ channel, V1_RECORD_BATCH_HEADER_SIZE, size,
position));
+ }
+ }
+ }
+
@VisibleForTesting
ByteBuffer getLogHeaderBuffer() {
return logHeaderBuffer;
diff --git
a/fluss-common/src/main/java/org/apache/fluss/record/IndexedLogRecord.java
b/fluss-common/src/main/java/org/apache/fluss/record/IndexedLogRecord.java
index e14b6ce14..385e457b8 100644
--- a/fluss-common/src/main/java/org/apache/fluss/record/IndexedLogRecord.java
+++ b/fluss-common/src/main/java/org/apache/fluss/record/IndexedLogRecord.java
@@ -30,7 +30,7 @@ import org.apache.fluss.utils.MurmurHashUtils;
import java.io.IOException;
-import static org.apache.fluss.record.DefaultLogRecordBatch.LENGTH_LENGTH;
+import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_LENGTH;
/* This file is based on source code of Apache Kafka Project
(https://kafka.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
diff --git
a/fluss-common/src/main/java/org/apache/fluss/record/KvRecordBatchBuilder.java
b/fluss-common/src/main/java/org/apache/fluss/record/KvRecordBatchBuilder.java
index c66ecb383..12c321710 100644
---
a/fluss-common/src/main/java/org/apache/fluss/record/KvRecordBatchBuilder.java
+++
b/fluss-common/src/main/java/org/apache/fluss/record/KvRecordBatchBuilder.java
@@ -37,6 +37,8 @@ import static
org.apache.fluss.record.DefaultKvRecordBatch.LENGTH_LENGTH;
import static
org.apache.fluss.record.DefaultKvRecordBatch.RECORD_BATCH_HEADER_SIZE;
import static org.apache.fluss.record.DefaultKvRecordBatch.SCHEMA_ID_OFFSET;
import static org.apache.fluss.record.KvRecordBatch.CURRENT_KV_MAGIC_VALUE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
import static org.apache.fluss.utils.Preconditions.checkArgument;
/** Builder for {@link DefaultKvRecordBatch} memory bytes. */
@@ -72,8 +74,8 @@ public class KvRecordBatchBuilder implements AutoCloseable {
this.writeLimit = writeLimit;
this.pagedOutputView = pagedOutputView;
this.firstSegment = pagedOutputView.getCurrentSegment();
- this.writerId = LogRecordBatch.NO_WRITER_ID;
- this.batchSequence = LogRecordBatch.NO_BATCH_SEQUENCE;
+ this.writerId = NO_WRITER_ID;
+ this.batchSequence = NO_BATCH_SEQUENCE;
this.currentRecordNumber = 0;
this.isClosed = false;
// We don't need to write header information while the builder
creating,
diff --git
a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java
index 8dcab1be2..d28c28ad3 100644
--- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java
+++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java
@@ -26,6 +26,9 @@ import org.apache.fluss.utils.CloseableIterator;
import java.util.Iterator;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
+
/**
* A record batch is a container for {@link LogRecord LogRecords}.
*
@@ -33,17 +36,8 @@ import java.util.Iterator;
*/
@PublicEvolving
public interface LogRecordBatch {
-
- /** The "magic" values. */
- byte LOG_MAGIC_VALUE_V0 = 0;
-
/** The current "magic" value. */
- byte CURRENT_LOG_MAGIC_VALUE = LOG_MAGIC_VALUE_V0;
-
- /** Value used if non-idempotent. */
- long NO_WRITER_ID = -1L;
-
- int NO_BATCH_SEQUENCE = -1;
+ byte CURRENT_LOG_MAGIC_VALUE = LOG_MAGIC_VALUE_V1;
/**
* Check whether the checksum of this batch is correct.
@@ -129,6 +123,13 @@ public interface LogRecordBatch {
*/
int batchSequence();
+ /**
+ * Get leader epoch of this bucket for this log record batch.
+ *
+ * @return leader epoch
+ */
+ int leaderEpoch();
+
/**
* Get the size in bytes of this batch, including the size of the record
and the batch overhead.
*
diff --git
a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchFormat.java
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchFormat.java
new file mode 100644
index 000000000..d0f42c052
--- /dev/null
+++
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchFormat.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.record;
+
+import org.apache.fluss.exception.OutOfOrderSequenceException;
+
+/** The format of Fluss how to organize and storage a {@link LogRecordBatch}.
*/
+public class LogRecordBatchFormat {
+
+ //
----------------------------------------------------------------------------------------
+ // Common Variables
+ //
----------------------------------------------------------------------------------------
+
+ /** Value used if non-idempotent. */
+ public static final long NO_WRITER_ID = -1L;
+
+ public static final int NO_BATCH_SEQUENCE = -1;
+
+ /**
+ * Used to indicate an unknown leaderEpoch, which will be the case when
the record set is first
+ * created by the writer or the magic lower than V1.
+ */
+ public static final int NO_LEADER_EPOCH = -1;
+
+ public static final int BASE_OFFSET_LENGTH = 8;
+ public static final int LENGTH_LENGTH = 4;
+ public static final int MAGIC_LENGTH = 1;
+ private static final int COMMIT_TIMESTAMP_LENGTH = 8;
+ private static final int CRC_LENGTH = 4;
+ private static final int SCHEMA_ID_LENGTH = 2;
+ private static final int LEADER_EPOCH_LENGTH = 4;
+ private static final int ATTRIBUTE_LENGTH = 1;
+ private static final int LAST_OFFSET_DELTA_LENGTH = 4;
+ private static final int WRITE_CLIENT_ID_LENGTH = 8;
+ private static final int BATCH_SEQUENCE_LENGTH = 4;
+ private static final int RECORDS_COUNT_LENGTH = 4;
+
+ public static final int BASE_OFFSET_OFFSET = 0;
+ public static final int LENGTH_OFFSET = BASE_OFFSET_OFFSET +
BASE_OFFSET_LENGTH;
+ public static final int MAGIC_OFFSET = LENGTH_OFFSET + LENGTH_LENGTH;
+ public static final int COMMIT_TIMESTAMP_OFFSET = MAGIC_OFFSET +
MAGIC_LENGTH;
+ public static final int LOG_OVERHEAD = LENGTH_OFFSET + LENGTH_LENGTH;
+ public static final int HEADER_SIZE_UP_TO_MAGIC = MAGIC_OFFSET +
MAGIC_LENGTH;
+
+ //
----------------------------------------------------------------------------------------
+ // Format of Magic Version: V1
+ //
----------------------------------------------------------------------------------------
+
+ /**
+ * LogRecordBatch implementation for magic 1 (V1). The schema of {@link
LogRecordBatch} is given
+ * below:
+ *
+ * <ul>
+ * RecordBatch =>
+ * <li>BaseOffset => Int64
+ * <li>Length => Int32
+ * <li>Magic => Int8
+ * <li>CommitTimestamp => Int64
+ * <li>LeaderEpoch => Int32
+ * <li>CRC => Uint32
+ * <li>SchemaId => Int16
+ * <li>Attributes => Int8
+ * <li>LastOffsetDelta => Int32
+ * <li>WriterID => Int64
+ * <li>SequenceID => Int32
+ * <li>RecordCount => Int32
+ * <li>Records => [Record]
+ * </ul>
+ *
+ * <p>Newly added field in LogRecordBatch header of magic V1 is
LeaderEpoch, which used to build
+ * a consistent leaderEpoch cache across different tabletServers.
+ *
+ * <p>The CRC covers the data from the schemaId to the end of the batch
(i.e. all the bytes that
+ * follow the CRC). It is located after the magic byte, which means that
clients must parse the
+ * magic byte before deciding how to interpret the bytes between the batch
length and the magic
+ * byte. The CRC-32C (Castagnoli) polynomial is used for the computation.
CommitTimestamp is
+ * also located before the CRC, because it is determined in server side.
+ *
+ * <p>The field 'lastOffsetDelta is used to calculate the lastOffset of
the current batch as:
+ * [lastOffset = baseOffset + LastOffsetDelta] instead of [lastOffset =
baseOffset + recordCount
+ * - 1]. The reason for introducing this field is that there might be
cases where the offset
+ * delta in batch does not match the recordCount. For example, when
generating CDC logs for a kv
+ * table and sending a batch that only contains the deletion of
non-existent kvs, no CDC logs
+ * would be generated. However, we need to increment the batchSequence for
the corresponding
+ * writerId to make sure no {@link OutOfOrderSequenceException} will be
thrown. In such a case,
+ * we would generate a logRecordBatch with a LastOffsetDelta of 0 but a
recordCount of 0.
+ *
+ * <p>The current attributes are given below:
+ *
+ * <pre>
+ * ------------------------------------------
+ * | Unused (1-7) | AppendOnly Flag (0) |
+ * ------------------------------------------
+ * </pre>
+ *
+ * @since 0.7
+ */
+ public static final byte LOG_MAGIC_VALUE_V1 = 1;
+
+ private static final int V1_LEADER_EPOCH_OFFSET =
+ COMMIT_TIMESTAMP_OFFSET + COMMIT_TIMESTAMP_LENGTH;
+ private static final int V1_CRC_OFFSET = V1_LEADER_EPOCH_OFFSET +
LEADER_EPOCH_LENGTH;
+ private static final int V1_SCHEMA_ID_OFFSET = V1_CRC_OFFSET + CRC_LENGTH;
+ private static final int V1_ATTRIBUTES_OFFSET = V1_SCHEMA_ID_OFFSET +
SCHEMA_ID_LENGTH;
+ private static final int V1_LAST_OFFSET_DELTA_OFFSET =
V1_ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
+ private static final int V1_WRITE_CLIENT_ID_OFFSET =
+ V1_LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH;
+ private static final int V1_BATCH_SEQUENCE_OFFSET =
+ V1_WRITE_CLIENT_ID_OFFSET + WRITE_CLIENT_ID_LENGTH;
+ private static final int V1_RECORDS_COUNT_OFFSET =
+ V1_BATCH_SEQUENCE_OFFSET + BATCH_SEQUENCE_LENGTH;
+ private static final int V1_RECORDS_OFFSET = V1_RECORDS_COUNT_OFFSET +
RECORDS_COUNT_LENGTH;
+
+ public static final int V1_RECORD_BATCH_HEADER_SIZE = V1_RECORDS_OFFSET;
+ private static final int V1_ARROW_CHANGETYPE_OFFSET =
V1_RECORD_BATCH_HEADER_SIZE;
+
+ //
----------------------------------------------------------------------------------------
+ // Format of Magic Version: V0
+ //
----------------------------------------------------------------------------------------
+
+ /**
+ * LogRecordBatch implementation for magic 0 (V0). The schema of {@link
LogRecordBatch} is given
+ * below:
+ *
+ * <ul>
+ * RecordBatch =>
+ * <li>BaseOffset => Int64
+ * <li>Length => Int32
+ * <li>Magic => Int8
+ * <li>CommitTimestamp => Int64
+ * <li>CRC => Uint32
+ * <li>SchemaId => Int16
+ * <li>Attributes => Int8
+ * <li>LastOffsetDelta => Int32
+ * <li>WriterID => Int64
+ * <li>SequenceID => Int32
+ * <li>RecordCount => Int32
+ * <li>Records => [Record]
+ * </ul>
+ *
+ * <p>The current attributes are given below:
+ *
+ * <pre>
+ * ------------------------------------------
+ * | Unused (1-7) | AppendOnly Flag (0) |
+ * ------------------------------------------
+ * </pre>
+ *
+ * @since 0.1
+ */
+ public static final byte LOG_MAGIC_VALUE_V0 = 0;
+
+ private static final int V0_CRC_OFFSET = COMMIT_TIMESTAMP_OFFSET +
COMMIT_TIMESTAMP_LENGTH;
+ private static final int V0_SCHEMA_ID_OFFSET = V0_CRC_OFFSET + CRC_LENGTH;
+ private static final int V0_ATTRIBUTES_OFFSET = V0_SCHEMA_ID_OFFSET +
SCHEMA_ID_LENGTH;
+ private static final int V0_LAST_OFFSET_DELTA_OFFSET =
V0_ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
+ private static final int V0_WRITE_CLIENT_ID_OFFSET =
+ V0_LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH;
+ private static final int V0_BATCH_SEQUENCE_OFFSET =
+ V0_WRITE_CLIENT_ID_OFFSET + WRITE_CLIENT_ID_LENGTH;
+ private static final int V0_RECORDS_COUNT_OFFSET =
+ V0_BATCH_SEQUENCE_OFFSET + BATCH_SEQUENCE_LENGTH;
+ private static final int V0_RECORDS_OFFSET = V0_RECORDS_COUNT_OFFSET +
RECORDS_COUNT_LENGTH;
+
+ public static final int V0_RECORD_BATCH_HEADER_SIZE = V0_RECORDS_OFFSET;
+ private static final int V0_ARROW_CHANGETYPE_OFFSET =
V0_RECORD_BATCH_HEADER_SIZE;
+
+ //
----------------------------------------------------------------------------------------
+ // Static Methods
+ //
----------------------------------------------------------------------------------------
+
+ public static int leaderEpochOffset(byte magic) {
+ if (magic == LOG_MAGIC_VALUE_V1) {
+ return V1_LEADER_EPOCH_OFFSET;
+ }
+ throw new IllegalArgumentException("Unsupported magic value " + magic);
+ }
+
+ public static int crcOffset(byte magic) {
+ switch (magic) {
+ case LOG_MAGIC_VALUE_V1:
+ return V1_CRC_OFFSET;
+ case LOG_MAGIC_VALUE_V0:
+ return V0_CRC_OFFSET;
+ default:
+ throw new IllegalArgumentException("Unsupported magic value "
+ magic);
+ }
+ }
+
+ public static int schemaIdOffset(byte magic) {
+ switch (magic) {
+ case LOG_MAGIC_VALUE_V1:
+ return V1_SCHEMA_ID_OFFSET;
+ case LOG_MAGIC_VALUE_V0:
+ return V0_SCHEMA_ID_OFFSET;
+ default:
+ throw new IllegalArgumentException("Unsupported magic value "
+ magic);
+ }
+ }
+
+ public static int attributeOffset(byte magic) {
+ switch (magic) {
+ case LOG_MAGIC_VALUE_V1:
+ return V1_ATTRIBUTES_OFFSET;
+ case LOG_MAGIC_VALUE_V0:
+ return V0_ATTRIBUTES_OFFSET;
+ default:
+ throw new IllegalArgumentException("Unsupported magic value "
+ magic);
+ }
+ }
+
+ public static int lastOffsetDeltaOffset(byte magic) {
+ switch (magic) {
+ case LOG_MAGIC_VALUE_V1:
+ return V1_LAST_OFFSET_DELTA_OFFSET;
+ case LOG_MAGIC_VALUE_V0:
+ return V0_LAST_OFFSET_DELTA_OFFSET;
+ default:
+ throw new IllegalArgumentException("Unsupported magic value "
+ magic);
+ }
+ }
+
+ public static int writeClientIdOffset(byte magic) {
+ switch (magic) {
+ case LOG_MAGIC_VALUE_V1:
+ return V1_WRITE_CLIENT_ID_OFFSET;
+ case LOG_MAGIC_VALUE_V0:
+ return V0_WRITE_CLIENT_ID_OFFSET;
+ default:
+ throw new IllegalArgumentException("Unsupported magic value "
+ magic);
+ }
+ }
+
+ public static int batchSequenceOffset(byte magic) {
+ switch (magic) {
+ case LOG_MAGIC_VALUE_V1:
+ return V1_BATCH_SEQUENCE_OFFSET;
+ case LOG_MAGIC_VALUE_V0:
+ return V0_BATCH_SEQUENCE_OFFSET;
+ default:
+ throw new IllegalArgumentException("Unsupported magic value "
+ magic);
+ }
+ }
+
+ public static int recordsCountOffset(byte magic) {
+ switch (magic) {
+ case LOG_MAGIC_VALUE_V1:
+ return V1_RECORDS_COUNT_OFFSET;
+ case LOG_MAGIC_VALUE_V0:
+ return V0_RECORDS_COUNT_OFFSET;
+ default:
+ throw new IllegalArgumentException("Unsupported magic value "
+ magic);
+ }
+ }
+
+ public static int recordBatchHeaderSize(byte magic) {
+ switch (magic) {
+ case LOG_MAGIC_VALUE_V1:
+ return V1_RECORD_BATCH_HEADER_SIZE;
+ case LOG_MAGIC_VALUE_V0:
+ return V0_RECORD_BATCH_HEADER_SIZE;
+ default:
+ throw new IllegalArgumentException("Unsupported magic value "
+ magic);
+ }
+ }
+
+ public static int arrowChangeTypeOffset(byte magic) {
+ switch (magic) {
+ case LOG_MAGIC_VALUE_V1:
+ return V1_ARROW_CHANGETYPE_OFFSET;
+ case LOG_MAGIC_VALUE_V0:
+ return V0_ARROW_CHANGETYPE_OFFSET;
+ default:
+ throw new IllegalArgumentException("Unsupported magic value "
+ magic);
+ }
+ }
+}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecords.java
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecords.java
index ab8128f06..8ff8997e5 100644
--- a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecords.java
+++ b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecords.java
@@ -27,6 +27,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
+import static
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
+
/* This file is based on source code of Apache Kafka Project
(https://kafka.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
* additional information regarding copyright ownership. */
@@ -72,13 +74,14 @@ public class MemoryLogRecords implements LogRecords {
sizeInBytes = 0;
}
- public void ensureValid() {
- if (sizeInBytes < DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE) {
+ public void ensureValid(byte recordBatchMagic) {
+ int recordBatchHeaderSize = recordBatchHeaderSize(recordBatchMagic);
+ if (sizeInBytes < recordBatchHeaderSize) {
throw new RuntimeException(
"Record batch is corrupt (the size "
+ sizeInBytes
+ " is smaller than the minimum allowed overhead "
- + DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE
+ + recordBatchHeaderSize
+ ")");
}
}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java
index 225e948db..94b2fe59f 100644
---
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java
+++
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java
@@ -29,15 +29,17 @@ import org.apache.fluss.utils.crc.Crc32C;
import java.io.IOException;
-import static
org.apache.fluss.record.DefaultLogRecordBatch.ARROW_CHANGETYPE_OFFSET;
-import static org.apache.fluss.record.DefaultLogRecordBatch.BASE_OFFSET_LENGTH;
-import static org.apache.fluss.record.DefaultLogRecordBatch.CRC_OFFSET;
-import static org.apache.fluss.record.DefaultLogRecordBatch.LENGTH_LENGTH;
-import static
org.apache.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
-import static org.apache.fluss.record.DefaultLogRecordBatch.SCHEMA_ID_OFFSET;
import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
-import static org.apache.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE;
-import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID;
+import static org.apache.fluss.record.LogRecordBatchFormat.BASE_OFFSET_LENGTH;
+import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_LENGTH;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_LEADER_EPOCH;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
+import static
org.apache.fluss.record.LogRecordBatchFormat.arrowChangeTypeOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.crcOffset;
+import static
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
+import static org.apache.fluss.record.LogRecordBatchFormat.schemaIdOffset;
import static org.apache.fluss.utils.Preconditions.checkArgument;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
@@ -89,24 +91,26 @@ public class MemoryLogRecordsArrowBuilder implements
AutoCloseable {
this.pagedOutputView = pagedOutputView;
this.firstSegment = pagedOutputView.getCurrentSegment();
+ int arrowChangeTypeOffset = arrowChangeTypeOffset(magic);
checkArgument(
- firstSegment.size() >= ARROW_CHANGETYPE_OFFSET,
+ firstSegment.size() >= arrowChangeTypeOffset,
"The size of first segment of pagedOutputView is too small,
need at least "
- + ARROW_CHANGETYPE_OFFSET
+ + arrowChangeTypeOffset
+ " bytes.");
- this.changeTypeWriter = new ChangeTypeVectorWriter(firstSegment,
ARROW_CHANGETYPE_OFFSET);
- this.estimatedSizeInBytes = RECORD_BATCH_HEADER_SIZE;
+ this.changeTypeWriter = new ChangeTypeVectorWriter(firstSegment,
arrowChangeTypeOffset);
+ this.estimatedSizeInBytes = recordBatchHeaderSize(magic);
this.recordCount = 0;
}
@VisibleForTesting
public static MemoryLogRecordsArrowBuilder builder(
long baseLogOffset,
+ byte magic,
int schemaId,
ArrowWriter arrowWriter,
AbstractPagedOutputView outputView) {
return new MemoryLogRecordsArrowBuilder(
- baseLogOffset, schemaId, CURRENT_LOG_MAGIC_VALUE, arrowWriter,
outputView, false);
+ baseLogOffset, schemaId, magic, arrowWriter, outputView,
false);
}
/** Builder with limited write size and the memory segment used to
serialize records. */
@@ -139,7 +143,7 @@ public class MemoryLogRecordsArrowBuilder implements
AutoCloseable {
// serialize the arrow batch to dynamically allocated memory segments
arrowWriter.serializeToOutputView(
- pagedOutputView, ARROW_CHANGETYPE_OFFSET +
changeTypeWriter.sizeInBytes());
+ pagedOutputView, arrowChangeTypeOffset(magic) +
changeTypeWriter.sizeInBytes());
recordCount = arrowWriter.getRecordsCount();
bytesView =
MultiBytesView.builder()
@@ -237,7 +241,7 @@ public class MemoryLogRecordsArrowBuilder implements
AutoCloseable {
if (reCalculateSizeInBytes) {
// make size in bytes up-to-date
estimatedSizeInBytes =
- ARROW_CHANGETYPE_OFFSET
+ arrowChangeTypeOffset(magic)
+ changeTypeWriter.sizeInBytes()
+ arrowWriter.estimatedSizeInBytes();
}
@@ -259,6 +263,12 @@ public class MemoryLogRecordsArrowBuilder implements
AutoCloseable {
// write empty timestamp which will be overridden on server side
outputView.writeLong(0);
+
+ // write empty leaderEpoch which will be overridden on server side
+ if (magic >= LOG_MAGIC_VALUE_V1) {
+ outputView.writeInt(NO_LEADER_EPOCH);
+ }
+
// write empty crc first.
outputView.writeUnsignedInt(0);
// write schema id
@@ -278,8 +288,8 @@ public class MemoryLogRecordsArrowBuilder implements
AutoCloseable {
outputView.writeInt(recordCount);
// Update crc.
- long crc = Crc32C.compute(pagedOutputView.getWrittenSegments(),
SCHEMA_ID_OFFSET);
- outputView.setPosition(CRC_OFFSET);
+ long crc = Crc32C.compute(pagedOutputView.getWrittenSegments(),
schemaIdOffset(magic));
+ outputView.setPosition(crcOffset(magic));
outputView.writeUnsignedInt(crc);
}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
index b62c2cef7..8488c3351 100644
---
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
+++
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
@@ -29,15 +29,17 @@ import org.apache.fluss.utils.crc.Crc32C;
import java.io.IOException;
-import static org.apache.fluss.record.DefaultLogRecordBatch.BASE_OFFSET_LENGTH;
-import static org.apache.fluss.record.DefaultLogRecordBatch.CRC_OFFSET;
-import static
org.apache.fluss.record.DefaultLogRecordBatch.LAST_OFFSET_DELTA_OFFSET;
-import static org.apache.fluss.record.DefaultLogRecordBatch.LENGTH_LENGTH;
-import static
org.apache.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
-import static org.apache.fluss.record.DefaultLogRecordBatch.SCHEMA_ID_OFFSET;
import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
-import static org.apache.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE;
-import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID;
+import static org.apache.fluss.record.LogRecordBatchFormat.BASE_OFFSET_LENGTH;
+import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_LENGTH;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_LEADER_EPOCH;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
+import static org.apache.fluss.record.LogRecordBatchFormat.crcOffset;
+import static
org.apache.fluss.record.LogRecordBatchFormat.lastOffsetDeltaOffset;
+import static
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
+import static org.apache.fluss.record.LogRecordBatchFormat.schemaIdOffset;
import static org.apache.fluss.utils.Preconditions.checkArgument;
/**
@@ -87,8 +89,8 @@ public class MemoryLogRecordsIndexedBuilder implements
AutoCloseable {
// We don't need to write header information while the builder
creating,
// we'll skip it first.
- this.pagedOutputView.setPosition(RECORD_BATCH_HEADER_SIZE);
- this.sizeInBytes = RECORD_BATCH_HEADER_SIZE;
+ this.pagedOutputView.setPosition(recordBatchHeaderSize(magic));
+ this.sizeInBytes = recordBatchHeaderSize(magic);
}
public static MemoryLogRecordsIndexedBuilder builder(
@@ -219,6 +221,12 @@ public class MemoryLogRecordsIndexedBuilder implements
AutoCloseable {
// write empty timestamp which will be overridden on server side
outputView.writeLong(0);
+
+ // write empty leaderEpoch which will be overridden on server side
+ if (magic >= LOG_MAGIC_VALUE_V1) {
+ outputView.writeInt(NO_LEADER_EPOCH);
+ }
+
// write empty crc first.
outputView.writeUnsignedInt(0);
@@ -226,7 +234,7 @@ public class MemoryLogRecordsIndexedBuilder implements
AutoCloseable {
// write attributes (currently only appendOnly flag)
outputView.writeBoolean(appendOnly);
// skip write attribute byte for now.
- outputView.setPosition(LAST_OFFSET_DELTA_OFFSET);
+ outputView.setPosition(lastOffsetDeltaOffset(magic));
if (currentRecordNumber > 0) {
outputView.writeInt(currentRecordNumber - 1);
} else {
@@ -239,8 +247,8 @@ public class MemoryLogRecordsIndexedBuilder implements
AutoCloseable {
outputView.writeInt(currentRecordNumber);
// Update crc.
- long crc = Crc32C.compute(pagedOutputView.getWrittenSegments(),
SCHEMA_ID_OFFSET);
- outputView.setPosition(CRC_OFFSET);
+ long crc = Crc32C.compute(pagedOutputView.getWrittenSegments(),
schemaIdOffset(magic));
+ outputView.setPosition(crcOffset(magic));
outputView.writeUnsignedInt(crc);
}
}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/record/MemorySegmentLogInputStream.java
b/fluss-common/src/main/java/org/apache/fluss/record/MemorySegmentLogInputStream.java
index feaf430ff..5e2bdebe5 100644
---
a/fluss-common/src/main/java/org/apache/fluss/record/MemorySegmentLogInputStream.java
+++
b/fluss-common/src/main/java/org/apache/fluss/record/MemorySegmentLogInputStream.java
@@ -19,8 +19,9 @@ package org.apache.fluss.record;
import org.apache.fluss.memory.MemorySegment;
-import static org.apache.fluss.record.DefaultLogRecordBatch.LENGTH_OFFSET;
-import static org.apache.fluss.record.DefaultLogRecordBatch.LOG_OVERHEAD;
+import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_OFFSET;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_OVERHEAD;
+import static
org.apache.fluss.record.LogRecordBatchFormat.V0_RECORD_BATCH_HEADER_SIZE;
/**
* A byte buffer backed log input stream. This class avoids the need to copy
records by returning
@@ -40,7 +41,8 @@ class MemorySegmentLogInputStream implements
LogInputStream<LogRecordBatch> {
public LogRecordBatch nextBatch() {
Integer batchSize = nextBatchSize();
- if (batchSize == null || remaining < batchSize) {
+ // should at-least larger than V0 header size, because V1 header is
larger than V0.
+ if (batchSize == null || remaining < batchSize || remaining <
V0_RECORD_BATCH_HEADER_SIZE) {
return null;
}
diff --git
a/fluss-common/src/test/java/org/apache/fluss/record/DefaultLogRecordBatchTest.java
b/fluss-common/src/test/java/org/apache/fluss/record/DefaultLogRecordBatchTest.java
index bfb23da94..70f881e06 100644
---
a/fluss-common/src/test/java/org/apache/fluss/record/DefaultLogRecordBatchTest.java
+++
b/fluss-common/src/test/java/org/apache/fluss/record/DefaultLogRecordBatchTest.java
@@ -24,21 +24,26 @@ import org.apache.fluss.testutils.DataTestUtils;
import org.apache.fluss.types.RowType;
import org.apache.fluss.utils.CloseableIterator;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V0;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
+import static
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link DefaultLogRecordBatch}. */
public class DefaultLogRecordBatchTest extends LogTestBase {
- @Test
- void testRecordBatchSize() throws Exception {
+ @ParameterizedTest
+ @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
+ void testRecordBatchSize(byte magic) throws Exception {
MemoryLogRecords memoryLogRecords =
- DataTestUtils.genMemoryLogRecordsByObject(TestData.DATA1);
+ DataTestUtils.genMemoryLogRecordsByObject(magic,
TestData.DATA1);
int totalSize = 0;
for (LogRecordBatch logRecordBatch : memoryLogRecords.batches()) {
totalSize += logRecordBatch.sizeInBytes();
@@ -46,8 +51,9 @@ public class DefaultLogRecordBatchTest extends LogTestBase {
assertThat(totalSize).isEqualTo(memoryLogRecords.sizeInBytes());
}
- @Test
- void testIndexedRowWriteAndReadBatch() throws Exception {
+ @ParameterizedTest
+ @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
+ void testIndexedRowWriteAndReadBatch(byte magic) throws Exception {
int recordNumber = 50;
RowType allRowType = TestInternalRowGenerator.createAllRowType();
MemoryLogRecordsIndexedBuilder builder =
@@ -98,8 +104,9 @@ public class DefaultLogRecordBatchTest extends LogTestBase {
builder.close();
}
- @Test
- void testNoRecordAppend() throws Exception {
+ @ParameterizedTest
+ @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
+ void testNoRecordAppend(byte magic) throws Exception {
// 1. no record append with baseOffset as 0.
MemoryLogRecordsIndexedBuilder builder =
MemoryLogRecordsIndexedBuilder.builder(
@@ -107,7 +114,7 @@ public class DefaultLogRecordBatchTest extends LogTestBase {
MemoryLogRecords memoryLogRecords =
MemoryLogRecords.pointToBytesView(builder.build());
Iterator<LogRecordBatch> iterator =
memoryLogRecords.batches().iterator();
// only contains batch header.
- assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(48);
+
assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(recordBatchHeaderSize(magic));
assertThat(iterator.hasNext()).isTrue();
LogRecordBatch logRecordBatch = iterator.next();
@@ -135,7 +142,7 @@ public class DefaultLogRecordBatchTest extends LogTestBase {
memoryLogRecords = MemoryLogRecords.pointToBytesView(builder.build());
iterator = memoryLogRecords.batches().iterator();
// only contains batch header.
- assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(48);
+
assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(recordBatchHeaderSize(magic));
assertThat(iterator.hasNext()).isTrue();
logRecordBatch = iterator.next();
diff --git
a/fluss-common/src/test/java/org/apache/fluss/record/FileLogInputStreamTest.java
b/fluss-common/src/test/java/org/apache/fluss/record/FileLogInputStreamTest.java
index 9c8e1dab2..7de046ec7 100644
---
a/fluss-common/src/test/java/org/apache/fluss/record/FileLogInputStreamTest.java
+++
b/fluss-common/src/test/java/org/apache/fluss/record/FileLogInputStreamTest.java
@@ -17,28 +17,40 @@
package org.apache.fluss.record;
+import org.apache.fluss.metadata.LogFormat;
import org.apache.fluss.utils.CloseableIterator;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.util.Collections;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V0;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
-import static
org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsWithBaseOffset;
+import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
+import static
org.apache.fluss.testutils.DataTestUtils.createRecordsWithoutBaseLogOffset;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link FileLogInputStream}. */
public class FileLogInputStreamTest extends LogTestBase {
private @TempDir File tempDir;
- @Test
- void testWriteTo() throws Exception {
+ @ParameterizedTest
+ @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
+ void testWriteTo(byte recordBatchMagic) throws Exception {
try (FileLogRecords fileLogRecords = FileLogRecords.open(new
File(tempDir, "test.tmp"))) {
fileLogRecords.append(
- genMemoryLogRecordsWithBaseOffset(
- 0L, Collections.singletonList(new Object[] {0,
"abc"})));
+ createRecordsWithoutBaseLogOffset(
+ DATA1_ROW_TYPE,
+ DEFAULT_SCHEMA_ID,
+ 0L,
+ -1L,
+ recordBatchMagic,
+ Collections.singletonList(new Object[] {0, "abc"}),
+ LogFormat.ARROW));
fileLogRecords.flush();
FileLogInputStream logInputStream =
@@ -46,7 +58,7 @@ public class FileLogInputStreamTest extends LogTestBase {
FileLogInputStream.FileChannelLogRecordBatch batch =
logInputStream.nextBatch();
assertThat(batch).isNotNull();
- assertThat(batch.magic()).isEqualTo(magic);
+ assertThat(batch.magic()).isEqualTo(recordBatchMagic);
LogRecordBatch recordBatch = batch.loadFullBatch();
diff --git
a/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java
b/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java
index 515289cb4..c19ecc882 100644
---
a/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java
+++
b/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java
@@ -28,15 +28,23 @@ import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.EOFException;
import java.io.File;
+import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import java.util.stream.Stream;
import static
org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V0;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
+import static org.apache.fluss.record.LogRecordBatchFormat.MAGIC_OFFSET;
+import static
org.apache.fluss.record.LogRecordBatchFormat.V0_RECORD_BATCH_HEADER_SIZE;
+import static
org.apache.fluss.record.LogRecordBatchFormat.V1_RECORD_BATCH_HEADER_SIZE;
import static
org.apache.fluss.record.LogRecordReadContext.createArrowReadContext;
import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
import static
org.apache.fluss.testutils.DataTestUtils.createRecordsWithoutBaseLogOffset;
@@ -126,17 +134,23 @@ class FileLogProjectionTest {
static Stream<Arguments> projectedFieldsArgs() {
return Stream.of(
- Arguments.of((Object) new int[] {0}),
- Arguments.arguments((Object) new int[] {1}),
- Arguments.arguments((Object) new int[] {0, 1}));
+ Arguments.of((Object) new int[] {0}, LOG_MAGIC_VALUE_V0),
+ Arguments.arguments((Object) new int[] {1},
LOG_MAGIC_VALUE_V0),
+ Arguments.arguments((Object) new int[] {0, 1},
LOG_MAGIC_VALUE_V0),
+ Arguments.of((Object) new int[] {0}, LOG_MAGIC_VALUE_V1),
+ Arguments.arguments((Object) new int[] {1},
LOG_MAGIC_VALUE_V1),
+ Arguments.arguments((Object) new int[] {0, 1},
LOG_MAGIC_VALUE_V1));
}
@ParameterizedTest
@MethodSource("projectedFieldsArgs")
- void testProject(int[] projectedFields) throws Exception {
+ void testProject(int[] projectedFields, byte recordBatchMagic) throws
Exception {
FileLogRecords fileLogRecords =
createFileLogRecords(
- TestData.DATA1_ROW_TYPE, TestData.DATA1,
TestData.ANOTHER_DATA1);
+ recordBatchMagic,
+ TestData.DATA1_ROW_TYPE,
+ TestData.DATA1,
+ TestData.ANOTHER_DATA1);
List<Object[]> results =
doProjection(
new FileLogProjection(),
@@ -159,11 +173,15 @@ class FileLogProjectionTest {
assertEquals(results, expected);
}
- @Test
- void testIllegalByteOrder() throws Exception {
+ @ParameterizedTest
+ @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
+ void testIllegalByteOrder(byte recordBatchMagic) throws Exception {
FileLogRecords fileLogRecords =
createFileLogRecords(
- TestData.DATA1_ROW_TYPE, TestData.DATA1,
TestData.ANOTHER_DATA1);
+ recordBatchMagic,
+ TestData.DATA1_ROW_TYPE,
+ TestData.DATA1,
+ TestData.ANOTHER_DATA1);
FileLogProjection projection = new FileLogProjection();
// overwrite the wrong decoding byte order endian
projection.getLogHeaderBuffer().order(ByteOrder.BIG_ENDIAN);
@@ -180,14 +198,18 @@ class FileLogProjectionTest {
.hasMessageContaining("Failed to read `arrow header` from file
channel");
}
- @Test
- void testProjectSizeLimited() throws Exception {
+ @ParameterizedTest
+ @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
+ void testProjectSizeLimited(byte recordBatchMagic) throws Exception {
List<Object[]> allData = new ArrayList<>();
allData.addAll(TestData.DATA1);
allData.addAll(TestData.ANOTHER_DATA1);
FileLogRecords fileLogRecords =
createFileLogRecords(
- TestData.DATA1_ROW_TYPE, TestData.DATA1,
TestData.ANOTHER_DATA1);
+ recordBatchMagic,
+ TestData.DATA1_ROW_TYPE,
+ TestData.DATA1,
+ TestData.ANOTHER_DATA1);
int totalSize = fileLogRecords.sizeInBytes();
boolean hasEmpty = false;
boolean hasHalf = false;
@@ -218,9 +240,89 @@ class FileLogProjectionTest {
assertThat(hasFull).isTrue();
}
+ @Test
+ void testReadLogHeaderFullyOrFail() throws Exception {
+ ByteBuffer logHeaderBuffer =
ByteBuffer.allocate(V1_RECORD_BATCH_HEADER_SIZE);
+
+ // only V1 log header, should read fully
+ try (FileLogRecords fileLogRecords =
+ createFileWithLogHeader(LOG_MAGIC_VALUE_V1,
V1_RECORD_BATCH_HEADER_SIZE)) {
+ FileLogProjection.readLogHeaderFullyOrFail(
+ fileLogRecords.channel(), logHeaderBuffer, 0);
+ assertThat(logHeaderBuffer.hasRemaining()).isFalse();
+ }
+
+ // V1 log header with data, should read fully
+ try (FileLogRecords fileLogRecords =
createFileWithLogHeader(LOG_MAGIC_VALUE_V1, 100)) {
+ logHeaderBuffer.rewind();
+ FileLogProjection.readLogHeaderFullyOrFail(
+ fileLogRecords.channel(), logHeaderBuffer, 0);
+ assertThat(logHeaderBuffer.hasRemaining()).isFalse();
+ }
+
+ // only v0 log header, should only read 48 bytes
+ try (FileLogRecords fileLogRecords =
+ createFileWithLogHeader(LOG_MAGIC_VALUE_V0,
V0_RECORD_BATCH_HEADER_SIZE)) {
+ logHeaderBuffer.rewind();
+ FileLogProjection.readLogHeaderFullyOrFail(
+ fileLogRecords.channel(), logHeaderBuffer, 0);
+ assertThat(logHeaderBuffer.hasRemaining()).isTrue();
+
assertThat(logHeaderBuffer.position()).isEqualTo(V0_RECORD_BATCH_HEADER_SIZE);
+ }
+
+ // v0 log header with data, should read fully
+ try (FileLogRecords fileLogRecords =
createFileWithLogHeader(LOG_MAGIC_VALUE_V0, 100)) {
+ logHeaderBuffer.rewind();
+ FileLogProjection.readLogHeaderFullyOrFail(
+ fileLogRecords.channel(), logHeaderBuffer, 0);
+ assertThat(logHeaderBuffer.hasRemaining()).isFalse();
+ }
+
+ // v1 log header incomplete, should throw exception
+ try (FileLogRecords fileLogRecords =
+ createFileWithLogHeader(LOG_MAGIC_VALUE_V1,
V0_RECORD_BATCH_HEADER_SIZE)) {
+ logHeaderBuffer.rewind();
+ assertThatThrownBy(
+ () ->
+ FileLogProjection.readLogHeaderFullyOrFail(
+ fileLogRecords.channel(),
logHeaderBuffer, 0),
+ "Should throw exception if the log header is
incomplete")
+ .isInstanceOf(EOFException.class)
+ .hasMessageContaining(
+ "Expected to read 52 bytes, but reached end of
file after reading 48 bytes.");
+ }
+
+ // v0 log header incomplete, should throw exception
+ try (FileLogRecords fileLogRecords =
+ createFileWithLogHeader(LOG_MAGIC_VALUE_V0,
V0_RECORD_BATCH_HEADER_SIZE - 1)) {
+ logHeaderBuffer.rewind();
+ assertThatThrownBy(
+ () ->
+ FileLogProjection.readLogHeaderFullyOrFail(
+ fileLogRecords.channel(),
logHeaderBuffer, 0),
+ "Should throw exception if the log header is
incomplete")
+ .isInstanceOf(EOFException.class)
+ .hasMessageContaining(
+ "Expected to read 48 bytes, but reached end of
file after reading 47 bytes.");
+ }
+ }
+
+ private FileLogRecords createFileWithLogHeader(byte magic, int length)
throws Exception {
+ ByteBuffer buffer =
ByteBuffer.allocate(length).order(ByteOrder.LITTLE_ENDIAN);
+ buffer.position(MAGIC_OFFSET);
+ buffer.put(magic);
+ buffer.position(length);
+ buffer.flip();
+ File file = new File(tempDir, UUID.randomUUID() + ".log");
+ FileLogRecords fileLogRecords = FileLogRecords.open(file);
+ fileLogRecords.channel().write(buffer);
+ fileLogRecords.flush();
+ return fileLogRecords;
+ }
+
@SafeVarargs
- private final FileLogRecords createFileLogRecords(RowType rowType,
List<Object[]>... inputs)
- throws Exception {
+ final FileLogRecords createFileLogRecords(
+ byte recordBatchMagic, RowType rowType, List<Object[]>... inputs)
throws Exception {
FileLogRecords fileLogRecords = FileLogRecords.open(new File(tempDir,
"test.tmp"));
long offsetBase = 0L;
for (List<Object[]> input : inputs) {
@@ -230,6 +332,7 @@ class FileLogProjectionTest {
DEFAULT_SCHEMA_ID,
offsetBase,
System.currentTimeMillis(),
+ recordBatchMagic,
input,
LogFormat.ARROW));
offsetBase += input.size();
diff --git
a/fluss-common/src/test/java/org/apache/fluss/record/KvRecordTestUtils.java
b/fluss-common/src/test/java/org/apache/fluss/record/KvRecordTestUtils.java
index 5a0115fd1..679920a21 100644
--- a/fluss-common/src/test/java/org/apache/fluss/record/KvRecordTestUtils.java
+++ b/fluss-common/src/test/java/org/apache/fluss/record/KvRecordTestUtils.java
@@ -33,8 +33,8 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
-import static org.apache.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE;
-import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
import static org.apache.fluss.testutils.DataTestUtils.compactedRow;
/** Test utils for kv record. */
diff --git
a/fluss-common/src/test/java/org/apache/fluss/record/LogRecordBatchFormatTest.java
b/fluss-common/src/test/java/org/apache/fluss/record/LogRecordBatchFormatTest.java
new file mode 100644
index 000000000..3f83aa7a5
--- /dev/null
+++
b/fluss-common/src/test/java/org/apache/fluss/record/LogRecordBatchFormatTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.record;
+
+import org.junit.jupiter.api.Test;
+
+import static
org.apache.fluss.record.LogRecordBatchFormat.HEADER_SIZE_UP_TO_MAGIC;
+import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_OFFSET;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_OVERHEAD;
+import static org.apache.fluss.record.LogRecordBatchFormat.MAGIC_OFFSET;
+import static
org.apache.fluss.record.LogRecordBatchFormat.arrowChangeTypeOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.attributeOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.batchSequenceOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.crcOffset;
+import static
org.apache.fluss.record.LogRecordBatchFormat.lastOffsetDeltaOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.leaderEpochOffset;
+import static
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
+import static org.apache.fluss.record.LogRecordBatchFormat.recordsCountOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.schemaIdOffset;
+import static org.apache.fluss.record.LogRecordBatchFormat.writeClientIdOffset;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link LogRecordBatchFormat}. */
+public class LogRecordBatchFormatTest {
+
+ @Test
+ void testCommonParam() {
+ assertThat(LENGTH_OFFSET).isEqualTo(8);
+ assertThat(MAGIC_OFFSET).isEqualTo(12);
+ assertThat(LOG_OVERHEAD).isEqualTo(12);
+ assertThat(HEADER_SIZE_UP_TO_MAGIC).isEqualTo(13);
+ }
+
+ @Test
+ void testLogRecordBatchFormatForMagicV0() {
+ byte magic = (byte) 0;
+ assertThatThrownBy(() -> leaderEpochOffset(magic))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Unsupported magic value 0");
+ assertThat(crcOffset(magic)).isEqualTo(21);
+ assertThat(schemaIdOffset(magic)).isEqualTo(25);
+ assertThat(attributeOffset(magic)).isEqualTo(27);
+ assertThat(lastOffsetDeltaOffset(magic)).isEqualTo(28);
+ assertThat(writeClientIdOffset(magic)).isEqualTo(32);
+ assertThat(batchSequenceOffset(magic)).isEqualTo(40);
+ assertThat(recordsCountOffset(magic)).isEqualTo(44);
+ assertThat(recordBatchHeaderSize(magic)).isEqualTo(48);
+ assertThat(arrowChangeTypeOffset(magic)).isEqualTo(48);
+ }
+
+ @Test
+ void testLogRecordBatchFormatForMagicV1() {
+ byte magic = (byte) 1;
+ assertThat(leaderEpochOffset(magic)).isEqualTo(21);
+ assertThat(crcOffset(magic)).isEqualTo(25);
+ assertThat(schemaIdOffset(magic)).isEqualTo(29);
+ assertThat(attributeOffset(magic)).isEqualTo(31);
+ assertThat(lastOffsetDeltaOffset(magic)).isEqualTo(32);
+ assertThat(writeClientIdOffset(magic)).isEqualTo(36);
+ assertThat(batchSequenceOffset(magic)).isEqualTo(44);
+ assertThat(recordsCountOffset(magic)).isEqualTo(48);
+ assertThat(recordBatchHeaderSize(magic)).isEqualTo(52);
+ assertThat(arrowChangeTypeOffset(magic)).isEqualTo(52);
+ }
+}
diff --git
a/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java
b/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java
index 7759e711e..5c0df6e36 100644
---
a/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java
+++
b/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java
@@ -38,17 +38,23 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import static
org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
import static org.apache.fluss.compression.ArrowCompressionInfo.NO_COMPRESSION;
+import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V0;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
import static org.apache.fluss.record.TestData.DATA1;
import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
@@ -77,14 +83,15 @@ public class MemoryLogRecordsArrowBuilderTest {
allocator.close();
}
- @Test
- void testAppendWithEmptyRecord() throws Exception {
+ @ParameterizedTest
+ @MethodSource("magicAndExpectedBatchSize")
+ void testAppendWithEmptyRecord(byte recordBatchMagic, int
expectedBatchSize) throws Exception {
int maxSizeInBytes = 1024;
ArrowWriter writer =
provider.getOrCreateWriter(
1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE,
DEFAULT_COMPRESSION);
MemoryLogRecordsArrowBuilder builder =
- createMemoryLogRecordsArrowBuilder(0, writer, 10, 100);
+ createMemoryLogRecordsArrowBuilder(0, writer, 10, 100,
recordBatchMagic);
assertThat(builder.isFull()).isFalse();
assertThat(builder.getWriteLimitInBytes())
.isEqualTo((int) (maxSizeInBytes * BUFFER_USAGE_RATIO));
@@ -95,18 +102,19 @@ public class MemoryLogRecordsArrowBuilderTest {
assertThat(iterator.hasNext()).isTrue();
LogRecordBatch batch = iterator.next();
assertThat(batch.getRecordCount()).isEqualTo(0);
- assertThat(batch.sizeInBytes()).isEqualTo(48);
+ assertThat(batch.sizeInBytes()).isEqualTo(expectedBatchSize);
assertThat(iterator.hasNext()).isFalse();
}
- @Test
- void testAppend() throws Exception {
+ @ParameterizedTest
+ @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
+ void testAppend(byte recordBatchMagic) throws Exception {
int maxSizeInBytes = 1024;
ArrowWriter writer =
provider.getOrCreateWriter(
1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE,
DEFAULT_COMPRESSION);
MemoryLogRecordsArrowBuilder builder =
- createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024);
+ createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024,
recordBatchMagic);
List<ChangeType> changeTypes =
DATA1.stream().map(row ->
ChangeType.APPEND_ONLY).collect(Collectors.toList());
List<InternalRow> rows =
@@ -157,7 +165,7 @@ public class MemoryLogRecordsArrowBuilderTest {
provider.getOrCreateWriter(
1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE,
NO_COMPRESSION);
MemoryLogRecordsArrowBuilder builder =
- createMemoryLogRecordsArrowBuilder(0, writer1, 10, 1024);
+ createMemoryLogRecordsArrowBuilder(0, writer1, 10, 1024,
CURRENT_LOG_MAGIC_VALUE);
for (Object[] data : dataSet) {
builder.append(ChangeType.APPEND_ONLY, row(data));
}
@@ -171,7 +179,7 @@ public class MemoryLogRecordsArrowBuilderTest {
provider.getOrCreateWriter(
1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE,
compressionInfo);
MemoryLogRecordsArrowBuilder builder2 =
- createMemoryLogRecordsArrowBuilder(0, writer2, 10, 1024);
+ createMemoryLogRecordsArrowBuilder(0, writer2, 10, 1024,
CURRENT_LOG_MAGIC_VALUE);
for (Object[] data : dataSet) {
builder2.append(ChangeType.APPEND_ONLY, row(data));
}
@@ -184,8 +192,9 @@ public class MemoryLogRecordsArrowBuilderTest {
assertThat(sizeInBytes1).isGreaterThan(sizeInBytes2);
}
- @Test
- void testIllegalArgument() {
+ @ParameterizedTest
+ @MethodSource("magicAndExpectedBatchSize")
+ void testIllegalArgument(byte recordBatchMagic, int expectedBatchSize) {
int maxSizeInBytes = 1024;
assertThatThrownBy(
() -> {
@@ -196,12 +205,15 @@ public class MemoryLogRecordsArrowBuilderTest {
maxSizeInBytes,
DATA1_ROW_TYPE,
DEFAULT_COMPRESSION)) {
- createMemoryLogRecordsArrowBuilder(0, writer,
10, 30);
+ createMemoryLogRecordsArrowBuilder(
+ 0, writer, 10, 30, recordBatchMagic);
}
})
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
- "The size of first segment of pagedOutputView is too
small, need at least 48 bytes.");
+ "The size of first segment of pagedOutputView is too
small, need at least "
+ + expectedBatchSize
+ + " bytes.");
}
@Test
@@ -211,7 +223,7 @@ public class MemoryLogRecordsArrowBuilderTest {
provider.getOrCreateWriter(
1L, DEFAULT_SCHEMA_ID, 1024, DATA1_ROW_TYPE,
NO_COMPRESSION);
MemoryLogRecordsArrowBuilder builder =
- createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024);
+ createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024,
CURRENT_LOG_MAGIC_VALUE);
List<ChangeType> changeTypes =
DATA1.stream().map(row ->
ChangeType.APPEND_ONLY).collect(Collectors.toList());
List<InternalRow> rows =
@@ -244,17 +256,18 @@ public class MemoryLogRecordsArrowBuilderTest {
writer1.close();
}
- @Test
- void testNoRecordAppend() throws Exception {
+ @ParameterizedTest
+ @MethodSource("magicAndExpectedBatchSize")
+ void testNoRecordAppend(byte recordBatchMagic, int expectedBatchSize)
throws Exception {
// 1. no record append with base offset as 0.
ArrowWriter writer =
provider.getOrCreateWriter(
1L, DEFAULT_SCHEMA_ID, 1024 * 10, DATA1_ROW_TYPE,
DEFAULT_COMPRESSION);
MemoryLogRecordsArrowBuilder builder =
- createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024 * 10);
+ createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024 * 10,
recordBatchMagic);
MemoryLogRecords memoryLogRecords =
MemoryLogRecords.pointToBytesView(builder.build());
// only contains batch header.
- assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(48);
+
assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(expectedBatchSize);
Iterator<LogRecordBatch> iterator =
memoryLogRecords.batches().iterator();
assertThat(iterator.hasNext()).isTrue();
LogRecordBatch logRecordBatch = iterator.next();
@@ -276,10 +289,10 @@ public class MemoryLogRecordsArrowBuilderTest {
ArrowWriter writer2 =
provider.getOrCreateWriter(
1L, DEFAULT_SCHEMA_ID, 1024 * 10, DATA1_ROW_TYPE,
DEFAULT_COMPRESSION);
- builder = createMemoryLogRecordsArrowBuilder(100, writer2, 10, 1024 *
10);
+ builder = createMemoryLogRecordsArrowBuilder(100, writer2, 10, 1024 *
10, recordBatchMagic);
memoryLogRecords = MemoryLogRecords.pointToBytesView(builder.build());
// only contains batch header.
- assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(48);
+
assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(expectedBatchSize);
iterator = memoryLogRecords.batches().iterator();
assertThat(iterator.hasNext()).isTrue();
logRecordBatch = iterator.next();
@@ -305,7 +318,7 @@ public class MemoryLogRecordsArrowBuilderTest {
provider.getOrCreateWriter(
1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE,
DEFAULT_COMPRESSION);
MemoryLogRecordsArrowBuilder builder =
- createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024);
+ createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024,
CURRENT_LOG_MAGIC_VALUE);
List<ChangeType> changeTypes =
DATA1.stream().map(row ->
ChangeType.APPEND_ONLY).collect(Collectors.toList());
List<InternalRow> rows =
@@ -343,8 +356,19 @@ public class MemoryLogRecordsArrowBuilderTest {
new ArrowCompressionInfo(ArrowCompressionType.ZSTD, 9));
}
+ private static Collection<Arguments> magicAndExpectedBatchSize() {
+ List<Arguments> params = new ArrayList<>();
+ params.add(Arguments.arguments(LOG_MAGIC_VALUE_V0, 48));
+ params.add(Arguments.arguments(LOG_MAGIC_VALUE_V1, 52));
+ return params;
+ }
+
private MemoryLogRecordsArrowBuilder createMemoryLogRecordsArrowBuilder(
- int baseOffset, ArrowWriter writer, int maxPages, int
pageSizeInBytes)
+ int baseOffset,
+ ArrowWriter writer,
+ int maxPages,
+ int pageSizeInBytes,
+ byte recordBatchMagic)
throws IOException {
conf.set(
ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE,
@@ -353,6 +377,7 @@ public class MemoryLogRecordsArrowBuilderTest {
conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, new
MemorySize(pageSizeInBytes));
return MemoryLogRecordsArrowBuilder.builder(
baseOffset,
+ recordBatchMagic,
DEFAULT_SCHEMA_ID,
writer,
new ManagedPagedOutputView(new
TestingMemorySegmentPool(pageSizeInBytes)));
diff --git
a/fluss-common/src/test/java/org/apache/fluss/record/MemorySegmentLogInputStreamTest.java
b/fluss-common/src/test/java/org/apache/fluss/record/MemorySegmentLogInputStreamTest.java
index 3fe720853..576e90131 100644
---
a/fluss-common/src/test/java/org/apache/fluss/record/MemorySegmentLogInputStreamTest.java
+++
b/fluss-common/src/test/java/org/apache/fluss/record/MemorySegmentLogInputStreamTest.java
@@ -17,13 +17,16 @@
package org.apache.fluss.record;
+import org.apache.fluss.memory.MemorySegment;
import org.apache.fluss.testutils.DataTestUtils;
import org.junit.jupiter.api.Test;
import java.util.Iterator;
-import static org.apache.fluss.record.DefaultLogRecordBatch.LOG_OVERHEAD;
+import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_OVERHEAD;
+import static org.apache.fluss.record.LogRecordBatchFormat.MAGIC_OFFSET;
import static org.apache.fluss.record.TestData.DATA1;
import static org.assertj.core.api.Assertions.assertThat;
@@ -52,13 +55,15 @@ public class MemorySegmentLogInputStreamTest {
iterator = getIterator(memoryLogRecords);
assertThat(iterator.hasNext()).isFalse();
- // gen batch with enough size.
+ // gen batch with not enough size.
memoryLogRecords = MemoryLogRecords.pointToBytes(new
byte[LOG_OVERHEAD]);
iterator = getIterator(memoryLogRecords);
- assertThat(iterator.hasNext()).isTrue();
+ assertThat(iterator.hasNext()).isFalse();
// gen batch with enough size.
- memoryLogRecords = MemoryLogRecords.pointToBytes(new byte[12]);
+ MemorySegment memory = MemorySegment.allocateHeapMemory(100);
+ memory.put(MAGIC_OFFSET, CURRENT_LOG_MAGIC_VALUE);
+ memoryLogRecords =
MemoryLogRecords.pointToBytes(memory.getHeapMemory());
iterator = getIterator(memoryLogRecords);
assertThat(iterator.hasNext()).isTrue();
}
diff --git a/fluss-common/src/test/java/org/apache/fluss/record/TestData.java
b/fluss-common/src/test/java/org/apache/fluss/record/TestData.java
index a920efa94..7752b15b0 100644
--- a/fluss-common/src/test/java/org/apache/fluss/record/TestData.java
+++ b/fluss-common/src/test/java/org/apache/fluss/record/TestData.java
@@ -32,11 +32,13 @@ import org.apache.fluss.utils.types.Tuple2;
import java.util.Arrays;
import java.util.List;
+import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
+
/** utils to create test data. */
public final class TestData {
public static final short DEFAULT_SCHEMA_ID = 1;
public static final long BASE_OFFSET = 0L;
- public static final byte DEFAULT_MAGIC = (byte) 0;
+ public static final byte DEFAULT_MAGIC = CURRENT_LOG_MAGIC_VALUE;
// ---------------------------- data1 and related table info begin
---------------------------
public static final List<Object[]> DATA1 =
Arrays.asList(
diff --git
a/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java
b/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java
index cedaadcbd..ce9336c5e 100644
---
a/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java
+++
b/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java
@@ -44,7 +44,8 @@ import java.util.List;
import static
org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
import static org.apache.fluss.compression.ArrowCompressionInfo.NO_COMPRESSION;
-import static
org.apache.fluss.record.DefaultLogRecordBatch.ARROW_CHANGETYPE_OFFSET;
+import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
+import static
org.apache.fluss.record.LogRecordBatchFormat.arrowChangeTypeOffset;
import static org.apache.fluss.record.TestData.DATA1;
import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
import static org.apache.fluss.testutils.DataTestUtils.row;
@@ -147,12 +148,14 @@ class ArrowReaderWriterTest {
new ManagedPagedOutputView(new TestingMemorySegmentPool(10
* 1024));
// skip arrow batch header.
- int size = writer.serializeToOutputView(pagedOutputView,
ARROW_CHANGETYPE_OFFSET);
+ int size =
+ writer.serializeToOutputView(
+ pagedOutputView,
arrowChangeTypeOffset(CURRENT_LOG_MAGIC_VALUE));
MemorySegment segment =
MemorySegment.allocateHeapMemory(writer.estimatedSizeInBytes());
assertThat(pagedOutputView.getWrittenSegments().size()).isEqualTo(1);
MemorySegment firstSegment = pagedOutputView.getCurrentSegment();
- firstSegment.copyTo(ARROW_CHANGETYPE_OFFSET, segment, 0, size);
+
firstSegment.copyTo(arrowChangeTypeOffset(CURRENT_LOG_MAGIC_VALUE), segment, 0,
size);
ArrowReader reader =
ArrowUtils.createArrowReader(segment, 0, size, root,
allocator, rowType);
diff --git
a/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java
b/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java
index 4f00fc14b..2236ebe21 100644
--- a/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java
+++ b/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java
@@ -70,8 +70,9 @@ import java.util.List;
import java.util.stream.Collectors;
import static
org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
-import static org.apache.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE;
-import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID;
+import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
import static
org.apache.fluss.record.LogRecordReadContext.createArrowReadContext;
import static org.apache.fluss.record.TestData.BASE_OFFSET;
import static org.apache.fluss.record.TestData.DATA1;
@@ -150,17 +151,23 @@ public class DataTestUtils {
return (CompactedRow) rowEncoder.finishRow();
}
- public static MemoryLogRecords genMemoryLogRecordsByObject(List<Object[]>
objects)
+ public static MemoryLogRecords genMemoryLogRecordsByObject(byte magic,
List<Object[]> objects)
throws Exception {
return createRecordsWithoutBaseLogOffset(
DATA1_ROW_TYPE,
DEFAULT_SCHEMA_ID,
0,
System.currentTimeMillis(),
+ magic,
objects,
LogFormat.ARROW);
}
+ public static MemoryLogRecords genMemoryLogRecordsByObject(List<Object[]>
objects)
+ throws Exception {
+ return genMemoryLogRecordsByObject(CURRENT_LOG_MAGIC_VALUE, objects);
+ }
+
public static MemoryLogRecords genMemoryLogRecordsWithWriterId(
List<Object[]> objects, long writerId, int batchSequence, long
baseOffset)
throws Exception {
@@ -171,6 +178,7 @@ public class DataTestUtils {
DEFAULT_SCHEMA_ID,
baseOffset,
System.currentTimeMillis(),
+ CURRENT_LOG_MAGIC_VALUE,
writerId,
batchSequence,
changeTypes,
@@ -196,7 +204,13 @@ public class DataTestUtils {
public static MemoryLogRecords genMemoryLogRecordsWithBaseOffset(
long offsetBase, List<Object[]> objects) throws Exception {
return createRecordsWithoutBaseLogOffset(
- DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID, offsetBase, -1L, objects,
LogFormat.ARROW);
+ DATA1_ROW_TYPE,
+ DEFAULT_SCHEMA_ID,
+ offsetBase,
+ -1L,
+ CURRENT_LOG_MAGIC_VALUE,
+ objects,
+ LogFormat.ARROW);
}
public static MemoryLogRecords genLogRecordsWithBaseOffsetAndTimestamp(
@@ -206,6 +220,7 @@ public class DataTestUtils {
DEFAULT_SCHEMA_ID,
offsetBase,
maxTimestamp,
+ CURRENT_LOG_MAGIC_VALUE,
objects,
LogFormat.ARROW);
}
@@ -316,6 +331,7 @@ public class DataTestUtils {
DEFAULT_SCHEMA_ID,
baseOffset,
System.currentTimeMillis(),
+ CURRENT_LOG_MAGIC_VALUE,
objects,
logFormat));
fileLogRecords.flush();
@@ -358,6 +374,7 @@ public class DataTestUtils {
int schemaId,
long offsetBase,
long maxTimestamp,
+ byte magic,
List<Object[]> objects,
LogFormat logFormat)
throws Exception {
@@ -368,6 +385,7 @@ public class DataTestUtils {
schemaId,
offsetBase,
maxTimestamp,
+ magic,
NO_WRITER_ID,
NO_BATCH_SEQUENCE,
changeTypes,
@@ -381,6 +399,7 @@ public class DataTestUtils {
int schemaId,
long offsetBase,
long maxTimestamp,
+ byte magic,
long writerId,
int batchSequence,
List<ChangeType> changeTypes,
@@ -393,6 +412,7 @@ public class DataTestUtils {
schemaId,
offsetBase,
maxTimestamp,
+ magic,
writerId,
batchSequence,
changeTypes,
@@ -406,6 +426,7 @@ public class DataTestUtils {
int schemaId,
long offsetBase,
long maxTimestamp,
+ byte magic,
long writerId,
int batchSequence,
List<ChangeType> changeTypes,
@@ -420,6 +441,7 @@ public class DataTestUtils {
rowType,
offsetBase,
maxTimestamp,
+ magic,
schemaId,
writerId,
batchSequence,
@@ -458,7 +480,7 @@ public class DataTestUtils {
}
builder.setWriterState(writerId, batchSequence);
MemoryLogRecords memoryLogRecords =
MemoryLogRecords.pointToBytesView(builder.build());
- memoryLogRecords.ensureValid();
+ memoryLogRecords.ensureValid(DEFAULT_MAGIC);
((DefaultLogRecordBatch) memoryLogRecords.batches().iterator().next())
.setCommitTimestamp(maxTimestamp);
@@ -470,6 +492,7 @@ public class DataTestUtils {
RowType rowType,
long baseLogOffset,
long maxTimestamp,
+ byte magic,
int schemaId,
long writerId,
int batchSequence,
@@ -485,6 +508,7 @@ public class DataTestUtils {
MemoryLogRecordsArrowBuilder builder =
MemoryLogRecordsArrowBuilder.builder(
baseLogOffset,
+ magic,
schemaId,
writer,
new ManagedPagedOutputView(new
TestingMemorySegmentPool(10 * 1024)));
@@ -497,7 +521,7 @@ public class DataTestUtils {
((DefaultLogRecordBatch)
memoryLogRecords.batches().iterator().next())
.setCommitTimestamp(maxTimestamp);
- memoryLogRecords.ensureValid();
+ memoryLogRecords.ensureValid(magic);
return memoryLogRecords;
}
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java
index 105fc47e7..0a30d5f5f 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java
@@ -46,7 +46,7 @@ import java.io.File;
import java.io.IOException;
import java.util.Optional;
-import static
org.apache.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
+import static
org.apache.fluss.record.LogRecordBatchFormat.V0_RECORD_BATCH_HEADER_SIZE;
import static org.apache.fluss.utils.IOUtils.closeQuietly;
/* This file is based on source code of Apache Kafka Project
(https://kafka.apache.org/), licensed by the Apache
@@ -495,7 +495,8 @@ public final class LogSegment {
new LogOffsetMetadata(startOffset, this.baseOffset,
startPosition);
int adjustedMaxSize =
minOneMessage ? Math.max(maxSize,
startOffsetAndSize.getSize()) : maxSize;
- if (adjustedMaxSize <= RECORD_BATCH_HEADER_SIZE) {
+ // use V0 size as the lower bound, since V1 header size is large than
V0
+ if (adjustedMaxSize <= V0_RECORD_BATCH_HEADER_SIZE) {
return new FetchDataInfo(offsetMetadata, MemoryLogRecords.EMPTY);
}
if (projection == null) {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterAppendInfo.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterAppendInfo.java
index 9ed95e35b..cd625b155 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterAppendInfo.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterAppendInfo.java
@@ -21,7 +21,7 @@ import org.apache.fluss.exception.OutOfOrderSequenceException;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.record.LogRecordBatch;
-import static org.apache.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
/**
* This class is used to validate the records appended by a given writer
before they are written to
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateEntry.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateEntry.java
index c0b434991..cf237a73e 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateEntry.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateEntry.java
@@ -25,7 +25,7 @@ import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Optional;
-import static org.apache.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
/**
* This class represents the state of a specific writer id. The batch sequence
number is ordered
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java
index e8712964c..4fa0baced 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java
@@ -56,6 +56,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
import static org.apache.fluss.utils.FlussPaths.WRITER_SNAPSHOT_FILE_SUFFIX;
import static org.apache.fluss.utils.FlussPaths.writerSnapshotFile;
@@ -232,7 +233,7 @@ public class WriterStateManager {
/** Update the mapping with the given append information. */
public void update(WriterAppendInfo appendInfo) {
long writerId = appendInfo.writerId();
- if (writerId == LogRecordBatch.NO_WRITER_ID) {
+ if (writerId == NO_WRITER_ID) {
throw new IllegalArgumentException(
"Invalid writer id "
+ writerId
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
index 1f3271562..6734a32d5 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
@@ -77,8 +77,9 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static
org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
-import static org.apache.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE;
-import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID;
+import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK;
import static org.apache.fluss.record.TestData.DATA2_SCHEMA;
import static org.apache.fluss.record.TestData.DATA3_SCHEMA_PK;
@@ -893,6 +894,7 @@ class KvTabletTest {
DEFAULT_SCHEMA_ID,
baseOffset,
-1L,
+ CURRENT_LOG_MAGIC_VALUE,
NO_WRITER_ID,
NO_BATCH_SEQUENCE,
changeTypes,
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
index 526b1b350..045bd3525 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
@@ -57,8 +57,9 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static
org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
-import static org.apache.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE;
-import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID;
+import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
import static org.apache.fluss.record.TestData.DATA1;
import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH_PK;
@@ -671,6 +672,7 @@ final class ReplicaTest extends ReplicaTestBase {
DEFAULT_SCHEMA_ID,
baseOffset,
-1L,
+ CURRENT_LOG_MAGIC_VALUE,
NO_WRITER_ID,
NO_BATCH_SEQUENCE,
changeTypes,
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
index 04f8f3ce3..afd10c943 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
@@ -62,6 +62,8 @@ import org.apache.fluss.utils.types.Tuple2;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import javax.annotation.Nullable;
@@ -77,6 +79,8 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V0;
+import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
import static org.apache.fluss.record.TestData.ANOTHER_DATA1;
import static org.apache.fluss.record.TestData.DATA1;
import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
@@ -118,8 +122,9 @@ public class TabletServiceITCase {
public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
FlussClusterExtension.builder().setNumOfTabletServers(3).build();
- @Test
- void testProduceLog() throws Exception {
+ @ParameterizedTest
+ @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
+ void testProduceLog(byte recordBatchMagic) throws Exception {
long tableId =
createTable(FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH,
DATA1_TABLE_DESCRIPTOR);
TableBucket tb = new TableBucket(tableId, 0);
@@ -134,7 +139,10 @@ public class TabletServiceITCase {
leaderGateWay
.produceLog(
newProduceLogRequest(
- tableId, 0, 1,
genMemoryLogRecordsByObject(DATA1)))
+ tableId,
+ 0,
+ 1,
+
genMemoryLogRecordsByObject(recordBatchMagic, DATA1)))
.get(),
0,
0L);
@@ -205,8 +213,9 @@ public class TabletServiceITCase {
}
}
- @Test
- void testFetchLog() throws Exception {
+ @ParameterizedTest
+ @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
+ void testFetchLog(byte recordBatchMagic) throws Exception {
long tableId =
createTable(FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH,
DATA1_TABLE_DESCRIPTOR);
TableBucket tb = new TableBucket(tableId, 0);
@@ -222,7 +231,10 @@ public class TabletServiceITCase {
leaderGateWay
.produceLog(
newProduceLogRequest(
- tableId, 0, 1,
genMemoryLogRecordsByObject(DATA1)))
+ tableId,
+ 0,
+ 1,
+
genMemoryLogRecordsByObject(recordBatchMagic, DATA1)))
.get(),
0,
0L);
@@ -248,7 +260,11 @@ public class TabletServiceITCase {
leaderGateWay
.produceLog(
newProduceLogRequest(
- tableId, 0, 1,
genMemoryLogRecordsByObject(ANOTHER_DATA1)))
+ tableId,
+ 0,
+ 1,
+ genMemoryLogRecordsByObject(
+ recordBatchMagic,
ANOTHER_DATA1)))
.get(),
0,
10L);