This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 28edeaf7a9 [core] Judge minSequenceNum to decide whether assign 
sequenceNum to ManifestEntry (#7121)
28edeaf7a9 is described below

commit 28edeaf7a9cf6b0b01fd81083573e5ca0734187d
Author: YeJunHao <[email protected]>
AuthorDate: Mon Jan 26 22:56:31 2026 +0800

    [core] Judge minSequenceNum to decide whether assign sequenceNum to 
ManifestEntry (#7121)
---
 .../org/apache/paimon/append/AppendOnlyWriter.java   | 16 ++++++++++------
 .../apache/paimon/append/MultipleBlobFileWriter.java |  4 ++--
 .../apache/paimon/append/RollingBlobFileWriter.java  | 10 +++++-----
 .../DataEvolutionCompactCoordinator.java             | 20 ++++++++------------
 .../dataevolution/DataEvolutionCompactTask.java      | 17 +++++++++++++++++
 .../java/org/apache/paimon/io/RowDataFileWriter.java |  5 +++--
 .../apache/paimon/io/RowDataRollingFileWriter.java   |  5 +++--
 .../paimon/operation/AbstractFileStoreScan.java      |  8 +++++++-
 .../paimon/operation/BaseAppendFileStoreWrite.java   | 16 +++++++++++-----
 .../org/apache/paimon/operation/FileStoreScan.java   |  2 ++
 .../operation/commit/RowTrackingCommitUtils.java     |  6 +++++-
 .../apache/paimon/append/AppendOnlyWriterTest.java   |  3 ++-
 .../paimon/append/RollingBlobFileWriterTest.java     | 10 +++++-----
 .../DataEvolutionCompactCoordinatorTest.java         | 12 ++++++++----
 .../apache/paimon/io/KeyValueFileReadWriteTest.java  |  3 ++-
 .../org/apache/paimon/io/RollingFileWriterTest.java  |  2 +-
 .../org/apache/paimon/spark/sql/BlobTestBase.scala   |  4 ++--
 .../paimon/spark/sql/RowTrackingTestBase.scala       |  4 ++--
 18 files changed, 95 insertions(+), 52 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 61abbb0cb0..7868513c9e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -60,6 +60,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
 
 import static org.apache.paimon.types.DataTypeRoot.BLOB;
 
@@ -86,7 +87,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
     private final List<DataFileMeta> deletedFiles;
     private final List<DataFileMeta> compactBefore;
     private final List<DataFileMeta> compactAfter;
-    private final LongCounter seqNumCounter;
+    private final Supplier<LongCounter> seqNumCounterProvider;
     private final String fileCompression;
     private final CompressOptions spillCompression;
     private final StatsCollectorFactories statsCollectorFactories;
@@ -123,7 +124,8 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
             FileIndexOptions fileIndexOptions,
             boolean asyncFileWrite,
             boolean statsDenseStore,
-            @Nullable BlobConsumer blobConsumer) {
+            @Nullable BlobConsumer blobConsumer,
+            boolean dataEvolutionEnabled) {
         this.fileIO = fileIO;
         this.schemaId = schemaId;
         this.fileFormat = fileFormat;
@@ -142,7 +144,9 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
         this.deletedFiles = new ArrayList<>();
         this.compactBefore = new ArrayList<>();
         this.compactAfter = new ArrayList<>();
-        this.seqNumCounter = new LongCounter(maxSequenceNumber + 1);
+        final LongCounter seqNumCounter = new LongCounter(maxSequenceNumber + 
1);
+        this.seqNumCounterProvider =
+                dataEvolutionEnabled ? () -> new LongCounter(0) : () -> 
seqNumCounter;
         this.fileCompression = fileCompression;
         this.spillCompression = spillCompression;
         this.ioManager = ioManager;
@@ -224,7 +228,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
 
     @Override
     public long maxSequenceNumber() {
-        return seqNumCounter.getValue() - 1;
+        return seqNumCounterProvider.get().getValue() - 1;
     }
 
     @Override
@@ -309,7 +313,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
                     blobTargetFileSize,
                     writeSchema,
                     pathFactory,
-                    seqNumCounter,
+                    seqNumCounterProvider,
                     fileCompression,
                     statsCollectorFactories,
                     fileIndexOptions,
@@ -325,7 +329,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
                 targetFileSize,
                 writeSchema,
                 pathFactory,
-                seqNumCounter,
+                seqNumCounterProvider,
                 fileCompression,
                 
statsCollectorFactories.statsCollectors(writeSchema.getFieldNames()),
                 fileIndexOptions,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
index f9edb0ef65..b55bdd8ef0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
@@ -56,7 +56,7 @@ public class MultipleBlobFileWriter implements Closeable {
             long schemaId,
             RowType writeSchema,
             DataFilePathFactory pathFactory,
-            LongCounter seqNumCounter,
+            Supplier<LongCounter> seqNumCounterSupplier,
             FileSource fileSource,
             boolean asyncFileWrite,
             boolean statsDenseStore,
@@ -82,7 +82,7 @@ public class MultipleBlobFileWriter implements Closeable {
                                             pathFactory.newBlobPath(),
                                             writeSchema.project(blobFieldName),
                                             schemaId,
-                                            seqNumCounter,
+                                            seqNumCounterSupplier,
                                             new FileIndexOptions(),
                                             fileSource,
                                             asyncFileWrite,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
index f565212d1f..80faa01d92 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
@@ -102,7 +102,7 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
             long blobTargetFileSize,
             RowType writeSchema,
             DataFilePathFactory pathFactory,
-            LongCounter seqNumCounter,
+            Supplier<LongCounter> seqNumCounterSupplier,
             String fileCompression,
             StatsCollectorFactories statsCollectorFactories,
             FileIndexOptions fileIndexOptions,
@@ -124,7 +124,7 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
                         BlobType.splitBlob(writeSchema).getLeft(),
                         writeSchema,
                         pathFactory,
-                        seqNumCounter,
+                        seqNumCounterSupplier,
                         fileCompression,
                         statsCollectorFactories,
                         fileIndexOptions,
@@ -140,7 +140,7 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
                                 schemaId,
                                 writeSchema,
                                 pathFactory,
-                                seqNumCounter,
+                                seqNumCounterSupplier,
                                 fileSource,
                                 asyncFileWrite,
                                 statsDenseStore,
@@ -158,7 +158,7 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
                     RowType normalRowType,
                     RowType writeSchema,
                     DataFilePathFactory pathFactory,
-                    LongCounter seqNumCounter,
+                    Supplier<LongCounter> seqNumCounterSupplier,
                     String fileCompression,
                     StatsCollectorFactories statsCollectorFactories,
                     FileIndexOptions fileIndexOptions,
@@ -181,7 +181,7 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
                             pathFactory.newPath(),
                             normalRowType,
                             schemaId,
-                            seqNumCounter,
+                            seqNumCounterSupplier,
                             fileIndexOptions,
                             fileSource,
                             asyncFileWrite,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
index f1fe3ae617..e8081ac069 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.EndOfScanException;
@@ -42,7 +43,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.TreeMap;
-import java.util.stream.Collectors;
 
 import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -70,7 +70,8 @@ public class DataEvolutionCompactCoordinator {
 
         this.scanner =
                 new CompactScanner(
-                        
table.newSnapshotReader().withPartitionFilter(partitionPredicate));
+                        
table.newSnapshotReader().withPartitionFilter(partitionPredicate),
+                        table.store().newScan());
         this.planner =
                 new CompactPlanner(compactBlob, targetFileSize, openFileCost, 
compactMinFileNum);
     }
@@ -89,11 +90,11 @@ public class DataEvolutionCompactCoordinator {
     /** Scanner to generate sorted ManifestEntries. */
     static class CompactScanner {
 
-        private final SnapshotReader snapshotReader;
+        private final FileStoreScan scan;
         private final Queue<List<ManifestFileMeta>> metas;
 
-        private CompactScanner(SnapshotReader snapshotReader) {
-            this.snapshotReader = snapshotReader;
+        private CompactScanner(SnapshotReader snapshotReader, FileStoreScan 
scan) {
+            this.scan = scan;
             Snapshot snapshot = 
snapshotReader.snapshotManager().latestSnapshot();
 
             List<ManifestFileMeta> manifestFileMetas =
@@ -116,13 +117,8 @@ public class DataEvolutionCompactCoordinator {
             List<ManifestEntry> result = new ArrayList<>();
             while (metas.peek() != null && result.size() < FILES_BATCH) {
                 List<ManifestFileMeta> currentMetas = metas.poll();
-                List<ManifestEntry> targetEntries =
-                        currentMetas.stream()
-                                .flatMap(meta -> 
snapshotReader.readManifest(meta).stream())
-                                // we don't need stats for compaction
-                                .map(ManifestEntry::copyWithoutStats)
-                                .collect(Collectors.toList());
-                result.addAll(targetEntries);
+                scan.readFileIterator(currentMetas)
+                        .forEachRemaining(entry -> 
result.add(entry.copyWithoutStats()));
             }
             if (result.isEmpty()) {
                 throw new EndOfScanException();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
index d4da47bdc5..f1f48ac56d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
@@ -125,6 +125,23 @@ public class DataEvolutionCompactTask {
                 writeResult.size() == 1, "Data evolution compaction should 
produce one file.");
 
         DataFileMeta dataFileMeta = 
writeResult.get(0).assignFirstRowId(firstRowId);
+        long minSequenceId =
+                compactBefore.stream()
+                        .mapToLong(DataFileMeta::minSequenceNumber)
+                        .min()
+                        .orElseThrow(
+                                () ->
+                                        new IllegalStateException(
+                                                "Cannot get min sequence id 
from compact before files."));
+        long maxSequenceId =
+                compactBefore.stream()
+                        .mapToLong(DataFileMeta::maxSequenceNumber)
+                        .max()
+                        .orElseThrow(
+                                () ->
+                                        new IllegalStateException(
+                                                "Cannot get max sequence id 
from compact before files."));
+        dataFileMeta = dataFileMeta.assignSequenceNumber(minSequenceId, 
maxSequenceId);
         compactAfter.add(dataFileMeta);
 
         CompactIncrement compactIncrement =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
index 2f2982ba85..7f8715ab08 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
@@ -35,6 +35,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 import static org.apache.paimon.io.DataFilePathFactory.dataFileToFileIndexPath;
 
@@ -58,7 +59,7 @@ public class RowDataFileWriter extends 
StatsCollectingSingleFileWriter<InternalR
             Path path,
             RowType writeSchema,
             long schemaId,
-            LongCounter seqNumCounter,
+            Supplier<LongCounter> seqNumCounterSupplier,
             FileIndexOptions fileIndexOptions,
             FileSource fileSource,
             boolean asyncFileWrite,
@@ -67,7 +68,7 @@ public class RowDataFileWriter extends 
StatsCollectingSingleFileWriter<InternalR
             @Nullable List<String> writeCols) {
         super(fileIO, context, path, Function.identity(), writeSchema, 
asyncFileWrite);
         this.schemaId = schemaId;
-        this.seqNumCounter = seqNumCounter;
+        this.seqNumCounter = seqNumCounterSupplier.get();
         this.isExternalPath = isExternalPath;
         this.statsArraySerializer = new SimpleStatsConverter(writeSchema, 
statsDenseStore);
         this.dataFileIndexWriter =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
index 328bc193b9..81e0a0aefc 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
@@ -30,6 +30,7 @@ import org.apache.paimon.utils.LongCounter;
 import javax.annotation.Nullable;
 
 import java.util.List;
+import java.util.function.Supplier;
 
 /** {@link RollingFileWriterImpl} for data files containing {@link 
InternalRow}. */
 public class RowDataRollingFileWriter extends 
RollingFileWriterImpl<InternalRow, DataFileMeta> {
@@ -41,7 +42,7 @@ public class RowDataRollingFileWriter extends 
RollingFileWriterImpl<InternalRow,
             long targetFileSize,
             RowType writeSchema,
             DataFilePathFactory pathFactory,
-            LongCounter seqNumCounter,
+            Supplier<LongCounter> seqNumCounterSupplier,
             String fileCompression,
             SimpleColStatsCollector.Factory[] statsCollectors,
             FileIndexOptions fileIndexOptions,
@@ -58,7 +59,7 @@ public class RowDataRollingFileWriter extends 
RollingFileWriterImpl<InternalRow,
                                 pathFactory.newPath(),
                                 writeSchema,
                                 schemaId,
-                                seqNumCounter,
+                                seqNumCounterSupplier,
                                 fileIndexOptions,
                                 fileSource,
                                 asyncFileWrite,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index aa3f1ccf7a..033b24c746 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -373,7 +373,13 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         return readManifestEntries(readManifests().filteredManifests, true);
     }
 
-    protected Iterator<ManifestEntry> readManifestEntries(
+    @Override
+    public Iterator<ManifestEntry> readFileIterator(List<ManifestFileMeta> 
manifestFileMetas) {
+        // useSequential: reduce memory and iterator can be stopping
+        return readManifestEntries(manifestFileMetas, true);
+    }
+
+    public Iterator<ManifestEntry> readManifestEntries(
             List<ManifestFileMeta> manifests, boolean useSequential) {
         return scanMode == ScanMode.ALL
                 ? readAndMergeFileEntries(manifests, Function.identity(), 
useSequential)
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
index 9eb49fee83..bc39a4ae8e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
@@ -60,6 +60,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 import static org.apache.paimon.format.FileFormat.fileFormat;
 import static org.apache.paimon.types.DataTypeRoot.BLOB;
@@ -151,7 +152,8 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
                 fileIndexOptions,
                 options.asyncFileWrite(),
                 options.statsDenseStore(),
-                blobConsumer);
+                blobConsumer,
+                options.dataEvolutionEnabled());
     }
 
     @Override
@@ -191,7 +193,9 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
         Exception collectedExceptions = null;
         RowDataRollingFileWriter rewriter =
                 createRollingFileWriter(
-                        partition, bucket, new 
LongCounter(toCompact.get(0).minSequenceNumber()));
+                        partition,
+                        bucket,
+                        () -> new 
LongCounter(toCompact.get(0).minSequenceNumber()));
         Map<String, IOExceptionSupplier<DeletionVector>> dvFactories = null;
         if (dvFactory != null) {
             dvFactories = new HashMap<>();
@@ -226,7 +230,9 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
         Sorter sorter = Sorter.getSorter(reader, ioManager, rowType, options);
         RowDataRollingFileWriter rewriter =
                 createRollingFileWriter(
-                        partition, bucket, new 
LongCounter(toCluster.get(0).minSequenceNumber()));
+                        partition,
+                        bucket,
+                        () -> new 
LongCounter(toCluster.get(0).minSequenceNumber()));
         try {
             MutableObjectIterator<BinaryRow> sorted = sorter.sort();
             BinaryRow binaryRow = new BinaryRow(sorter.arity());
@@ -253,7 +259,7 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
     }
 
     private RowDataRollingFileWriter createRollingFileWriter(
-            BinaryRow partition, int bucket, LongCounter seqNumCounter) {
+            BinaryRow partition, int bucket, Supplier<LongCounter> 
seqNumCounterSupplier) {
         return new RowDataRollingFileWriter(
                 fileIO,
                 schemaId,
@@ -261,7 +267,7 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
                 options.targetFileSize(false),
                 writeType,
                 pathFactory.createDataFilePathFactory(partition, bucket),
-                seqNumCounter,
+                seqNumCounterSupplier,
                 options.fileCompression(),
                 statsCollectors(),
                 fileIndexOptions,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 129658cc36..7542ff87d4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -117,6 +117,8 @@ public interface FileStoreScan {
 
     Iterator<ManifestEntry> readFileIterator();
 
+    Iterator<ManifestEntry> readFileIterator(List<ManifestFileMeta> 
manifestFileMetas);
+
     default List<BinaryRow> listPartitions() {
         return readPartitionEntries().stream()
                 .map(PartitionEntry::partition)
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java
index 36e7beec85..d2f3dc8851 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java
@@ -49,7 +49,11 @@ public class RowTrackingCommitUtils {
     private static void assignSnapshotId(
             long snapshotId, List<ManifestEntry> deltaFiles, 
List<ManifestEntry> snapshotAssigned) {
         for (ManifestEntry entry : deltaFiles) {
-            snapshotAssigned.add(entry.assignSequenceNumber(snapshotId, 
snapshotId));
+            if (entry.file().minSequenceNumber() == 0L) {
+                snapshotAssigned.add(entry.assignSequenceNumber(snapshotId, 
snapshotId));
+            } else {
+                snapshotAssigned.add(entry);
+            }
         }
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index bd31672d6a..ff5891d37c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -711,7 +711,8 @@ public class AppendOnlyWriterTest {
                         new FileIndexOptions(),
                         true,
                         false,
-                        null);
+                        null,
+                        options.dataEvolutionEnabled());
         writer.setMemoryPool(
                 new HeapMemorySegmentPool(options.writeBufferSize(), 
options.pageSize()));
         return Pair.of(writer, compactManager.allFiles());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
index 8fe9f193dc..0afe95eef0 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
@@ -99,7 +99,7 @@ public class RollingBlobFileWriterTest {
                         TARGET_FILE_SIZE,
                         SCHEMA,
                         pathFactory,
-                        seqNumCounter,
+                        () -> seqNumCounter,
                         COMPRESSION,
                         new StatsCollectorFactories(new CoreOptions(new 
Options())),
                         new FileIndexOptions(),
@@ -199,7 +199,7 @@ public class RollingBlobFileWriterTest {
                                 false,
                                 null,
                                 null),
-                        new LongCounter(),
+                        () -> new LongCounter(),
                         COMPRESSION,
                         new StatsCollectorFactories(new CoreOptions(new 
Options())),
                         new FileIndexOptions(),
@@ -276,7 +276,7 @@ public class RollingBlobFileWriterTest {
                         blobTargetFileSize,
                         SCHEMA,
                         pathFactory, // Use the same pathFactory to ensure 
shared UUID
-                        new LongCounter(),
+                        () -> new LongCounter(),
                         COMPRESSION,
                         new StatsCollectorFactories(new CoreOptions(new 
Options())),
                         new FileIndexOptions(),
@@ -355,7 +355,7 @@ public class RollingBlobFileWriterTest {
                         blobTargetFileSize,
                         SCHEMA,
                         pathFactory, // Use the same pathFactory to ensure 
shared UUID
-                        new LongCounter(),
+                        () -> new LongCounter(),
                         COMPRESSION,
                         new StatsCollectorFactories(new CoreOptions(new 
Options())),
                         new FileIndexOptions(),
@@ -573,7 +573,7 @@ public class RollingBlobFileWriterTest {
                         TARGET_FILE_SIZE,
                         customSchema, // Use custom schema
                         pathFactory,
-                        seqNumCounter,
+                        () -> seqNumCounter,
                         COMPRESSION,
                         new StatsCollectorFactories(new CoreOptions(new 
Options())),
                         new FileIndexOptions(),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
index c8c0178811..441ec93ebb 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.append.dataevolution;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.FileStore;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.Timestamp;
@@ -27,6 +28,7 @@ import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.ManifestsReader;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.PartitionPredicate;
@@ -187,6 +189,8 @@ public class DataEvolutionCompactCoordinatorTest {
         SnapshotManager snapshotManager = mock(SnapshotManager.class);
         Snapshot snapshot = mock(Snapshot.class);
         ManifestsReader manifestsReader = mock(ManifestsReader.class);
+        FileStore fileStore = mock(FileStore.class);
+        FileStoreScan scan = mock(FileStoreScan.class);
 
         Options options = new Options();
         options.set("target-file-size", "1 kb");
@@ -199,6 +203,8 @@ public class DataEvolutionCompactCoordinatorTest {
         when(snapshotReader.snapshotManager()).thenReturn(snapshotManager);
         when(snapshotManager.latestSnapshot()).thenReturn(snapshot);
         when(snapshotReader.manifestsReader()).thenReturn(manifestsReader);
+        when(table.store()).thenReturn(fileStore);
+        when(fileStore.newScan()).thenReturn(scan);
 
         ManifestFileMeta metaWithNullRowId =
                 new ManifestFileMeta(
@@ -234,10 +240,8 @@ public class DataEvolutionCompactCoordinatorTest {
 
         ManifestEntry entry1 = makeEntry("file1.parquet", 0L, 100L, 600);
         ManifestEntry entry2 = makeEntry("file2.parquet", 100L, 100L, 600);
-        when(snapshotReader.readManifest(metaWithNullRowId))
-                .thenReturn(Collections.singletonList(entry1));
-        when(snapshotReader.readManifest(metaWithRowId))
-                .thenReturn(Collections.singletonList(entry2));
+        when(scan.readFileIterator(Arrays.asList(metaWithNullRowId, 
metaWithRowId)))
+                .thenReturn(Arrays.asList(entry1, entry2).iterator());
 
         DataEvolutionCompactCoordinator coordinator =
                 new DataEvolutionCompactCoordinator(table, false);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index f5c26445bf..3ffe8606ab 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -291,7 +291,8 @@ public class KeyValueFileReadWriteTest {
                         new FileIndexOptions(),
                         true,
                         false,
-                        null);
+                        null,
+                        options.dataEvolutionEnabled());
         appendOnlyWriter.setMemoryPool(
                 new HeapMemorySegmentPool(options.writeBufferSize(), 
options.pageSize()));
         appendOnlyWriter.write(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
index 96adc5fc21..84e7c6766b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
@@ -92,7 +92,7 @@ public class RollingFileWriterTest {
                                                 .newPath(),
                                         SCHEMA,
                                         0L,
-                                        new LongCounter(0),
+                                        () -> new LongCounter(0),
                                         new FileIndexOptions(),
                                         FileSource.APPEND,
                                         true,
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
index 5d8e1ef9fb..2cdd156e7f 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
@@ -204,14 +204,14 @@ class BlobTestBase extends PaimonSparkTestBase {
         sql("SELECT COUNT(*) FROM `t$files`"),
         Seq(Row(22))
       )
-      sql("CALL paimon.sys.compact('t')")
+      sql("CALL paimon.sys.compact('t')").collect()
       checkAnswer(
         sql("SELECT COUNT(*) FROM `t$files`"),
         Seq(Row(12))
       )
       checkAnswer(
         sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t LIMIT 1"),
-        Seq(Row(1, "paimon", Array[Byte](72, 101, 108, 108, 111), 0, 12))
+        Seq(Row(1, "paimon", Array[Byte](72, 101, 108, 108, 111), 0, 11))
       )
     }
   }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index 2f6afa7ce7..4079cf5eed 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -38,7 +38,7 @@ abstract class RowTrackingTestBase extends 
PaimonSparkTestBase {
 
   import testImplicits._
 
-  ignore("Data Evolution: concurrent merge and compact") {
+  test("Data Evolution: concurrent merge and compact") {
     withTable("s", "t") {
       sql(s"""
             CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES (
@@ -694,7 +694,7 @@ abstract class RowTrackingTestBase extends 
PaimonSparkTestBase {
       sql("CALL paimon.sys.compact(table => 't')")
       checkAnswer(
         sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
-        Seq(Row(1, 11, 111, 2, 3), Row(2, 22, 2, 0, 3), Row(3, 3, 3, 1, 3))
+        Seq(Row(1, 11, 111, 2, 2), Row(2, 22, 2, 0, 2), Row(3, 3, 3, 1, 2))
       )
 
       checkAnswer(

Reply via email to