This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4aa3598986 [core] Extract RowTrackingCommitUtils from
FileStoreCommitImpl
4aa3598986 is described below
commit 4aa35989869c1ca35c0cdf80d8aa417e75ccdf58
Author: JingsongLi <[email protected]>
AuthorDate: Tue Dec 30 15:57:44 2025 +0800
[core] Extract RowTrackingCommitUtils from FileStoreCommitImpl
---
.../paimon/operation/FileStoreCommitImpl.java | 68 ++-----------
.../operation/commit/RowTrackingCommitUtils.java | 108 +++++++++++++++++++++
2 files changed, 114 insertions(+), 62 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index b446078932..aa47e69d5e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -30,7 +30,6 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.manifest.FileEntry;
import org.apache.paimon.manifest.FileKind;
-import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.IndexManifestFile;
import org.apache.paimon.manifest.ManifestCommittable;
@@ -50,6 +49,7 @@ import org.apache.paimon.operation.commit.ConflictDetection;
import org.apache.paimon.operation.commit.ConflictDetection.ConflictCheck;
import org.apache.paimon.operation.commit.ManifestEntryChanges;
import org.apache.paimon.operation.commit.RetryCommitResult;
+import
org.apache.paimon.operation.commit.RowTrackingCommitUtils.RowTrackingAssigned;
import org.apache.paimon.operation.commit.SuccessCommitResult;
import org.apache.paimon.operation.metrics.CommitMetrics;
import org.apache.paimon.operation.metrics.CommitStats;
@@ -62,7 +62,6 @@ import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.BucketMode;
-import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
@@ -95,7 +94,6 @@ import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
-import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
import static org.apache.paimon.manifest.ManifestEntry.nullableRecordCount;
import static org.apache.paimon.manifest.ManifestEntry.recordCountAdd;
import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete;
@@ -103,6 +101,7 @@ import static
org.apache.paimon.operation.commit.ConflictDetection.hasConflictCh
import static
org.apache.paimon.operation.commit.ConflictDetection.mustConflictCheck;
import static
org.apache.paimon.operation.commit.ConflictDetection.noConflictCheck;
import static
org.apache.paimon.operation.commit.ManifestEntryChanges.changedPartitions;
+import static
org.apache.paimon.operation.commit.RowTrackingCommitUtils.assignRowTracking;
import static
org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -971,14 +970,10 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
baseManifestList = manifestList.write(mergeAfterManifests);
if (rowTrackingEnabled) {
- // assigned snapshot id to delta files
- List<ManifestEntry> snapshotAssigned = new ArrayList<>();
- assignSnapshotId(newSnapshotId, deltaFiles, snapshotAssigned);
- // assign row id for new files
- List<ManifestEntry> rowIdAssigned = new ArrayList<>();
- nextRowIdStart =
- assignRowTrackingMeta(firstRowIdStart,
snapshotAssigned, rowIdAssigned);
- deltaFiles = rowIdAssigned;
+ RowTrackingAssigned assigned =
+ assignRowTracking(newSnapshotId, firstRowIdStart,
deltaFiles);
+ nextRowIdStart = assigned.nextRowIdStart;
+ deltaFiles = assigned.assignedEntries;
}
// the added records subtract the deleted records from
@@ -1132,57 +1127,6 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
return commitSnapshotImpl(newSnapshot, emptyList());
}
- private long assignRowTrackingMeta(
- long firstRowIdStart,
- List<ManifestEntry> deltaFiles,
- List<ManifestEntry> rowIdAssigned) {
- if (deltaFiles.isEmpty()) {
- return firstRowIdStart;
- }
- // assign row id for new files
- long start = firstRowIdStart;
- long blobStart = firstRowIdStart;
- for (ManifestEntry entry : deltaFiles) {
- checkArgument(
- entry.file().fileSource().isPresent(),
- "This is a bug, file source field for row-tracking table
must present.");
- boolean containsRowId =
- entry.file().writeCols() != null
- &&
entry.file().writeCols().contains(SpecialFields.ROW_ID.name());
- if (entry.file().fileSource().get().equals(FileSource.APPEND)
- && entry.file().firstRowId() == null
- && !containsRowId) {
- if (isBlobFile(entry.file().fileName())) {
- if (blobStart >= start) {
- throw new IllegalStateException(
- String.format(
- "This is a bug, blobStart %d should be
less than start %d when assigning a blob entry file.",
- blobStart, start));
- }
- long rowCount = entry.file().rowCount();
- rowIdAssigned.add(entry.assignFirstRowId(blobStart));
- blobStart += rowCount;
- } else {
- long rowCount = entry.file().rowCount();
- rowIdAssigned.add(entry.assignFirstRowId(start));
- blobStart = start;
- start += rowCount;
- }
- } else {
- // for compact file, do not assign first row id.
- rowIdAssigned.add(entry);
- }
- }
- return start;
- }
-
- private void assignSnapshotId(
- long snapshotId, List<ManifestEntry> deltaFiles,
List<ManifestEntry> snapshotAssigned) {
- for (ManifestEntry entry : deltaFiles) {
- snapshotAssigned.add(entry.assignSequenceNumber(snapshotId,
snapshotId));
- }
- }
-
public void compactManifest() {
int retryCount = 0;
long startMillis = System.currentTimeMillis();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java
new file mode 100644
index 0000000000..d6eeae17f0
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.commit;
+
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.table.SpecialFields;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Utils for row tracking commit. */
+public class RowTrackingCommitUtils {
+
+ public static RowTrackingAssigned assignRowTracking(
+ long newSnapshotId, long firstRowIdStart, List<ManifestEntry>
deltaFiles) {
+ // assigned snapshot id to delta files
+ List<ManifestEntry> snapshotAssigned = new ArrayList<>();
+ assignSnapshotId(newSnapshotId, deltaFiles, snapshotAssigned);
+ // assign row id for new files
+ List<ManifestEntry> rowIdAssigned = new ArrayList<>();
+ long nextRowIdStart =
+ assignRowTrackingMeta(firstRowIdStart, snapshotAssigned,
rowIdAssigned);
+ return new RowTrackingAssigned(nextRowIdStart, rowIdAssigned);
+ }
+
+ private static void assignSnapshotId(
+ long snapshotId, List<ManifestEntry> deltaFiles,
List<ManifestEntry> snapshotAssigned) {
+ for (ManifestEntry entry : deltaFiles) {
+ snapshotAssigned.add(entry.assignSequenceNumber(snapshotId,
snapshotId));
+ }
+ }
+
+ private static long assignRowTrackingMeta(
+ long firstRowIdStart,
+ List<ManifestEntry> deltaFiles,
+ List<ManifestEntry> rowIdAssigned) {
+ if (deltaFiles.isEmpty()) {
+ return firstRowIdStart;
+ }
+ // assign row id for new files
+ long start = firstRowIdStart;
+ long blobStart = firstRowIdStart;
+ for (ManifestEntry entry : deltaFiles) {
+ Optional<FileSource> fileSource = entry.file().fileSource();
+ checkArgument(
+ fileSource.isPresent(),
+ "This is a bug, file source field for row-tracking table
must present.");
+ List<String> writeCols = entry.file().writeCols();
+ boolean containsRowId =
+ writeCols != null &&
writeCols.contains(SpecialFields.ROW_ID.name());
+ if (fileSource.get().equals(FileSource.APPEND)
+ && entry.file().firstRowId() == null
+ && !containsRowId) {
+ long rowCount = entry.file().rowCount();
+ if (isBlobFile(entry.file().fileName())) {
+ if (blobStart >= start) {
+ throw new IllegalStateException(
+ String.format(
+ "This is a bug, blobStart %d should be
less than start %d when assigning a blob entry file.",
+ blobStart, start));
+ }
+ rowIdAssigned.add(entry.assignFirstRowId(blobStart));
+ blobStart += rowCount;
+ } else {
+ rowIdAssigned.add(entry.assignFirstRowId(start));
+ blobStart = start;
+ start += rowCount;
+ }
+ } else {
+ // for compact file, do not assign first row id.
+ rowIdAssigned.add(entry);
+ }
+ }
+ return start;
+ }
+
+ /** Assigned results. */
+ public static class RowTrackingAssigned {
+ public final long nextRowIdStart;
+ public final List<ManifestEntry> assignedEntries;
+
+ public RowTrackingAssigned(long nextRowIdStart, List<ManifestEntry>
assignedEntries) {
+ this.nextRowIdStart = nextRowIdStart;
+ this.assignedEntries = assignedEntries;
+ }
+ }
+}