This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch ci-add-column
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/ci-add-column by this push:
new 8660164b0 KvTablet
8660164b0 is described below
commit 8660164b0a830f3edf01fb18c5abdb62a271a5ee
Author: Jark Wu <[email protected]>
AuthorDate: Mon Dec 1 17:33:44 2025 +0800
KvTablet
---
.../table/scanner/batch/SnapshotFilesReader.java | 3 +-
.../java/org/apache/fluss/record/BinaryValue.java | 67 +++++++++
.../org/apache/fluss/row/encode/ValueDecoder.java | 5 +-
.../java/org/apache/fluss/server/kv/KvTablet.java | 151 +++++++++------------
.../server/kv/partialupdate/PartialUpdater.java | 45 +++---
.../kv/partialupdate/PartialUpdaterCache.java | 9 +-
.../server/kv/rowmerger/DefaultRowMerger.java | 14 +-
.../server/kv/rowmerger/FirstRowRowMerger.java | 8 +-
.../fluss/server/kv/rowmerger/RowMerger.java | 24 ++--
.../server/kv/rowmerger/VersionedRowMerger.java | 7 +-
.../server/kv/rowmerger/DefaultRowMergerTest.java | 57 ++++----
.../kv/rowmerger/VersionedRowMergerTest.java | 41 +++---
.../fluss/server/tablet/TabletServiceITCase.java | 2 +-
13 files changed, 249 insertions(+), 184 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/SnapshotFilesReader.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/SnapshotFilesReader.java
index a7c5e2969..8a52814f3 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/SnapshotFilesReader.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/SnapshotFilesReader.java
@@ -22,6 +22,7 @@ import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.metadata.KvFormat;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.SchemaGetter;
+import org.apache.fluss.record.BinaryValue;
import org.apache.fluss.rocksdb.RocksDBHandle;
import org.apache.fluss.rocksdb.RocksIteratorWrapper;
import org.apache.fluss.row.InternalRow;
@@ -164,7 +165,7 @@ class SnapshotFilesReader implements
CloseableIterator<InternalRow> {
byte[] value = rocksIteratorWrapper.value();
rocksIteratorWrapper.next();
- ValueDecoder.Value originValue = valueDecoder.decodeValue(value);
+ BinaryValue originValue = valueDecoder.decodeValue(value);
InternalRow originRow = originValue.row;
if (targetSchemaId != originValue.schemaId) {
int[] indexMapping =
diff --git
a/fluss-common/src/main/java/org/apache/fluss/record/BinaryValue.java
b/fluss-common/src/main/java/org/apache/fluss/record/BinaryValue.java
new file mode 100644
index 000000000..d02bff81b
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/record/BinaryValue.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.record;
+
+import org.apache.fluss.row.BinaryRow;
+import org.apache.fluss.utils.UnsafeUtils;
+
+import java.util.Objects;
+
+/** A value of key-value pair that contains schema id and binary row. */
+public class BinaryValue {
+
+ public static final int SCHEMA_ID_LENGTH = 2;
+
+ public final short schemaId;
+ public final BinaryRow row;
+
+ public BinaryValue(short schemaId, BinaryRow row) {
+ this.schemaId = schemaId;
+ this.row = row;
+ }
+
+ /**
+ * Encode the value (consisted of {@code row} with a {@code schemaId}) to
a byte array value to
+ * be expected persisted to kv store.
+ */
+ public byte[] encodeValue() {
+ byte[] values = new byte[SCHEMA_ID_LENGTH + row.getSizeInBytes()];
+ UnsafeUtils.putShort(values, 0, schemaId);
+ row.copyTo(values, SCHEMA_ID_LENGTH);
+ return values;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BinaryValue that = (BinaryValue) o;
+ return schemaId == that.schemaId && Objects.equals(row, that.row);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(schemaId, row);
+ }
+
+ @Override
+ public String toString() {
+ return "BinaryValue{" + "schemaId=" + schemaId + ", row=" + row + '}';
+ }
+}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java
b/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java
index 5f90ddaef..c7ac792e4 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java
@@ -21,6 +21,7 @@ import org.apache.fluss.memory.MemorySegment;
import org.apache.fluss.metadata.KvFormat;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.SchemaGetter;
+import org.apache.fluss.record.BinaryValue;
import org.apache.fluss.row.BinaryRow;
import org.apache.fluss.row.decode.RowDecoder;
import org.apache.fluss.types.DataType;
@@ -48,7 +49,7 @@ public class ValueDecoder {
}
/** Decode the value bytes and return the schema id and the row encoded in
the value bytes. */
- public Value decodeValue(byte[] valueBytes) {
+ public BinaryValue decodeValue(byte[] valueBytes) {
MemorySegment memorySegment = MemorySegment.wrap(valueBytes);
short schemaId = memorySegment.getShort(0);
@@ -65,7 +66,7 @@ public class ValueDecoder {
BinaryRow row =
rowDecoder.decode(
memorySegment, SCHEMA_ID_LENGTH, valueBytes.length -
SCHEMA_ID_LENGTH);
- return new Value(schemaId, row);
+ return new BinaryValue(schemaId, row);
}
/** The schema id and {@link BinaryRow} stored as the value of kv store. */
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
index 70099878e..b6957c5dc 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
@@ -34,16 +34,16 @@ import org.apache.fluss.metadata.SchemaGetter;
import org.apache.fluss.metadata.SchemaInfo;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.BinaryValue;
import org.apache.fluss.record.ChangeType;
import org.apache.fluss.record.KvRecord;
import org.apache.fluss.record.KvRecordBatch;
import org.apache.fluss.record.KvRecordReadContext;
-import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.row.ProjectedRow;
+import org.apache.fluss.row.BinaryRow;
+import org.apache.fluss.row.PaddingRow;
import org.apache.fluss.row.arrow.ArrowWriterPool;
import org.apache.fluss.row.arrow.ArrowWriterProvider;
import org.apache.fluss.row.encode.ValueDecoder;
-import org.apache.fluss.row.encode.ValueEncoder;
import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer;
import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.TruncateReason;
import org.apache.fluss.server.kv.rocksdb.RocksDBKv;
@@ -254,6 +254,20 @@ public final class KvTablet {
/**
* Put the KvRecordBatch into the kv storage, and return the appended wal
log info.
*
+ * <p>Schema Evolution Handling:
+ *
+ * <p>We don't allow shema of input kv records to be larger than the
latest schema id known by
+ * the tablet. Besides, we currently only support ADD COLUMN LAST
operation, so the input row or
+ * old row must have same or fewer columns than latest schema. This helps
to simplify the schema
+ * change handling.
+ *
+ * <p>1. We write the kv records into KvStore without converting it into
latest schema for
+ * performance consideration. We have mechanisms that writer client
dynamically use latest
+ * schema for writing records.
+ *
+ * <p>2. We always use the latest schema for writing WAL logs, because it
anyway happens
+ * deserialization&serialization to convert the compacted format into
Arrow format.
+ *
* @param kvRecords the kv records to put into
* @param targetColumns the target columns to put, null if put all columns
*/
@@ -266,28 +280,28 @@ public final class KvTablet {
SchemaInfo schemaInfo = schemaGetter.getLatestSchemaInfo();
Schema latestSchema = schemaInfo.getSchema();
- // In the following scenario, we need to support schema
migration:
- // schema1: old data's schema -> valueDecoder
- // schema2: new data's schema
- // schema3: current schema (also the latest schema)
- // (schema1 <= or >= schema2) <= current schema id
short latestSchemaId = (short) schemaInfo.getSchemaId();
short schemaIdOfNewData = kvRecords.schemaId();
if (schemaIdOfNewData > latestSchemaId ||
schemaIdOfNewData < 0) {
+ // TODO: we may need to support retriable exception
here
throw new SchemaNotExistException(
"Invalid schema id: "
+ schemaIdOfNewData
+ ", latest schema id: "
+ latestSchemaId);
}
- Schema schemaOfNewData =
schemaGetter.getSchema(schemaIdOfNewData);
+
// we only support ADD COLUMN, so targetColumns is fine to
be used directly
RowMerger currentMerger =
rowMerger.configureTargetColumns(
targetColumns, latestSchemaId,
latestSchema);
- RowType currentRowType = latestSchema.getRowType();
- WalBuilder walBuilder = createWalBuilder(latestSchemaId,
currentRowType);
+ RowType latestRowType = latestSchema.getRowType();
+ WalBuilder walBuilder = createWalBuilder(latestSchemaId,
latestRowType);
walBuilder.setWriterState(kvRecords.writerId(),
kvRecords.batchSequence());
+ // we only support ADD COLUMN LAST, so the BinaryRow after
RowMerger is
+ // only has fewer ending columns than latest schema, so we
pad nulls to
+ // the end of the BinaryRow to get the latest schema row.
+ PaddingRow latestSchemaRow = new
PaddingRow(latestRowType.getFieldCount());
// get offset to track the offset corresponded to the kv
record
long logEndOffsetOfPrevBatch =
logTablet.localLogEndOffset();
try {
@@ -296,22 +310,15 @@ public final class KvTablet {
// TODO: reuse the read context and decoder
KvRecordBatch.ReadContext readContext =
KvRecordReadContext.createReadContext(kvFormat, schemaGetter);
-
- // replace new row with current schema if the schema
id is older than now
- ProjectedRow projectedRow =
- ProjectedRow.from(schemaOfNewData,
latestSchema);
ValueDecoder valueDecoder = new
ValueDecoder(schemaGetter, kvFormat);
for (KvRecord kvRecord :
kvRecords.records(readContext)) {
-
byte[] keyBytes =
BytesUtils.toArray(kvRecord.getKey());
KvPreWriteBuffer.Key key =
KvPreWriteBuffer.Key.of(keyBytes);
- InternalRow row = kvRecord.getRow();
- if (row != null && schemaIdOfNewData <
latestSchemaId) {
- row = projectedRow.replaceRow(row);
- }
-
- if (row == null) {
- DeleteBehavior deleteBehavior =
rowMerger.deleteBehavior();
+ BinaryRow row = kvRecord.getRow();
+ BinaryValue currentValue =
+ row == null ? null : new
BinaryValue(schemaIdOfNewData, row);
+ if (currentValue == null) {
+ DeleteBehavior deleteBehavior =
currentMerger.deleteBehavior();
if (deleteBehavior == DeleteBehavior.IGNORE) {
// skip delete rows if the merger doesn't
support yet
continue;
@@ -321,99 +328,69 @@ public final class KvTablet {
+ "The
table.delete.behavior is set to 'disable'.");
}
// it's for deletion
- byte[] oldValue = getFromBufferOrKv(key);
+ byte[] oldValueBytes = getFromBufferOrKv(key);
- if (oldValue == null) {
+ if (oldValueBytes == null) {
// there might be large amount of such
deletion, so we don't log
LOG.debug(
"The specific key can't be found
in kv tablet although the kv record is for deletion, "
+ "ignore it directly as
it doesn't exist in the kv tablet yet.");
} else {
- ValueDecoder.Value oldRowAndSchemaId =
- valueDecoder.decodeValue(oldValue);
- InternalRow oldRow = oldRowAndSchemaId.row;
- if (oldRow != null
- && oldRowAndSchemaId.schemaId !=
latestSchemaId) {
- // todo: 后续想办法复用对应的projected row.
- Schema schemaOfOldData =
-
schemaGetter.getSchema(oldRowAndSchemaId.schemaId);
- oldRow =
-
ProjectedRow.from(schemaOfOldData, latestSchema)
- .replaceRow(oldRow);
- }
- InternalRow newRow =
currentMerger.delete(oldRow);
+ BinaryValue oldValue =
valueDecoder.decodeValue(oldValueBytes);
+ BinaryValue newValue =
currentMerger.delete(oldValue);
// if newRow is null, it means the row
should be deleted
- if (newRow == null) {
- walBuilder.append(ChangeType.DELETE,
oldRow);
+ if (newValue == null) {
+ walBuilder.append(
+ ChangeType.DELETE,
+
latestSchemaRow.replaceRow(oldValue.row));
kvPreWriteBuffer.delete(key,
logOffset++);
} else {
// otherwise, it's a partial update,
should produce -U,+U
-
walBuilder.append(ChangeType.UPDATE_BEFORE, oldRow);
-
walBuilder.append(ChangeType.UPDATE_AFTER, newRow);
+ walBuilder.append(
+ ChangeType.UPDATE_BEFORE,
+
latestSchemaRow.replaceRow(oldValue.row));
+ walBuilder.append(
+ ChangeType.UPDATE_AFTER,
+
latestSchemaRow.replaceRow(newValue.row));
kvPreWriteBuffer.put(
- key,
- ValueEncoder.encodeRow(
- latestSchemaId,
- kvFormat,
- currentRowType,
- newRow),
- logOffset + 1);
+ key, newValue.encodeValue(),
logOffset + 1);
logOffset += 2;
}
}
} else {
// upsert operation
- byte[] oldValue = getFromBufferOrKv(key);
+ byte[] oldValueBytes = getFromBufferOrKv(key);
// it's update
- if (oldValue != null) {
- ValueDecoder.Value oldRowAndSchemaId =
- valueDecoder.decodeValue(oldValue);
- InternalRow oldRow = oldRowAndSchemaId.row;
- if (oldRow != null
- && oldRowAndSchemaId.schemaId !=
latestSchemaId) {
- // todo: 后续想办法复用对应的projected row.
- Schema schemaOfOldData =
-
schemaGetter.getSchema(oldRowAndSchemaId.schemaId);
- oldRow =
-
ProjectedRow.from(schemaOfOldData, latestSchema)
- .replaceRow(oldRow);
- }
-
- InternalRow newRow =
currentMerger.merge(oldRow, row);
- if (newRow == oldRow) {
- // newRow is the same to oldRow, means
nothing
+ if (oldValueBytes != null) {
+ BinaryValue oldValue =
valueDecoder.decodeValue(oldValueBytes);
+ BinaryValue newValue =
+ currentMerger.merge(oldValue,
currentValue);
+ if (newValue == oldValue) {
+ // newValue is the same to oldValue,
means nothing
// happens (no update/delete), and
input should be ignored
continue;
}
-
walBuilder.append(ChangeType.UPDATE_BEFORE, oldRow);
- walBuilder.append(ChangeType.UPDATE_AFTER,
newRow);
+ walBuilder.append(
+ ChangeType.UPDATE_BEFORE,
+
latestSchemaRow.replaceRow(oldValue.row));
+ walBuilder.append(
+ ChangeType.UPDATE_AFTER,
+
latestSchemaRow.replaceRow(newValue.row));
// logOffset is for -U, logOffset + 1 is
for +U, we need to use
// the log offset for +U
kvPreWriteBuffer.put(
- key,
- ValueEncoder.encodeRow(
- latestSchemaId,
- kvFormat,
- currentRowType,
- newRow),
- logOffset + 1);
+ key, newValue.encodeValue(),
logOffset + 1);
logOffset += 2;
} else {
// it's insert
- // TODO: we shouldadd guarantees that all
non-specified
- // columns
+ // TODO: we should add guarantees that all
non-specified columns
// of the input row are set to null.
- InternalRow newRow = row;
- walBuilder.append(ChangeType.INSERT,
newRow);
+ walBuilder.append(
+ ChangeType.INSERT,
+
latestSchemaRow.replaceRow(currentValue.row));
kvPreWriteBuffer.put(
- key,
- ValueEncoder.encodeRow(
- latestSchemaId,
- kvFormat,
- currentRowType,
- newRow),
- logOffset++);
+ key, currentValue.encodeValue(),
logOffset++);
}
}
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/partialupdate/PartialUpdater.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/partialupdate/PartialUpdater.java
index 688ac8dab..d8741e010 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/partialupdate/PartialUpdater.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/partialupdate/PartialUpdater.java
@@ -20,7 +20,7 @@ package org.apache.fluss.server.kv.partialupdate;
import org.apache.fluss.exception.InvalidTargetColumnException;
import org.apache.fluss.metadata.KvFormat;
import org.apache.fluss.metadata.Schema;
-import org.apache.fluss.row.BinaryRow;
+import org.apache.fluss.record.BinaryValue;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.encode.RowEncoder;
import org.apache.fluss.types.DataType;
@@ -34,6 +34,7 @@ import java.util.BitSet;
@NotThreadSafe
public class PartialUpdater {
+ private final short targetSchemaId;
private final InternalRow.FieldGetter[] flussFieldGetters;
private final RowEncoder rowEncoder;
@@ -42,7 +43,8 @@ public class PartialUpdater {
private final BitSet primaryKeyCols = new BitSet();
private final DataType[] fieldDataTypes;
- public PartialUpdater(KvFormat kvFormat, Schema schema, int[]
targetColumns) {
+ public PartialUpdater(KvFormat kvFormat, short schemaId, Schema schema,
int[] targetColumns) {
+ this.targetSchemaId = schemaId;
for (int targetColumn : targetColumns) {
partialUpdateCols.set(targetColumn);
}
@@ -89,43 +91,44 @@ public class PartialUpdater {
}
/**
- * Partial update the {@code oldRow} with the given new row {@code
partialRow}. The {@code
- * oldRow} may be null, in this case, the field don't exist in the {@code
partialRow} will be
+ * Partial update the {@code oldValue} with the given new row {@code
partialValue}. The {@code
+ * oldValue} may be null, in this case, the field don't exist in the
{@code partialRow} will be
* set to null.
*
- * @param oldRow the old row to be updated
- * @param partialRow the new row to be updated.
- * @return the updated row
+ * @param oldValue the old value to be updated
+ * @param partialValue the new value to be updated.
+ * @return the updated value (schema id + row bytes)
*/
- public BinaryRow updateRow(@Nullable InternalRow oldRow, InternalRow
partialRow) {
+ public BinaryValue updateRow(@Nullable BinaryValue oldValue, BinaryValue
partialValue) {
rowEncoder.startNewRow();
// write each field
for (int i = 0; i < fieldDataTypes.length; i++) {
// use the partial row value
if (partialUpdateCols.get(i)) {
- rowEncoder.encodeField(i,
flussFieldGetters[i].getFieldOrNull(partialRow));
+ rowEncoder.encodeField(i,
flussFieldGetters[i].getFieldOrNull(partialValue.row));
} else {
// use the old row value
- if (oldRow == null) {
+ if (oldValue == null) {
rowEncoder.encodeField(i, null);
} else {
- rowEncoder.encodeField(i,
flussFieldGetters[i].getFieldOrNull(oldRow));
+ rowEncoder.encodeField(i,
flussFieldGetters[i].getFieldOrNull(oldValue.row));
}
}
}
- return rowEncoder.finishRow();
+ return new BinaryValue(targetSchemaId, rowEncoder.finishRow());
}
/**
- * Partial delete the given {@code row}. If all the fields except for
{@link #partialUpdateCols}
- * in {@code row} are null, return null. Otherwise, update all the {@link
#partialUpdateCols} in
- * the {@code row} except for the primary key columns to null values,
return the updated row.
+ * Partial delete the given {@code value}. If all the fields except for
{@link
+ * #partialUpdateCols} in {@code value.row} are null, return null.
Otherwise, update all the
+ * {@link #partialUpdateCols} in the {@code value.row} except for the
primary key columns to
+ * null values, return the updated value.
*
- * @param row the row to be deleted
- * @return the row after partial deleted
+ * @param value the value to be deleted
+ * @return the value after partial deleted
*/
- public @Nullable BinaryRow deleteRow(InternalRow row) {
- if (isFieldsNull(row, partialUpdateCols)) {
+ public @Nullable BinaryValue deleteRow(BinaryValue value) {
+ if (isFieldsNull(value.row, partialUpdateCols)) {
return null;
} else {
rowEncoder.startNewRow();
@@ -137,10 +140,10 @@ public class PartialUpdater {
rowEncoder.encodeField(i, null);
} else {
// use the old row value
- rowEncoder.encodeField(i,
flussFieldGetters[i].getFieldOrNull(row));
+ rowEncoder.encodeField(i,
flussFieldGetters[i].getFieldOrNull(value.row));
}
}
- return rowEncoder.finishRow();
+ return new BinaryValue(targetSchemaId, rowEncoder.finishRow());
}
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/partialupdate/PartialUpdaterCache.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/partialupdate/PartialUpdaterCache.java
index 3ce22854c..0c227aaf2 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/partialupdate/PartialUpdaterCache.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/partialupdate/PartialUpdaterCache.java
@@ -25,6 +25,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import javax.annotation.concurrent.ThreadSafe;
+import java.time.Duration;
import java.util.Arrays;
/** The cache for {@link PartialUpdater}. */
@@ -37,7 +38,11 @@ public class PartialUpdaterCache {
// currently, the cache is used per-bucket, so we limit the cache size
to 5 to have a
// maximal 5 parallel partial updaters. This is a temporary solution
and should be
// shared across all buckets in the future.
- this.rowPartialUpdaters = Caffeine.newBuilder().maximumSize(5).build();
+ this.rowPartialUpdaters =
+ Caffeine.newBuilder()
+ .maximumSize(5)
+ .expireAfterAccess(Duration.ofMinutes(5))
+ .build();
}
// TODO: extend to tableId and schemaId when the cache is shared across
all tables
@@ -45,7 +50,7 @@ public class PartialUpdaterCache {
KvFormat kvFormat, short schemaId, Schema schema, int[]
targetColumns) {
return rowPartialUpdaters.get(
getPartialUpdaterKey(targetColumns, schemaId),
- k -> new PartialUpdater(kvFormat, schema, targetColumns));
+ k -> new PartialUpdater(kvFormat, schemaId, schema,
targetColumns));
}
private String getPartialUpdaterKey(int[] targetColumns, int schemaId) {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java
index af3500c59..2db73c819 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java
@@ -20,7 +20,7 @@ package org.apache.fluss.server.kv.rowmerger;
import org.apache.fluss.metadata.DeleteBehavior;
import org.apache.fluss.metadata.KvFormat;
import org.apache.fluss.metadata.Schema;
-import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.record.BinaryValue;
import org.apache.fluss.server.kv.partialupdate.PartialUpdater;
import org.apache.fluss.server.kv.partialupdate.PartialUpdaterCache;
@@ -46,14 +46,14 @@ public class DefaultRowMerger implements RowMerger {
@Nullable
@Override
- public InternalRow merge(InternalRow oldRow, InternalRow newRow) {
+ public BinaryValue merge(BinaryValue oldValue, BinaryValue newValue) {
// always retain the new row (latest row)
- return newRow;
+ return newValue;
}
@Nullable
@Override
- public InternalRow delete(InternalRow oldRow) {
+ public BinaryValue delete(BinaryValue oldRow) {
// returns null to indicate the row is deleted
return null;
}
@@ -98,13 +98,13 @@ public class DefaultRowMerger implements RowMerger {
@Nullable
@Override
- public InternalRow merge(InternalRow oldRow, InternalRow newRow) {
- return partialUpdater.updateRow(oldRow, newRow);
+ public BinaryValue merge(BinaryValue oldValue, BinaryValue newValue) {
+ return partialUpdater.updateRow(oldValue, newValue);
}
@Nullable
@Override
- public InternalRow delete(InternalRow oldRow) {
+ public BinaryValue delete(BinaryValue oldRow) {
return partialUpdater.deleteRow(oldRow);
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/FirstRowRowMerger.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/FirstRowRowMerger.java
index 79592e177..33bd6ee1f 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/FirstRowRowMerger.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/FirstRowRowMerger.java
@@ -20,7 +20,7 @@ package org.apache.fluss.server.kv.rowmerger;
import org.apache.fluss.metadata.DeleteBehavior;
import org.apache.fluss.metadata.MergeEngineType;
import org.apache.fluss.metadata.Schema;
-import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.record.BinaryValue;
import javax.annotation.Nullable;
@@ -44,14 +44,14 @@ public class FirstRowRowMerger implements RowMerger {
@Nullable
@Override
- public InternalRow merge(InternalRow oldRow, InternalRow newRow) {
+ public BinaryValue merge(BinaryValue oldValue, BinaryValue newValue) {
// always retain the old row (first row)
- return oldRow;
+ return oldValue;
}
@Nullable
@Override
- public InternalRow delete(InternalRow oldRow) {
+ public BinaryValue delete(BinaryValue oldRow) {
throw new UnsupportedOperationException(
"DELETE is not supported for the first_row merge engine.");
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/RowMerger.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/RowMerger.java
index 5f3414574..89bff8c0a 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/RowMerger.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/RowMerger.java
@@ -23,7 +23,7 @@ import org.apache.fluss.metadata.DeleteBehavior;
import org.apache.fluss.metadata.KvFormat;
import org.apache.fluss.metadata.MergeEngineType;
import org.apache.fluss.metadata.Schema;
-import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.record.BinaryValue;
import javax.annotation.Nullable;
@@ -33,14 +33,14 @@ import java.util.Optional;
public interface RowMerger {
/**
- * Merge the old row with the new row.
+ * Merge the old value with the new value.
*
- * @param oldRow the old row
- * @param newRow the new row
- * @return the merged row, if the returned row is the same to the old row,
then nothing happens
- * to the row (no update, no delete).
+ * @param oldValue the old value
+ * @param newValue the new row
+ * @return the merged value, if the returned row is the same to the old
row, then nothing
+ * happens to the row (no update, no delete).
*/
- InternalRow merge(InternalRow oldRow, InternalRow newRow);
+ BinaryValue merge(BinaryValue oldValue, BinaryValue newValue);
/**
* Merge the old row with a delete row.
@@ -52,7 +52,7 @@ public interface RowMerger {
* @return the merged row, or null if the row is deleted.
*/
@Nullable
- InternalRow delete(InternalRow oldRow);
+ BinaryValue delete(BinaryValue oldRow);
/**
* The behavior of delete operations on primary key tables.
@@ -61,7 +61,13 @@ public interface RowMerger {
*/
DeleteBehavior deleteBehavior();
- /** Dynamically configure the target columns to merge and return the
effective merger. */
+ /**
+ * Dynamically configure the target columns to merge and return the
effective merger.
+ *
+ * @param targetColumns the partial update target column positions, null
means full update
+ * @param schemaId the schema id used to generate new rows
+ * @param schema the schema used to generate new rows
+ */
RowMerger configureTargetColumns(@Nullable int[] targetColumns, short
schemaId, Schema schema);
/** Create a row merger based on the given configuration. */
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMerger.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMerger.java
index 9706230b5..4b555602e 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMerger.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMerger.java
@@ -20,6 +20,7 @@ package org.apache.fluss.server.kv.rowmerger;
import org.apache.fluss.metadata.DeleteBehavior;
import org.apache.fluss.metadata.MergeEngineType;
import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.record.BinaryValue;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.TimestampLtz;
import org.apache.fluss.row.TimestampNtz;
@@ -64,14 +65,14 @@ public class VersionedRowMerger implements RowMerger {
@Nullable
@Override
- public InternalRow merge(InternalRow oldRow, InternalRow newRow) {
+ public BinaryValue merge(BinaryValue oldValue, BinaryValue newValue) {
// return newRow if newRow's version is larger or equal than oldRow's
version
- return versionComparator.compare(oldRow, newRow) <= 0 ? newRow :
oldRow;
+ return versionComparator.compare(oldValue.row, newValue.row) <= 0 ?
newValue : oldValue;
}
@Nullable
@Override
- public InternalRow delete(InternalRow oldRow) {
+ public BinaryValue delete(BinaryValue oldRow) {
throw new UnsupportedOperationException(
"DELETE is not supported for the versioned merge engine.");
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMergerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMergerTest.java
index 95afb907d..3b5fc88c4 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMergerTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMergerTest.java
@@ -20,9 +20,7 @@ package org.apache.fluss.server.kv.rowmerger;
import org.apache.fluss.metadata.DeleteBehavior;
import org.apache.fluss.metadata.KvFormat;
import org.apache.fluss.metadata.Schema;
-import org.apache.fluss.row.BinaryRow;
-import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.row.ProjectedRow;
+import org.apache.fluss.record.BinaryValue;
import org.apache.fluss.types.DataTypes;
import org.junit.jupiter.params.ParameterizedTest;
@@ -47,18 +45,21 @@ class DefaultRowMergerTest {
Schema.newBuilder()
.fromColumns(
Arrays.asList(
- new Schema.Column("age",
DataTypes.STRING(), null, (short) 2),
new Schema.Column("id", DataTypes.INT(),
null, (short) 0),
- new Schema.Column("name",
DataTypes.STRING(), null, (short) 1)))
+ new Schema.Column("name",
DataTypes.STRING(), null, (short) 1),
+ // add new column at end
+ new Schema.Column("age",
DataTypes.STRING(), null, (short) 2)))
.primaryKey("id")
.build();
- private BinaryRow createBinaryRow(int id, String name) {
- return compactedRow(SCHEMA.getRowType(), new Object[] {id, name});
+ private BinaryValue createBinaryValue(int id, String name) {
+ return new BinaryValue(
+ (short) 1, compactedRow(SCHEMA.getRowType(), new Object[] {id,
name}));
}
- private BinaryRow createBinaryRow(String age, int id, String name) {
- return compactedRow(SCHEMA_2.getRowType(), new Object[] {age, id,
name});
+ private BinaryValue createBinaryValue(int id, String name, String age) {
+ return new BinaryValue(
+ (short) 2, compactedRow(SCHEMA_2.getRowType(), new Object[]
{id, name, age}));
}
@ParameterizedTest
@@ -67,26 +68,25 @@ class DefaultRowMergerTest {
DefaultRowMerger merger = new DefaultRowMerger(KvFormat.COMPACTED,
deleteBehavior);
merger.configureTargetColumns(null, (byte) 1, SCHEMA);
- InternalRow oldRow = createBinaryRow(1, "old");
- BinaryRow newRow = createBinaryRow(1, "new");
+ BinaryValue oldValue = createBinaryValue(1, "old");
+ BinaryValue newValue = createBinaryValue(1, "new");
// Test merge operation - should return new row
- InternalRow mergedRow = merger.merge(oldRow, newRow);
- assertThat(mergedRow).isSameAs(newRow);
+ BinaryValue mergedValue = merger.merge(oldValue, newValue);
+ assertThat(mergedValue).isSameAs(newValue);
// Test delete operation - should return null (deleted)
- InternalRow deletedRow = merger.delete(oldRow);
- assertThat(deletedRow).isNull();
+ BinaryValue deletedValue = merger.delete(oldValue);
+ assertThat(deletedValue).isNull();
// Test supportsDelete - should return true
assertThat(merger.deleteBehavior()).isEqualTo(deleteBehavior);
// Test schema change.
merger.configureTargetColumns(null, (byte) 2, SCHEMA_2);
- oldRow = ProjectedRow.from(SCHEMA, SCHEMA_2).replaceRow(oldRow);
- newRow = createBinaryRow("20", 1, "new2");
- assertThat(merger.merge(oldRow, newRow)).isSameAs(newRow);
- assertThat(merger.delete(newRow)).isNull();
+ newValue = createBinaryValue(1, "new2", "20");
+ assertThat(merger.merge(oldValue, newValue)).isSameAs(newValue);
+ assertThat(merger.delete(newValue)).isNull();
}
@ParameterizedTest
@@ -98,20 +98,19 @@ class DefaultRowMergerTest {
RowMerger partialMerger =
merger.configureTargetColumns(new int[] {0, 1}, (byte) 1,
SCHEMA); // id + name
- InternalRow oldRow = createBinaryRow(1, "old");
+ BinaryValue oldValue = createBinaryValue(1, "old");
- InternalRow ignoredRow = partialMerger.delete(oldRow);
- assertThat(ignoredRow).isNull();
+ BinaryValue ignoredValue = partialMerger.delete(oldValue);
+ assertThat(ignoredValue).isNull();
assertThat(partialMerger.deleteBehavior()).isEqualTo(deleteBehavior);
- assertThat(partialMerger.merge(null, oldRow)).isEqualTo(oldRow);
+ assertThat(partialMerger.merge(null, oldValue)).isEqualTo(oldValue);
// schema change then partial update (except name column).
- partialMerger = merger.configureTargetColumns(new int[] {0, 1}, (byte)
2, SCHEMA_2);
- oldRow = ProjectedRow.from(SCHEMA, SCHEMA_2).replaceRow(oldRow);
- BinaryRow newRow = createBinaryRow("20", 1, null);
- BinaryRow mergeRow = createBinaryRow("20", 1, "old");
- assertThat(partialMerger.merge(oldRow, newRow)).isEqualTo(mergeRow);
-
assertThat(partialMerger.delete(mergeRow)).isEqualTo(createBinaryRow(null, 1,
"old"));
+ partialMerger = merger.configureTargetColumns(new int[] {0, 2}, (byte)
2, SCHEMA_2);
+ BinaryValue newValue = createBinaryValue(1, null, "20");
+ BinaryValue mergeValue = createBinaryValue(1, "old", "20");
+ assertThat(partialMerger.merge(oldValue,
newValue)).isEqualTo(mergeValue);
+
assertThat(partialMerger.delete(mergeValue)).isEqualTo(createBinaryValue(1,
"old", null));
}
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMergerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMergerTest.java
index 008c2fafa..9163b23b0 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMergerTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/VersionedRowMergerTest.java
@@ -19,9 +19,7 @@ package org.apache.fluss.server.kv.rowmerger;
import org.apache.fluss.metadata.DeleteBehavior;
import org.apache.fluss.metadata.Schema;
-import org.apache.fluss.row.BinaryRow;
-import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.row.ProjectedRow;
+import org.apache.fluss.record.BinaryValue;
import org.apache.fluss.row.TimestampLtz;
import org.apache.fluss.row.TimestampNtz;
import org.apache.fluss.types.DataType;
@@ -132,13 +130,13 @@ class VersionedRowMergerTest {
merger.configureTargetColumns(null, (short) 1, schema);
for (TestSpec testSpec : testSpecs) {
- BinaryRow oldRow = compactedRow(rowType, new Object[]
{testSpec.oldValue, "dummy"});
- BinaryRow newRow = compactedRow(rowType, new Object[]
{testSpec.newValue, "dummy"});
- InternalRow mergedRow = merger.merge(oldRow, newRow);
+ BinaryValue oldValue = binaryValue(rowType, new Object[]
{testSpec.oldValue, "dummy"});
+ BinaryValue newValue = binaryValue(rowType, new Object[]
{testSpec.newValue, "dummy"});
+ BinaryValue mergedValue = merger.merge(oldValue, newValue);
if (testSpec.expected.equals("old")) {
- assertThat(mergedRow).isSameAs(oldRow);
+ assertThat(mergedValue).isSameAs(oldValue);
} else if (testSpec.expected.equals("new")) {
- assertThat(mergedRow).isSameAs(newRow);
+ assertThat(mergedValue).isSameAs(newValue);
} else {
throw new IllegalArgumentException("Unknown expected value: "
+ testSpec.expected);
}
@@ -169,27 +167,29 @@ class VersionedRowMergerTest {
RowType rowType = schema.getRowType();
VersionedRowMerger merger = new VersionedRowMerger("a",
DeleteBehavior.DISABLE);
merger.configureTargetColumns(null, (short) 1, schema);
- InternalRow oldRow = compactedRow(rowType, new Object[] {11, "dummy"});
- BinaryRow newRow = compactedRow(rowType, new Object[] {2, "dummy"});
- assertThat(merger.merge(oldRow, newRow)).isSameAs(oldRow);
+ BinaryValue oldValue = binaryValue(rowType, new Object[] {11,
"dummy"});
+ BinaryValue newValue = binaryValue(rowType, new Object[] {2, "dummy"});
+ assertThat(merger.merge(oldValue, newValue)).isSameAs(oldValue);
Schema schema2 =
Schema.newBuilder()
.fromColumns(
Arrays.asList(
- new Schema.Column("c",
DataTypes.STRING(), null, (short) 2),
new Schema.Column("a",
DataTypes.INT(), null, (short) 0),
+ new Schema.Column("b",
DataTypes.STRING(), null, (short) 1),
+ // add new column at end
new Schema.Column(
- "b", DataTypes.STRING(), null,
(short) 1)))
+ "c", DataTypes.STRING(), null,
(short) 2)))
.build();
rowType = schema2.getRowType();
merger.configureTargetColumns(null, (short) 2, schema2);
- oldRow = ProjectedRow.from(schema, schema2).replaceRow(oldRow);
- newRow = compactedRow(rowType, new Object[] {"a", 2, "dummy"});
- assertThat(merger.merge(oldRow, newRow)).isSameAs(oldRow);
- newRow = compactedRow(rowType, new Object[] {"b", 20, "dummy"});
- assertThat(merger.merge(oldRow, newRow)).isSameAs(newRow);
+ newValue = binaryValue(rowType, new Object[] {2, "dummy", "a"});
+ // version is smaller than old value
+ assertThat(merger.merge(oldValue, newValue)).isSameAs(oldValue);
+ newValue = binaryValue(rowType, new Object[] {20, "dummy", "b"});
+ // version is greater than old value
+ assertThat(merger.merge(oldValue, newValue)).isSameAs(newValue);
}
private static TimestampNtz timestampNtz(String timestamp) {
@@ -201,6 +201,11 @@ class VersionedRowMergerTest {
return TimestampLtz.fromInstant(instant);
}
+ private static BinaryValue binaryValue(RowType rowType, Object[] objects) {
+ int schemaId = rowType.getFieldCount() == 2 ? 1 : 2;
+ return new BinaryValue((short) schemaId, compactedRow(rowType,
objects));
+ }
+
/** Test specification for {@link VersionedRowMerger}. */
private static class TestSpec {
private final Object oldValue;
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
index 5a5910d1e..8b0b75c35 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
@@ -330,7 +330,7 @@ public class TabletServiceITCase {
tableId,
0,
Errors.INVALID_COLUMN_PROJECTION.code(),
- "Projected field id 2 is not contains in [0, 1]");
+ "Projected field id 2 is not contained in [0, 1]");
}
@Test