This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 03c860243 [kv] Supports compacted row as change log (#2108)
03c860243 is described below

commit 03c8602433a9872ab0e7d20624d021ac0c6e33cc
Author: yuxia Luo <[email protected]>
AuthorDate: Tue Dec 16 11:17:56 2025 +0800

    [kv] Supports compacted row as change log (#2108)
    
    ---------
    
    Co-authored-by: ipolyzos <[email protected]>
---
 .../fluss/client/admin/FlussAdminITCase.java       |   2 +-
 .../fluss/client/table/FlussTableITCase.java       |  16 +-
 .../org/apache/fluss/config/ConfigOptions.java     |   2 +-
 .../java/org/apache/fluss/metadata/LogFormat.java  |  24 ++-
 ...ava => AbstractRowMemoryLogRecordsBuilder.java} |  91 ++++------
 ...dexedLogRecord.java => CompactedLogRecord.java} |  94 ++++------
 .../apache/fluss/record/DefaultLogRecordBatch.java |  29 +++
 .../org/apache/fluss/record/IndexedLogRecord.java  |  19 +-
 .../java/org/apache/fluss/record/LogRecord.java    |  28 +++
 .../apache/fluss/record/LogRecordReadContext.java  |  25 +++
 .../fluss/record/MemoryLogRecordsArrowBuilder.java |   2 +-
 .../record/MemoryLogRecordsCompactedBuilder.java   |  78 ++++++++
 .../record/MemoryLogRecordsIndexedBuilder.java     | 196 +--------------------
 .../fluss/record/CompactedLogRecordTest.java       |  89 ++++++++++
 .../MemoryLogRecordsCompactedBuilderTest.java      | 189 ++++++++++++++++++++
 .../apache/fluss/server/kv/KvRecoverHelper.java    |  20 ++-
 .../java/org/apache/fluss/server/kv/KvTablet.java  |   3 +
 .../fluss/server/kv/wal/CompactedWalBuilder.java   |  92 ++++++++++
 .../org/apache/fluss/server/replica/Replica.java   |   1 +
 .../server/utils/TableDescriptorValidation.java    |   8 +-
 .../server/replica/KvReplicaRestoreITCase.java     |  13 +-
 21 files changed, 678 insertions(+), 343 deletions(-)

diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index bd7954284..62026ca2f 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -659,7 +659,7 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
                 .cause()
                 .isInstanceOf(InvalidConfigException.class)
                 .hasMessageContaining(
-                        "Currently, Primary Key Table only supports ARROW log 
format if kv format is COMPACTED.");
+                        "Currently, Primary Key Table supports ARROW or 
COMPACTED log format when kv format is COMPACTED.");
     }
 
     @Test
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
index bfd9926df..0cc2f7f10 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
@@ -753,6 +753,11 @@ class FlussTableITCase extends ClientToServerITCaseBase {
         verifyAppendOrPut(false, "ARROW", kvFormat);
     }
 
+    @Test
+    void testPutAndPollCompacted() throws Exception {
+        verifyAppendOrPut(false, "COMPACTED", "COMPACTED");
+    }
+
     void verifyAppendOrPut(boolean append, String logFormat, @Nullable String 
kvFormat)
             throws Exception {
         Schema schema =
@@ -911,8 +916,9 @@ class FlussTableITCase extends ClientToServerITCaseBase {
         }
     }
 
-    @Test
-    void testPutAndProject() throws Exception {
+    @ParameterizedTest
+    @ValueSource(strings = {"ARROW", "COMPACTED"})
+    void testPutAndProject(String changelogFormat) throws Exception {
         Schema schema =
                 Schema.newBuilder()
                         .column("a", DataTypes.INT())
@@ -921,7 +927,11 @@ class FlussTableITCase extends ClientToServerITCaseBase {
                         .column("d", DataTypes.BIGINT())
                         .primaryKey("a")
                         .build();
-        TableDescriptor tableDescriptor = 
TableDescriptor.builder().schema(schema).build();
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder()
+                        .schema(schema)
+                        .property(ConfigOptions.TABLE_LOG_FORMAT.key(), 
changelogFormat)
+                        .build();
         TablePath tablePath = TablePath.of("test_db_1", "test_pk_table_1");
         createTable(tablePath, tableDescriptor, false);
 
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java 
b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index 38c36c4c4..d571db270 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -1230,7 +1230,7 @@ public class ConfigOptions {
                     .defaultValue(LogFormat.ARROW)
                     .withDescription(
                             "The format of the log records in log store. The 
default value is `arrow`. "
-                                    + "The supported formats are `arrow` and 
`indexed`.");
+                                    + "The supported formats are `arrow`, 
`indexed` and `compacted`.");
 
     public static final ConfigOption<ArrowCompressionType> 
TABLE_LOG_ARROW_COMPRESSION_TYPE =
             key("table.log.arrow.compression.type")
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metadata/LogFormat.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/LogFormat.java
index d1b6de4ba..b5f460162 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/LogFormat.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/LogFormat.java
@@ -18,10 +18,15 @@
 package org.apache.fluss.metadata;
 
 import org.apache.fluss.record.MemoryLogRecordsArrowBuilder;
+import org.apache.fluss.record.MemoryLogRecordsCompactedBuilder;
 import org.apache.fluss.record.MemoryLogRecordsIndexedBuilder;
+import org.apache.fluss.row.compacted.CompactedRow;
 import org.apache.fluss.row.indexed.IndexedRow;
 
-/** The format of the log records in log store. The supported formats are 
'arrow' and 'indexed'. */
+/**
+ * The format of the log records in log store. The supported formats are 
'arrow', 'indexed' and
+ * 'compacted'.
+ */
 public enum LogFormat {
 
     /**
@@ -41,11 +46,20 @@ public enum LogFormat {
      *
      * @see MemoryLogRecordsIndexedBuilder
      */
-    INDEXED;
+    INDEXED,
+
+    /**
+     * The log record batches are stored in {@link CompactedRow} format which 
is a compact
+     * row-oriented format optimized for primary key tables to reduce storage 
while trading CPU for
+     * reads.
+     *
+     * @see MemoryLogRecordsCompactedBuilder
+     */
+    COMPACTED;
 
     /**
-     * Creates a {@link LogFormat} from the given string. The string must be 
either 'arrow' or
-     * 'indexed'.
+     * Creates a {@link LogFormat} from the given string. The string must be 
either 'arrow',
+     * 'indexed' or 'compacted'.
      */
     public static LogFormat fromString(String format) {
         switch (format.toUpperCase()) {
@@ -53,6 +67,8 @@ public enum LogFormat {
                 return ARROW;
             case "INDEXED":
                 return INDEXED;
+            case "COMPACTED":
+                return COMPACTED;
             default:
                 throw new IllegalArgumentException("Unsupported log format: " 
+ format);
         }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
 
b/fluss-common/src/main/java/org/apache/fluss/record/AbstractRowMemoryLogRecordsBuilder.java
similarity index 72%
copy from 
fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
copy to 
fluss-common/src/main/java/org/apache/fluss/record/AbstractRowMemoryLogRecordsBuilder.java
index 8488c3351..423b35a3b 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/AbstractRowMemoryLogRecordsBuilder.java
@@ -17,19 +17,15 @@
 
 package org.apache.fluss.record;
 
-import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.memory.AbstractPagedOutputView;
 import org.apache.fluss.memory.MemorySegment;
 import org.apache.fluss.memory.MemorySegmentOutputView;
-import org.apache.fluss.metadata.LogFormat;
 import org.apache.fluss.record.bytesview.BytesView;
 import org.apache.fluss.record.bytesview.MultiBytesView;
-import org.apache.fluss.row.indexed.IndexedRow;
 import org.apache.fluss.utils.crc.Crc32C;
 
 import java.io.IOException;
 
-import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
 import static org.apache.fluss.record.LogRecordBatchFormat.BASE_OFFSET_LENGTH;
 import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_LENGTH;
 import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
@@ -42,20 +38,18 @@ import static 
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize
 import static org.apache.fluss.record.LogRecordBatchFormat.schemaIdOffset;
 import static org.apache.fluss.utils.Preconditions.checkArgument;
 
-/**
- * Default builder for {@link MemoryLogRecords} of log records in {@link 
LogFormat#INDEXED} format.
- */
-public class MemoryLogRecordsIndexedBuilder implements AutoCloseable {
-    private static final int BUILDER_DEFAULT_OFFSET = 0;
+/** Abstract base builder for row-based MemoryLogRecords builders sharing 
common logic. */
+abstract class AbstractRowMemoryLogRecordsBuilder<T> implements AutoCloseable {
+    protected static final int BUILDER_DEFAULT_OFFSET = 0;
 
-    private final long baseLogOffset;
-    private final int schemaId;
+    protected final long baseLogOffset;
+    protected final int schemaId;
     // The max bytes can be appended.
-    private final int writeLimit;
-    private final byte magic;
-    private final AbstractPagedOutputView pagedOutputView;
-    private final MemorySegment firstSegment;
-    private final boolean appendOnly;
+    protected final int writeLimit;
+    protected final byte magic;
+    protected final AbstractPagedOutputView pagedOutputView;
+    protected final MemorySegment firstSegment;
+    protected final boolean appendOnly;
 
     private BytesView builtBuffer = null;
     private long writerId;
@@ -65,7 +59,7 @@ public class MemoryLogRecordsIndexedBuilder implements 
AutoCloseable {
     private volatile boolean isClosed;
     private boolean aborted = false;
 
-    private MemoryLogRecordsIndexedBuilder(
+    protected AbstractRowMemoryLogRecordsBuilder(
             long baseLogOffset,
             int schemaId,
             int writeLimit,
@@ -87,64 +81,44 @@ public class MemoryLogRecordsIndexedBuilder implements 
AutoCloseable {
         this.currentRecordNumber = 0;
         this.isClosed = false;
 
-        // We don't need to write header information while the builder 
creating,
-        // we'll skip it first.
-        this.pagedOutputView.setPosition(recordBatchHeaderSize(magic));
-        this.sizeInBytes = recordBatchHeaderSize(magic);
+        // Skip header initially; will be written in build()
+        int headerSize = recordBatchHeaderSize(magic);
+        this.pagedOutputView.setPosition(headerSize);
+        this.sizeInBytes = headerSize;
     }
 
-    public static MemoryLogRecordsIndexedBuilder builder(
-            int schemaId, int writeLimit, AbstractPagedOutputView outputView, 
boolean appendOnly) {
-        return new MemoryLogRecordsIndexedBuilder(
-                BUILDER_DEFAULT_OFFSET,
-                schemaId,
-                writeLimit,
-                CURRENT_LOG_MAGIC_VALUE,
-                outputView,
-                appendOnly);
-    }
+    /** Implement to return size of the record (including length field). */
+    protected abstract int sizeOf(T row);
 
-    @VisibleForTesting
-    public static MemoryLogRecordsIndexedBuilder builder(
-            long baseLogOffset,
-            int schemaId,
-            int writeLimit,
-            byte magic,
-            AbstractPagedOutputView outputView)
-            throws IOException {
-        return new MemoryLogRecordsIndexedBuilder(
-                baseLogOffset, schemaId, writeLimit, magic, outputView, false);
-    }
+    /** Implement to write the record and return total written bytes including 
length field. */
+    protected abstract int writeRecord(ChangeType changeType, T row) throws 
IOException;
 
-    /**
-     * Check if we have room for a new record containing the given row. If no 
records have been
-     * appended, then this returns true.
-     */
-    public boolean hasRoomFor(IndexedRow row) {
-        return sizeInBytes + IndexedLogRecord.sizeOf(row) <= writeLimit;
+    public boolean hasRoomFor(T row) {
+        return sizeInBytes + sizeOf(row) <= writeLimit;
     }
 
-    public void append(ChangeType changeType, IndexedRow row) throws Exception 
{
+    public void append(ChangeType changeType, T row) throws Exception {
         appendRecord(changeType, row);
     }
 
-    private void appendRecord(ChangeType changeType, IndexedRow row) throws 
IOException {
+    private void appendRecord(ChangeType changeType, T row) throws IOException 
{
         if (aborted) {
             throw new IllegalStateException(
-                    "Tried to append a record, but 
MemoryLogRecordsIndexedBuilder has already been aborted");
+                    "Tried to append a record, but "
+                            + getClass().getSimpleName()
+                            + " has already been aborted");
         }
-
         if (isClosed) {
             throw new IllegalStateException(
                     "Tried to append a record, but MemoryLogRecordsBuilder is 
closed for record appends");
         }
         if (appendOnly && changeType != ChangeType.APPEND_ONLY) {
             throw new IllegalArgumentException(
-                    "Only append-only change type is allowed for append-only 
arrow log builder, but got "
+                    "Only append-only change type is allowed for append-only 
row log builder, but got "
                             + changeType);
         }
 
-        int recordByteSizes = IndexedLogRecord.writeTo(pagedOutputView, 
changeType, row);
+        int recordByteSizes = writeRecord(changeType, row);
         currentRecordNumber++;
         sizeInBytes += recordByteSizes;
     }
@@ -153,11 +127,9 @@ public class MemoryLogRecordsIndexedBuilder implements 
AutoCloseable {
         if (aborted) {
             throw new IllegalStateException("Attempting to build an aborted 
record batch");
         }
-
         if (builtBuffer != null) {
             return builtBuffer;
         }
-
         writeBatchHeader();
         builtBuffer =
                 MultiBytesView.builder()
@@ -198,9 +170,10 @@ public class MemoryLogRecordsIndexedBuilder implements 
AutoCloseable {
     public void close() throws IOException {
         if (aborted) {
             throw new IllegalStateException(
-                    "Cannot close MemoryLogRecordsIndexedBuilder as it has 
already been aborted");
+                    "Cannot close "
+                            + getClass().getSimpleName()
+                            + " as it has already been aborted");
         }
-
         isClosed = true;
     }
 
@@ -238,7 +211,7 @@ public class MemoryLogRecordsIndexedBuilder implements 
AutoCloseable {
         if (currentRecordNumber > 0) {
             outputView.writeInt(currentRecordNumber - 1);
         } else {
-            // If there is no record, we write 0 for filed lastOffsetDelta, 
see the comments about
+            // If there is no record, we write 0 for field lastOffsetDelta, 
see the comments about
             // the field 'lastOffsetDelta' in DefaultLogRecordBatch.
             outputView.writeInt(0);
         }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/IndexedLogRecord.java 
b/fluss-common/src/main/java/org/apache/fluss/record/CompactedLogRecord.java
similarity index 60%
copy from 
fluss-common/src/main/java/org/apache/fluss/record/IndexedLogRecord.java
copy to 
fluss-common/src/main/java/org/apache/fluss/record/CompactedLogRecord.java
index 88bc6a3c7..fb7f2f314 100644
--- a/fluss-common/src/main/java/org/apache/fluss/record/IndexedLogRecord.java
+++ b/fluss-common/src/main/java/org/apache/fluss/record/CompactedLogRecord.java
@@ -23,8 +23,9 @@ import org.apache.fluss.memory.OutputView;
 import org.apache.fluss.metadata.LogFormat;
 import org.apache.fluss.row.BinaryRow;
 import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.row.indexed.IndexedRow;
-import org.apache.fluss.row.indexed.IndexedRowWriter;
+import org.apache.fluss.row.compacted.CompactedRow;
+import org.apache.fluss.row.compacted.CompactedRowDeserializer;
+import org.apache.fluss.row.compacted.CompactedRowWriter;
 import org.apache.fluss.types.DataType;
 import org.apache.fluss.utils.MurmurHashUtils;
 
@@ -32,30 +33,30 @@ import java.io.IOException;
 
 import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_LENGTH;
 
-/* This file is based on source code of Apache Kafka Project 
(https://kafka.apache.org/), licensed by the Apache
- * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
- * additional information regarding copyright ownership. */
-
 /**
- * This class is an immutable log record and can be directly persisted. The 
schema is as follows:
+ * An immutable log record for {@link CompactedRow} which can be directly 
persisted. The on-wire
+ * schema is identical to IndexedLogRecord but the row payload uses the 
CompactedRow binary format:
  *
  * <ul>
- *   <li>Length => int32
- *   <li>Attributes => Int8
- *   <li>Value => {@link InternalRow}
+ *   <li>Length => int32 (total number of bytes following this length field)
+ *   <li>Attributes => int8 (low 4 bits encode {@link ChangeType})
+ *   <li>Value => {@link CompactedRow} (bytes in compacted row format)
  * </ul>
  *
- * <p>The current record attributes are depicted below:
- *
- * <p>----------- | ChangeType (0-3) | Unused (4-7) |---------------
+ * <p>Differences vs {@link IndexedLogRecord}: - Uses CompactedRow encoding 
which is space-optimized
+ * (VLQ for ints/longs, per-row null bitset) and trades CPU for smaller 
storage; random access to
+ * fields is not supported without decoding. - Deserialization is lazy: we 
wrap the underlying bytes
+ * in a CompactedRow with a {@link CompactedRowDeserializer} and only decode 
to object values when a
+ * field is accessed. - The record header (Length + Attributes) layout and 
attribute semantics are
+ * the same.
  *
- * <p>The offset compute the difference relative to the base offset and of the 
batch that this
- * record is contained in.
+ * <p>The offset computes the difference relative to the base offset of the 
batch containing this
+ * record.
  *
- * @since 0.1
+ * @since 0.8
  */
 @PublicEvolving
-public class IndexedLogRecord implements LogRecord {
+public class CompactedLogRecord implements LogRecord {
 
     private static final int ATTRIBUTES_LENGTH = 1;
 
@@ -67,13 +68,13 @@ public class IndexedLogRecord implements LogRecord {
     private int offset;
     private int sizeInBytes;
 
-    IndexedLogRecord(long logOffset, long timestamp, DataType[] fieldTypes) {
+    CompactedLogRecord(long logOffset, long timestamp, DataType[] fieldTypes) {
         this.logOffset = logOffset;
-        this.fieldTypes = fieldTypes;
         this.timestamp = timestamp;
+        this.fieldTypes = fieldTypes;
     }
 
-    void pointTo(MemorySegment segment, int offset, int sizeInBytes) {
+    private void pointTo(MemorySegment segment, int offset, int sizeInBytes) {
         this.segment = segment;
         this.offset = offset;
         this.sizeInBytes = sizeInBytes;
@@ -88,12 +89,10 @@ public class IndexedLogRecord implements LogRecord {
         if (this == o) {
             return true;
         }
-
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-
-        IndexedLogRecord that = (IndexedLogRecord) o;
+        CompactedLogRecord that = (CompactedLogRecord) o;
         return sizeInBytes == that.sizeInBytes
                 && segment.equalTo(that.segment, offset, that.offset, 
sizeInBytes);
     }
@@ -122,41 +121,35 @@ public class IndexedLogRecord implements LogRecord {
     @Override
     public InternalRow getRow() {
         int rowOffset = LENGTH_LENGTH + ATTRIBUTES_LENGTH;
-        // TODO currently, we only support indexed row.
-        return deserializeInternalRow(
+        return LogRecord.deserializeInternalRow(
                 sizeInBytes - rowOffset,
                 segment,
                 offset + rowOffset,
                 fieldTypes,
-                LogFormat.INDEXED);
+                LogFormat.COMPACTED);
     }
 
-    /** Write the record to input `target` and return its size. */
-    public static int writeTo(OutputView outputView, ChangeType changeType, 
IndexedRow row)
+    /** Write the record to output and return total bytes written including 
length field. */
+    public static int writeTo(OutputView outputView, ChangeType changeType, 
CompactedRow row)
             throws IOException {
         int sizeInBytes = calculateSizeInBytes(row);
-
-        // TODO using varint instead int to reduce storage size.
-        // write record total bytes size.
+        // write record total bytes size (excluding this int itself)
         outputView.writeInt(sizeInBytes);
-
-        // write attributes.
+        // write attributes
         outputView.writeByte(changeType.toByteValue());
-
-        // write internal row.
-        serializeInternalRow(outputView, row);
-
+        // write row payload
+        CompactedRowWriter.serializeCompactedRow(row, outputView);
         return sizeInBytes + LENGTH_LENGTH;
     }
 
-    public static IndexedLogRecord readFrom(
+    public static CompactedLogRecord readFrom(
             MemorySegment segment,
             int position,
             long logOffset,
             long logTimestamp,
             DataType[] colTypes) {
         int sizeInBytes = segment.getInt(position);
-        IndexedLogRecord logRecord = new IndexedLogRecord(logOffset, 
logTimestamp, colTypes);
+        CompactedLogRecord logRecord = new CompactedLogRecord(logOffset, 
logTimestamp, colTypes);
         logRecord.pointTo(segment, position, sizeInBytes + LENGTH_LENGTH);
         return logRecord;
     }
@@ -167,29 +160,8 @@ public class IndexedLogRecord implements LogRecord {
     }
 
     private static int calculateSizeInBytes(BinaryRow row) {
-        int size = 1; // always one byte for attributes
+        int size = 1; // one byte for attributes
         size += row.getSizeInBytes();
         return size;
     }
-
-    private static void serializeInternalRow(OutputView outputView, IndexedRow 
row)
-            throws IOException {
-        IndexedRowWriter.serializeIndexedRow(row, outputView);
-    }
-
-    private static InternalRow deserializeInternalRow(
-            int length,
-            MemorySegment segment,
-            int position,
-            DataType[] fieldTypes,
-            LogFormat logFormat) {
-        if (logFormat == LogFormat.INDEXED) {
-            IndexedRow indexedRow = new IndexedRow(fieldTypes);
-            indexedRow.pointTo(segment, position, length);
-            return indexedRow;
-        } else {
-            throw new IllegalArgumentException(
-                    "No such internal row deserializer for: " + logFormat);
-        }
-    }
 }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java 
b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
index 88cba5497..188a62bbd 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
@@ -233,6 +233,8 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
             case INDEXED:
                 return rowRecordIterator(
                         rowType, context.getOutputProjectedRow(schemaId), 
timestamp);
+            case COMPACTED:
+                return compactedRowRecordIterator(rowType, timestamp);
             default:
                 throw new IllegalArgumentException("Unsupported log format: " 
+ logFormat);
         }
@@ -295,6 +297,33 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
         };
     }
 
+    private CloseableIterator<LogRecord> compactedRowRecordIterator(
+            RowType rowType, long timestamp) {
+        DataType[] fieldTypes = rowType.getChildren().toArray(new DataType[0]);
+        return new LogRecordIterator() {
+            int position = DefaultLogRecordBatch.this.position + 
recordBatchHeaderSize(magic);
+            int rowId = 0;
+
+            @Override
+            protected LogRecord readNext(long baseOffset) {
+                CompactedLogRecord logRecord =
+                        CompactedLogRecord.readFrom(
+                                segment, position, baseOffset + rowId, 
timestamp, fieldTypes);
+                rowId++;
+                position += logRecord.getSizeInBytes();
+                return logRecord;
+            }
+
+            @Override
+            protected boolean ensureNoneRemaining() {
+                return true;
+            }
+
+            @Override
+            public void close() {}
+        };
+    }
+
     private CloseableIterator<LogRecord> columnRecordIterator(
             RowType rowType,
             @Nullable ProjectedRow outputProjection,
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/IndexedLogRecord.java 
b/fluss-common/src/main/java/org/apache/fluss/record/IndexedLogRecord.java
index 88bc6a3c7..cf2abd90d 100644
--- a/fluss-common/src/main/java/org/apache/fluss/record/IndexedLogRecord.java
+++ b/fluss-common/src/main/java/org/apache/fluss/record/IndexedLogRecord.java
@@ -122,8 +122,7 @@ public class IndexedLogRecord implements LogRecord {
     @Override
     public InternalRow getRow() {
         int rowOffset = LENGTH_LENGTH + ATTRIBUTES_LENGTH;
-        // TODO currently, we only support indexed row.
-        return deserializeInternalRow(
+        return LogRecord.deserializeInternalRow(
                 sizeInBytes - rowOffset,
                 segment,
                 offset + rowOffset,
@@ -176,20 +175,4 @@ public class IndexedLogRecord implements LogRecord {
             throws IOException {
         IndexedRowWriter.serializeIndexedRow(row, outputView);
     }
-
-    private static InternalRow deserializeInternalRow(
-            int length,
-            MemorySegment segment,
-            int position,
-            DataType[] fieldTypes,
-            LogFormat logFormat) {
-        if (logFormat == LogFormat.INDEXED) {
-            IndexedRow indexedRow = new IndexedRow(fieldTypes);
-            indexedRow.pointTo(segment, position, length);
-            return indexedRow;
-        } else {
-            throw new IllegalArgumentException(
-                    "No such internal row deserializer for: " + logFormat);
-        }
-    }
 }
diff --git a/fluss-common/src/main/java/org/apache/fluss/record/LogRecord.java 
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecord.java
index ceef8e6ed..34df26ebb 100644
--- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecord.java
+++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecord.java
@@ -18,7 +18,13 @@
 package org.apache.fluss.record;
 
 import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.memory.MemorySegment;
+import org.apache.fluss.metadata.LogFormat;
 import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.compacted.CompactedRow;
+import org.apache.fluss.row.compacted.CompactedRowDeserializer;
+import org.apache.fluss.row.indexed.IndexedRow;
+import org.apache.fluss.types.DataType;
 
 /**
  * A log record is a tuple consisting of a unique offset in the log, a 
changeType and a row.
@@ -55,4 +61,26 @@ public interface LogRecord {
      * @return the log record's row
      */
     InternalRow getRow();
+
+    /** Deserialize the row in the log record according to given log format. */
+    static InternalRow deserializeInternalRow(
+            int length,
+            MemorySegment segment,
+            int position,
+            DataType[] fieldTypes,
+            LogFormat logFormat) {
+        if (logFormat == LogFormat.INDEXED) {
+            IndexedRow indexedRow = new IndexedRow(fieldTypes);
+            indexedRow.pointTo(segment, position, length);
+            return indexedRow;
+        } else if (logFormat == LogFormat.COMPACTED) {
+            CompactedRow compactedRow =
+                    new CompactedRow(fieldTypes.length, new 
CompactedRowDeserializer(fieldTypes));
+            compactedRow.pointTo(segment, position, length);
+            return compactedRow;
+        } else {
+            throw new IllegalArgumentException(
+                    "No such internal row deserializer for: " + logFormat);
+        }
+    }
 }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java 
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
index 7289d2d67..6d3ee99af 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
@@ -103,6 +103,9 @@ public class LogRecordReadContext implements 
LogRecordBatch.ReadContext, AutoClo
         } else if (logFormat == LogFormat.INDEXED) {
             int[] selectedFields = projection.getProjectionPositions();
             return createIndexedReadContext(rowType, schemaId, selectedFields, 
schemaGetter);
+        } else if (logFormat == LogFormat.COMPACTED) {
+            int[] selectedFields = projection.getProjectionPositions();
+            return createCompactedRowReadContext(rowType, schemaId, 
selectedFields);
         } else {
             throw new IllegalArgumentException("Unsupported log format: " + 
logFormat);
         }
@@ -156,6 +159,13 @@ public class LogRecordReadContext implements 
LogRecordBatch.ReadContext, AutoClo
         return createIndexedReadContext(rowType, schemaId, selectedFields, 
schemaGetter);
     }
 
+    /** Creates a LogRecordReadContext for COMPACTED log format. */
+    public static LogRecordReadContext createCompactedRowReadContext(
+            RowType rowType, int schemaId) {
+        int[] selectedFields = IntStream.range(0, 
rowType.getFieldCount()).toArray();
+        return createCompactedRowReadContext(rowType, schemaId, 
selectedFields);
+    }
+
     /**
      * Creates a LogRecordReadContext for INDEXED log format.
      *
@@ -172,6 +182,21 @@ public class LogRecordReadContext implements 
LogRecordBatch.ReadContext, AutoClo
                 LogFormat.INDEXED, rowType, schemaId, null, fieldGetters, 
false, schemaGetter);
     }
 
+    /**
+     * Creates a LogRecordReadContext for COMPACTED log format.
+     *
+     * @param rowType the schema of the read data
+     * @param schemaId the schemaId of the table
+     * @param selectedFields the final selected fields of the read data
+     */
+    public static LogRecordReadContext createCompactedRowReadContext(
+            RowType rowType, int schemaId, int[] selectedFields) {
+        FieldGetter[] fieldGetters = buildProjectedFieldGetters(rowType, 
selectedFields);
+        // for COMPACTED log format, the projection is NEVER push downed to 
the server side
+        return new LogRecordReadContext(
+                LogFormat.COMPACTED, rowType, schemaId, null, fieldGetters, 
false, null);
+    }
+
     private LogRecordReadContext(
             LogFormat logFormat,
             RowType targetDataRowType,
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java
 
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java
index 94b2fe59f..6624f14bd 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java
@@ -279,7 +279,7 @@ public class MemoryLogRecordsArrowBuilder implements 
AutoCloseable {
         if (recordCount > 0) {
             outputView.writeInt(recordCount - 1);
         } else {
-            // If there is no record, we write 0 for filed lastOffsetDelta, 
see the comments about
+            // If there is no record, we write 0 for field lastOffsetDelta, 
see the comments about
             // the field 'lastOffsetDelta' in DefaultLogRecordBatch.
             outputView.writeInt(0);
         }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilder.java
 
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilder.java
new file mode 100644
index 000000000..e23aa5cc3
--- /dev/null
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilder.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.record;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.memory.AbstractPagedOutputView;
+import org.apache.fluss.metadata.LogFormat;
+import org.apache.fluss.row.compacted.CompactedRow;
+
+import java.io.IOException;
+
+import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
+
+/**
+ * Default builder for {@link MemoryLogRecords} of log records in {@link 
LogFormat#COMPACTED}
+ * format.
+ */
+public class MemoryLogRecordsCompactedBuilder
+        extends AbstractRowMemoryLogRecordsBuilder<CompactedRow> {
+
+    private MemoryLogRecordsCompactedBuilder(
+            long baseLogOffset,
+            int schemaId,
+            int writeLimit,
+            byte magic,
+            AbstractPagedOutputView pagedOutputView,
+            boolean appendOnly) {
+        super(baseLogOffset, schemaId, writeLimit, magic, pagedOutputView, 
appendOnly);
+    }
+
+    public static MemoryLogRecordsCompactedBuilder builder(
+            int schemaId, int writeLimit, AbstractPagedOutputView outputView, 
boolean appendOnly) {
+        return new MemoryLogRecordsCompactedBuilder(
+                BUILDER_DEFAULT_OFFSET,
+                schemaId,
+                writeLimit,
+                CURRENT_LOG_MAGIC_VALUE,
+                outputView,
+                appendOnly);
+    }
+
+    @VisibleForTesting
+    public static MemoryLogRecordsCompactedBuilder builder(
+            long baseLogOffset,
+            int schemaId,
+            int writeLimit,
+            byte magic,
+            AbstractPagedOutputView outputView)
+            throws IOException {
+        return new MemoryLogRecordsCompactedBuilder(
+                baseLogOffset, schemaId, writeLimit, magic, outputView, false);
+    }
+
+    @Override
+    protected int sizeOf(CompactedRow row) {
+        return CompactedLogRecord.sizeOf(row);
+    }
+
+    @Override
+    protected int writeRecord(ChangeType changeType, CompactedRow row) throws 
IOException {
+        return CompactedLogRecord.writeTo(pagedOutputView, changeType, row);
+    }
+}
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
 
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
index 8488c3351..a713c34ee 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
@@ -19,51 +19,17 @@ package org.apache.fluss.record;
 
 import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.memory.AbstractPagedOutputView;
-import org.apache.fluss.memory.MemorySegment;
-import org.apache.fluss.memory.MemorySegmentOutputView;
-import org.apache.fluss.metadata.LogFormat;
-import org.apache.fluss.record.bytesview.BytesView;
-import org.apache.fluss.record.bytesview.MultiBytesView;
 import org.apache.fluss.row.indexed.IndexedRow;
-import org.apache.fluss.utils.crc.Crc32C;
 
 import java.io.IOException;
 
 import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
-import static org.apache.fluss.record.LogRecordBatchFormat.BASE_OFFSET_LENGTH;
-import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_LENGTH;
-import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
-import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
-import static org.apache.fluss.record.LogRecordBatchFormat.NO_LEADER_EPOCH;
-import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
-import static org.apache.fluss.record.LogRecordBatchFormat.crcOffset;
-import static 
org.apache.fluss.record.LogRecordBatchFormat.lastOffsetDeltaOffset;
-import static 
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
-import static org.apache.fluss.record.LogRecordBatchFormat.schemaIdOffset;
-import static org.apache.fluss.utils.Preconditions.checkArgument;
 
 /**
- * Default builder for {@link MemoryLogRecords} of log records in {@link 
LogFormat#INDEXED} format.
+ * Default builder for {@link MemoryLogRecords} of log records in {@link
+ * org.apache.fluss.metadata.LogFormat#INDEXED} format.
  */
-public class MemoryLogRecordsIndexedBuilder implements AutoCloseable {
-    private static final int BUILDER_DEFAULT_OFFSET = 0;
-
-    private final long baseLogOffset;
-    private final int schemaId;
-    // The max bytes can be appended.
-    private final int writeLimit;
-    private final byte magic;
-    private final AbstractPagedOutputView pagedOutputView;
-    private final MemorySegment firstSegment;
-    private final boolean appendOnly;
-
-    private BytesView builtBuffer = null;
-    private long writerId;
-    private int batchSequence;
-    private int currentRecordNumber;
-    private int sizeInBytes;
-    private volatile boolean isClosed;
-    private boolean aborted = false;
+public class MemoryLogRecordsIndexedBuilder extends 
AbstractRowMemoryLogRecordsBuilder<IndexedRow> {
 
     private MemoryLogRecordsIndexedBuilder(
             long baseLogOffset,
@@ -72,25 +38,7 @@ public class MemoryLogRecordsIndexedBuilder implements 
AutoCloseable {
             byte magic,
             AbstractPagedOutputView pagedOutputView,
             boolean appendOnly) {
-        this.appendOnly = appendOnly;
-        checkArgument(
-                schemaId <= Short.MAX_VALUE,
-                "schemaId shouldn't be greater than the max value of short: " 
+ Short.MAX_VALUE);
-        this.baseLogOffset = baseLogOffset;
-        this.schemaId = schemaId;
-        this.writeLimit = writeLimit;
-        this.magic = magic;
-        this.pagedOutputView = pagedOutputView;
-        this.firstSegment = pagedOutputView.getCurrentSegment();
-        this.writerId = NO_WRITER_ID;
-        this.batchSequence = NO_BATCH_SEQUENCE;
-        this.currentRecordNumber = 0;
-        this.isClosed = false;
-
-        // We don't need to write header information while the builder 
creating,
-        // we'll skip it first.
-        this.pagedOutputView.setPosition(recordBatchHeaderSize(magic));
-        this.sizeInBytes = recordBatchHeaderSize(magic);
+        super(baseLogOffset, schemaId, writeLimit, magic, pagedOutputView, 
appendOnly);
     }
 
     public static MemoryLogRecordsIndexedBuilder builder(
@@ -116,139 +64,13 @@ public class MemoryLogRecordsIndexedBuilder implements 
AutoCloseable {
                 baseLogOffset, schemaId, writeLimit, magic, outputView, false);
     }
 
-    /**
-     * Check if we have room for a new record containing the given row. If no 
records have been
-     * appended, then this returns true.
-     */
-    public boolean hasRoomFor(IndexedRow row) {
-        return sizeInBytes + IndexedLogRecord.sizeOf(row) <= writeLimit;
-    }
-
-    public void append(ChangeType changeType, IndexedRow row) throws Exception 
{
-        appendRecord(changeType, row);
-    }
-
-    private void appendRecord(ChangeType changeType, IndexedRow row) throws 
IOException {
-        if (aborted) {
-            throw new IllegalStateException(
-                    "Tried to append a record, but 
MemoryLogRecordsIndexedBuilder has already been aborted");
-        }
-
-        if (isClosed) {
-            throw new IllegalStateException(
-                    "Tried to append a record, but MemoryLogRecordsBuilder is 
closed for record appends");
-        }
-        if (appendOnly && changeType != ChangeType.APPEND_ONLY) {
-            throw new IllegalArgumentException(
-                    "Only append-only change type is allowed for append-only 
arrow log builder, but got "
-                            + changeType);
-        }
-
-        int recordByteSizes = IndexedLogRecord.writeTo(pagedOutputView, 
changeType, row);
-        currentRecordNumber++;
-        sizeInBytes += recordByteSizes;
-    }
-
-    public BytesView build() throws IOException {
-        if (aborted) {
-            throw new IllegalStateException("Attempting to build an aborted 
record batch");
-        }
-
-        if (builtBuffer != null) {
-            return builtBuffer;
-        }
-
-        writeBatchHeader();
-        builtBuffer =
-                MultiBytesView.builder()
-                        
.addMemorySegmentByteViewList(pagedOutputView.getWrittenSegments())
-                        .build();
-        return builtBuffer;
-    }
-
-    public void setWriterState(long writerId, int batchBaseSequence) {
-        this.writerId = writerId;
-        this.batchSequence = batchBaseSequence;
-    }
-
-    public void resetWriterState(long writerId, int batchSequence) {
-        // trigger to rewrite batch header
-        this.builtBuffer = null;
-        this.writerId = writerId;
-        this.batchSequence = batchSequence;
-    }
-
-    public long writerId() {
-        return writerId;
-    }
-
-    public int batchSequence() {
-        return batchSequence;
-    }
-
-    public boolean isClosed() {
-        return isClosed;
-    }
-
-    public void abort() {
-        aborted = true;
-    }
-
     @Override
-    public void close() throws IOException {
-        if (aborted) {
-            throw new IllegalStateException(
-                    "Cannot close MemoryLogRecordsIndexedBuilder as it has 
already been aborted");
-        }
-
-        isClosed = true;
-    }
-
-    public int getSizeInBytes() {
-        return sizeInBytes;
+    protected int sizeOf(IndexedRow row) {
+        return IndexedLogRecord.sizeOf(row);
     }
 
-    // ----------------------- internal methods -------------------------------
-    private void writeBatchHeader() throws IOException {
-        // pagedOutputView doesn't support seek to previous segment,
-        // so we create a new output view on the first segment
-        MemorySegmentOutputView outputView = new 
MemorySegmentOutputView(firstSegment);
-        outputView.setPosition(0);
-        // update header.
-        outputView.writeLong(baseLogOffset);
-        outputView.writeInt(sizeInBytes - BASE_OFFSET_LENGTH - LENGTH_LENGTH);
-        outputView.writeByte(magic);
-
-        // write empty timestamp which will be overridden on server side
-        outputView.writeLong(0);
-
-        // write empty leaderEpoch which will be overridden on server side
-        if (magic >= LOG_MAGIC_VALUE_V1) {
-            outputView.writeInt(NO_LEADER_EPOCH);
-        }
-
-        // write empty crc first.
-        outputView.writeUnsignedInt(0);
-
-        outputView.writeShort((short) schemaId);
-        // write attributes (currently only appendOnly flag)
-        outputView.writeBoolean(appendOnly);
-        // skip write attribute byte for now.
-        outputView.setPosition(lastOffsetDeltaOffset(magic));
-        if (currentRecordNumber > 0) {
-            outputView.writeInt(currentRecordNumber - 1);
-        } else {
-            // If there is no record, we write 0 for filed lastOffsetDelta, 
see the comments about
-            // the field 'lastOffsetDelta' in DefaultLogRecordBatch.
-            outputView.writeInt(0);
-        }
-        outputView.writeLong(writerId);
-        outputView.writeInt(batchSequence);
-        outputView.writeInt(currentRecordNumber);
-
-        // Update crc.
-        long crc = Crc32C.compute(pagedOutputView.getWrittenSegments(), 
schemaIdOffset(magic));
-        outputView.setPosition(crcOffset(magic));
-        outputView.writeUnsignedInt(crc);
+    @Override
+    protected int writeRecord(ChangeType changeType, IndexedRow row) throws 
IOException {
+        return IndexedLogRecord.writeTo(pagedOutputView, changeType, row);
     }
 }
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/record/CompactedLogRecordTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/record/CompactedLogRecordTest.java
new file mode 100644
index 000000000..cfec405f1
--- /dev/null
+++ 
b/fluss-common/src/test/java/org/apache/fluss/record/CompactedLogRecordTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.record;
+
+import org.apache.fluss.memory.MemorySegment;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.TestInternalRowGenerator;
+import org.apache.fluss.row.compacted.CompactedRow;
+import org.apache.fluss.row.compacted.CompactedRowDeserializer;
+import org.apache.fluss.row.compacted.CompactedRowWriter;
+import org.apache.fluss.types.DataType;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CompactedLogRecord}. */
+class CompactedLogRecordTest extends LogTestBase {
+
+    @Test
+    void testBase() throws IOException {
+        DataType[] fieldTypes = baseRowType.getChildren().toArray(new 
DataType[0]);
+
+        CompactedRowWriter writer = new CompactedRowWriter(fieldTypes.length);
+        // field 0: int 10
+        writer.writeInt(10);
+        // field 1: string "abc"
+        writer.writeString(BinaryString.fromString("abc"));
+        byte[] bytes = writer.toBytes();
+        CompactedRow row =
+                CompactedRow.from(fieldTypes, bytes, new 
CompactedRowDeserializer(fieldTypes));
+
+        CompactedLogRecord.writeTo(outputView, ChangeType.APPEND_ONLY, row);
+
+        CompactedLogRecord logRecord =
+                CompactedLogRecord.readFrom(
+                        MemorySegment.wrap(outputView.getCopyOfBuffer()),
+                        0,
+                        1000,
+                        10001,
+                        fieldTypes);
+
+        assertThat(logRecord.getSizeInBytes()).isEqualTo(1 + 
row.getSizeInBytes() + 4);
+        assertThat(logRecord.logOffset()).isEqualTo(1000);
+        assertThat(logRecord.timestamp()).isEqualTo(10001);
+        
assertThat(logRecord.getChangeType()).isEqualTo(ChangeType.APPEND_ONLY);
+        assertThat(logRecord.getRow()).isEqualTo(row);
+    }
+
+    @Test
+    void testWriteToAndReadFromWithRandomData() throws IOException {
+        // generate a compacted row for all supported types
+        DataType[] allColTypes =
+                
TestInternalRowGenerator.createAllRowType().getChildren().toArray(new 
DataType[0]);
+        CompactedRow row = 
TestInternalRowGenerator.genCompactedRowForAllType();
+
+        CompactedLogRecord.writeTo(outputView, ChangeType.APPEND_ONLY, row);
+
+        LogRecord logRecord =
+                CompactedLogRecord.readFrom(
+                        MemorySegment.wrap(outputView.getCopyOfBuffer()),
+                        0,
+                        1000,
+                        10001,
+                        allColTypes);
+
+        assertThat(logRecord.logOffset()).isEqualTo(1000);
+        assertThat(logRecord.timestamp()).isEqualTo(10001);
+        
assertThat(logRecord.getChangeType()).isEqualTo(ChangeType.APPEND_ONLY);
+        assertThat(logRecord.getRow()).isEqualTo(row);
+    }
+}
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilderTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilderTest.java
new file mode 100644
index 000000000..2a8d70c80
--- /dev/null
+++ 
b/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilderTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.record;
+
+import org.apache.fluss.memory.ManagedPagedOutputView;
+import org.apache.fluss.memory.TestingMemorySegmentPool;
+import org.apache.fluss.row.compacted.CompactedRow;
+import org.apache.fluss.utils.CloseableIterator;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.fluss.record.ChangeType.APPEND_ONLY;
+import static org.apache.fluss.record.TestData.BASE_OFFSET;
+import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
+import static org.apache.fluss.record.TestData.DEFAULT_MAGIC;
+import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
+import static org.apache.fluss.testutils.DataTestUtils.compactedRow;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link MemoryLogRecordsCompactedBuilder}. */
+class MemoryLogRecordsCompactedBuilderTest {
+
+    @Test
+    void testAppendAndBuild() throws Exception {
+        MemoryLogRecordsCompactedBuilder builder = createBuilder(0, 4, 1024);
+
+        List<CompactedRow> expected = new ArrayList<>();
+        expected.add(compactedRow(DATA1_ROW_TYPE, new Object[] {1, "a"}));
+        expected.add(compactedRow(DATA1_ROW_TYPE, new Object[] {2, "b"}));
+        expected.add(compactedRow(DATA1_ROW_TYPE, new Object[] {3, "c"}));
+
+        for (CompactedRow row : expected) {
+            assertThat(builder.hasRoomFor(row)).isTrue();
+            builder.append(APPEND_ONLY, row);
+        }
+
+        builder.setWriterState(7L, 13);
+        builder.close();
+        MemoryLogRecords records = 
MemoryLogRecords.pointToBytesView(builder.build());
+
+        Iterator<LogRecordBatch> it = records.batches().iterator();
+        assertThat(it.hasNext()).isTrue();
+        LogRecordBatch batch = it.next();
+        assertThat(it.hasNext()).isFalse();
+
+        assertThat(batch.getRecordCount()).isEqualTo(expected.size());
+        assertThat(batch.baseLogOffset()).isEqualTo(0);
+        assertThat(batch.writerId()).isEqualTo(7L);
+        assertThat(batch.batchSequence()).isEqualTo(13);
+
+        try (LogRecordReadContext ctx =
+                        LogRecordReadContext.createCompactedRowReadContext(
+                                DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID);
+                CloseableIterator<LogRecord> recIt = batch.records(ctx)) {
+            for (CompactedRow expRow : expected) {
+                assertThat(recIt.hasNext()).isTrue();
+                LogRecord rec = recIt.next();
+                assertThat(rec.getChangeType()).isEqualTo(APPEND_ONLY);
+                assertThat(rec.getRow()).isEqualTo(expRow);
+            }
+            assertThat(recIt.hasNext()).isFalse();
+        }
+    }
+
+    @Test
+    void testAbortSemantics() throws Exception {
+        MemoryLogRecordsCompactedBuilder builder = createBuilder(0, 2, 512);
+        builder.append(APPEND_ONLY, compactedRow(DATA1_ROW_TYPE, new Object[] 
{1, "a"}));
+        builder.abort();
+
+        // append after abort
+        assertThatThrownBy(
+                        () ->
+                                builder.append(
+                                        APPEND_ONLY,
+                                        compactedRow(DATA1_ROW_TYPE, new 
Object[] {2, "b"})))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("MemoryLogRecordsCompactedBuilder has 
already been aborted");
+
+        // build after abort
+        assertThatThrownBy(builder::build)
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("Attempting to build an aborted record 
batch");
+
+        // close after abort
+        assertThatThrownBy(builder::close)
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining(
+                        "Cannot close MemoryLogRecordsCompactedBuilder as it 
has already been aborted");
+    }
+
+    @Test
+    void testNoRecordAppendAndBaseOffset() throws Exception {
+        // base offset 0
+        try (MemoryLogRecordsCompactedBuilder builder = createBuilder(0, 1, 
1024)) {
+            MemoryLogRecords records = 
MemoryLogRecords.pointToBytesView(builder.build());
+            assertThat(records.sizeInBytes())
+                    .isEqualTo(
+                            LogRecordBatchFormat.recordBatchHeaderSize(
+                                    DEFAULT_MAGIC)); // only batch header
+            LogRecordBatch batch = records.batches().iterator().next();
+            batch.ensureValid();
+            assertThat(batch.getRecordCount()).isEqualTo(0);
+            assertThat(batch.baseLogOffset()).isEqualTo(0);
+            assertThat(batch.lastLogOffset()).isEqualTo(0);
+            assertThat(batch.nextLogOffset()).isEqualTo(1);
+            try (LogRecordReadContext ctx =
+                            LogRecordReadContext.createCompactedRowReadContext(
+                                    DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID);
+                    CloseableIterator<LogRecord> it = batch.records(ctx)) {
+                assertThat(it.hasNext()).isFalse();
+            }
+        }
+
+        // base offset 100
+        try (MemoryLogRecordsCompactedBuilder builder = createBuilder(100, 1, 
1024)) {
+            MemoryLogRecords records = 
MemoryLogRecords.pointToBytesView(builder.build());
+            assertThat(records.sizeInBytes())
+                    
.isEqualTo(LogRecordBatchFormat.recordBatchHeaderSize(DEFAULT_MAGIC));
+            LogRecordBatch batch = records.batches().iterator().next();
+            batch.ensureValid();
+            assertThat(batch.getRecordCount()).isEqualTo(0);
+            assertThat(batch.baseLogOffset()).isEqualTo(100);
+            assertThat(batch.lastLogOffset()).isEqualTo(100);
+            assertThat(batch.nextLogOffset()).isEqualTo(101);
+            try (LogRecordReadContext ctx =
+                            LogRecordReadContext.createCompactedRowReadContext(
+                                    DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID);
+                    CloseableIterator<LogRecord> it = batch.records(ctx)) {
+                assertThat(it.hasNext()).isFalse();
+            }
+        }
+    }
+
+    @Test
+    void testResetWriterState() throws Exception {
+        MemoryLogRecordsCompactedBuilder builder = createBuilder(BASE_OFFSET, 
2, 1024);
+        List<CompactedRow> expected = new ArrayList<>();
+        expected.add(compactedRow(DATA1_ROW_TYPE, new Object[] {1, "a"}));
+        expected.add(compactedRow(DATA1_ROW_TYPE, new Object[] {2, "b"}));
+        for (CompactedRow row : expected) {
+            builder.append(APPEND_ONLY, row);
+        }
+        builder.setWriterState(5L, 0);
+        builder.close();
+        MemoryLogRecords records = 
MemoryLogRecords.pointToBytesView(builder.build());
+        LogRecordBatch batch = records.batches().iterator().next();
+        assertThat(batch.writerId()).isEqualTo(5L);
+        assertThat(batch.batchSequence()).isEqualTo(0);
+
+        // reset writer state and rebuild with new sequence
+        builder.resetWriterState(5L, 1);
+        records = MemoryLogRecords.pointToBytesView(builder.build());
+        batch = records.batches().iterator().next();
+        assertThat(batch.writerId()).isEqualTo(5L);
+        assertThat(batch.batchSequence()).isEqualTo(1);
+    }
+
+    private MemoryLogRecordsCompactedBuilder createBuilder(
+            long baseOffset, int maxPages, int pageSizeInBytes) throws 
IOException {
+        return MemoryLogRecordsCompactedBuilder.builder(
+                baseOffset,
+                DEFAULT_SCHEMA_ID,
+                maxPages * pageSizeInBytes,
+                DEFAULT_MAGIC,
+                new ManagedPagedOutputView(new 
TestingMemorySegmentPool(pageSizeInBytes)));
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java
index 96bb0eba8..29cbfe5e1 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java
@@ -19,6 +19,7 @@ package org.apache.fluss.server.kv;
 
 import org.apache.fluss.metadata.DataLakeFormat;
 import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.LogFormat;
 import org.apache.fluss.metadata.SchemaGetter;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableInfo;
@@ -55,6 +56,7 @@ public class KvRecoverHelper {
     private final long recoverPointOffset;
     private final KvRecoverContext recoverContext;
     private final KvFormat kvFormat;
+    private final LogFormat logFormat;
 
     // will be initialized when first encounter a log record during recovering 
from log
     private Integer currentSchemaId;
@@ -72,12 +74,14 @@ public class KvRecoverHelper {
             long recoverPointOffset,
             KvRecoverContext recoverContext,
             KvFormat kvFormat,
+            LogFormat logFormat,
             SchemaGetter schemaGetter) {
         this.kvTablet = kvTablet;
         this.logTablet = logTablet;
         this.recoverPointOffset = recoverPointOffset;
         this.recoverContext = recoverContext;
         this.kvFormat = kvFormat;
+        this.logFormat = logFormat;
         this.schemaGetter = schemaGetter;
     }
 
@@ -127,9 +131,7 @@ public class KvRecoverHelper {
             FetchIsolation fetchIsolation,
             ThrowingConsumer<KeyValueAndLogOffset, Exception> 
resumeRecordConsumer)
             throws Exception {
-        try (LogRecordReadContext readContext =
-                LogRecordReadContext.createArrowReadContext(
-                        currentRowType, currentSchemaId, schemaGetter)) {
+        try (LogRecordReadContext readContext = createLogRecordReadContext()) {
             long nextFetchOffset = startFetchOffset;
             while (true) {
                 LogRecords logRecords =
@@ -175,6 +177,18 @@ public class KvRecoverHelper {
         }
     }
 
+    private LogRecordReadContext createLogRecordReadContext() {
+        if (logFormat == LogFormat.ARROW) {
+            return LogRecordReadContext.createArrowReadContext(
+                    currentRowType, currentSchemaId, schemaGetter);
+        } else if (logFormat == LogFormat.COMPACTED) {
+            return LogRecordReadContext.createCompactedRowReadContext(
+                    currentRowType, currentSchemaId);
+        } else {
+            throw new UnsupportedOperationException("Unsupported log format: " 
+ logFormat);
+        }
+    }
+
     // TODO: this is very in-efficient, because the conversion is CPU heavy. 
Should be optimized in
     //  the future.
     private BinaryRow toKvRow(InternalRow originalRow) {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
index b6957c5dc..e6542d8f4 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
@@ -54,6 +54,7 @@ import 
org.apache.fluss.server.kv.snapshot.KvFileHandleAndLocalPath;
 import org.apache.fluss.server.kv.snapshot.KvSnapshotDataUploader;
 import org.apache.fluss.server.kv.snapshot.RocksIncrementalSnapshot;
 import org.apache.fluss.server.kv.wal.ArrowWalBuilder;
+import org.apache.fluss.server.kv.wal.CompactedWalBuilder;
 import org.apache.fluss.server.kv.wal.IndexWalBuilder;
 import org.apache.fluss.server.kv.wal.WalBuilder;
 import org.apache.fluss.server.log.LogAppendInfo;
@@ -442,6 +443,8 @@ public final class KvTablet {
                             "Primary Key Table with COMPACTED kv format 
doesn't support INDEXED cdc log format.");
                 }
                 return new IndexWalBuilder(schemaId, memorySegmentPool);
+            case COMPACTED:
+                return new CompactedWalBuilder(schemaId, rowType, 
memorySegmentPool);
             case ARROW:
                 return new ArrowWalBuilder(
                         schemaId,
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/wal/CompactedWalBuilder.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/wal/CompactedWalBuilder.java
new file mode 100644
index 000000000..8c80a48d9
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/wal/CompactedWalBuilder.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.wal;
+
+import org.apache.fluss.memory.ManagedPagedOutputView;
+import org.apache.fluss.memory.MemorySegmentPool;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.record.MemoryLogRecords;
+import org.apache.fluss.record.MemoryLogRecordsCompactedBuilder;
+import org.apache.fluss.record.bytesview.BytesView;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.InternalRow.FieldGetter;
+import org.apache.fluss.row.compacted.CompactedRow;
+import org.apache.fluss.row.encode.CompactedRowEncoder;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.RowType;
+
+import java.io.IOException;
+
+/** A {@link WalBuilder} that builds a {@link MemoryLogRecords} with Compacted 
log format. */
+public class CompactedWalBuilder implements WalBuilder {
+
+    private final MemorySegmentPool memorySegmentPool;
+    private final ManagedPagedOutputView outputView;
+    private final MemoryLogRecordsCompactedBuilder recordsBuilder;
+
+    private final CompactedRowEncoder rowEncoder;
+    private final FieldGetter[] fieldGetters;
+    private final int fieldCount;
+
+    public CompactedWalBuilder(int schemaId, RowType rowType, 
MemorySegmentPool memorySegmentPool)
+            throws IOException {
+        this.memorySegmentPool = memorySegmentPool;
+        this.outputView = new ManagedPagedOutputView(memorySegmentPool);
+        // unlimited write size as we don't know the WAL size in advance
+        this.recordsBuilder =
+                MemoryLogRecordsCompactedBuilder.builder(
+                        schemaId, Integer.MAX_VALUE, outputView, 
/*appendOnly*/ false);
+        DataType[] fieldTypes = rowType.getChildren().toArray(new DataType[0]);
+        this.rowEncoder = new CompactedRowEncoder(fieldTypes);
+        this.fieldGetters = InternalRow.createFieldGetters(rowType);
+        this.fieldCount = rowType.getFieldCount();
+    }
+
+    @Override
+    public void append(ChangeType changeType, InternalRow row) throws 
Exception {
+        final CompactedRow compactedRow;
+        if (row instanceof CompactedRow) {
+            compactedRow = (CompactedRow) row;
+        } else {
+            rowEncoder.startNewRow();
+            for (int i = 0; i < fieldCount; i++) {
+                rowEncoder.encodeField(i, fieldGetters[i].getFieldOrNull(row));
+            }
+            compactedRow = rowEncoder.finishRow();
+        }
+        recordsBuilder.append(changeType, compactedRow);
+    }
+
+    @Override
+    public MemoryLogRecords build() throws Exception {
+        recordsBuilder.close();
+        BytesView bytesView = recordsBuilder.build();
+        // Convert BytesView to MemoryLogRecords (may copy if composite)
+        return 
MemoryLogRecords.pointToByteBuffer(bytesView.getByteBuf().nioBuffer());
+    }
+
+    @Override
+    public void setWriterState(long writerId, int batchSequence) {
+        recordsBuilder.setWriterState(writerId, batchSequence);
+    }
+
+    @Override
+    public void deallocate() {
+        memorySegmentPool.returnAll(outputView.allocatedPooledSegments());
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index 1a4b6d906..1669e004d 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -752,6 +752,7 @@ public final class Replica {
                             startRecoverLogOffset,
                             recoverContext,
                             tableConfig.getKvFormat(),
+                            tableConfig.getLogFormat(),
                             schemaGetter);
             kvRecoverHelper.recover();
         } catch (Exception e) {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
index 250fc30da..0941e32c1 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
@@ -221,9 +221,13 @@ public class TableDescriptorValidation {
     private static void checkLogFormat(Configuration tableConf, boolean 
hasPrimaryKey) {
         KvFormat kvFormat = tableConf.get(ConfigOptions.TABLE_KV_FORMAT);
         LogFormat logFormat = tableConf.get(ConfigOptions.TABLE_LOG_FORMAT);
-        if (hasPrimaryKey && kvFormat == KvFormat.COMPACTED && logFormat != 
LogFormat.ARROW) {
+
+        // Allow COMPACTED and ARROW log formats when KV format is COMPACTED 
for primary key tables
+        if (hasPrimaryKey
+                && kvFormat == KvFormat.COMPACTED
+                && !(logFormat == LogFormat.ARROW || logFormat == 
LogFormat.COMPACTED)) {
             throw new InvalidConfigException(
-                    "Currently, Primary Key Table only supports ARROW log 
format if kv format is COMPACTED.");
+                    "Currently, Primary Key Table supports ARROW or COMPACTED 
log format when kv format is COMPACTED.");
         }
     }
 
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/KvReplicaRestoreITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/KvReplicaRestoreITCase.java
index c6adbbbb1..932c17f23 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/KvReplicaRestoreITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/KvReplicaRestoreITCase.java
@@ -21,7 +21,9 @@ import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.config.MemorySize;
 import org.apache.fluss.exception.NotLeaderOrFollowerException;
+import org.apache.fluss.metadata.LogFormat;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.record.KvRecord;
 import org.apache.fluss.record.KvRecordBatch;
@@ -44,7 +46,7 @@ import java.util.List;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR_PK;
+import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK;
 import static 
org.apache.fluss.server.testutils.KvTestUtils.assertLookupResponse;
 import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.createTable;
 import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.newLookupRequest;
@@ -81,9 +83,14 @@ class KvReplicaRestoreITCase {
         int bucketNum = 3;
         List<TableBucket> tableBuckets = new ArrayList<>();
         for (int i = 0; i < tableNum; i++) {
+            TableDescriptor tableDescriptor =
+                    TableDescriptor.builder()
+                            .schema(DATA1_SCHEMA_PK)
+                            .distributedBy(3, "a")
+                            .logFormat(i % 2 == 0 ? LogFormat.ARROW : 
LogFormat.COMPACTED)
+                            .build();
             TablePath tablePath = TablePath.of("test_db", "test_table_" + i);
-            long tableId =
-                    createTable(FLUSS_CLUSTER_EXTENSION, tablePath, 
DATA1_TABLE_DESCRIPTOR_PK);
+            long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath, 
tableDescriptor);
             for (int bucket = 0; bucket < bucketNum; bucket++) {
                 tableBuckets.add(new TableBucket(tableId, bucket));
             }

Reply via email to