This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 020ed14809 [core] Row lineage core support. (#5935)
020ed14809 is described below
commit 020ed14809a4f3136100f288ac7c2f458cfbb062
Author: YeJunHao <[email protected]>
AuthorDate: Mon Jul 28 21:57:21 2025 +0800
[core] Row lineage core support. (#5935)
---
.../shortcodes/generated/core_configuration.html | 6 +
.../main/java/org/apache/paimon/CoreOptions.java | 10 +
.../src/main/java/org/apache/paimon/Snapshot.java | 28 ++-
.../org/apache/paimon/table/SpecialFields.java | 10 +-
.../apache/paimon/casting/FallbackMappingRow.java | 189 +++++++++++++++++++
.../paimon/data/columnar/ColumnarRowIterator.java | 54 ++++++
.../paimon/casting/FallbackMappingRowTest.java | 63 +++++++
.../java/org/apache/paimon/AbstractFileStore.java | 3 +-
.../org/apache/paimon/AppendOnlyFileStore.java | 3 +-
.../src/main/java/org/apache/paimon/Changelog.java | 9 +-
.../java/org/apache/paimon/KeyValueFileStore.java | 3 +-
.../java/org/apache/paimon/io/DataFileMeta.java | 108 +++++++++--
.../apache/paimon/io/DataFileMeta08Serializer.java | 1 +
.../apache/paimon/io/DataFileMeta09Serializer.java | 1 +
.../paimon/io/DataFileMeta10LegacySerializer.java | 1 +
...er.java => DataFileMeta12LegacySerializer.java} | 13 +-
.../apache/paimon/io/DataFileMetaSerializer.java | 6 +-
.../org/apache/paimon/io/DataFileRecordReader.java | 58 +++++-
.../apache/paimon/io/KeyValueDataFileWriter.java | 3 +-
.../paimon/io/KeyValueFileReaderFactory.java | 9 +-
.../org/apache/paimon/io/RowDataFileWriter.java | 3 +-
.../org/apache/paimon/manifest/ManifestEntry.java | 14 ++
.../org/apache/paimon/migrate/FileMetaUtils.java | 1 +
.../paimon/operation/FileStoreCommitImpl.java | 66 ++++++-
.../apache/paimon/operation/RawFileSplitRead.java | 19 +-
.../org/apache/paimon/schema/SchemaValidation.java | 16 ++
.../sink/CommitMessageLegacyV2Serializer.java | 1 +
.../paimon/table/sink/CommitMessageSerializer.java | 11 +-
.../org/apache/paimon/table/source/DataSplit.java | 8 +-
.../org/apache/paimon/table/system/FilesTable.java | 4 +-
.../paimon/table/system/RowLineageTable.java | 201 +++++++++++++++++++++
.../paimon/table/system/SystemTableLoader.java | 2 +
.../src/main/java/org/apache/paimon/tag/Tag.java | 8 +-
.../apache/paimon/utils/FormatReaderMapping.java | 35 +++-
.../append/AppendCompactCoordinatorTest.java | 1 +
.../apache/paimon/append/AppendOnlyWriterTest.java | 1 +
.../paimon/crosspartition/IndexBootstrapTest.java | 1 +
.../paimon/io/DataFileTestDataGenerator.java | 1 +
.../org/apache/paimon/io/DataFileTestUtils.java | 3 +
...festCommittableSerializerCompatibilityTest.java | 90 ++++++++-
.../ManifestCommittableSerializerTest.java | 1 +
.../paimon/manifest/ManifestFileMetaTestBase.java | 2 +
.../org/apache/paimon/mergetree/LevelsTest.java | 1 +
.../mergetree/compact/IntervalPartitionTest.java | 1 +
.../mergetree/compact/UniversalCompactionTest.java | 17 +-
.../paimon/operation/ExpireSnapshotsTest.java | 4 +-
.../paimon/table/source/SplitGeneratorTest.java | 1 +
.../org/apache/paimon/table/source/SplitTest.java | 80 +++++++-
.../org/apache/paimon/tag/TagAutoManagerTest.java | 2 +
.../test/java/org/apache/paimon/tag/TagTest.java | 1 +
.../paimon/utils/FormatReaderMappingTest.java | 4 +-
.../apache/paimon/utils/SnapshotManagerTest.java | 5 +
.../src/test/resources/compatibility/datasplit-v7 | Bin 0 -> 984 bytes
.../compatibility/manifest-committable-v8 | Bin 0 -> 3522 bytes
.../org/apache/paimon/flink/AppendTableITCase.java | 41 ++++-
...endPreCommitCompactCoordinatorOperatorTest.java | 1 +
.../ChangelogCompactCoordinateOperatorTest.java | 1 +
.../ChangelogCompactSortOperatorTest.java | 1 +
.../ChangelogCompactTaskSerializerTest.java | 1 +
.../changelog/ChangelogCompactTaskTest.java | 1 +
.../sink/CompactionTaskSimpleSerializerTest.java | 1 +
.../source/FileStoreSourceSplitGeneratorTest.java | 1 +
.../source/FileStoreSourceSplitSerializerTest.java | 1 +
.../flink/source/TestChangelogDataReadWrite.java | 3 +-
.../org/apache/paimon/spark/ScanHelperTest.scala | 2 +
65 files changed, 1163 insertions(+), 74 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index e079a253e1..77237c198e 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -875,6 +875,12 @@ This config option does not affect the default filesystem
metastore.</td>
<td>String</td>
<td>Time field for record level expire. It supports the following
types: `timestamps in seconds with INT`,`timestamps in seconds with BIGINT`,
`timestamps in milliseconds with BIGINT` or `timestamp`.</td>
</tr>
+ <tr>
+ <td><h5>row-tracking.enabled</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether enable unique row id for append table.</td>
+ </tr>
<tr>
<td><h5>rowkind.field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 64ea11a835..56f6daf40b 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1912,6 +1912,12 @@ public class CoreOptions implements Serializable {
+ "in 'sink.clustering.by-columns'.
'order' is used for 1 column, 'zorder' for less than 5 columns, "
+ "and 'hilbert' for 5 or more columns.");
+ public static final ConfigOption<Boolean> ROW_TRACKING_ENABLED =
+ key("row-tracking.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether enable unique row id for append
table.");
+
public static final ConfigOption<Boolean> SNAPSHOT_IGNORE_EMPTY_COMMIT =
key("snapshot.ignore-empty-commit")
.booleanType()
@@ -2864,6 +2870,10 @@ public class CoreOptions implements Serializable {
return options.get(RECORD_LEVEL_TIME_FIELD);
}
+ public boolean rowTrackingEnabled() {
+ return options.get(ROW_TRACKING_ENABLED);
+ }
+
public boolean prepareCommitWaitCompaction() {
if (!needLookup()) {
return false;
diff --git a/paimon-api/src/main/java/org/apache/paimon/Snapshot.java
b/paimon-api/src/main/java/org/apache/paimon/Snapshot.java
index 23d73b285d..31defa3bfb 100644
--- a/paimon-api/src/main/java/org/apache/paimon/Snapshot.java
+++ b/paimon-api/src/main/java/org/apache/paimon/Snapshot.java
@@ -88,6 +88,7 @@ public class Snapshot implements Serializable {
protected static final String FIELD_WATERMARK = "watermark";
protected static final String FIELD_STATISTICS = "statistics";
protected static final String FIELD_PROPERTIES = "properties";
+ protected static final String FIELD_NEXT_ROW_ID = "nextRowId";
// version of snapshot
// null for paimon <= 0.2
@@ -203,6 +204,11 @@ public class Snapshot implements Serializable {
@Nullable
protected final Map<String, String> properties;
+ @Nullable
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @JsonProperty(FIELD_NEXT_ROW_ID)
+ protected final Long nextRowId;
+
public Snapshot(
long id,
long schemaId,
@@ -223,7 +229,8 @@ public class Snapshot implements Serializable {
@Nullable Long changelogRecordCount,
@Nullable Long watermark,
@Nullable String statistics,
- @Nullable Map<String, String> properties) {
+ @Nullable Map<String, String> properties,
+ @Nullable Long nextRowId) {
this(
CURRENT_VERSION,
id,
@@ -245,7 +252,8 @@ public class Snapshot implements Serializable {
changelogRecordCount,
watermark,
statistics,
- properties);
+ properties,
+ nextRowId);
}
@JsonCreator
@@ -271,7 +279,8 @@ public class Snapshot implements Serializable {
@JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @Nullable Long
changelogRecordCount,
@JsonProperty(FIELD_WATERMARK) @Nullable Long watermark,
@JsonProperty(FIELD_STATISTICS) @Nullable String statistics,
- @JsonProperty(FIELD_PROPERTIES) @Nullable Map<String, String>
properties) {
+ @JsonProperty(FIELD_PROPERTIES) @Nullable Map<String, String>
properties,
+ @JsonProperty(FIELD_NEXT_ROW_ID) @Nullable Long nextRowId) {
this.version = version;
this.id = id;
this.schemaId = schemaId;
@@ -293,6 +302,7 @@ public class Snapshot implements Serializable {
this.watermark = watermark;
this.statistics = statistics;
this.properties = properties;
+ this.nextRowId = nextRowId;
}
@JsonGetter(FIELD_VERSION)
@@ -413,6 +423,12 @@ public class Snapshot implements Serializable {
return properties;
}
+ @JsonGetter(FIELD_NEXT_ROW_ID)
+ @Nullable
+ public Long nextRowId() {
+ return nextRowId;
+ }
+
public String toJson() {
return JsonSerdeUtil.toJson(this);
}
@@ -440,7 +456,8 @@ public class Snapshot implements Serializable {
changelogRecordCount,
watermark,
statistics,
- properties);
+ properties,
+ nextRowId);
}
@Override
@@ -472,7 +489,8 @@ public class Snapshot implements Serializable {
&& Objects.equals(changelogRecordCount,
that.changelogRecordCount)
&& Objects.equals(watermark, that.watermark)
&& Objects.equals(statistics, that.statistics)
- && Objects.equals(properties, that.properties);
+ && Objects.equals(properties, that.properties)
+ && Objects.equals(nextRowId, that.nextRowId);
}
/** Type of changes in this snapshot. */
diff --git
a/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
b/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
index 3288276a1f..1a594e5d15 100644
--- a/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
+++ b/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
@@ -87,8 +87,16 @@ public class SpecialFields {
new DataField(
Integer.MAX_VALUE - 4, "rowkind", new
VarCharType(VarCharType.MAX_LENGTH));
+ public static final DataField ROW_ID =
+ new DataField(Integer.MAX_VALUE - 5, "_ROW_ID",
DataTypes.BIGINT());
+
public static final Set<String> SYSTEM_FIELD_NAMES =
- Stream.of(SEQUENCE_NUMBER.name(), VALUE_KIND.name(), LEVEL.name(),
ROW_KIND.name())
+ Stream.of(
+ SEQUENCE_NUMBER.name(),
+ VALUE_KIND.name(),
+ LEVEL.name(),
+ ROW_KIND.name(),
+ ROW_ID.name())
.collect(Collectors.toSet());
public static boolean isSystemField(int fieldId) {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/casting/FallbackMappingRow.java
b/paimon-common/src/main/java/org/apache/paimon/casting/FallbackMappingRow.java
new file mode 100644
index 0000000000..2775c3ebfe
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/casting/FallbackMappingRow.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.casting;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.Variant;
+import org.apache.paimon.types.RowKind;
+
+/** Row with row lineage inject in. */
+public class FallbackMappingRow implements InternalRow {
+
+ private InternalRow main;
+ private InternalRow fallbackRow;
+ private final int[] mappings;
+
+ public FallbackMappingRow(int[] mappings) {
+ this.mappings = mappings;
+ }
+
+ @Override
+ public int getFieldCount() {
+ return main.getFieldCount();
+ }
+
+ @Override
+ public RowKind getRowKind() {
+ return main.getRowKind();
+ }
+
+ @Override
+ public void setRowKind(RowKind kind) {
+ main.setRowKind(kind);
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ if (mappings[pos] == -1) {
+ return main.isNullAt(pos);
+ }
+ return main.isNullAt(pos) || fallbackRow.isNullAt(mappings[pos]);
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ if (mappings[pos] != -1 && main.isNullAt(pos)) {
+ return fallbackRow.getBoolean(mappings[pos]);
+ }
+ return main.getBoolean(pos);
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ if (mappings[pos] != -1 && main.isNullAt(pos)) {
+ return fallbackRow.getByte(mappings[pos]);
+ }
+ return main.getByte(pos);
+ }
+
+ @Override
+ public short getShort(int pos) {
+ if (mappings[pos] != -1 && main.isNullAt(pos)) {
+ return fallbackRow.getShort(mappings[pos]);
+ }
+ return main.getShort(pos);
+ }
+
+ @Override
+ public int getInt(int pos) {
+ if (mappings[pos] != -1 && main.isNullAt(pos)) {
+ return fallbackRow.getInt(mappings[pos]);
+ }
+ return main.getInt(pos);
+ }
+
+ @Override
+ public long getLong(int pos) {
+ if (mappings[pos] != -1 && main.isNullAt(pos)) {
+ return fallbackRow.getLong(mappings[pos]);
+ }
+ return main.getLong(pos);
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ if (mappings[pos] != -1 && main.isNullAt(pos)) {
+ return fallbackRow.getFloat(mappings[pos]);
+ }
+ return main.getFloat(pos);
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ if (mappings[pos] != -1 && main.isNullAt(pos)) {
+ return fallbackRow.getDouble(mappings[pos]);
+ }
+ return main.getDouble(pos);
+ }
+
+ @Override
+ public BinaryString getString(int pos) {
+ if (mappings[pos] != -1 && main.isNullAt(pos)) {
+ return fallbackRow.getString(mappings[pos]);
+ }
+ return main.getString(pos);
+ }
+
+ @Override
+ public Decimal getDecimal(int pos, int precision, int scale) {
+ if (mappings[pos] != -1 && main.isNullAt(pos)) {
+ return fallbackRow.getDecimal(mappings[pos], precision, scale);
+ }
+ return main.getDecimal(pos, precision, scale);
+ }
+
+ @Override
+ public Timestamp getTimestamp(int pos, int precision) {
+ if (mappings[pos] != -1 && main.isNullAt(pos)) {
+ return fallbackRow.getTimestamp(mappings[pos], precision);
+ }
+ return main.getTimestamp(pos, precision);
+ }
+
+ @Override
+ public byte[] getBinary(int pos) {
+ if (mappings[pos] != -1 && main.isNullAt(pos)) {
+ return fallbackRow.getBinary(mappings[pos]);
+ }
+ return main.getBinary(pos);
+ }
+
+ @Override
+ public Variant getVariant(int pos) {
+ if (mappings[pos] != -1 && main.isNullAt(pos)) {
+ return fallbackRow.getVariant(mappings[pos]);
+ }
+ return main.getVariant(pos);
+ }
+
+ @Override
+ public InternalArray getArray(int pos) {
+ if (mappings[pos] != -1 && main.isNullAt(pos)) {
+ return fallbackRow.getArray(mappings[pos]);
+ }
+ return main.getArray(pos);
+ }
+
+ @Override
+ public InternalMap getMap(int pos) {
+ if (mappings[pos] != -1 && main.isNullAt(pos)) {
+ return fallbackRow.getMap(mappings[pos]);
+ }
+ return main.getMap(pos);
+ }
+
+ @Override
+ public InternalRow getRow(int pos, int numFields) {
+ if (mappings[pos] != -1 && main.isNullAt(pos)) {
+ return fallbackRow.getRow(mappings[pos], numFields);
+ }
+ return main.getRow(pos, numFields);
+ }
+
+ public FallbackMappingRow replace(InternalRow main, InternalRow
rowLineage) {
+ this.main = main;
+ this.fallbackRow = rowLineage;
+ return this;
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
index 02bfe5912d..37f8d8edf2 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
@@ -23,12 +23,15 @@ import org.apache.paimon.data.PartitionInfo;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.utils.LongIterator;
import org.apache.paimon.utils.RecyclableIterator;
import org.apache.paimon.utils.VectorMappingUtils;
import javax.annotation.Nullable;
+import java.util.Map;
+
import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
@@ -120,4 +123,55 @@ public class ColumnarRowIterator extends
RecyclableIterator<InternalRow>
}
return this;
}
+
+ public ColumnarRowIterator assignRowLineage(
+ Long firstRowId, Long snapshotId, Map<String, Integer> meta) {
+ VectorizedColumnBatch vectorizedColumnBatch = row.batch();
+ ColumnVector[] vectors = vectorizedColumnBatch.columns;
+
+ if (meta.containsKey(SpecialFields.ROW_ID.name())) {
+ Integer index = meta.get(SpecialFields.ROW_ID.name());
+ final ColumnVector rowIdVector = vectors[index];
+ vectors[index] =
+ new LongColumnVector() {
+ @Override
+ public long getLong(int i) {
+ if (rowIdVector.isNullAt(i)) {
+ return firstRowId + returnedPosition();
+ } else {
+ return ((LongColumnVector)
rowIdVector).getLong(i);
+ }
+ }
+
+ @Override
+ public boolean isNullAt(int i) {
+ return false;
+ }
+ };
+ }
+
+ if (meta.containsKey(SpecialFields.SEQUENCE_NUMBER.name())) {
+ Integer index = meta.get(SpecialFields.SEQUENCE_NUMBER.name());
+ final ColumnVector versionVector = vectors[index];
+ vectors[index] =
+ new LongColumnVector() {
+ @Override
+ public long getLong(int i) {
+ if (versionVector.isNullAt(i)) {
+ return snapshotId;
+ } else {
+ return ((LongColumnVector)
versionVector).getLong(i);
+ }
+ }
+
+ @Override
+ public boolean isNullAt(int i) {
+ return false;
+ }
+ };
+ }
+
+ copy(vectors);
+ return this;
+ }
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/casting/FallbackMappingRowTest.java
b/paimon-common/src/test/java/org/apache/paimon/casting/FallbackMappingRowTest.java
new file mode 100644
index 0000000000..cd5c293f6c
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/casting/FallbackMappingRowTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.casting;
+
+import org.apache.paimon.data.GenericRow;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.paimon.types.DataTypesTest.assertThat;
+
+/** Test for {@link FallbackMappingRow}. */
+public class FallbackMappingRowTest {
+
+ @Test
+ public void testBasic() {
+ int[] project = new int[7];
+
+ GenericRow genericRow = new GenericRow(2);
+ genericRow.setField(0, 111L);
+ genericRow.setField(1, 222L);
+
+ project[0] = -1;
+ project[1] = -1;
+ project[2] = -1;
+ project[3] = -1;
+ project[4] = -1;
+ project[5] = 1;
+ project[6] = 0;
+
+ GenericRow mainRow = new GenericRow(7);
+ mainRow.setField(0, 0L);
+ mainRow.setField(1, 1L);
+ mainRow.setField(2, 2L);
+ mainRow.setField(3, 3L);
+ FallbackMappingRow fallbackMappingRow = new
FallbackMappingRow(project);
+ fallbackMappingRow.replace(mainRow, genericRow);
+
+ assertThat(fallbackMappingRow.getFieldCount()).isEqualTo(7);
+ assertThat(fallbackMappingRow.getLong(6)).isEqualTo(111L);
+ assertThat(fallbackMappingRow.getLong(5)).isEqualTo(222L);
+ assertThat(fallbackMappingRow.getLong(0)).isEqualTo(0L);
+ assertThat(fallbackMappingRow.getLong(1)).isEqualTo(1L);
+ assertThat(fallbackMappingRow.getLong(2)).isEqualTo(2L);
+ assertThat(fallbackMappingRow.getLong(3)).isEqualTo(3L);
+ assertThat(fallbackMappingRow.isNullAt(4)).isEqualTo(true);
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 94dbcbda9b..d998b290dd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -304,7 +304,8 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
options.commitTimeout(),
options.commitMinRetryWait(),
options.commitMaxRetryWait(),
- options.commitStrictModeLastSafeSnapshot().orElse(null));
+ options.commitStrictModeLastSafeSnapshot().orElse(null),
+ options.rowTrackingEnabled());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index a277ce6530..c81a44d426 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -80,7 +80,8 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
rowType,
FileFormatDiscover.of(options),
pathFactory(),
- options.fileIndexReadEnabled());
+ options.fileIndexReadEnabled(),
+ options.rowTrackingEnabled());
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/Changelog.java
b/paimon-core/src/main/java/org/apache/paimon/Changelog.java
index c0865cdd34..a715ab8234 100644
--- a/paimon-core/src/main/java/org/apache/paimon/Changelog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/Changelog.java
@@ -63,7 +63,8 @@ public class Changelog extends Snapshot {
snapshot.changelogRecordCount(),
snapshot.watermark(),
snapshot.statistics(),
- snapshot.properties);
+ snapshot.properties,
+ snapshot.nextRowId);
}
@JsonCreator
@@ -89,7 +90,8 @@ public class Changelog extends Snapshot {
@JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @Nullable Long
changelogRecordCount,
@JsonProperty(FIELD_WATERMARK) @Nullable Long watermark,
@JsonProperty(FIELD_STATISTICS) @Nullable String statistics,
- @JsonProperty(FIELD_PROPERTIES) Map<String, String> properties) {
+ @JsonProperty(FIELD_PROPERTIES) Map<String, String> properties,
+ @JsonProperty(FIELD_NEXT_ROW_ID) @Nullable Long nextRowId) {
super(
version,
id,
@@ -111,7 +113,8 @@ public class Changelog extends Snapshot {
changelogRecordCount,
watermark,
statistics,
- properties);
+ properties,
+ nextRowId);
}
public static Changelog fromJson(String json) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index a230e10d69..f7a743ace1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -131,7 +131,8 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
valueType,
FileFormatDiscover.of(options),
pathFactory(),
- options.fileIndexReadEnabled());
+ options.fileIndexReadEnabled(),
+ false);
}
public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index eb28759f93..8fa2aea2be 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -83,7 +83,8 @@ public class DataFileMeta {
16,
"_VALUE_STATS_COLS",
DataTypes.ARRAY(DataTypes.STRING().notNull())),
- new DataField(17, "_EXTERNAL_PATH",
newStringType(true))));
+ new DataField(17, "_EXTERNAL_PATH",
newStringType(true)),
+ new DataField(18, "_FIRST_ROW_ID", new
BigIntType(true))));
public static final BinaryRow EMPTY_MIN_KEY = EMPTY_ROW;
public static final BinaryRow EMPTY_MAX_KEY = EMPTY_ROW;
@@ -100,6 +101,7 @@ public class DataFileMeta {
private final SimpleStats keyStats;
private final SimpleStats valueStats;
+ // As for row-lineage table, this will be reassigned while committing
private final long minSequenceNumber;
private final long maxSequenceNumber;
private final long schemaId;
@@ -124,6 +126,8 @@ public class DataFileMeta {
/** external path of file, if it is null, it is in the default warehouse
path. */
private final @Nullable String externalPath;
+ private final @Nullable Long firstRowId;
+
public static DataFileMeta forAppend(
String fileName,
long fileSize,
@@ -136,7 +140,8 @@ public class DataFileMeta {
@Nullable byte[] embeddedIndex,
@Nullable FileSource fileSource,
@Nullable List<String> valueStatsCols,
- @Nullable String externalPath) {
+ @Nullable String externalPath,
+ @Nullable Long firstRowId) {
return new DataFileMeta(
fileName,
fileSize,
@@ -155,7 +160,8 @@ public class DataFileMeta {
embeddedIndex,
fileSource,
valueStatsCols,
- externalPath);
+ externalPath,
+ firstRowId);
}
public DataFileMeta(
@@ -175,7 +181,8 @@ public class DataFileMeta {
@Nullable byte[] embeddedIndex,
@Nullable FileSource fileSource,
@Nullable List<String> valueStatsCols,
- @Nullable String externalPath) {
+ @Nullable String externalPath,
+ @Nullable Long firstRowId) {
this(
fileName,
fileSize,
@@ -194,7 +201,8 @@ public class DataFileMeta {
embeddedIndex,
fileSource,
valueStatsCols,
- externalPath);
+ externalPath,
+ firstRowId);
}
public DataFileMeta(
@@ -212,7 +220,8 @@ public class DataFileMeta {
@Nullable Long deleteRowCount,
@Nullable byte[] embeddedIndex,
@Nullable FileSource fileSource,
- @Nullable List<String> valueStatsCols) {
+ @Nullable List<String> valueStatsCols,
+ @Nullable Long firstRowId) {
this(
fileName,
fileSize,
@@ -231,7 +240,8 @@ public class DataFileMeta {
embeddedIndex,
fileSource,
valueStatsCols,
- null);
+ null,
+ firstRowId);
}
public DataFileMeta(
@@ -252,7 +262,8 @@ public class DataFileMeta {
@Nullable byte[] embeddedIndex,
@Nullable FileSource fileSource,
@Nullable List<String> valueStatsCols,
- @Nullable String externalPath) {
+ @Nullable String externalPath,
+ @Nullable Long firstRowId) {
this.fileName = fileName;
this.fileSize = fileSize;
@@ -275,6 +286,7 @@ public class DataFileMeta {
this.fileSource = fileSource;
this.valueStatsCols = valueStatsCols;
this.externalPath = externalPath;
+ this.firstRowId = firstRowId;
}
public String fileName() {
@@ -387,6 +399,11 @@ public class DataFileMeta {
return valueStatsCols;
}
+ @Nullable
+ public Long firstRowId() {
+ return firstRowId;
+ }
+
public DataFileMeta upgrade(int newLevel) {
checkArgument(newLevel > this.level);
return new DataFileMeta(
@@ -407,7 +424,8 @@ public class DataFileMeta {
embeddedIndex,
fileSource,
valueStatsCols,
- externalPath);
+ externalPath,
+ firstRowId);
}
public DataFileMeta rename(String newFileName) {
@@ -430,7 +448,8 @@ public class DataFileMeta {
embeddedIndex,
fileSource,
valueStatsCols,
- newExternalPath);
+ newExternalPath,
+ firstRowId);
}
public DataFileMeta copyWithoutStats() {
@@ -452,7 +471,54 @@ public class DataFileMeta {
embeddedIndex,
fileSource,
Collections.emptyList(),
- externalPath);
+ externalPath,
+ firstRowId);
+ }
+
+ public DataFileMeta assignSequenceNumber(long minSequenceNumber, long
maxSequenceNumber) {
+ return new DataFileMeta(
+ fileName,
+ fileSize,
+ rowCount,
+ minKey,
+ maxKey,
+ keyStats,
+ valueStats,
+ minSequenceNumber,
+ maxSequenceNumber,
+ schemaId,
+ level,
+ extraFiles,
+ creationTime,
+ deleteRowCount,
+ embeddedIndex,
+ fileSource,
+ valueStatsCols,
+ externalPath,
+ firstRowId);
+ }
+
+ public DataFileMeta assignFirstRowId(long firstRowId) {
+ return new DataFileMeta(
+ fileName,
+ fileSize,
+ rowCount,
+ minKey,
+ maxKey,
+ keyStats,
+ valueStats,
+ minSequenceNumber,
+ maxSequenceNumber,
+ schemaId,
+ level,
+ extraFiles,
+ creationTime,
+ deleteRowCount,
+ embeddedIndex,
+ fileSource,
+ valueStatsCols,
+ externalPath,
+ firstRowId);
}
public List<Path> collectFiles(DataFilePathFactory pathFactory) {
@@ -481,7 +547,8 @@ public class DataFileMeta {
embeddedIndex,
fileSource,
valueStatsCols,
- externalPath);
+ externalPath,
+ firstRowId);
}
public DataFileMeta newExternalPath(String newExternalPath) {
@@ -503,7 +570,8 @@ public class DataFileMeta {
embeddedIndex,
fileSource,
valueStatsCols,
- newExternalPath);
+ newExternalPath,
+ firstRowId);
}
public DataFileMeta copy(byte[] newEmbeddedIndex) {
@@ -525,7 +593,8 @@ public class DataFileMeta {
newEmbeddedIndex,
fileSource,
valueStatsCols,
- externalPath);
+ externalPath,
+ firstRowId);
}
@Override
@@ -554,7 +623,8 @@ public class DataFileMeta {
&& Objects.equals(deleteRowCount, that.deleteRowCount)
&& Objects.equals(fileSource, that.fileSource)
&& Objects.equals(valueStatsCols, that.valueStatsCols)
- && Objects.equals(externalPath, that.externalPath);
+ && Objects.equals(externalPath, that.externalPath)
+ && Objects.equals(firstRowId, that.firstRowId);
}
@Override
@@ -577,7 +647,8 @@ public class DataFileMeta {
deleteRowCount,
fileSource,
valueStatsCols,
- externalPath);
+ externalPath,
+ firstRowId);
}
@Override
@@ -587,7 +658,7 @@ public class DataFileMeta {
+ "minKey: %s, maxKey: %s, keyStats: %s, valueStats:
%s, "
+ "minSequenceNumber: %d, maxSequenceNumber: %d, "
+ "schemaId: %d, level: %d, extraFiles: %s,
creationTime: %s, "
- + "deleteRowCount: %d, fileSource: %s, valueStatsCols:
%s, externalPath: %s}",
+ + "deleteRowCount: %d, fileSource: %s, valueStatsCols:
%s, externalPath: %s, firstRowId: %s}",
fileName,
fileSize,
rowCount,
@@ -605,7 +676,8 @@ public class DataFileMeta {
deleteRowCount,
fileSource,
valueStatsCols,
- externalPath);
+ externalPath,
+ firstRowId);
}
public static long getMaxSequenceNumber(List<DataFileMeta> fileMetas) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
index e6c10f1534..029d2cca55 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
@@ -134,6 +134,7 @@ public class DataFileMeta08Serializer implements
Serializable {
row.isNullAt(14) ? null : row.getBinary(14),
null,
null,
+ null,
null);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java
index 36d1ad260f..5c238b86bf 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java
@@ -140,6 +140,7 @@ public class DataFileMeta09Serializer implements
Serializable {
row.isNullAt(14) ? null : row.getBinary(14),
row.isNullAt(15) ? null :
FileSource.fromByteValue(row.getByte(15)),
null,
+ null,
null);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
index 518db7c658..03e4f1a189 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
@@ -145,6 +145,7 @@ public class DataFileMeta10LegacySerializer implements
Serializable {
row.isNullAt(14) ? null : row.getBinary(14),
row.isNullAt(15) ? null :
FileSource.fromByteValue(row.getByte(15)),
row.isNullAt(16) ? null :
fromStringArrayData(row.getArray(16)),
+ null,
null);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta12LegacySerializer.java
similarity index 93%
copy from
paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
copy to
paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta12LegacySerializer.java
index 518db7c658..b372467d26 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta12LegacySerializer.java
@@ -46,8 +46,8 @@ import static
org.apache.paimon.utils.SerializationUtils.newBytesType;
import static org.apache.paimon.utils.SerializationUtils.newStringType;
import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
-/** Serializer for {@link DataFileMeta} with 1.0 snapshot version. */
-public class DataFileMeta10LegacySerializer implements Serializable {
+/** Serializer for {@link DataFileMeta} with 1.2 snapshot version. */
+public class DataFileMeta12LegacySerializer implements Serializable {
private static final long serialVersionUID = 1L;
@@ -75,11 +75,12 @@ public class DataFileMeta10LegacySerializer implements
Serializable {
new DataField(
16,
"_VALUE_STATS_COLS",
-
DataTypes.ARRAY(DataTypes.STRING().notNull()))));
+
DataTypes.ARRAY(DataTypes.STRING().notNull())),
+ new DataField(17, "_EXTERNAL_PATH",
newStringType(true))));
protected final InternalRowSerializer rowSerializer;
- public DataFileMeta10LegacySerializer() {
+ public DataFileMeta12LegacySerializer() {
this.rowSerializer = InternalSerializers.create(SCHEMA);
}
@@ -110,7 +111,8 @@ public class DataFileMeta10LegacySerializer implements
Serializable {
meta.deleteRowCount().orElse(null),
meta.embeddedIndex(),
meta.fileSource().map(FileSource::toByteValue).orElse(null),
- toStringArrayData(meta.valueStatsCols()));
+ toStringArrayData(meta.valueStatsCols()),
+
meta.fileSource().map(FileSource::toByteValue).orElse(null));
rowSerializer.serialize(row, target);
}
@@ -145,6 +147,7 @@ public class DataFileMeta10LegacySerializer implements
Serializable {
row.isNullAt(14) ? null : row.getBinary(14),
row.isNullAt(15) ? null :
FileSource.fromByteValue(row.getByte(15)),
row.isNullAt(16) ? null :
fromStringArrayData(row.getArray(16)),
+ row.isNullAt(17) ? null : row.getString(17).toString(),
null);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
index a316f897ff..7eac729586 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
@@ -59,7 +59,8 @@ public class DataFileMetaSerializer extends
ObjectSerializer<DataFileMeta> {
meta.embeddedIndex(),
meta.fileSource().map(FileSource::toByteValue).orElse(null),
toStringArrayData(meta.valueStatsCols()),
-
meta.externalPath().map(BinaryString::fromString).orElse(null));
+ meta.externalPath().map(BinaryString::fromString).orElse(null),
+ meta.firstRowId());
}
@Override
@@ -82,6 +83,7 @@ public class DataFileMetaSerializer extends
ObjectSerializer<DataFileMeta> {
row.isNullAt(14) ? null : row.getBinary(14),
row.isNullAt(15) ? null :
FileSource.fromByteValue(row.getByte(15)),
row.isNullAt(16) ? null :
fromStringArrayData(row.getArray(16)),
- row.isNullAt(17) ? null : row.getString(17).toString());
+ row.isNullAt(17) ? null : row.getString(17).toString(),
+ row.isNullAt(18) ? null : row.getLong(18));
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
index 16fad55a49..f7b283daf0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
@@ -21,34 +21,51 @@ package org.apache.paimon.io;
import org.apache.paimon.PartitionSettedRow;
import org.apache.paimon.casting.CastFieldGetter;
import org.apache.paimon.casting.CastedRow;
+import org.apache.paimon.casting.FallbackMappingRow;
+import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.PartitionInfo;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.ProjectedRow;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
/** Reads {@link InternalRow} from data files. */
public class DataFileRecordReader implements FileRecordReader<InternalRow> {
+ private final RowType tableRowType;
private final FileRecordReader<InternalRow> reader;
@Nullable private final int[] indexMapping;
@Nullable private final PartitionInfo partitionInfo;
@Nullable private final CastFieldGetter[] castMapping;
+ private final boolean rowLineageEnabled;
+ @Nullable private final Long firstRowId;
+ private final long maxSequenceNumber;
+ private final Map<String, Integer> systemFields;
public DataFileRecordReader(
+ RowType tableRowType,
FormatReaderFactory readerFactory,
FormatReaderFactory.Context context,
@Nullable int[] indexMapping,
@Nullable CastFieldGetter[] castMapping,
- @Nullable PartitionInfo partitionInfo)
+ @Nullable PartitionInfo partitionInfo,
+ boolean rowLineageEnabled,
+ @Nullable Long firstRowId,
+ long maxSequenceNumber,
+ Map<String, Integer> systemFields)
throws IOException {
+ this.tableRowType = tableRowType;
try {
this.reader = readerFactory.createReader(context);
} catch (Exception e) {
@@ -58,6 +75,10 @@ public class DataFileRecordReader implements
FileRecordReader<InternalRow> {
this.indexMapping = indexMapping;
this.partitionInfo = partitionInfo;
this.castMapping = castMapping;
+ this.rowLineageEnabled = rowLineageEnabled;
+ this.firstRowId = firstRowId;
+ this.maxSequenceNumber = maxSequenceNumber;
+ this.systemFields = systemFields;
}
@Nullable
@@ -70,6 +91,11 @@ public class DataFileRecordReader implements
FileRecordReader<InternalRow> {
if (iterator instanceof ColumnarRowIterator) {
iterator = ((ColumnarRowIterator) iterator).mapping(partitionInfo,
indexMapping);
+ if (rowLineageEnabled) {
+ iterator =
+ ((ColumnarRowIterator) iterator)
+ .assignRowLineage(firstRowId,
maxSequenceNumber, systemFields);
+ }
} else {
if (partitionInfo != null) {
final PartitionSettedRow partitionSettedRow =
@@ -81,6 +107,36 @@ public class DataFileRecordReader implements
FileRecordReader<InternalRow> {
final ProjectedRow projectedRow =
ProjectedRow.from(indexMapping);
iterator = iterator.transform(projectedRow::replaceRow);
}
+
+ if (rowLineageEnabled && !systemFields.isEmpty()) {
+ GenericRow lineageRow = new GenericRow(2);
+
+ int[] fallbackToLineageMappings = new
int[tableRowType.getFieldCount()];
+ Arrays.fill(fallbackToLineageMappings, -1);
+
+ if (systemFields.containsKey(SpecialFields.ROW_ID.name())) {
+
fallbackToLineageMappings[systemFields.get(SpecialFields.ROW_ID.name())] = 0;
+ }
+ if
(systemFields.containsKey(SpecialFields.SEQUENCE_NUMBER.name())) {
+ fallbackToLineageMappings[
+
systemFields.get(SpecialFields.SEQUENCE_NUMBER.name())] =
+ 1;
+ }
+
+ FallbackMappingRow fallbackMappingRow =
+ new FallbackMappingRow(fallbackToLineageMappings);
+ final FileRecordIterator<InternalRow> iteratorInner = iterator;
+ iterator =
+ iterator.transform(
+ row -> {
+ if (firstRowId != null) {
+ lineageRow.setField(
+ 0,
iteratorInner.returnedPosition() + firstRowId);
+ }
+ lineageRow.setField(1, maxSequenceNumber);
+ return fallbackMappingRow.replace(row,
lineageRow);
+ });
+ }
}
if (castMapping != null) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
index 76f4ebd3a2..077b90df36 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
@@ -178,7 +178,8 @@ public abstract class KeyValueDataFileWriter
indexResult.embeddedIndexBytes(),
fileSource,
valueStatsPair.getKey(),
- externalPath);
+ externalPath,
+ null);
}
abstract Pair<SimpleColStats[], SimpleColStats[]>
fetchKeyValueStats(SimpleColStats[] rowStats);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 14221d50be..33dadec1a5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -127,6 +127,7 @@ public class KeyValueFileReaderFactory implements
FileReaderFactory<KeyValue> {
long fileSize = file.fileSize();
FileRecordReader<InternalRow> fileRecordReader =
new DataFileRecordReader(
+ schema.logicalRowType(),
formatReaderMapping.getReaderFactory(),
orcPoolSize == null
? new FormatReaderContext(fileIO, filePath,
fileSize)
@@ -134,7 +135,11 @@ public class KeyValueFileReaderFactory implements
FileReaderFactory<KeyValue> {
fileIO, filePath, fileSize,
orcPoolSize),
formatReaderMapping.getIndexMapping(),
formatReaderMapping.getCastMapping(),
-
PartitionUtils.create(formatReaderMapping.getPartitionPair(), partition));
+
PartitionUtils.create(formatReaderMapping.getPartitionPair(), partition),
+ false,
+ null,
+ -1,
+ Collections.emptyMap());
Optional<DeletionVector> deletionVector =
dvFactory.create(file.fileName());
if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {
@@ -267,7 +272,7 @@ public class KeyValueFileReaderFactory implements
FileReaderFactory<KeyValue> {
finalReadKeyType,
readValueType,
new FormatReaderMapping.Builder(
- formatDiscover, readTableFields, fieldsExtractor,
filters),
+ formatDiscover, readTableFields, fieldsExtractor,
filters, false),
pathFactory.createDataFilePathFactory(partition, bucket),
options.fileReaderAsyncThreshold().getBytes(),
partition,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
index 432845de32..b3f9d9ecff 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
@@ -116,6 +116,7 @@ public class RowDataFileWriter extends
StatsCollectingSingleFileWriter<InternalR
indexResult.embeddedIndexBytes(),
fileSource,
statsPair.getKey(),
- externalPath);
+ externalPath,
+ null);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
index 75ab52ac75..0de753f166 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
@@ -140,6 +140,20 @@ public class ManifestEntry implements FileEntry {
return new ManifestEntry(kind, partition, bucket, totalBuckets,
file.copyWithoutStats());
}
+ public ManifestEntry assignSequenceNumber(long minSequenceNumber, long
maxSequenceNumber) {
+ return new ManifestEntry(
+ kind,
+ partition,
+ bucket,
+ totalBuckets,
+ file.assignSequenceNumber(minSequenceNumber,
maxSequenceNumber));
+ }
+
+ public ManifestEntry assignFirstRowId(long firstRowId) {
+ return new ManifestEntry(
+ kind, partition, bucket, totalBuckets,
file.assignFirstRowId(firstRowId));
+ }
+
@Override
public boolean equals(Object o) {
if (!(o instanceof ManifestEntry)) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
index 42b349d7d1..8e10798acb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
@@ -218,6 +218,7 @@ public class FileMetaUtils {
null,
FileSource.APPEND,
null,
+ null,
null);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 743cf36eeb..3a3d00263b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -29,6 +29,7 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.manifest.FileEntry;
import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.IndexManifestFile;
import org.apache.paimon.manifest.ManifestCommittable;
@@ -92,6 +93,7 @@ import static
org.apache.paimon.manifest.ManifestEntry.recordCountDelete;
import static
org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static
org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
* Default implementation of {@link FileStoreCommit}.
@@ -147,6 +149,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
private final int commitMaxRetries;
@Nullable private Long strictModeLastSafeSnapshot;
private final InternalRowPartitionComputer partitionComputer;
+ private final boolean rowTrackingEnabled;
private boolean ignoreEmptyCommit;
private CommitMetrics commitMetrics;
@@ -182,7 +185,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
long commitTimeout,
long commitMinRetryWait,
long commitMaxRetryWait,
- @Nullable Long strictModeLastSafeSnapshot) {
+ @Nullable Long strictModeLastSafeSnapshot,
+ boolean rowTrackingEnabled) {
this.snapshotCommit = snapshotCommit;
this.fileIO = fileIO;
this.schemaManager = schemaManager;
@@ -225,6 +229,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
this.commitMetrics = null;
this.statsFileHandler = statsFileHandler;
this.bucketMode = bucketMode;
+ this.rowTrackingEnabled = rowTrackingEnabled;
}
@Override
@@ -247,7 +252,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
}
for (int i = 1; i < committables.size(); i++) {
- Preconditions.checkArgument(
+ checkArgument(
committables.get(i).identifier() > committables.get(i -
1).identifier(),
"Committables must be sorted according to identifiers
before filtering. This is unexpected.");
}
@@ -549,7 +554,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
@Override
public void dropPartitions(List<Map<String, String>> partitions, long
commitIdentifier) {
- Preconditions.checkArgument(!partitions.isEmpty(), "Partitions list
cannot be empty.");
+ checkArgument(!partitions.isEmpty(), "Partitions list cannot be
empty.");
if (LOG.isDebugEnabled()) {
LOG.debug(
@@ -917,6 +922,10 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
long newSnapshotId =
latestSnapshot == null ? Snapshot.FIRST_SNAPSHOT_ID :
latestSnapshot.id() + 1;
+ long firstRowIdStart =
+ latestSnapshot == null
+ ? 0L
+ : latestSnapshot.nextRowId() == null ? 0L :
latestSnapshot.nextRowId();
if (strictModeLastSafeSnapshot != null && strictModeLastSafeSnapshot
>= 0) {
for (long id = strictModeLastSafeSnapshot + 1; id < newSnapshotId;
id++) {
@@ -989,6 +998,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
String indexManifest = null;
List<ManifestFileMeta> mergeBeforeManifests = new ArrayList<>();
List<ManifestFileMeta> mergeAfterManifests = new ArrayList<>();
+ long nextRowIdStart = firstRowIdStart;
try {
long previousTotalRecordCount = 0L;
Long currentWatermark = watermark;
@@ -1024,6 +1034,17 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
manifestReadParallelism);
baseManifestList = manifestList.write(mergeAfterManifests);
+ if (rowTrackingEnabled) {
+ // assigned snapshot id to delta files
+ List<ManifestEntry> snapshotAssigned = new ArrayList<>();
+ assignSnapshotId(newSnapshotId, deltaFiles, snapshotAssigned);
+ // assign row id for new files
+ List<ManifestEntry> rowIdAssigned = new ArrayList<>();
+ nextRowIdStart =
+ assignRowLineageMeta(firstRowIdStart,
snapshotAssigned, rowIdAssigned);
+ deltaFiles = rowIdAssigned;
+ }
+
// the added records subtract the deleted records from
long deltaRecordCount = recordCountAdd(deltaFiles) -
recordCountDelete(deltaFiles);
long totalRecordCount = previousTotalRecordCount +
deltaRecordCount;
@@ -1083,7 +1104,8 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
currentWatermark,
statsFileName,
// if empty properties, just set to null
- properties.isEmpty() ? null : properties);
+ properties.isEmpty() ? null : properties,
+ nextRowIdStart);
} catch (Throwable e) {
// fails when preparing for commit, we should clean up
cleanUpReuseTmpManifests(
@@ -1133,10 +1155,41 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
if (strictModeLastSafeSnapshot != null) {
strictModeLastSafeSnapshot = newSnapshot.id();
}
- commitCallbacks.forEach(callback -> callback.call(deltaFiles,
indexFiles, newSnapshot));
+ final List<ManifestEntry> finalDeltaFiels = deltaFiles;
+ commitCallbacks.forEach(
+ callback -> callback.call(finalDeltaFiels, indexFiles,
newSnapshot));
return new SuccessResult();
}
+ private long assignRowLineageMeta(
+ long firstRowIdStart,
+ List<ManifestEntry> deltaFiles,
+ List<ManifestEntry> rowIdAssigned) {
+ if (deltaFiles.isEmpty()) {
+ return firstRowIdStart;
+ }
+ // assign row id for new files
+ long start = firstRowIdStart;
+ for (ManifestEntry entry : deltaFiles) {
+ checkArgument(
+ entry.file().fileSource().isPresent(),
+ "This is a bug, file source field for row-tracking table
must present.");
+ if (entry.file().fileSource().get().equals(FileSource.APPEND)) {
+ long rowCount = entry.file().rowCount();
+ rowIdAssigned.add(entry.assignFirstRowId(start));
+ start += rowCount;
+ }
+ }
+ return start;
+ }
+
+ private void assignSnapshotId(
+ long snapshotId, List<ManifestEntry> deltaFiles,
List<ManifestEntry> snapshotAssigned) {
+ for (ManifestEntry entry : deltaFiles) {
+ snapshotAssigned.add(entry.assignSequenceNumber(snapshotId,
snapshotId));
+ }
+ }
+
public void compactManifest() {
int retryCount = 0;
long startMillis = System.currentTimeMillis();
@@ -1211,7 +1264,8 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
0L,
latestSnapshot.watermark(),
latestSnapshot.statistics(),
- latestSnapshot.properties());
+ latestSnapshot.properties(),
+ latestSnapshot.nextRowId());
return commitSnapshotImpl(newSnapshot, emptyList());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index a6690ef660..d25910dda3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -79,6 +79,7 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
private final FileStorePathFactory pathFactory;
private final Map<FormatKey, FormatReaderMapping> formatReaderMappings;
private final boolean fileIndexReadEnabled;
+ private final boolean rowTrackingEnabled;
private RowType readRowType;
@Nullable private List<Predicate> filters;
@@ -90,7 +91,8 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
RowType rowType,
FileFormatDiscover formatDiscover,
FileStorePathFactory pathFactory,
- boolean fileIndexReadEnabled) {
+ boolean fileIndexReadEnabled,
+ boolean rowTrackingEnabled) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schema = schema;
@@ -98,6 +100,7 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
this.pathFactory = pathFactory;
this.formatReaderMappings = new HashMap<>();
this.fileIndexReadEnabled = fileIndexReadEnabled;
+ this.rowTrackingEnabled = rowTrackingEnabled;
this.readRowType = rowType;
}
@@ -153,7 +156,12 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
List<DataField> readTableFields = readRowType.getFields();
Builder formatReaderMappingBuilder =
- new Builder(formatDiscover, readTableFields,
TableSchema::fields, filters);
+ new Builder(
+ formatDiscover,
+ readTableFields,
+ TableSchema::fields,
+ filters,
+ rowTrackingEnabled);
for (int i = 0; i < files.size(); i++) {
DataFileMeta file = files.get(i);
@@ -235,11 +243,16 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
fileIO, dataFilePathFactory.toPath(file),
file.fileSize(), selection);
FileRecordReader<InternalRow> fileRecordReader =
new DataFileRecordReader(
+ schema.logicalRowType(),
formatReaderMapping.getReaderFactory(),
formatReaderContext,
formatReaderMapping.getIndexMapping(),
formatReaderMapping.getCastMapping(),
-
PartitionUtils.create(formatReaderMapping.getPartitionPair(), partition));
+
PartitionUtils.create(formatReaderMapping.getPartitionPair(), partition),
+ rowTrackingEnabled,
+ file.firstRowId(),
+ file.maxSequenceNumber(),
+ formatReaderMapping.getSystemFields());
if (fileIndexResult instanceof BitmapIndexResult) {
fileRecordReader =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 6c2d12e222..f0603d4d53 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -63,6 +63,7 @@ import static
org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
import static org.apache.paimon.CoreOptions.INCREMENTAL_TO_AUTO_TAG;
+import static org.apache.paimon.CoreOptions.PRIMARY_KEY;
import static org.apache.paimon.CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS;
import static org.apache.paimon.CoreOptions.SCAN_MODE;
import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
@@ -236,6 +237,8 @@ public class SchemaValidation {
}
validateMergeFunctionFactory(schema);
+
+ validateRowLineage(schema, options);
}
public static void validateFallbackBranch(SchemaManager schemaManager,
TableSchema schema) {
@@ -630,4 +633,17 @@ public class SchemaValidation {
}
}
}
+
+ private static void validateRowLineage(TableSchema schema, CoreOptions
options) {
+ if (options.rowTrackingEnabled()) {
+ checkArgument(
+ options.bucket() == -1,
+ "Cannot define %s for row lineage table, it only support
bucket = -1",
+ CoreOptions.BUCKET.key());
+ checkArgument(
+ schema.primaryKeys().isEmpty(),
+ "Cannot define %s for row lineage table.",
+ PRIMARY_KEY.key());
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
index fe1bc6adb9..64bd218167 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
@@ -157,6 +157,7 @@ public class CommitMessageLegacyV2Serializer {
null,
null,
null,
+ null,
null);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
index 4d91359ec0..47c69855cb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
@@ -27,6 +27,7 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMeta08Serializer;
import org.apache.paimon.io.DataFileMeta09Serializer;
import org.apache.paimon.io.DataFileMeta10LegacySerializer;
+import org.apache.paimon.io.DataFileMeta12LegacySerializer;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.DataInputDeserializer;
@@ -48,11 +49,12 @@ import static
org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
/** {@link VersionedSerializer} for {@link CommitMessage}. */
public class CommitMessageSerializer implements
VersionedSerializer<CommitMessage> {
- private static final int CURRENT_VERSION = 7;
+ private static final int CURRENT_VERSION = 8;
private final DataFileMetaSerializer dataFileSerializer;
private final IndexFileMetaSerializer indexEntrySerializer;
+ private DataFileMeta12LegacySerializer dataFileMeta12LegacySerializer;
private DataFileMeta10LegacySerializer dataFileMeta10LegacySerializer;
private DataFileMeta09Serializer dataFile09Serializer;
private DataFileMeta08Serializer dataFile08Serializer;
@@ -142,8 +144,13 @@ public class CommitMessageSerializer implements
VersionedSerializer<CommitMessag
private IOExceptionSupplier<List<DataFileMeta>> fileDeserializer(
int version, DataInputView view) {
- if (version >= 6) {
+ if (version >= 8) {
return () -> dataFileSerializer.deserializeList(view);
+ } else if (version == 6 || version == 7) {
+ if (dataFileMeta12LegacySerializer == null) {
+ dataFileMeta12LegacySerializer = new
DataFileMeta12LegacySerializer();
+ }
+ return () -> dataFileMeta12LegacySerializer.deserializeList(view);
} else if (version == 4 || version == 5) {
if (dataFileMeta10LegacySerializer == null) {
dataFileMeta10LegacySerializer = new
DataFileMeta10LegacySerializer();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index f4491618d8..1feb72afa3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -24,6 +24,7 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMeta08Serializer;
import org.apache.paimon.io.DataFileMeta09Serializer;
import org.apache.paimon.io.DataFileMeta10LegacySerializer;
+import org.apache.paimon.io.DataFileMeta12LegacySerializer;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.io.DataInputView;
import org.apache.paimon.io.DataInputViewStreamWrapper;
@@ -58,7 +59,7 @@ public class DataSplit implements Split {
private static final long serialVersionUID = 7L;
private static final long MAGIC = -2394839472490812314L;
- private static final int VERSION = 6;
+ private static final int VERSION = 7;
private long snapshotId = 0;
private BinaryRow partition;
@@ -426,7 +427,10 @@ public class DataSplit implements Split {
} else if (version == 3 || version == 4) {
DataFileMeta10LegacySerializer serializer = new
DataFileMeta10LegacySerializer();
return serializer::deserialize;
- } else if (version >= 5) {
+ } else if (version == 5 || version == 6) {
+ DataFileMeta12LegacySerializer serializer = new
DataFileMeta12LegacySerializer();
+ return serializer::deserialize;
+ } else if (version >= 7) {
DataFileMetaSerializer serializer = new DataFileMetaSerializer();
return serializer::deserialize;
} else {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index 83bfbe831e..eb18e679f5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -464,7 +464,9 @@ public class FilesTable implements ReadonlyTable {
lazyNullValueCounts = new TreeMap<>();
lazyLowerValueBounds = new TreeMap<>();
lazyUpperValueBounds = new TreeMap<>();
- for (int i = 0; i < min.getFieldCount(); i++) {
+ int length =
+ Math.min(min.getFieldCount(),
simpleStatsEvolutions.tableDataFields().size());
+ for (int i = 0; i < length; i++) {
DataField field =
simpleStatsEvolutions.tableDataFields().get(i);
String name = field.name();
DataType type = field.type();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/RowLineageTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/RowLineageTable.java
new file mode 100644
index 0000000000..c428df607f
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/RowLineageTable.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.consumer.ConsumerManager;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.DataTable;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.ReadonlyTable;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.ChangelogManager;
+import org.apache.paimon.utils.SimpleFileReader;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
+
+/** A {@link Table} for reading row id of table. */
+public class RowLineageTable implements DataTable, ReadonlyTable {
+
+ public static final String ROW_LINEAGE = "row_lineage";
+
+ private final FileStoreTable wrapped;
+
+ public RowLineageTable(FileStoreTable wrapped) {
+ this.wrapped = wrapped;
+
+ if (!coreOptions().rowTrackingEnabled()) {
+ throw new IllegalArgumentException(
+ "Table "
+ + wrapped.name()
+ + " does not support row tracking, "
+ + "please enable 'row-tracking.enabled' option to
use 'with_metadata' table.");
+ }
+ }
+
+ @Override
+ public Optional<Snapshot> latestSnapshot() {
+ return wrapped.latestSnapshot();
+ }
+
+ @Override
+ public Snapshot snapshot(long snapshotId) {
+ return wrapped.snapshot(snapshotId);
+ }
+
+ @Override
+ public SimpleFileReader<ManifestFileMeta> manifestListReader() {
+ return wrapped.manifestListReader();
+ }
+
+ @Override
+ public SimpleFileReader<ManifestEntry> manifestFileReader() {
+ return wrapped.manifestFileReader();
+ }
+
+ @Override
+ public SimpleFileReader<IndexManifestEntry> indexManifestFileReader() {
+ return wrapped.indexManifestFileReader();
+ }
+
+ @Override
+ public String name() {
+ return wrapped.name() + SYSTEM_TABLE_SPLITTER + ROW_LINEAGE;
+ }
+
+ @Override
+ public RowType rowType() {
+ List<DataField> fields = new
ArrayList<>(wrapped.rowType().getFields());
+ fields.add(SpecialFields.ROW_ID);
+ fields.add(SpecialFields.SEQUENCE_NUMBER);
+ return new RowType(fields);
+ }
+
+ @Override
+ public List<String> partitionKeys() {
+ return wrapped.partitionKeys();
+ }
+
+ @Override
+ public Map<String, String> options() {
+ return wrapped.options();
+ }
+
+ @Override
+ public List<String> primaryKeys() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public SnapshotReader newSnapshotReader() {
+ return wrapped.newSnapshotReader();
+ }
+
+ @Override
+ public DataTableScan newScan() {
+ return wrapped.newScan();
+ }
+
+ @Override
+ public StreamDataTableScan newStreamScan() {
+ return wrapped.newStreamScan();
+ }
+
+ @Override
+ public CoreOptions coreOptions() {
+ return wrapped.coreOptions();
+ }
+
+ @Override
+ public Path location() {
+ return wrapped.location();
+ }
+
+ @Override
+ public SnapshotManager snapshotManager() {
+ return wrapped.snapshotManager();
+ }
+
+ @Override
+ public ChangelogManager changelogManager() {
+ return wrapped.changelogManager();
+ }
+
+ @Override
+ public ConsumerManager consumerManager() {
+ return wrapped.consumerManager();
+ }
+
+ @Override
+ public SchemaManager schemaManager() {
+ return wrapped.schemaManager();
+ }
+
+ @Override
+ public TagManager tagManager() {
+ return wrapped.tagManager();
+ }
+
+ @Override
+ public BranchManager branchManager() {
+ return wrapped.branchManager();
+ }
+
+ @Override
+ public DataTable switchToBranch(String branchName) {
+ return new RowLineageTable(wrapped.switchToBranch(branchName));
+ }
+
+ @Override
+ public InnerTableRead newRead() {
+ return wrapped.newRead().withReadType(rowType());
+ }
+
+ @Override
+ public Table copy(Map<String, String> dynamicOptions) {
+ return new RowLineageTable(wrapped.copy(dynamicOptions));
+ }
+
+ @Override
+ public FileIO fileIO() {
+ return wrapped.fileIO();
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index 57c3c2caac..69f7a65ddf 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -45,6 +45,7 @@ import static
org.apache.paimon.table.system.ManifestsTable.MANIFESTS;
import static org.apache.paimon.table.system.OptionsTable.OPTIONS;
import static org.apache.paimon.table.system.PartitionsTable.PARTITIONS;
import static org.apache.paimon.table.system.ReadOptimizedTable.READ_OPTIMIZED;
+import static org.apache.paimon.table.system.RowLineageTable.ROW_LINEAGE;
import static org.apache.paimon.table.system.SchemasTable.SCHEMAS;
import static org.apache.paimon.table.system.SnapshotsTable.SNAPSHOTS;
import static org.apache.paimon.table.system.StatisticTable.STATISTICS;
@@ -72,6 +73,7 @@ public class SystemTableLoader {
.put(STATISTICS, StatisticTable::new)
.put(BINLOG, BinlogTable::new)
.put(TABLE_INDEXES, TableIndexesTable::new)
+ .put(ROW_LINEAGE, RowLineageTable::new)
.build();
public static final List<String> SYSTEM_TABLES = new
ArrayList<>(SYSTEM_TABLE_LOADERS.keySet());
diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java
b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java
index c7db185068..73a2e19cc8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java
@@ -80,6 +80,7 @@ public class Tag extends Snapshot {
@JsonProperty(FIELD_WATERMARK) @Nullable Long watermark,
@JsonProperty(FIELD_STATISTICS) @Nullable String statistics,
@JsonProperty(FIELD_PROPERTIES) Map<String, String> properties,
+ @JsonProperty(FIELD_NEXT_ROW_ID) @Nullable Long nextRowId,
@JsonProperty(FIELD_TAG_CREATE_TIME) @Nullable LocalDateTime
tagCreateTime,
@JsonProperty(FIELD_TAG_TIME_RETAINED) @Nullable Duration
tagTimeRetained) {
super(
@@ -103,7 +104,8 @@ public class Tag extends Snapshot {
changelogRecordCount,
watermark,
statistics,
- properties);
+ properties,
+ nextRowId);
this.tagCreateTime = tagCreateTime;
this.tagTimeRetained = tagTimeRetained;
}
@@ -147,6 +149,7 @@ public class Tag extends Snapshot {
snapshot.watermark(),
snapshot.statistics(),
snapshot.properties(),
+ snapshot.nextRowId(),
tagCreateTime,
tagTimeRetained);
}
@@ -173,7 +176,8 @@ public class Tag extends Snapshot {
changelogRecordCount,
watermark,
statistics,
- properties);
+ properties,
+ nextRowId);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
index 341199d1bd..752ea3b377 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
@@ -59,6 +59,7 @@ public class FormatReaderMapping {
private final FormatReaderFactory readerFactory;
private final TableSchema dataSchema;
private final List<Predicate> dataFilters;
+ private final Map<String, Integer> systemFields;
public FormatReaderMapping(
@Nullable int[] indexMapping,
@@ -67,13 +68,15 @@ public class FormatReaderMapping {
@Nullable Pair<int[], RowType> partitionPair,
FormatReaderFactory readerFactory,
TableSchema dataSchema,
- List<Predicate> dataFilters) {
+ List<Predicate> dataFilters,
+ Map<String, Integer> systemFields) {
this.indexMapping = combine(indexMapping, trimmedKeyMapping);
this.castMapping = castMapping;
this.readerFactory = readerFactory;
this.partitionPair = partitionPair;
this.dataSchema = dataSchema;
this.dataFilters = dataFilters;
+ this.systemFields = systemFields;
}
private int[] combine(@Nullable int[] indexMapping, @Nullable int[]
trimmedKeyMapping) {
@@ -111,6 +114,10 @@ public class FormatReaderMapping {
return partitionPair;
}
+ public Map<String, Integer> getSystemFields() {
+ return systemFields;
+ }
+
public FormatReaderFactory getReaderFactory() {
return readerFactory;
}
@@ -130,16 +137,19 @@ public class FormatReaderMapping {
private final List<DataField> readTableFields;
private final Function<TableSchema, List<DataField>> fieldsExtractor;
@Nullable private final List<Predicate> filters;
+ private final boolean rowTrackingEnabled;
public Builder(
FileFormatDiscover formatDiscover,
List<DataField> readTableFields,
Function<TableSchema, List<DataField>> fieldsExtractor,
- @Nullable List<Predicate> filters) {
+ @Nullable List<Predicate> filters,
+ boolean rowTrackingEnabled) {
this.formatDiscover = formatDiscover;
this.readTableFields = readTableFields;
this.fieldsExtractor = fieldsExtractor;
this.filters = filters;
+ this.rowTrackingEnabled = rowTrackingEnabled;
}
/**
@@ -165,8 +175,13 @@ public class FormatReaderMapping {
// extract the whole data fields in logic.
List<DataField> allDataFields = fieldsExtractor.apply(dataSchema);
+ if (rowTrackingEnabled) {
+ allDataFields.add(SpecialFields.ROW_ID);
+ allDataFields.add(SpecialFields.SEQUENCE_NUMBER.copy(true));
+ }
+ Map<String, Integer> systemFields =
findSystemFields(readTableFields);
+
List<DataField> readDataFields = readDataFields(allDataFields);
- // build index cast mapping
IndexCastMapping indexCastMapping =
SchemaEvolutionUtil.createIndexCastMapping(readTableFields, readDataFields);
@@ -196,7 +211,19 @@ public class FormatReaderMapping {
.discover(formatIdentifier)
.createReaderFactory(readRowType, readFilters),
dataSchema,
- readFilters);
+ readFilters,
+ systemFields);
+ }
+
+ private Map<String, Integer> findSystemFields(List<DataField>
readTableFields) {
+ Map<String, Integer> systemFields = new HashMap<>();
+ for (int i = 0; i < readTableFields.size(); i++) {
+ DataField field = readTableFields.get(i);
+ if (SpecialFields.isSystemField(field.name())) {
+ systemFields.put(field.name(), i);
+ }
+ }
+ return systemFields;
}
static Pair<int[], RowType> trimKeyFields(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactCoordinatorTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactCoordinatorTest.java
index e166086515..1e99e00a09 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactCoordinatorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactCoordinatorTest.java
@@ -260,6 +260,7 @@ public class AppendCompactCoordinatorTest {
0L,
null,
FileSource.APPEND,
+ null,
null);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 69d4e5c11f..d8b5abe565 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -684,6 +684,7 @@ public class AppendOnlyWriterTest {
null,
FileSource.APPEND,
null,
+ null,
null);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
index c9a4dacb90..a8592612d2 100644
---
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
@@ -161,6 +161,7 @@ public class IndexBootstrapTest extends TableTestBase {
null,
FileSource.APPEND,
null,
+ null,
null);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
index 7de4214bea..2029a9f616 100644
---
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
+++
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
@@ -165,6 +165,7 @@ public class DataFileTestDataGenerator {
kvs.stream().filter(kv ->
kv.valueKind().isRetract()).count(),
null,
FileSource.APPEND,
+ null,
null),
kvs);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
index a44ef9a530..e0bbbb0e9b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
@@ -58,6 +58,7 @@ public class DataFileTestUtils {
null,
FileSource.APPEND,
null,
+ null,
null);
}
@@ -77,6 +78,7 @@ public class DataFileTestUtils {
0L,
null,
FileSource.APPEND,
+ null,
null);
}
@@ -102,6 +104,7 @@ public class DataFileTestUtils {
deleteRowCount,
null,
FileSource.APPEND,
+ null,
null);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
index 6ed112de13..3fbaca4242 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
@@ -45,6 +45,82 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Compatibility Test for {@link ManifestCommittableSerializer}. */
public class ManifestCommittableSerializerCompatibilityTest {
+ @Test
+ public void testCompatibilityToV4CommitV8() throws IOException {
+ SimpleStats keyStats =
+ new SimpleStats(
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ fromLongArray(new Long[] {0L}));
+ SimpleStats valueStats =
+ new SimpleStats(
+ singleColumn("min_value"),
+ singleColumn("max_value"),
+ fromLongArray(new Long[] {0L}));
+ DataFileMeta dataFile =
+ new DataFileMeta(
+ "my_file",
+ 1024 * 1024,
+ 1024,
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ keyStats,
+ valueStats,
+ 15,
+ 200,
+ 5,
+ 3,
+ Arrays.asList("extra1", "extra2"),
+
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+ 11L,
+ new byte[] {1, 2, 4},
+ FileSource.COMPACT,
+ Arrays.asList("field1", "field2", "field3"),
+ "hdfs://localhost:9000/path/to/file",
+ 1L);
+ List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+ LinkedHashMap<String, DeletionVectorMeta> dvMetas = new
LinkedHashMap<>();
+ dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, 3L));
+ dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, 5L));
+ IndexFileMeta indexFile =
+ new IndexFileMeta("my_index_type", "my_index_file", 1024 *
100, 1002, dvMetas);
+ List<IndexFileMeta> indexFiles = Collections.singletonList(indexFile);
+
+ CommitMessageImpl commitMessage =
+ new CommitMessageImpl(
+ singleColumn("my_partition"),
+ 11,
+ 16,
+ new DataIncrement(dataFiles, dataFiles, dataFiles),
+ new CompactIncrement(dataFiles, dataFiles, dataFiles),
+ new IndexIncrement(indexFiles));
+
+ ManifestCommittable manifestCommittable =
+ new ManifestCommittable(
+ 5,
+ 202020L,
+ Collections.singletonMap(5, 555L),
+ Collections.singletonList(commitMessage));
+ manifestCommittable.addProperty("k1", "v1");
+ manifestCommittable.addProperty("k2", "v2");
+
+ ManifestCommittableSerializer serializer = new
ManifestCommittableSerializer();
+ byte[] bytes = serializer.serialize(manifestCommittable);
+
+ ManifestCommittable deserialized =
serializer.deserialize(serializer.getVersion(), bytes);
+ assertThat(deserialized).isEqualTo(manifestCommittable);
+
+ byte[] oldBytes =
+ IOUtils.readFully(
+ ManifestCommittableSerializerCompatibilityTest.class
+ .getClassLoader()
+
.getResourceAsStream("compatibility/manifest-committable-v8"),
+ true);
+ deserialized = serializer.deserialize(4, oldBytes);
+ assertThat(deserialized).isEqualTo(manifestCommittable);
+ }
+
@Test
public void testCompatibilityToV4CommitV7() throws IOException {
SimpleStats keyStats =
@@ -76,7 +152,8 @@ public class ManifestCommittableSerializerCompatibilityTest {
new byte[] {1, 2, 4},
FileSource.COMPACT,
Arrays.asList("field1", "field2", "field3"),
- "hdfs://localhost:9000/path/to/file");
+ "hdfs://localhost:9000/path/to/file",
+ null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
LinkedHashMap<String, DeletionVectorMeta> dvMetas = new
LinkedHashMap<>();
@@ -150,7 +227,8 @@ public class ManifestCommittableSerializerCompatibilityTest
{
new byte[] {1, 2, 4},
FileSource.COMPACT,
Arrays.asList("field1", "field2", "field3"),
- "hdfs://localhost:9000/path/to/file");
+ "hdfs://localhost:9000/path/to/file",
+ null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
LinkedHashMap<String, DeletionVectorMeta> dvMetas = new
LinkedHashMap<>();
@@ -222,7 +300,8 @@ public class ManifestCommittableSerializerCompatibilityTest
{
new byte[] {1, 2, 4},
FileSource.COMPACT,
Arrays.asList("field1", "field2", "field3"),
- "hdfs://localhost:9000/path/to/file");
+ "hdfs://localhost:9000/path/to/file",
+ null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
LinkedHashMap<String, DeletionVectorMeta> dvMetas = new
LinkedHashMap<>();
@@ -294,6 +373,7 @@ public class ManifestCommittableSerializerCompatibilityTest
{
new byte[] {1, 2, 4},
FileSource.COMPACT,
Arrays.asList("field1", "field2", "field3"),
+ null,
null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
@@ -365,6 +445,7 @@ public class ManifestCommittableSerializerCompatibilityTest
{
new byte[] {1, 2, 4},
FileSource.COMPACT,
Arrays.asList("field1", "field2", "field3"),
+ null,
null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
@@ -437,6 +518,7 @@ public class ManifestCommittableSerializerCompatibilityTest
{
new byte[] {1, 2, 4},
FileSource.COMPACT,
null,
+ null,
null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
@@ -509,6 +591,7 @@ public class ManifestCommittableSerializerCompatibilityTest
{
new byte[] {1, 2, 4},
null,
null,
+ null,
null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
@@ -581,6 +664,7 @@ public class ManifestCommittableSerializerCompatibilityTest
{
null,
null,
null,
+ null,
null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
index 2372c7bb5f..950d40abd1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
@@ -116,6 +116,7 @@ public class ManifestCommittableSerializerTest {
0L,
null,
FileSource.APPEND,
+ null,
null);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index d27ab82354..a8b5ccdba4 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -96,6 +96,7 @@ public abstract class ManifestFileMetaTestBase {
embeddedIndex, // not used
FileSource.APPEND,
null,
+ null,
null));
}
@@ -280,6 +281,7 @@ public abstract class ManifestFileMetaTestBase {
0L,
null,
FileSource.APPEND,
+ null,
null));
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
index d804c97902..5cca50d1e6 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
@@ -84,6 +84,7 @@ public class LevelsTest {
0L,
null,
FileSource.APPEND,
+ null,
null);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
index 94c11498c5..ad33ea1c13 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
@@ -185,6 +185,7 @@ public class IntervalPartitionTest {
null,
FileSource.APPEND,
null,
+ null,
null);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
index c9ca9070c3..dccd22b008 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
@@ -424,6 +424,21 @@ public class UniversalCompactionTest {
static DataFileMeta file(long size) {
return new DataFileMeta(
- "", size, 1, null, null, null, null, 0, 0, 0, 0, 0L, null,
FileSource.APPEND, null);
+ "",
+ size,
+ 1,
+ null,
+ null,
+ null,
+ null,
+ 0,
+ 0,
+ 0,
+ 0,
+ 0L,
+ null,
+ FileSource.APPEND,
+ null,
+ null);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
index 3164ac96d0..cbf2b63d93 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
@@ -284,6 +284,7 @@ public class ExpireSnapshotsTest {
null,
FileSource.APPEND,
null,
+ null,
null);
ManifestEntry add = new ManifestEntry(FileKind.ADD, partition, 0, 1,
dataFile);
ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition,
0, 1, dataFile);
@@ -345,7 +346,8 @@ public class ExpireSnapshotsTest {
null,
FileSource.APPEND,
null,
- myDataFile.toString());
+ myDataFile.toString(),
+ null);
ManifestEntry add = new ManifestEntry(FileKind.ADD, partition, 0, 1,
dataFile);
ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition,
0, 1, dataFile);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
index a1f7d69e28..3b00e7f088 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
@@ -59,6 +59,7 @@ public class SplitGeneratorTest {
0L,
null,
FileSource.APPEND,
+ null,
null);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
index 7213f11ca0..2d28ebb2fb 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
@@ -216,6 +216,7 @@ public class SplitTest {
new byte[] {1, 2, 4},
null,
null,
+ null,
null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
@@ -279,6 +280,7 @@ public class SplitTest {
new byte[] {1, 2, 4},
FileSource.COMPACT,
null,
+ null,
null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
@@ -342,6 +344,7 @@ public class SplitTest {
new byte[] {1, 2, 4},
FileSource.COMPACT,
Arrays.asList("field1", "field2", "field3"),
+ null,
null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
@@ -409,6 +412,7 @@ public class SplitTest {
new byte[] {1, 2, 4},
FileSource.COMPACT,
Arrays.asList("field1", "field2", "field3"),
+ null,
null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
@@ -476,7 +480,8 @@ public class SplitTest {
new byte[] {1, 2, 4},
FileSource.COMPACT,
Arrays.asList("field1", "field2", "field3"),
- "hdfs:///path/to/warehouse");
+ "hdfs:///path/to/warehouse",
+ null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22,
33L);
@@ -543,7 +548,8 @@ public class SplitTest {
new byte[] {1, 2, 4},
FileSource.COMPACT,
Arrays.asList("field1", "field2", "field3"),
- "hdfs:///path/to/warehouse");
+ "hdfs:///path/to/warehouse",
+ null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22,
33L);
@@ -579,6 +585,75 @@ public class SplitTest {
assertThat(actual).isEqualTo(split);
}
+ @Test
+ public void testSerializerCompatibleV7() throws Exception {
+ SimpleStats keyStats =
+ new SimpleStats(
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ fromLongArray(new Long[] {0L}));
+ SimpleStats valueStats =
+ new SimpleStats(
+ singleColumn("min_value"),
+ singleColumn("max_value"),
+ fromLongArray(new Long[] {0L}));
+
+ DataFileMeta dataFile =
+ new DataFileMeta(
+ "my_file",
+ 1024 * 1024,
+ 1024,
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ keyStats,
+ valueStats,
+ 15,
+ 200,
+ 5,
+ 3,
+ Arrays.asList("extra1", "extra2"),
+
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+ 11L,
+ new byte[] {1, 2, 4},
+ FileSource.COMPACT,
+ Arrays.asList("field1", "field2", "field3"),
+ "hdfs:///path/to/warehouse",
+ 12L);
+ List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+ DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22,
33L);
+ List<DeletionFile> deletionFiles =
Collections.singletonList(deletionFile);
+
+ BinaryRow partition = new BinaryRow(1);
+ BinaryRowWriter binaryRowWriter = new BinaryRowWriter(partition);
+ binaryRowWriter.writeString(0, BinaryString.fromString("aaaaa"));
+ binaryRowWriter.complete();
+
+ DataSplit split =
+ DataSplit.builder()
+ .withSnapshot(18)
+ .withPartition(partition)
+ .withBucket(20)
+ .withTotalBuckets(32)
+ .withDataFiles(dataFiles)
+ .withDataDeletionFiles(deletionFiles)
+ .withBucketPath("my path")
+ .build();
+
+ assertThat(InstantiationUtil.clone(split)).isEqualTo(split);
+
+ byte[] v6Bytes =
+ IOUtils.readFully(
+ SplitTest.class
+ .getClassLoader()
+
.getResourceAsStream("compatibility/datasplit-v7"),
+ true);
+
+ DataSplit actual =
+ InstantiationUtil.deserializeObject(v6Bytes,
DataSplit.class.getClassLoader());
+ assertThat(actual).isEqualTo(split);
+ }
+
private DataFileMeta newDataFile(long rowCount) {
return newDataFile(rowCount, null, null);
}
@@ -597,6 +672,7 @@ public class SplitTest {
null,
null,
valueStatsCols,
+ null,
null);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
index 0d888225dc..306820fa7d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
@@ -409,6 +409,7 @@ public class TagAutoManagerTest extends
PrimaryKeyTableTestBase {
null,
null,
null,
+ null,
null);
tagManager.createTag(
snapshot1,
@@ -438,6 +439,7 @@ public class TagAutoManagerTest extends
PrimaryKeyTableTestBase {
null,
null,
null,
+ null,
null);
tagManager.createTag(
snapshot2,
diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java
b/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java
index 0096238f72..27b3a575b6 100644
--- a/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java
@@ -51,6 +51,7 @@ public class TagTest {
null,
null,
null,
+ null,
null);
@Test
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/FormatReaderMappingTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/FormatReaderMappingTest.java
index dd00d142c8..0444d5644d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/utils/FormatReaderMappingTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/utils/FormatReaderMappingTest.java
@@ -29,6 +29,7 @@ import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
/** Test for {@link FormatReaderMapping.Builder}. */
@@ -133,7 +134,8 @@ public class FormatReaderMappingTest {
null,
null,
null,
- null);
+ null,
+ Collections.emptyMap());
Assertions.assertThat(formatReaderMapping.getIndexMapping())
.containsExactly(0, 1, 0, -1, 2);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
index b33227cad6..8a7484faac 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
@@ -263,6 +263,7 @@ public class SnapshotManagerTest {
null,
null,
null,
+ null,
null);
}
@@ -287,6 +288,7 @@ public class SnapshotManagerTest {
null,
watermark,
null,
+ null,
null);
}
@@ -312,6 +314,7 @@ public class SnapshotManagerTest {
null,
null,
null,
+ null,
null));
}
@@ -343,6 +346,7 @@ public class SnapshotManagerTest {
null,
null,
null,
+ null,
null);
localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i),
snapshot.toJson());
}
@@ -396,6 +400,7 @@ public class SnapshotManagerTest {
null,
null,
null,
+ null,
null);
localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i),
snapshot.toJson());
}
diff --git a/paimon-core/src/test/resources/compatibility/datasplit-v7
b/paimon-core/src/test/resources/compatibility/datasplit-v7
new file mode 100644
index 0000000000..3c77a0f71c
Binary files /dev/null and
b/paimon-core/src/test/resources/compatibility/datasplit-v7 differ
diff --git
a/paimon-core/src/test/resources/compatibility/manifest-committable-v8
b/paimon-core/src/test/resources/compatibility/manifest-committable-v8
new file mode 100644
index 0000000000..8a436147dd
Binary files /dev/null and
b/paimon-core/src/test/resources/compatibility/manifest-committable-v8 differ
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendTableITCase.java
index 025a7f4b5a..4afe48a4a9 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendTableITCase.java
@@ -94,6 +94,19 @@ public class AppendTableITCase extends CatalogITCaseBase {
assertThat(rows).containsExactlyInAnyOrder(Row.of("AAA"),
Row.of("BBB"));
}
+ @Test
+ public void testReadWriteWithLineage() {
+ batchSql("INSERT INTO append_table_lineage VALUES (1, 'AAA'), (2,
'BBB')");
+ List<Row> rows = batchSql("SELECT * FROM
append_table_lineage$row_lineage");
+ assertThat(rows.size()).isEqualTo(2);
+ assertThat(rows)
+ .containsExactlyInAnyOrder(Row.of(1, "AAA", 0L, 1L), Row.of(2,
"BBB", 1L, 1L));
+
+ rows = batchSql("SELECT * FROM append_table_lineage");
+ assertThat(rows.size()).isEqualTo(2);
+ assertThat(rows).containsExactlyInAnyOrder(Row.of(1, "AAA"), Row.of(2,
"BBB"));
+ }
+
@Test
public void testSkipDedup() {
batchSql("INSERT INTO append_table VALUES (1, 'AAA'), (1, 'AAA'), (2,
'BBB'), (3, 'BBB')");
@@ -551,6 +564,7 @@ public class AppendTableITCase extends CatalogITCaseBase {
protected List<String> ddl() {
return Arrays.asList(
"CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING)
WITH ('bucket' = '-1')",
+ "CREATE TABLE IF NOT EXISTS append_table_lineage (id INT, data
STRING) WITH ('bucket' = '-1', 'row-tracking.enabled' = 'true')",
"CREATE TABLE IF NOT EXISTS part_table (id INT, data STRING,
dt STRING) PARTITIONED BY (dt) WITH ('bucket' = '-1')",
"CREATE TABLE IF NOT EXISTS complex_table (id INT, data
MAP<INT, INT>) WITH ('bucket' = '-1')",
"CREATE TABLE IF NOT EXISTS index_table (id INT, indexc
STRING, data STRING) WITH ('bucket' = '-1',
'file-index.bloom-filter.columns'='indexc',
'file-index.bloom-filter.indexc.items' = '500')");
@@ -584,19 +598,40 @@ public class AppendTableITCase extends CatalogITCaseBase {
private void assertExecuteExpected(
String sql, long expectedSnapshotId, Snapshot.CommitKind
expectedCommitKind) {
+ assertExecuteExpected(sql, expectedSnapshotId, expectedCommitKind,
"append_table");
+ }
+
+ private void assertExecuteExpected(
+ String sql,
+ long expectedSnapshotId,
+ Snapshot.CommitKind expectedCommitKind,
+ String tableName) {
batchSql(sql);
- Snapshot snapshot = findLatestSnapshot("append_table");
+ Snapshot snapshot = findLatestSnapshot(tableName);
assertThat(snapshot.id()).isEqualTo(expectedSnapshotId);
assertThat(snapshot.commitKind()).isEqualTo(expectedCommitKind);
}
+ private void assertBatchHasCompact(String sql, long timeout) throws
Exception {
+ batchSql(sql);
+ waitCompactSnapshot(timeout);
+ }
+
private void assertStreamingHasCompact(String sql, long timeout) throws
Exception {
+ sEnv.executeSql(sql);
+ waitCompactSnapshot(timeout);
+ }
+
+ private void waitCompactSnapshot(long timeout) throws Exception {
+ waitCompactSnapshot(timeout, "append_table");
+ }
+
+ private void waitCompactSnapshot(long timeout, String tableName) throws
Exception {
long start = System.currentTimeMillis();
long currentId = 1;
- sEnv.executeSql(sql);
Snapshot snapshot;
while (true) {
- snapshot = findSnapshot("append_table", currentId);
+ snapshot = findSnapshot(tableName, currentId);
if (snapshot != null) {
if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
break;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperatorTest.java
index 6675a1358a..c0c3041bf9 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperatorTest.java
@@ -243,6 +243,7 @@ public class AppendPreCommitCompactCoordinatorOperatorTest {
null,
null,
null,
+ null,
null);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperatorTest.java
index 4d5e9ac1b9..4f5df2a2d2 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperatorTest.java
@@ -350,6 +350,7 @@ public class ChangelogCompactCoordinateOperatorTest {
null,
null,
null,
+ null,
null);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperatorTest.java
index 6b47256a5c..ddd798519a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperatorTest.java
@@ -198,6 +198,7 @@ public class ChangelogCompactSortOperatorTest {
null,
null,
null,
+ null,
null);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
index 580220e77d..afd57ff29e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
@@ -101,6 +101,7 @@ public class ChangelogCompactTaskSerializerTest {
0L,
null,
FileSource.APPEND,
+ null,
null);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskTest.java
index fa567e5490..364cdcb9b8 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskTest.java
@@ -82,6 +82,7 @@ public class ChangelogCompactTaskTest {
null,
null,
null,
+ null,
null)));
ChangelogCompactTask task =
new ChangelogCompactTask(1, BinaryRow.EMPTY_ROW, 1, files, new
HashMap<>());
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
index 0c90b54dee..50099be181 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
@@ -79,6 +79,7 @@ public class CompactionTaskSimpleSerializerTest {
0L,
null,
FileSource.APPEND,
+ null,
null);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
index 92c8dd94a1..1bf93ab818 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
@@ -116,6 +116,7 @@ public class FileStoreSourceSplitGeneratorTest {
0L, // not used
null, // not used
FileSource.APPEND,
+ null,
null));
}
return DataSplit.builder()
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
index 9c884cd5c3..f36b57a009 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
@@ -89,6 +89,7 @@ public class FileStoreSourceSplitSerializerTest {
0L,
null,
FileSource.APPEND,
+ null,
null);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index 0d44e0241a..e8931fb086 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -146,7 +146,8 @@ public class TestChangelogDataReadWrite {
VALUE_TYPE,
FileFormatDiscover.of(options),
pathFactory,
- options.fileIndexReadEnabled());
+ options.fileIndexReadEnabled(),
+ false);
return new KeyValueTableRead(() -> read, () -> rawFileRead, null);
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
index 4997f65eaf..febf15f9e9 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
@@ -54,6 +54,7 @@ class ScanHelperTest extends PaimonSparkTestBase {
null,
FileSource.APPEND,
null,
+ null,
null)
}
@@ -91,6 +92,7 @@ class ScanHelperTest extends PaimonSparkTestBase {
null,
FileSource.APPEND,
null,
+ null,
null)
).asJava