This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 28edeaf7a9 [core] Judge minSequenceNum to decide whether assign
sequenceNum to ManifestEntry (#7121)
28edeaf7a9 is described below
commit 28edeaf7a9cf6b0b01fd81083573e5ca0734187d
Author: YeJunHao <[email protected]>
AuthorDate: Mon Jan 26 22:56:31 2026 +0800
[core] Judge minSequenceNum to decide whether assign sequenceNum to
ManifestEntry (#7121)
---
.../org/apache/paimon/append/AppendOnlyWriter.java | 16 ++++++++++------
.../apache/paimon/append/MultipleBlobFileWriter.java | 4 ++--
.../apache/paimon/append/RollingBlobFileWriter.java | 10 +++++-----
.../DataEvolutionCompactCoordinator.java | 20 ++++++++------------
.../dataevolution/DataEvolutionCompactTask.java | 17 +++++++++++++++++
.../java/org/apache/paimon/io/RowDataFileWriter.java | 5 +++--
.../apache/paimon/io/RowDataRollingFileWriter.java | 5 +++--
.../paimon/operation/AbstractFileStoreScan.java | 8 +++++++-
.../paimon/operation/BaseAppendFileStoreWrite.java | 16 +++++++++++-----
.../org/apache/paimon/operation/FileStoreScan.java | 2 ++
.../operation/commit/RowTrackingCommitUtils.java | 6 +++++-
.../apache/paimon/append/AppendOnlyWriterTest.java | 3 ++-
.../paimon/append/RollingBlobFileWriterTest.java | 10 +++++-----
.../DataEvolutionCompactCoordinatorTest.java | 12 ++++++++----
.../apache/paimon/io/KeyValueFileReadWriteTest.java | 3 ++-
.../org/apache/paimon/io/RollingFileWriterTest.java | 2 +-
.../org/apache/paimon/spark/sql/BlobTestBase.scala | 4 ++--
.../paimon/spark/sql/RowTrackingTestBase.scala | 4 ++--
18 files changed, 95 insertions(+), 52 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 61abbb0cb0..7868513c9e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -60,6 +60,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
import static org.apache.paimon.types.DataTypeRoot.BLOB;
@@ -86,7 +87,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
private final List<DataFileMeta> deletedFiles;
private final List<DataFileMeta> compactBefore;
private final List<DataFileMeta> compactAfter;
- private final LongCounter seqNumCounter;
+ private final Supplier<LongCounter> seqNumCounterProvider;
private final String fileCompression;
private final CompressOptions spillCompression;
private final StatsCollectorFactories statsCollectorFactories;
@@ -123,7 +124,8 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
FileIndexOptions fileIndexOptions,
boolean asyncFileWrite,
boolean statsDenseStore,
- @Nullable BlobConsumer blobConsumer) {
+ @Nullable BlobConsumer blobConsumer,
+ boolean dataEvolutionEnabled) {
this.fileIO = fileIO;
this.schemaId = schemaId;
this.fileFormat = fileFormat;
@@ -142,7 +144,9 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
this.deletedFiles = new ArrayList<>();
this.compactBefore = new ArrayList<>();
this.compactAfter = new ArrayList<>();
- this.seqNumCounter = new LongCounter(maxSequenceNumber + 1);
+ final LongCounter seqNumCounter = new LongCounter(maxSequenceNumber +
1);
+ this.seqNumCounterProvider =
+ dataEvolutionEnabled ? () -> new LongCounter(0) : () ->
seqNumCounter;
this.fileCompression = fileCompression;
this.spillCompression = spillCompression;
this.ioManager = ioManager;
@@ -224,7 +228,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
@Override
public long maxSequenceNumber() {
- return seqNumCounter.getValue() - 1;
+ return seqNumCounterProvider.get().getValue() - 1;
}
@Override
@@ -309,7 +313,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
blobTargetFileSize,
writeSchema,
pathFactory,
- seqNumCounter,
+ seqNumCounterProvider,
fileCompression,
statsCollectorFactories,
fileIndexOptions,
@@ -325,7 +329,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
targetFileSize,
writeSchema,
pathFactory,
- seqNumCounter,
+ seqNumCounterProvider,
fileCompression,
statsCollectorFactories.statsCollectors(writeSchema.getFieldNames()),
fileIndexOptions,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
index f9edb0ef65..b55bdd8ef0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
@@ -56,7 +56,7 @@ public class MultipleBlobFileWriter implements Closeable {
long schemaId,
RowType writeSchema,
DataFilePathFactory pathFactory,
- LongCounter seqNumCounter,
+ Supplier<LongCounter> seqNumCounterSupplier,
FileSource fileSource,
boolean asyncFileWrite,
boolean statsDenseStore,
@@ -82,7 +82,7 @@ public class MultipleBlobFileWriter implements Closeable {
pathFactory.newBlobPath(),
writeSchema.project(blobFieldName),
schemaId,
- seqNumCounter,
+ seqNumCounterSupplier,
new FileIndexOptions(),
fileSource,
asyncFileWrite,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
index f565212d1f..80faa01d92 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
@@ -102,7 +102,7 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
long blobTargetFileSize,
RowType writeSchema,
DataFilePathFactory pathFactory,
- LongCounter seqNumCounter,
+ Supplier<LongCounter> seqNumCounterSupplier,
String fileCompression,
StatsCollectorFactories statsCollectorFactories,
FileIndexOptions fileIndexOptions,
@@ -124,7 +124,7 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
BlobType.splitBlob(writeSchema).getLeft(),
writeSchema,
pathFactory,
- seqNumCounter,
+ seqNumCounterSupplier,
fileCompression,
statsCollectorFactories,
fileIndexOptions,
@@ -140,7 +140,7 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
schemaId,
writeSchema,
pathFactory,
- seqNumCounter,
+ seqNumCounterSupplier,
fileSource,
asyncFileWrite,
statsDenseStore,
@@ -158,7 +158,7 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
RowType normalRowType,
RowType writeSchema,
DataFilePathFactory pathFactory,
- LongCounter seqNumCounter,
+ Supplier<LongCounter> seqNumCounterSupplier,
String fileCompression,
StatsCollectorFactories statsCollectorFactories,
FileIndexOptions fileIndexOptions,
@@ -181,7 +181,7 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
pathFactory.newPath(),
normalRowType,
schemaId,
- seqNumCounter,
+ seqNumCounterSupplier,
fileIndexOptions,
fileSource,
asyncFileWrite,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
index f1fe3ae617..e8081ac069 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.EndOfScanException;
@@ -42,7 +43,6 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.TreeMap;
-import java.util.stream.Collectors;
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -70,7 +70,8 @@ public class DataEvolutionCompactCoordinator {
this.scanner =
new CompactScanner(
-
table.newSnapshotReader().withPartitionFilter(partitionPredicate));
+
table.newSnapshotReader().withPartitionFilter(partitionPredicate),
+ table.store().newScan());
this.planner =
new CompactPlanner(compactBlob, targetFileSize, openFileCost,
compactMinFileNum);
}
@@ -89,11 +90,11 @@ public class DataEvolutionCompactCoordinator {
/** Scanner to generate sorted ManifestEntries. */
static class CompactScanner {
- private final SnapshotReader snapshotReader;
+ private final FileStoreScan scan;
private final Queue<List<ManifestFileMeta>> metas;
- private CompactScanner(SnapshotReader snapshotReader) {
- this.snapshotReader = snapshotReader;
+ private CompactScanner(SnapshotReader snapshotReader, FileStoreScan
scan) {
+ this.scan = scan;
Snapshot snapshot =
snapshotReader.snapshotManager().latestSnapshot();
List<ManifestFileMeta> manifestFileMetas =
@@ -116,13 +117,8 @@ public class DataEvolutionCompactCoordinator {
List<ManifestEntry> result = new ArrayList<>();
while (metas.peek() != null && result.size() < FILES_BATCH) {
List<ManifestFileMeta> currentMetas = metas.poll();
- List<ManifestEntry> targetEntries =
- currentMetas.stream()
- .flatMap(meta ->
snapshotReader.readManifest(meta).stream())
- // we don't need stats for compaction
- .map(ManifestEntry::copyWithoutStats)
- .collect(Collectors.toList());
- result.addAll(targetEntries);
+ scan.readFileIterator(currentMetas)
+ .forEachRemaining(entry ->
result.add(entry.copyWithoutStats()));
}
if (result.isEmpty()) {
throw new EndOfScanException();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
index d4da47bdc5..f1f48ac56d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
@@ -125,6 +125,23 @@ public class DataEvolutionCompactTask {
writeResult.size() == 1, "Data evolution compaction should
produce one file.");
DataFileMeta dataFileMeta =
writeResult.get(0).assignFirstRowId(firstRowId);
+ long minSequenceId =
+ compactBefore.stream()
+ .mapToLong(DataFileMeta::minSequenceNumber)
+ .min()
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Cannot get min sequence id
from compact before files."));
+ long maxSequenceId =
+ compactBefore.stream()
+ .mapToLong(DataFileMeta::maxSequenceNumber)
+ .max()
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Cannot get max sequence id
from compact before files."));
+ dataFileMeta = dataFileMeta.assignSequenceNumber(minSequenceId,
maxSequenceId);
compactAfter.add(dataFileMeta);
CompactIncrement compactIncrement =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
index 2f2982ba85..7f8715ab08 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
@@ -35,6 +35,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
+import java.util.function.Supplier;
import static org.apache.paimon.io.DataFilePathFactory.dataFileToFileIndexPath;
@@ -58,7 +59,7 @@ public class RowDataFileWriter extends
StatsCollectingSingleFileWriter<InternalR
Path path,
RowType writeSchema,
long schemaId,
- LongCounter seqNumCounter,
+ Supplier<LongCounter> seqNumCounterSupplier,
FileIndexOptions fileIndexOptions,
FileSource fileSource,
boolean asyncFileWrite,
@@ -67,7 +68,7 @@ public class RowDataFileWriter extends
StatsCollectingSingleFileWriter<InternalR
@Nullable List<String> writeCols) {
super(fileIO, context, path, Function.identity(), writeSchema,
asyncFileWrite);
this.schemaId = schemaId;
- this.seqNumCounter = seqNumCounter;
+ this.seqNumCounter = seqNumCounterSupplier.get();
this.isExternalPath = isExternalPath;
this.statsArraySerializer = new SimpleStatsConverter(writeSchema,
statsDenseStore);
this.dataFileIndexWriter =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
index 328bc193b9..81e0a0aefc 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
@@ -30,6 +30,7 @@ import org.apache.paimon.utils.LongCounter;
import javax.annotation.Nullable;
import java.util.List;
+import java.util.function.Supplier;
/** {@link RollingFileWriterImpl} for data files containing {@link
InternalRow}. */
public class RowDataRollingFileWriter extends
RollingFileWriterImpl<InternalRow, DataFileMeta> {
@@ -41,7 +42,7 @@ public class RowDataRollingFileWriter extends
RollingFileWriterImpl<InternalRow,
long targetFileSize,
RowType writeSchema,
DataFilePathFactory pathFactory,
- LongCounter seqNumCounter,
+ Supplier<LongCounter> seqNumCounterSupplier,
String fileCompression,
SimpleColStatsCollector.Factory[] statsCollectors,
FileIndexOptions fileIndexOptions,
@@ -58,7 +59,7 @@ public class RowDataRollingFileWriter extends
RollingFileWriterImpl<InternalRow,
pathFactory.newPath(),
writeSchema,
schemaId,
- seqNumCounter,
+ seqNumCounterSupplier,
fileIndexOptions,
fileSource,
asyncFileWrite,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index aa3f1ccf7a..033b24c746 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -373,7 +373,13 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
return readManifestEntries(readManifests().filteredManifests, true);
}
- protected Iterator<ManifestEntry> readManifestEntries(
+ @Override
+ public Iterator<ManifestEntry> readFileIterator(List<ManifestFileMeta>
manifestFileMetas) {
+ // useSequential: reduce memory and iterator can be stopping
+ return readManifestEntries(manifestFileMetas, true);
+ }
+
+ public Iterator<ManifestEntry> readManifestEntries(
List<ManifestFileMeta> manifests, boolean useSequential) {
return scanMode == ScanMode.ALL
? readAndMergeFileEntries(manifests, Function.identity(),
useSequential)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
index 9eb49fee83..bc39a4ae8e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
@@ -60,6 +60,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
+import java.util.function.Supplier;
import static org.apache.paimon.format.FileFormat.fileFormat;
import static org.apache.paimon.types.DataTypeRoot.BLOB;
@@ -151,7 +152,8 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
fileIndexOptions,
options.asyncFileWrite(),
options.statsDenseStore(),
- blobConsumer);
+ blobConsumer,
+ options.dataEvolutionEnabled());
}
@Override
@@ -191,7 +193,9 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
Exception collectedExceptions = null;
RowDataRollingFileWriter rewriter =
createRollingFileWriter(
- partition, bucket, new
LongCounter(toCompact.get(0).minSequenceNumber()));
+ partition,
+ bucket,
+ () -> new
LongCounter(toCompact.get(0).minSequenceNumber()));
Map<String, IOExceptionSupplier<DeletionVector>> dvFactories = null;
if (dvFactory != null) {
dvFactories = new HashMap<>();
@@ -226,7 +230,9 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
Sorter sorter = Sorter.getSorter(reader, ioManager, rowType, options);
RowDataRollingFileWriter rewriter =
createRollingFileWriter(
- partition, bucket, new
LongCounter(toCluster.get(0).minSequenceNumber()));
+ partition,
+ bucket,
+ () -> new
LongCounter(toCluster.get(0).minSequenceNumber()));
try {
MutableObjectIterator<BinaryRow> sorted = sorter.sort();
BinaryRow binaryRow = new BinaryRow(sorter.arity());
@@ -253,7 +259,7 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
}
private RowDataRollingFileWriter createRollingFileWriter(
- BinaryRow partition, int bucket, LongCounter seqNumCounter) {
+ BinaryRow partition, int bucket, Supplier<LongCounter>
seqNumCounterSupplier) {
return new RowDataRollingFileWriter(
fileIO,
schemaId,
@@ -261,7 +267,7 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
options.targetFileSize(false),
writeType,
pathFactory.createDataFilePathFactory(partition, bucket),
- seqNumCounter,
+ seqNumCounterSupplier,
options.fileCompression(),
statsCollectors(),
fileIndexOptions,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 129658cc36..7542ff87d4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -117,6 +117,8 @@ public interface FileStoreScan {
Iterator<ManifestEntry> readFileIterator();
+ Iterator<ManifestEntry> readFileIterator(List<ManifestFileMeta>
manifestFileMetas);
+
default List<BinaryRow> listPartitions() {
return readPartitionEntries().stream()
.map(PartitionEntry::partition)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java
index 36e7beec85..d2f3dc8851 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java
@@ -49,7 +49,11 @@ public class RowTrackingCommitUtils {
private static void assignSnapshotId(
long snapshotId, List<ManifestEntry> deltaFiles,
List<ManifestEntry> snapshotAssigned) {
for (ManifestEntry entry : deltaFiles) {
- snapshotAssigned.add(entry.assignSequenceNumber(snapshotId,
snapshotId));
+ if (entry.file().minSequenceNumber() == 0L) {
+ snapshotAssigned.add(entry.assignSequenceNumber(snapshotId,
snapshotId));
+ } else {
+ snapshotAssigned.add(entry);
+ }
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index bd31672d6a..ff5891d37c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -711,7 +711,8 @@ public class AppendOnlyWriterTest {
new FileIndexOptions(),
true,
false,
- null);
+ null,
+ options.dataEvolutionEnabled());
writer.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(),
options.pageSize()));
return Pair.of(writer, compactManager.allFiles());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
index 8fe9f193dc..0afe95eef0 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
@@ -99,7 +99,7 @@ public class RollingBlobFileWriterTest {
TARGET_FILE_SIZE,
SCHEMA,
pathFactory,
- seqNumCounter,
+ () -> seqNumCounter,
COMPRESSION,
new StatsCollectorFactories(new CoreOptions(new
Options())),
new FileIndexOptions(),
@@ -199,7 +199,7 @@ public class RollingBlobFileWriterTest {
false,
null,
null),
- new LongCounter(),
+ () -> new LongCounter(),
COMPRESSION,
new StatsCollectorFactories(new CoreOptions(new
Options())),
new FileIndexOptions(),
@@ -276,7 +276,7 @@ public class RollingBlobFileWriterTest {
blobTargetFileSize,
SCHEMA,
pathFactory, // Use the same pathFactory to ensure
shared UUID
- new LongCounter(),
+ () -> new LongCounter(),
COMPRESSION,
new StatsCollectorFactories(new CoreOptions(new
Options())),
new FileIndexOptions(),
@@ -355,7 +355,7 @@ public class RollingBlobFileWriterTest {
blobTargetFileSize,
SCHEMA,
pathFactory, // Use the same pathFactory to ensure
shared UUID
- new LongCounter(),
+ () -> new LongCounter(),
COMPRESSION,
new StatsCollectorFactories(new CoreOptions(new
Options())),
new FileIndexOptions(),
@@ -573,7 +573,7 @@ public class RollingBlobFileWriterTest {
TARGET_FILE_SIZE,
customSchema, // Use custom schema
pathFactory,
- seqNumCounter,
+ () -> seqNumCounter,
COMPRESSION,
new StatsCollectorFactories(new CoreOptions(new
Options())),
new FileIndexOptions(),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
index c8c0178811..441ec93ebb 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
@@ -19,6 +19,7 @@
package org.apache.paimon.append.dataevolution;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.FileStore;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.Timestamp;
@@ -27,6 +28,7 @@ import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.ManifestsReader;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
@@ -187,6 +189,8 @@ public class DataEvolutionCompactCoordinatorTest {
SnapshotManager snapshotManager = mock(SnapshotManager.class);
Snapshot snapshot = mock(Snapshot.class);
ManifestsReader manifestsReader = mock(ManifestsReader.class);
+ FileStore fileStore = mock(FileStore.class);
+ FileStoreScan scan = mock(FileStoreScan.class);
Options options = new Options();
options.set("target-file-size", "1 kb");
@@ -199,6 +203,8 @@ public class DataEvolutionCompactCoordinatorTest {
when(snapshotReader.snapshotManager()).thenReturn(snapshotManager);
when(snapshotManager.latestSnapshot()).thenReturn(snapshot);
when(snapshotReader.manifestsReader()).thenReturn(manifestsReader);
+ when(table.store()).thenReturn(fileStore);
+ when(fileStore.newScan()).thenReturn(scan);
ManifestFileMeta metaWithNullRowId =
new ManifestFileMeta(
@@ -234,10 +240,8 @@ public class DataEvolutionCompactCoordinatorTest {
ManifestEntry entry1 = makeEntry("file1.parquet", 0L, 100L, 600);
ManifestEntry entry2 = makeEntry("file2.parquet", 100L, 100L, 600);
- when(snapshotReader.readManifest(metaWithNullRowId))
- .thenReturn(Collections.singletonList(entry1));
- when(snapshotReader.readManifest(metaWithRowId))
- .thenReturn(Collections.singletonList(entry2));
+ when(scan.readFileIterator(Arrays.asList(metaWithNullRowId,
metaWithRowId)))
+ .thenReturn(Arrays.asList(entry1, entry2).iterator());
DataEvolutionCompactCoordinator coordinator =
new DataEvolutionCompactCoordinator(table, false);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index f5c26445bf..3ffe8606ab 100644
---
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -291,7 +291,8 @@ public class KeyValueFileReadWriteTest {
new FileIndexOptions(),
true,
false,
- null);
+ null,
+ options.dataEvolutionEnabled());
appendOnlyWriter.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(),
options.pageSize()));
appendOnlyWriter.write(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
index 96adc5fc21..84e7c6766b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
@@ -92,7 +92,7 @@ public class RollingFileWriterTest {
.newPath(),
SCHEMA,
0L,
- new LongCounter(0),
+ () -> new LongCounter(0),
new FileIndexOptions(),
FileSource.APPEND,
true,
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
index 5d8e1ef9fb..2cdd156e7f 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
@@ -204,14 +204,14 @@ class BlobTestBase extends PaimonSparkTestBase {
sql("SELECT COUNT(*) FROM `t$files`"),
Seq(Row(22))
)
- sql("CALL paimon.sys.compact('t')")
+ sql("CALL paimon.sys.compact('t')").collect()
checkAnswer(
sql("SELECT COUNT(*) FROM `t$files`"),
Seq(Row(12))
)
checkAnswer(
sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t LIMIT 1"),
- Seq(Row(1, "paimon", Array[Byte](72, 101, 108, 108, 111), 0, 12))
+ Seq(Row(1, "paimon", Array[Byte](72, 101, 108, 108, 111), 0, 11))
)
}
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index 2f6afa7ce7..4079cf5eed 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -38,7 +38,7 @@ abstract class RowTrackingTestBase extends
PaimonSparkTestBase {
import testImplicits._
- ignore("Data Evolution: concurrent merge and compact") {
+ test("Data Evolution: concurrent merge and compact") {
withTable("s", "t") {
sql(s"""
CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES (
@@ -694,7 +694,7 @@ abstract class RowTrackingTestBase extends
PaimonSparkTestBase {
sql("CALL paimon.sys.compact(table => 't')")
checkAnswer(
sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
- Seq(Row(1, 11, 111, 2, 3), Row(2, 22, 2, 0, 3), Row(3, 3, 3, 1, 3))
+ Seq(Row(1, 11, 111, 2, 2), Row(2, 22, 2, 0, 2), Row(3, 3, 3, 1, 2))
)
checkAnswer(