This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e60487e849 [core] Make ManifestEntry and DataFileMeta to interfaces
(#6134)
e60487e849 is described below
commit e60487e849a828af6d57bd5b89de74b0fef80bb2
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Aug 25 17:09:56 2025 +0800
[core] Make ManifestEntry and DataFileMeta to interfaces (#6134)
---
.../java/org/apache/paimon/io/DataFileMeta.java | 504 +++------------------
.../apache/paimon/io/DataFileMeta08Serializer.java | 2 +-
.../apache/paimon/io/DataFileMeta09Serializer.java | 2 +-
.../paimon/io/DataFileMeta10LegacySerializer.java | 2 +-
.../paimon/io/DataFileMeta12LegacySerializer.java | 2 +-
.../apache/paimon/io/DataFileMetaSerializer.java | 2 +-
.../apache/paimon/io/KeyValueDataFileWriter.java | 2 +-
.../{DataFileMeta.java => PojoDataFileMeta.java} | 298 +++---------
.../paimon/manifest/FilteredManifestEntry.java | 2 +-
.../org/apache/paimon/manifest/ManifestEntry.java | 149 +-----
.../paimon/manifest/ManifestEntrySerializer.java | 2 +-
.../{ManifestEntry.java => PojoManifestEntry.java} | 93 ++--
.../paimon/operation/FileStoreCommitImpl.java | 4 +-
.../sink/CommitMessageLegacyV2Serializer.java | 2 +-
.../append/AppendCompactCoordinatorTest.java | 2 +-
.../paimon/crosspartition/IndexBootstrapTest.java | 2 +-
.../paimon/io/DataFileTestDataGenerator.java | 2 +-
.../org/apache/paimon/io/DataFileTestUtils.java | 6 +-
...festCommittableSerializerCompatibilityTest.java | 18 +-
.../ManifestCommittableSerializerTest.java | 2 +-
.../paimon/manifest/ManifestFileMetaTestBase.java | 8 +-
.../paimon/manifest/ManifestTestDataGenerator.java | 9 +-
.../org/apache/paimon/mergetree/LevelsTest.java | 2 +-
.../mergetree/compact/IntervalPartitionTest.java | 2 +-
.../mergetree/compact/UniversalCompactionTest.java | 2 +-
.../paimon/operation/ExpireSnapshotsTest.java | 12 +-
.../apache/paimon/operation/FileDeletionTest.java | 2 +-
.../source/DataEvolutionSplitGeneratorTest.java | 2 +-
.../paimon/table/source/SplitGeneratorTest.java | 2 +-
.../org/apache/paimon/table/source/SplitTest.java | 14 +-
.../flink/copy/CopyManifestFileOperator.java | 2 +-
.../ChangelogCompactSortOperatorTest.java | 2 +-
.../ChangelogCompactTaskSerializerTest.java | 2 +-
.../sink/CompactionTaskSimpleSerializerTest.java | 2 +-
.../source/FileStoreSourceSplitGeneratorTest.java | 2 +-
.../source/FileStoreSourceSplitSerializerTest.java | 2 +-
36 files changed, 233 insertions(+), 932 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index 48b56a5c65..e7bb8b9571 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -35,17 +35,14 @@ import org.apache.paimon.types.TinyIntType;
import javax.annotation.Nullable;
import java.time.LocalDateTime;
-import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Objects;
import java.util.Optional;
import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
import static org.apache.paimon.stats.SimpleStats.EMPTY_STATS;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.SerializationUtils.newBytesType;
import static org.apache.paimon.utils.SerializationUtils.newStringType;
@@ -55,9 +52,9 @@ import static
org.apache.paimon.utils.SerializationUtils.newStringType;
* @since 0.9.0
*/
@Public
-public class DataFileMeta {
+public interface DataFileMeta {
- public static final RowType SCHEMA =
+ RowType SCHEMA =
new RowType(
false,
Arrays.asList(
@@ -87,51 +84,11 @@ public class DataFileMeta {
new DataField(
19, "_WRITE_COLS", new ArrayType(true,
newStringType(false)))));
- public static final BinaryRow EMPTY_MIN_KEY = EMPTY_ROW;
- public static final BinaryRow EMPTY_MAX_KEY = EMPTY_ROW;
- public static final int DUMMY_LEVEL = 0;
+ BinaryRow EMPTY_MIN_KEY = EMPTY_ROW;
+ BinaryRow EMPTY_MAX_KEY = EMPTY_ROW;
+ int DUMMY_LEVEL = 0;
- private final String fileName;
- private final long fileSize;
-
- // total number of rows (including add & delete) in this file
- private final long rowCount;
-
- private final BinaryRow minKey;
- private final BinaryRow maxKey;
- private final SimpleStats keyStats;
- private final SimpleStats valueStats;
-
- // As for row-lineage table, this will be reassigned while committing
- private final long minSequenceNumber;
- private final long maxSequenceNumber;
- private final long schemaId;
- private final int level;
-
- private final List<String> extraFiles;
- private final Timestamp creationTime;
-
- // rowCount = addRowCount + deleteRowCount
- // Why don't we keep addRowCount and deleteRowCount?
- // Because in previous versions of DataFileMeta, we only keep rowCount.
- // We have to keep the compatibility.
- private final @Nullable Long deleteRowCount;
-
- // file index filter bytes, if it is small, store in data file meta
- private final @Nullable byte[] embeddedIndex;
-
- private final @Nullable FileSource fileSource;
-
- private final @Nullable List<String> valueStatsCols;
-
- /** external path of file, if it is null, it is in the default warehouse
path. */
- private final @Nullable String externalPath;
-
- private final @Nullable Long firstRowId;
-
- private final @Nullable List<String> writeCols;
-
- public static DataFileMeta forAppend(
+ static DataFileMeta forAppend(
String fileName,
long fileSize,
long rowCount,
@@ -146,7 +103,7 @@ public class DataFileMeta {
@Nullable String externalPath,
@Nullable Long firstRowId,
@Nullable List<String> writeCols) {
- return new DataFileMeta(
+ return new PojoDataFileMeta(
fileName,
fileSize,
rowCount,
@@ -169,7 +126,7 @@ public class DataFileMeta {
writeCols);
}
- public DataFileMeta(
+ static DataFileMeta create(
String fileName,
long fileSize,
long rowCount,
@@ -189,7 +146,7 @@ public class DataFileMeta {
@Nullable String externalPath,
@Nullable Long firstRowId,
@Nullable List<String> writeCols) {
- this(
+ return new PojoDataFileMeta(
fileName,
fileSize,
rowCount,
@@ -212,7 +169,7 @@ public class DataFileMeta {
writeCols);
}
- public DataFileMeta(
+ static DataFileMeta create(
String fileName,
long fileSize,
long rowCount,
@@ -230,7 +187,7 @@ public class DataFileMeta {
@Nullable List<String> valueStatsCols,
@Nullable Long firstRowId,
@Nullable List<String> writeCols) {
- this(
+ return new PojoDataFileMeta(
fileName,
fileSize,
rowCount,
@@ -253,7 +210,7 @@ public class DataFileMeta {
writeCols);
}
- public DataFileMeta(
+ static DataFileMeta create(
String fileName,
long fileSize,
long rowCount,
@@ -274,429 +231,102 @@ public class DataFileMeta {
@Nullable String externalPath,
@Nullable Long firstRowId,
@Nullable List<String> writeCols) {
- this.fileName = fileName;
- this.fileSize = fileSize;
-
- this.rowCount = rowCount;
-
- this.embeddedIndex = embeddedIndex;
- this.minKey = minKey;
- this.maxKey = maxKey;
- this.keyStats = keyStats;
- this.valueStats = valueStats;
-
- this.minSequenceNumber = minSequenceNumber;
- this.maxSequenceNumber = maxSequenceNumber;
- this.level = level;
- this.schemaId = schemaId;
- this.extraFiles = Collections.unmodifiableList(extraFiles);
- this.creationTime = creationTime;
-
- this.deleteRowCount = deleteRowCount;
- this.fileSource = fileSource;
- this.valueStatsCols = valueStatsCols;
- this.externalPath = externalPath;
- this.firstRowId = firstRowId;
- this.writeCols = writeCols;
- }
-
- public String fileName() {
- return fileName;
+ return new PojoDataFileMeta(
+ fileName,
+ fileSize,
+ rowCount,
+ minKey,
+ maxKey,
+ keyStats,
+ valueStats,
+ minSequenceNumber,
+ maxSequenceNumber,
+ schemaId,
+ level,
+ extraFiles,
+ creationTime,
+ deleteRowCount,
+ embeddedIndex,
+ fileSource,
+ valueStatsCols,
+ externalPath,
+ firstRowId,
+ writeCols);
}
- public long fileSize() {
- return fileSize;
- }
+ String fileName();
- public long rowCount() {
- return rowCount;
- }
+ long fileSize();
- public Optional<Long> addRowCount() {
- return Optional.ofNullable(deleteRowCount).map(c -> rowCount - c);
- }
+ long rowCount();
- public Optional<Long> deleteRowCount() {
- return Optional.ofNullable(deleteRowCount);
- }
+ Optional<Long> deleteRowCount();
- public byte[] embeddedIndex() {
- return embeddedIndex;
- }
+ byte[] embeddedIndex();
- public BinaryRow minKey() {
- return minKey;
- }
+ BinaryRow minKey();
- public BinaryRow maxKey() {
- return maxKey;
- }
+ BinaryRow maxKey();
- public SimpleStats keyStats() {
- return keyStats;
- }
+ SimpleStats keyStats();
- public SimpleStats valueStats() {
- return valueStats;
- }
+ SimpleStats valueStats();
- public long minSequenceNumber() {
- return minSequenceNumber;
- }
+ long minSequenceNumber();
- public long maxSequenceNumber() {
- return maxSequenceNumber;
- }
+ long maxSequenceNumber();
- public long schemaId() {
- return schemaId;
- }
+ long schemaId();
- public int level() {
- return level;
- }
+ int level();
- public List<String> extraFiles() {
- return extraFiles;
- }
+ List<String> extraFiles();
- public Timestamp creationTime() {
- return creationTime;
- }
+ Timestamp creationTime();
- public long creationTimeEpochMillis() {
- return creationTime
- .toLocalDateTime()
- .atZone(ZoneId.systemDefault())
- .toInstant()
- .toEpochMilli();
- }
+ long creationTimeEpochMillis();
- public String fileFormat() {
- String[] split = fileName.split("\\.");
- if (split.length == 1) {
- throw new RuntimeException("Can't find format from file: " +
fileName());
- }
- return split[split.length - 1];
- }
+ String fileFormat();
- public Optional<String> externalPath() {
- return Optional.ofNullable(externalPath);
- }
+ Optional<String> externalPath();
- public Optional<String> externalPathDir() {
- return Optional.ofNullable(externalPath)
- .map(Path::new)
- .map(p -> p.getParent().toUri().toString());
- }
+ Optional<String> externalPathDir();
- public Optional<FileSource> fileSource() {
- return Optional.ofNullable(fileSource);
- }
+ Optional<FileSource> fileSource();
@Nullable
- public List<String> valueStatsCols() {
- return valueStatsCols;
- }
+ List<String> valueStatsCols();
@Nullable
- public Long firstRowId() {
- return firstRowId;
- }
+ Long firstRowId();
@Nullable
- public List<String> writeCols() {
- return writeCols;
- }
+ List<String> writeCols();
- public DataFileMeta upgrade(int newLevel) {
- checkArgument(newLevel > this.level);
- return new DataFileMeta(
- fileName,
- fileSize,
- rowCount,
- minKey,
- maxKey,
- keyStats,
- valueStats,
- minSequenceNumber,
- maxSequenceNumber,
- schemaId,
- newLevel,
- extraFiles,
- creationTime,
- deleteRowCount,
- embeddedIndex,
- fileSource,
- valueStatsCols,
- externalPath,
- firstRowId,
- writeCols);
- }
+ DataFileMeta upgrade(int newLevel);
- public DataFileMeta rename(String newFileName) {
- String newExternalPath = externalPathDir().map(dir -> dir + "/" +
newFileName).orElse(null);
- return new DataFileMeta(
- newFileName,
- fileSize,
- rowCount,
- minKey,
- maxKey,
- keyStats,
- valueStats,
- minSequenceNumber,
- maxSequenceNumber,
- schemaId,
- level,
- extraFiles,
- creationTime,
- deleteRowCount,
- embeddedIndex,
- fileSource,
- valueStatsCols,
- newExternalPath,
- firstRowId,
- writeCols);
- }
+ DataFileMeta rename(String newFileName);
- public DataFileMeta copyWithoutStats() {
- return new DataFileMeta(
- fileName,
- fileSize,
- rowCount,
- minKey,
- maxKey,
- keyStats,
- EMPTY_STATS,
- minSequenceNumber,
- maxSequenceNumber,
- schemaId,
- level,
- extraFiles,
- creationTime,
- deleteRowCount,
- embeddedIndex,
- fileSource,
- Collections.emptyList(),
- externalPath,
- firstRowId,
- writeCols);
- }
+ DataFileMeta copyWithoutStats();
- public DataFileMeta assignSequenceNumber(long minSequenceNumber, long
maxSequenceNumber) {
- return new DataFileMeta(
- fileName,
- fileSize,
- rowCount,
- minKey,
- maxKey,
- keyStats,
- valueStats,
- minSequenceNumber,
- maxSequenceNumber,
- schemaId,
- level,
- extraFiles,
- creationTime,
- deleteRowCount,
- embeddedIndex,
- fileSource,
- valueStatsCols,
- externalPath,
- firstRowId,
- writeCols);
- }
+ DataFileMeta assignSequenceNumber(long minSequenceNumber, long
maxSequenceNumber);
- public DataFileMeta assignFirstRowId(long firstRowId) {
- return new DataFileMeta(
- fileName,
- fileSize,
- rowCount,
- minKey,
- maxKey,
- keyStats,
- valueStats,
- minSequenceNumber,
- maxSequenceNumber,
- schemaId,
- level,
- extraFiles,
- creationTime,
- deleteRowCount,
- embeddedIndex,
- fileSource,
- valueStatsCols,
- externalPath,
- firstRowId,
- writeCols);
- }
+ DataFileMeta assignFirstRowId(long firstRowId);
- public List<Path> collectFiles(DataFilePathFactory pathFactory) {
+ default List<Path> collectFiles(DataFilePathFactory pathFactory) {
List<Path> paths = new ArrayList<>();
paths.add(pathFactory.toPath(this));
- extraFiles.forEach(f -> paths.add(pathFactory.toAlignedPath(f, this)));
+ extraFiles().forEach(f -> paths.add(pathFactory.toAlignedPath(f,
this)));
return paths;
}
- public DataFileMeta copy(List<String> newExtraFiles) {
- return new DataFileMeta(
- fileName,
- fileSize,
- rowCount,
- minKey,
- maxKey,
- keyStats,
- valueStats,
- minSequenceNumber,
- maxSequenceNumber,
- schemaId,
- level,
- newExtraFiles,
- creationTime,
- deleteRowCount,
- embeddedIndex,
- fileSource,
- valueStatsCols,
- externalPath,
- firstRowId,
- writeCols);
- }
+ DataFileMeta copy(List<String> newExtraFiles);
- public DataFileMeta newExternalPath(String newExternalPath) {
- return new DataFileMeta(
- fileName,
- fileSize,
- rowCount,
- minKey,
- maxKey,
- keyStats,
- valueStats,
- minSequenceNumber,
- maxSequenceNumber,
- schemaId,
- level,
- extraFiles,
- creationTime,
- deleteRowCount,
- embeddedIndex,
- fileSource,
- valueStatsCols,
- newExternalPath,
- firstRowId,
- writeCols);
- }
+ DataFileMeta newExternalPath(String newExternalPath);
- public DataFileMeta copy(byte[] newEmbeddedIndex) {
- return new DataFileMeta(
- fileName,
- fileSize,
- rowCount,
- minKey,
- maxKey,
- keyStats,
- valueStats,
- minSequenceNumber,
- maxSequenceNumber,
- schemaId,
- level,
- extraFiles,
- creationTime,
- deleteRowCount,
- newEmbeddedIndex,
- fileSource,
- valueStatsCols,
- externalPath,
- firstRowId,
- writeCols);
- }
-
- @Override
- public boolean equals(Object o) {
- if (o == this) {
- return true;
- }
- if (!(o instanceof DataFileMeta)) {
- return false;
- }
- DataFileMeta that = (DataFileMeta) o;
- return Objects.equals(fileName, that.fileName)
- && fileSize == that.fileSize
- && rowCount == that.rowCount
- && Arrays.equals(embeddedIndex, that.embeddedIndex)
- && Objects.equals(minKey, that.minKey)
- && Objects.equals(maxKey, that.maxKey)
- && Objects.equals(keyStats, that.keyStats)
- && Objects.equals(valueStats, that.valueStats)
- && minSequenceNumber == that.minSequenceNumber
- && maxSequenceNumber == that.maxSequenceNumber
- && schemaId == that.schemaId
- && level == that.level
- && Objects.equals(extraFiles, that.extraFiles)
- && Objects.equals(creationTime, that.creationTime)
- && Objects.equals(deleteRowCount, that.deleteRowCount)
- && Objects.equals(fileSource, that.fileSource)
- && Objects.equals(valueStatsCols, that.valueStatsCols)
- && Objects.equals(externalPath, that.externalPath)
- && Objects.equals(firstRowId, that.firstRowId)
- && Objects.equals(writeCols, that.writeCols);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(
- fileName,
- fileSize,
- rowCount,
- Arrays.hashCode(embeddedIndex),
- minKey,
- maxKey,
- keyStats,
- valueStats,
- minSequenceNumber,
- maxSequenceNumber,
- schemaId,
- level,
- extraFiles,
- creationTime,
- deleteRowCount,
- fileSource,
- valueStatsCols,
- externalPath,
- firstRowId,
- writeCols);
- }
-
- @Override
- public String toString() {
- return String.format(
- "{fileName: %s, fileSize: %d, rowCount: %d, embeddedIndex: %s,
"
- + "minKey: %s, maxKey: %s, keyStats: %s, valueStats:
%s, "
- + "minSequenceNumber: %d, maxSequenceNumber: %d, "
- + "schemaId: %d, level: %d, extraFiles: %s,
creationTime: %s, "
- + "deleteRowCount: %d, fileSource: %s, valueStatsCols:
%s, externalPath: %s, firstRowId: %s, writeCols: %s}",
- fileName,
- fileSize,
- rowCount,
- Arrays.toString(embeddedIndex),
- minKey,
- maxKey,
- keyStats,
- valueStats,
- minSequenceNumber,
- maxSequenceNumber,
- schemaId,
- level,
- extraFiles,
- creationTime,
- deleteRowCount,
- fileSource,
- valueStatsCols,
- externalPath,
- firstRowId,
- writeCols);
- }
+ DataFileMeta copy(byte[] newEmbeddedIndex);
- public static long getMaxSequenceNumber(List<DataFileMeta> fileMetas) {
+ static long getMaxSequenceNumber(List<DataFileMeta> fileMetas) {
return fileMetas.stream()
.map(DataFileMeta::maxSequenceNumber)
.max(Long::compare)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
index 55105affb6..b646ef08ca 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java
@@ -116,7 +116,7 @@ public class DataFileMeta08Serializer implements
Serializable {
byte[] bytes = new byte[in.readInt()];
in.readFully(bytes);
SafeBinaryRow row = new SafeBinaryRow(rowSerializer.getArity(), bytes,
0);
- return new DataFileMeta(
+ return DataFileMeta.create(
row.getString(0).toString(),
row.getLong(1),
row.getLong(2),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java
index c9cced68ea..662f1276c8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java
@@ -122,7 +122,7 @@ public class DataFileMeta09Serializer implements
Serializable {
byte[] bytes = new byte[in.readInt()];
in.readFully(bytes);
SafeBinaryRow row = new SafeBinaryRow(rowSerializer.getArity(), bytes,
0);
- return new DataFileMeta(
+ return DataFileMeta.create(
row.getString(0).toString(),
row.getLong(1),
row.getLong(2),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
index 0fdaa9d409..dca1aa528f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java
@@ -127,7 +127,7 @@ public class DataFileMeta10LegacySerializer implements
Serializable {
byte[] bytes = new byte[in.readInt()];
in.readFully(bytes);
SafeBinaryRow row = new SafeBinaryRow(rowSerializer.getArity(), bytes,
0);
- return new DataFileMeta(
+ return DataFileMeta.create(
row.getString(0).toString(),
row.getLong(1),
row.getLong(2),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta12LegacySerializer.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta12LegacySerializer.java
index b6f9fe3aa3..e888c1ca74 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta12LegacySerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta12LegacySerializer.java
@@ -129,7 +129,7 @@ public class DataFileMeta12LegacySerializer implements
Serializable {
byte[] bytes = new byte[in.readInt()];
in.readFully(bytes);
SafeBinaryRow row = new SafeBinaryRow(rowSerializer.getArity(), bytes,
0);
- return new DataFileMeta(
+ return DataFileMeta.create(
row.getString(0).toString(),
row.getLong(1),
row.getLong(2),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
index ed232d4431..afed7265d4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java
@@ -66,7 +66,7 @@ public class DataFileMetaSerializer extends
ObjectSerializer<DataFileMeta> {
@Override
public DataFileMeta fromRow(InternalRow row) {
- return new DataFileMeta(
+ return DataFileMeta.create(
row.getString(0).toString(),
row.getLong(1),
row.getLong(2),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
index 7647668b0a..0710e28cea 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
@@ -159,7 +159,7 @@ public abstract class KeyValueDataFileWriter
: dataFileIndexWriter.result();
String externalPath = isExternalPath ? path.toString() : null;
- return new DataFileMeta(
+ return DataFileMeta.create(
path.getName(),
fileSize,
recordCount(),
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
similarity index 59%
copy from paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
copy to paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
index 48b56a5c65..6e858fbd22 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
@@ -18,78 +18,26 @@
package org.apache.paimon.io;
-import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.stats.SimpleStats;
-import org.apache.paimon.types.ArrayType;
-import org.apache.paimon.types.BigIntType;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.IntType;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.TinyIntType;
import javax.annotation.Nullable;
-import java.time.LocalDateTime;
import java.time.ZoneId;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
-import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
import static org.apache.paimon.stats.SimpleStats.EMPTY_STATS;
import static org.apache.paimon.utils.Preconditions.checkArgument;
-import static org.apache.paimon.utils.SerializationUtils.newBytesType;
-import static org.apache.paimon.utils.SerializationUtils.newStringType;
-/**
- * Metadata of a data file.
- *
- * @since 0.9.0
- */
-@Public
-public class DataFileMeta {
-
- public static final RowType SCHEMA =
- new RowType(
- false,
- Arrays.asList(
- new DataField(0, "_FILE_NAME",
newStringType(false)),
- new DataField(1, "_FILE_SIZE", new
BigIntType(false)),
- new DataField(2, "_ROW_COUNT", new
BigIntType(false)),
- new DataField(3, "_MIN_KEY", newBytesType(false)),
- new DataField(4, "_MAX_KEY", newBytesType(false)),
- new DataField(5, "_KEY_STATS", SimpleStats.SCHEMA),
- new DataField(6, "_VALUE_STATS",
SimpleStats.SCHEMA),
- new DataField(7, "_MIN_SEQUENCE_NUMBER", new
BigIntType(false)),
- new DataField(8, "_MAX_SEQUENCE_NUMBER", new
BigIntType(false)),
- new DataField(9, "_SCHEMA_ID", new
BigIntType(false)),
- new DataField(10, "_LEVEL", new IntType(false)),
- new DataField(
- 11, "_EXTRA_FILES", new ArrayType(false,
newStringType(false))),
- new DataField(12, "_CREATION_TIME",
DataTypes.TIMESTAMP_MILLIS()),
- new DataField(13, "_DELETE_ROW_COUNT", new
BigIntType(true)),
- new DataField(14, "_EMBEDDED_FILE_INDEX",
newBytesType(true)),
- new DataField(15, "_FILE_SOURCE", new
TinyIntType(true)),
- new DataField(
- 16,
- "_VALUE_STATS_COLS",
-
DataTypes.ARRAY(DataTypes.STRING().notNull())),
- new DataField(17, "_EXTERNAL_PATH",
newStringType(true)),
- new DataField(18, "_FIRST_ROW_ID", new
BigIntType(true)),
- new DataField(
- 19, "_WRITE_COLS", new ArrayType(true,
newStringType(false)))));
-
- public static final BinaryRow EMPTY_MIN_KEY = EMPTY_ROW;
- public static final BinaryRow EMPTY_MAX_KEY = EMPTY_ROW;
- public static final int DUMMY_LEVEL = 0;
+/** A {@link DataFileMeta} using pojo objects. */
+public class PojoDataFileMeta implements DataFileMeta {
private final String fileName;
private final long fileSize;
@@ -131,129 +79,7 @@ public class DataFileMeta {
private final @Nullable List<String> writeCols;
- public static DataFileMeta forAppend(
- String fileName,
- long fileSize,
- long rowCount,
- SimpleStats rowStats,
- long minSequenceNumber,
- long maxSequenceNumber,
- long schemaId,
- List<String> extraFiles,
- @Nullable byte[] embeddedIndex,
- @Nullable FileSource fileSource,
- @Nullable List<String> valueStatsCols,
- @Nullable String externalPath,
- @Nullable Long firstRowId,
- @Nullable List<String> writeCols) {
- return new DataFileMeta(
- fileName,
- fileSize,
- rowCount,
- EMPTY_MIN_KEY,
- EMPTY_MAX_KEY,
- EMPTY_STATS,
- rowStats,
- minSequenceNumber,
- maxSequenceNumber,
- schemaId,
- DUMMY_LEVEL,
- extraFiles,
-
Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(),
- 0L,
- embeddedIndex,
- fileSource,
- valueStatsCols,
- externalPath,
- firstRowId,
- writeCols);
- }
-
- public DataFileMeta(
- String fileName,
- long fileSize,
- long rowCount,
- BinaryRow minKey,
- BinaryRow maxKey,
- SimpleStats keyStats,
- SimpleStats valueStats,
- long minSequenceNumber,
- long maxSequenceNumber,
- long schemaId,
- int level,
- List<String> extraFiles,
- @Nullable Long deleteRowCount,
- @Nullable byte[] embeddedIndex,
- @Nullable FileSource fileSource,
- @Nullable List<String> valueStatsCols,
- @Nullable String externalPath,
- @Nullable Long firstRowId,
- @Nullable List<String> writeCols) {
- this(
- fileName,
- fileSize,
- rowCount,
- minKey,
- maxKey,
- keyStats,
- valueStats,
- minSequenceNumber,
- maxSequenceNumber,
- schemaId,
- level,
- extraFiles,
-
Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(),
- deleteRowCount,
- embeddedIndex,
- fileSource,
- valueStatsCols,
- externalPath,
- firstRowId,
- writeCols);
- }
-
- public DataFileMeta(
- String fileName,
- long fileSize,
- long rowCount,
- BinaryRow minKey,
- BinaryRow maxKey,
- SimpleStats keyStats,
- SimpleStats valueStats,
- long minSequenceNumber,
- long maxSequenceNumber,
- long schemaId,
- int level,
- @Nullable Long deleteRowCount,
- @Nullable byte[] embeddedIndex,
- @Nullable FileSource fileSource,
- @Nullable List<String> valueStatsCols,
- @Nullable Long firstRowId,
- @Nullable List<String> writeCols) {
- this(
- fileName,
- fileSize,
- rowCount,
- minKey,
- maxKey,
- keyStats,
- valueStats,
- minSequenceNumber,
- maxSequenceNumber,
- schemaId,
- level,
- Collections.emptyList(),
-
Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(),
- deleteRowCount,
- embeddedIndex,
- fileSource,
- valueStatsCols,
- null,
- firstRowId,
- writeCols);
- }
-
- public DataFileMeta(
+ public PojoDataFileMeta(
String fileName,
long fileSize,
long rowCount,
@@ -300,70 +126,82 @@ public class DataFileMeta {
this.writeCols = writeCols;
}
+ @Override
public String fileName() {
return fileName;
}
+ @Override
public long fileSize() {
return fileSize;
}
+ @Override
public long rowCount() {
return rowCount;
}
- public Optional<Long> addRowCount() {
- return Optional.ofNullable(deleteRowCount).map(c -> rowCount - c);
- }
-
+ @Override
public Optional<Long> deleteRowCount() {
return Optional.ofNullable(deleteRowCount);
}
+ @Override
public byte[] embeddedIndex() {
return embeddedIndex;
}
+ @Override
public BinaryRow minKey() {
return minKey;
}
+ @Override
public BinaryRow maxKey() {
return maxKey;
}
+ @Override
public SimpleStats keyStats() {
return keyStats;
}
+ @Override
public SimpleStats valueStats() {
return valueStats;
}
+ @Override
public long minSequenceNumber() {
return minSequenceNumber;
}
+ @Override
public long maxSequenceNumber() {
return maxSequenceNumber;
}
+ @Override
public long schemaId() {
return schemaId;
}
+ @Override
public int level() {
return level;
}
+ @Override
public List<String> extraFiles() {
return extraFiles;
}
+ @Override
public Timestamp creationTime() {
return creationTime;
}
+ @Override
public long creationTimeEpochMillis() {
return creationTime
.toLocalDateTime()
@@ -372,6 +210,7 @@ public class DataFileMeta {
.toEpochMilli();
}
+ @Override
public String fileFormat() {
String[] split = fileName.split("\\.");
if (split.length == 1) {
@@ -380,16 +219,19 @@ public class DataFileMeta {
return split[split.length - 1];
}
+ @Override
public Optional<String> externalPath() {
return Optional.ofNullable(externalPath);
}
+ @Override
public Optional<String> externalPathDir() {
return Optional.ofNullable(externalPath)
.map(Path::new)
.map(p -> p.getParent().toUri().toString());
}
+ @Override
public Optional<FileSource> fileSource() {
return Optional.ofNullable(fileSource);
}
@@ -409,9 +251,10 @@ public class DataFileMeta {
return writeCols;
}
- public DataFileMeta upgrade(int newLevel) {
+ @Override
+ public PojoDataFileMeta upgrade(int newLevel) {
checkArgument(newLevel > this.level);
- return new DataFileMeta(
+ return new PojoDataFileMeta(
fileName,
fileSize,
rowCount,
@@ -434,9 +277,10 @@ public class DataFileMeta {
writeCols);
}
- public DataFileMeta rename(String newFileName) {
+ @Override
+ public PojoDataFileMeta rename(String newFileName) {
String newExternalPath = externalPathDir().map(dir -> dir + "/" +
newFileName).orElse(null);
- return new DataFileMeta(
+ return new PojoDataFileMeta(
newFileName,
fileSize,
rowCount,
@@ -459,8 +303,9 @@ public class DataFileMeta {
writeCols);
}
- public DataFileMeta copyWithoutStats() {
- return new DataFileMeta(
+ @Override
+ public PojoDataFileMeta copyWithoutStats() {
+ return new PojoDataFileMeta(
fileName,
fileSize,
rowCount,
@@ -483,8 +328,9 @@ public class DataFileMeta {
writeCols);
}
- public DataFileMeta assignSequenceNumber(long minSequenceNumber, long
maxSequenceNumber) {
- return new DataFileMeta(
+ @Override
+ public PojoDataFileMeta assignSequenceNumber(long minSequenceNumber, long
maxSequenceNumber) {
+ return new PojoDataFileMeta(
fileName,
fileSize,
rowCount,
@@ -507,8 +353,9 @@ public class DataFileMeta {
writeCols);
}
- public DataFileMeta assignFirstRowId(long firstRowId) {
- return new DataFileMeta(
+ @Override
+ public PojoDataFileMeta assignFirstRowId(long firstRowId) {
+ return new PojoDataFileMeta(
fileName,
fileSize,
rowCount,
@@ -531,15 +378,9 @@ public class DataFileMeta {
writeCols);
}
- public List<Path> collectFiles(DataFilePathFactory pathFactory) {
- List<Path> paths = new ArrayList<>();
- paths.add(pathFactory.toPath(this));
- extraFiles.forEach(f -> paths.add(pathFactory.toAlignedPath(f, this)));
- return paths;
- }
-
- public DataFileMeta copy(List<String> newExtraFiles) {
- return new DataFileMeta(
+ @Override
+ public PojoDataFileMeta copy(List<String> newExtraFiles) {
+ return new PojoDataFileMeta(
fileName,
fileSize,
rowCount,
@@ -562,8 +403,9 @@ public class DataFileMeta {
writeCols);
}
- public DataFileMeta newExternalPath(String newExternalPath) {
- return new DataFileMeta(
+ @Override
+ public PojoDataFileMeta newExternalPath(String newExternalPath) {
+ return new PojoDataFileMeta(
fileName,
fileSize,
rowCount,
@@ -586,8 +428,9 @@ public class DataFileMeta {
writeCols);
}
- public DataFileMeta copy(byte[] newEmbeddedIndex) {
- return new DataFileMeta(
+ @Override
+ public PojoDataFileMeta copy(byte[] newEmbeddedIndex) {
+ return new PojoDataFileMeta(
fileName,
fileSize,
rowCount,
@@ -619,26 +462,26 @@ public class DataFileMeta {
return false;
}
DataFileMeta that = (DataFileMeta) o;
- return Objects.equals(fileName, that.fileName)
- && fileSize == that.fileSize
- && rowCount == that.rowCount
- && Arrays.equals(embeddedIndex, that.embeddedIndex)
- && Objects.equals(minKey, that.minKey)
- && Objects.equals(maxKey, that.maxKey)
- && Objects.equals(keyStats, that.keyStats)
- && Objects.equals(valueStats, that.valueStats)
- && minSequenceNumber == that.minSequenceNumber
- && maxSequenceNumber == that.maxSequenceNumber
- && schemaId == that.schemaId
- && level == that.level
- && Objects.equals(extraFiles, that.extraFiles)
- && Objects.equals(creationTime, that.creationTime)
- && Objects.equals(deleteRowCount, that.deleteRowCount)
- && Objects.equals(fileSource, that.fileSource)
- && Objects.equals(valueStatsCols, that.valueStatsCols)
- && Objects.equals(externalPath, that.externalPath)
- && Objects.equals(firstRowId, that.firstRowId)
- && Objects.equals(writeCols, that.writeCols);
+ return Objects.equals(fileName, that.fileName())
+ && fileSize == that.fileSize()
+ && rowCount == that.rowCount()
+ && Arrays.equals(embeddedIndex, that.embeddedIndex())
+ && Objects.equals(minKey, that.minKey())
+ && Objects.equals(maxKey, that.maxKey())
+ && Objects.equals(keyStats, that.keyStats())
+ && Objects.equals(valueStats, that.valueStats())
+ && minSequenceNumber == that.minSequenceNumber()
+ && maxSequenceNumber == that.maxSequenceNumber()
+ && schemaId == that.schemaId()
+ && level == that.level()
+ && Objects.equals(extraFiles, that.extraFiles())
+ && Objects.equals(creationTime, that.creationTime())
+ && Objects.equals(deleteRowCount,
that.deleteRowCount().orElse(null))
+ && Objects.equals(fileSource, that.fileSource().orElse(null))
+ && Objects.equals(valueStatsCols, that.valueStatsCols())
+ && Objects.equals(externalPath,
that.externalPath().orElse(null))
+ && Objects.equals(firstRowId, that.firstRowId())
+ && Objects.equals(writeCols, that.writeCols());
}
@Override
@@ -695,11 +538,4 @@ public class DataFileMeta {
firstRowId,
writeCols);
}
-
- public static long getMaxSequenceNumber(List<DataFileMeta> fileMetas) {
- return fileMetas.stream()
- .map(DataFileMeta::maxSequenceNumber)
- .max(Long::compare)
- .orElse(-1L);
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/FilteredManifestEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/FilteredManifestEntry.java
index 29ae6f6389..1255313b2b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/FilteredManifestEntry.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/FilteredManifestEntry.java
@@ -19,7 +19,7 @@
package org.apache.paimon.manifest;
/** Wrap a {@link ManifestEntry} to contain {@link #selected}. */
-public class FilteredManifestEntry extends ManifestEntry {
+public class FilteredManifestEntry extends PojoManifestEntry {
private final boolean selected;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
index 0de753f166..5a99a7f270 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
@@ -26,12 +26,9 @@ import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TinyIntType;
-import javax.annotation.Nullable;
-
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
-import java.util.Objects;
import static org.apache.paimon.utils.SerializationUtils.newBytesType;
@@ -41,9 +38,9 @@ import static
org.apache.paimon.utils.SerializationUtils.newBytesType;
* @since 0.9.0
*/
@Public
-public class ManifestEntry implements FileEntry {
+public interface ManifestEntry extends FileEntry {
- public static final RowType SCHEMA =
+ RowType SCHEMA =
new RowType(
false,
Arrays.asList(
@@ -53,158 +50,36 @@ public class ManifestEntry implements FileEntry {
new DataField(3, "_TOTAL_BUCKETS", new
IntType(false)),
new DataField(4, "_FILE", DataFileMeta.SCHEMA)));
- private final FileKind kind;
- // for tables without partition this field should be a row with 0 columns
(not null)
- private final BinaryRow partition;
- private final int bucket;
- private final int totalBuckets;
- private final DataFileMeta file;
-
- public ManifestEntry(
+ static ManifestEntry create(
FileKind kind, BinaryRow partition, int bucket, int totalBuckets,
DataFileMeta file) {
- this.kind = kind;
- this.partition = partition;
- this.bucket = bucket;
- this.totalBuckets = totalBuckets;
- this.file = file;
- }
-
- @Override
- public FileKind kind() {
- return kind;
- }
-
- @Override
- public BinaryRow partition() {
- return partition;
- }
-
- @Override
- public int bucket() {
- return bucket;
- }
-
- @Override
- public int level() {
- return file.level();
+ return new PojoManifestEntry(kind, partition, bucket, totalBuckets,
file);
}
- @Override
- public String fileName() {
- return file.fileName();
- }
-
- @Nullable
- @Override
- public String externalPath() {
- return file.externalPath().orElse(null);
- }
-
- @Override
- public BinaryRow minKey() {
- return file.minKey();
- }
+ DataFileMeta file();
- @Override
- public BinaryRow maxKey() {
- return file.maxKey();
- }
+ ManifestEntry copyWithoutStats();
- @Override
- public List<String> extraFiles() {
- return file.extraFiles();
- }
+ ManifestEntry assignSequenceNumber(long minSequenceNumber, long
maxSequenceNumber);
- @Override
- public int totalBuckets() {
- return totalBuckets;
- }
+ ManifestEntry assignFirstRowId(long firstRowId);
- public DataFileMeta file() {
- return file;
- }
+ byte[] toBytes() throws IOException;
- @Override
- public Identifier identifier() {
- return new Identifier(
- partition,
- bucket,
- file.level(),
- file.fileName(),
- file.extraFiles(),
- file.embeddedIndex(),
- externalPath());
- }
-
- public ManifestEntry copyWithoutStats() {
- return new ManifestEntry(kind, partition, bucket, totalBuckets,
file.copyWithoutStats());
- }
-
- public ManifestEntry assignSequenceNumber(long minSequenceNumber, long
maxSequenceNumber) {
- return new ManifestEntry(
- kind,
- partition,
- bucket,
- totalBuckets,
- file.assignSequenceNumber(minSequenceNumber,
maxSequenceNumber));
- }
-
- public ManifestEntry assignFirstRowId(long firstRowId) {
- return new ManifestEntry(
- kind, partition, bucket, totalBuckets,
file.assignFirstRowId(firstRowId));
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof ManifestEntry)) {
- return false;
- }
- ManifestEntry that = (ManifestEntry) o;
- return Objects.equals(kind, that.kind)
- && Objects.equals(partition, that.partition)
- && bucket == that.bucket
- && totalBuckets == that.totalBuckets
- && Objects.equals(file, that.file);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(kind, partition, bucket, totalBuckets, file);
- }
-
- @Override
- public String toString() {
- return String.format("{%s, %s, %d, %d, %s}", kind, partition, bucket,
totalBuckets, file);
- }
-
- public static long recordCount(List<ManifestEntry> manifestEntries) {
+ static long recordCount(List<ManifestEntry> manifestEntries) {
return manifestEntries.stream().mapToLong(manifest ->
manifest.file().rowCount()).sum();
}
- public static long recordCountAdd(List<ManifestEntry> manifestEntries) {
+ static long recordCountAdd(List<ManifestEntry> manifestEntries) {
return manifestEntries.stream()
.filter(manifestEntry ->
FileKind.ADD.equals(manifestEntry.kind()))
.mapToLong(manifest -> manifest.file().rowCount())
.sum();
}
- public static long recordCountDelete(List<ManifestEntry> manifestEntries) {
+ static long recordCountDelete(List<ManifestEntry> manifestEntries) {
return manifestEntries.stream()
.filter(manifestEntry ->
FileKind.DELETE.equals(manifestEntry.kind()))
.mapToLong(manifest -> manifest.file().rowCount())
.sum();
}
-
- // ----------------------- Serialization -----------------------------
-
- private static final ThreadLocal<ManifestEntrySerializer>
SERIALIZER_THREAD_LOCAL =
- ThreadLocal.withInitial(ManifestEntrySerializer::new);
-
- public byte[] toBytes() throws IOException {
- return SERIALIZER_THREAD_LOCAL.get().serializeToBytes(this);
- }
-
- public ManifestEntry fromBytes(byte[] bytes) throws IOException {
- return SERIALIZER_THREAD_LOCAL.get().deserializeFromBytes(bytes);
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
index b1030448a7..c31d79713e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
@@ -69,7 +69,7 @@ public class ManifestEntrySerializer extends
VersionedObjectSerializer<ManifestE
}
throw new IllegalArgumentException("Unsupported version: " +
version);
}
- return new ManifestEntry(
+ return ManifestEntry.create(
FileKind.fromByteValue(row.getByte(0)),
deserializeBinaryRow(row.getBinary(1)),
row.getInt(2),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/PojoManifestEntry.java
similarity index 59%
copy from
paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
copy to
paimon-core/src/main/java/org/apache/paimon/manifest/PojoManifestEntry.java
index 0de753f166..b3741a3c55 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/PojoManifestEntry.java
@@ -18,40 +18,20 @@
package org.apache.paimon.manifest;
-import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.IntType;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.TinyIntType;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.util.Arrays;
import java.util.List;
import java.util.Objects;
-import static org.apache.paimon.utils.SerializationUtils.newBytesType;
+/** A {@link ManifestEntry} using pojo objects. */
+public class PojoManifestEntry implements ManifestEntry {
-/**
- * Entry of a manifest file, representing an addition / deletion of a data
file.
- *
- * @since 0.9.0
- */
-@Public
-public class ManifestEntry implements FileEntry {
-
- public static final RowType SCHEMA =
- new RowType(
- false,
- Arrays.asList(
- new DataField(0, "_KIND", new TinyIntType(false)),
- new DataField(1, "_PARTITION",
newBytesType(false)),
- new DataField(2, "_BUCKET", new IntType(false)),
- new DataField(3, "_TOTAL_BUCKETS", new
IntType(false)),
- new DataField(4, "_FILE", DataFileMeta.SCHEMA)));
+ private static final ThreadLocal<ManifestEntrySerializer>
SERIALIZER_THREAD_LOCAL =
+ ThreadLocal.withInitial(ManifestEntrySerializer::new);
private final FileKind kind;
// for tables without partition this field should be a row with 0 columns
(not null)
@@ -60,7 +40,7 @@ public class ManifestEntry implements FileEntry {
private final int totalBuckets;
private final DataFileMeta file;
- public ManifestEntry(
+ public PojoManifestEntry(
FileKind kind, BinaryRow partition, int bucket, int totalBuckets,
DataFileMeta file) {
this.kind = kind;
this.partition = partition;
@@ -120,6 +100,7 @@ public class ManifestEntry implements FileEntry {
return totalBuckets;
}
+ @Override
public DataFileMeta file() {
return file;
}
@@ -136,12 +117,15 @@ public class ManifestEntry implements FileEntry {
externalPath());
}
- public ManifestEntry copyWithoutStats() {
- return new ManifestEntry(kind, partition, bucket, totalBuckets,
file.copyWithoutStats());
+ @Override
+ public PojoManifestEntry copyWithoutStats() {
+ return new PojoManifestEntry(
+ kind, partition, bucket, totalBuckets,
file.copyWithoutStats());
}
- public ManifestEntry assignSequenceNumber(long minSequenceNumber, long
maxSequenceNumber) {
- return new ManifestEntry(
+ @Override
+ public PojoManifestEntry assignSequenceNumber(long minSequenceNumber, long
maxSequenceNumber) {
+ return new PojoManifestEntry(
kind,
partition,
bucket,
@@ -149,22 +133,28 @@ public class ManifestEntry implements FileEntry {
file.assignSequenceNumber(minSequenceNumber,
maxSequenceNumber));
}
- public ManifestEntry assignFirstRowId(long firstRowId) {
- return new ManifestEntry(
+ @Override
+ public PojoManifestEntry assignFirstRowId(long firstRowId) {
+ return new PojoManifestEntry(
kind, partition, bucket, totalBuckets,
file.assignFirstRowId(firstRowId));
}
+ @Override
+ public byte[] toBytes() throws IOException {
+ return SERIALIZER_THREAD_LOCAL.get().serializeToBytes(this);
+ }
+
@Override
public boolean equals(Object o) {
if (!(o instanceof ManifestEntry)) {
return false;
}
ManifestEntry that = (ManifestEntry) o;
- return Objects.equals(kind, that.kind)
- && Objects.equals(partition, that.partition)
- && bucket == that.bucket
- && totalBuckets == that.totalBuckets
- && Objects.equals(file, that.file);
+ return Objects.equals(kind, that.kind())
+ && Objects.equals(partition, that.partition())
+ && bucket == that.bucket()
+ && totalBuckets == that.totalBuckets()
+ && Objects.equals(file, that.file());
}
@Override
@@ -176,35 +166,4 @@ public class ManifestEntry implements FileEntry {
public String toString() {
return String.format("{%s, %s, %d, %d, %s}", kind, partition, bucket,
totalBuckets, file);
}
-
- public static long recordCount(List<ManifestEntry> manifestEntries) {
- return manifestEntries.stream().mapToLong(manifest ->
manifest.file().rowCount()).sum();
- }
-
- public static long recordCountAdd(List<ManifestEntry> manifestEntries) {
- return manifestEntries.stream()
- .filter(manifestEntry ->
FileKind.ADD.equals(manifestEntry.kind()))
- .mapToLong(manifest -> manifest.file().rowCount())
- .sum();
- }
-
- public static long recordCountDelete(List<ManifestEntry> manifestEntries) {
- return manifestEntries.stream()
- .filter(manifestEntry ->
FileKind.DELETE.equals(manifestEntry.kind()))
- .mapToLong(manifest -> manifest.file().rowCount())
- .sum();
- }
-
- // ----------------------- Serialization -----------------------------
-
- private static final ThreadLocal<ManifestEntrySerializer>
SERIALIZER_THREAD_LOCAL =
- ThreadLocal.withInitial(ManifestEntrySerializer::new);
-
- public byte[] toBytes() throws IOException {
- return SERIALIZER_THREAD_LOCAL.get().serializeToBytes(this);
- }
-
- public ManifestEntry fromBytes(byte[] bytes) throws IOException {
- return SERIALIZER_THREAD_LOCAL.get().deserializeFromBytes(bytes);
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 34d2f7cac5..1538bfde50 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -774,7 +774,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
totalBuckets = numBucket;
}
- return new ManifestEntry(
+ return ManifestEntry.create(
kind, commitMessage.partition(), commitMessage.bucket(),
totalBuckets, file);
}
@@ -853,7 +853,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
List<ManifestEntry> currentEntries = scan.plan().files();
for (ManifestEntry entry : currentEntries) {
changesWithOverwrite.add(
- new ManifestEntry(
+ ManifestEntry.create(
FileKind.DELETE,
entry.partition(),
entry.bucket(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
index 75b969b865..e3ebbf1ad1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
@@ -139,7 +139,7 @@ public class CommitMessageLegacyV2Serializer {
@Override
public DataFileMeta fromRow(InternalRow row) {
- return new DataFileMeta(
+ return DataFileMeta.create(
row.getString(0).toString(),
row.getLong(1),
row.getLong(2),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactCoordinatorTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactCoordinatorTest.java
index eb9468abbe..cd1ab9094d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactCoordinatorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactCoordinatorTest.java
@@ -245,7 +245,7 @@ public class AppendCompactCoordinatorTest {
}
private DataFileMeta newFile(long fileSize) {
- return new DataFileMeta(
+ return DataFileMeta.create(
UUID.randomUUID().toString(),
fileSize,
100,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
index deb21eade4..526b6bd29f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
@@ -140,7 +140,7 @@ public class IndexBootstrapTest extends TableTestBase {
}
private static DataFileMeta newFile(long timeMillis) {
- return new DataFileMeta(
+ return DataFileMeta.create(
"",
1,
1,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
index 3c4c74301e..3dbe87b9ff 100644
---
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
+++
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
@@ -150,7 +150,7 @@ public class DataFileTestDataGenerator {
return new Data(
partition,
bucket,
- new DataFileMeta(
+ DataFileMeta.create(
"data-" + UUID.randomUUID(),
totalSize,
kvs.size(),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
index b19b535ea6..bdf0c7c78e 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
@@ -40,7 +40,7 @@ public class DataFileTestUtils {
}
public static DataFileMeta newFile(long minSeq, long maxSeq) {
- return new DataFileMeta(
+ return DataFileMeta.create(
"",
maxSeq - minSeq + 1,
0L,
@@ -64,7 +64,7 @@ public class DataFileTestUtils {
}
public static DataFileMeta newFile() {
- return new DataFileMeta(
+ return DataFileMeta.create(
"",
0,
0,
@@ -91,7 +91,7 @@ public class DataFileTestUtils {
public static DataFileMeta newFile(
String name, int level, int minKey, int maxKey, long maxSequence,
Long deleteRowCount) {
- return new DataFileMeta(
+ return DataFileMeta.create(
name,
maxKey - minKey + 1,
maxKey - minKey + 1,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
index 0cf635ead6..82ff4d1be1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
@@ -58,7 +58,7 @@ public class ManifestCommittableSerializerCompatibilityTest {
singleColumn("max_value"),
fromLongArray(new Long[] {0L}));
DataFileMeta dataFile =
- new DataFileMeta(
+ DataFileMeta.create(
"my_file",
1024 * 1024,
1024,
@@ -135,7 +135,7 @@ public class ManifestCommittableSerializerCompatibilityTest
{
singleColumn("max_value"),
fromLongArray(new Long[] {0L}));
DataFileMeta dataFile =
- new DataFileMeta(
+ DataFileMeta.create(
"my_file",
1024 * 1024,
1024,
@@ -211,7 +211,7 @@ public class ManifestCommittableSerializerCompatibilityTest
{
singleColumn("max_value"),
fromLongArray(new Long[] {0L}));
DataFileMeta dataFile =
- new DataFileMeta(
+ DataFileMeta.create(
"my_file",
1024 * 1024,
1024,
@@ -285,7 +285,7 @@ public class ManifestCommittableSerializerCompatibilityTest
{
singleColumn("max_value"),
fromLongArray(new Long[] {0L}));
DataFileMeta dataFile =
- new DataFileMeta(
+ DataFileMeta.create(
"my_file",
1024 * 1024,
1024,
@@ -359,7 +359,7 @@ public class ManifestCommittableSerializerCompatibilityTest
{
singleColumn("max_value"),
fromLongArray(new Long[] {0L}));
DataFileMeta dataFile =
- new DataFileMeta(
+ DataFileMeta.create(
"my_file",
1024 * 1024,
1024,
@@ -432,7 +432,7 @@ public class ManifestCommittableSerializerCompatibilityTest
{
singleColumn("max_value"),
fromLongArray(new Long[] {0L}));
DataFileMeta dataFile =
- new DataFileMeta(
+ DataFileMeta.create(
"my_file",
1024 * 1024,
1024,
@@ -506,7 +506,7 @@ public class ManifestCommittableSerializerCompatibilityTest
{
singleColumn("max_value"),
fromLongArray(new Long[] {0L}));
DataFileMeta dataFile =
- new DataFileMeta(
+ DataFileMeta.create(
"my_file",
1024 * 1024,
1024,
@@ -580,7 +580,7 @@ public class ManifestCommittableSerializerCompatibilityTest
{
singleColumn("max_value"),
fromLongArray(new Long[] {0L}));
DataFileMeta dataFile =
- new DataFileMeta(
+ DataFileMeta.create(
"my_file",
1024 * 1024,
1024,
@@ -654,7 +654,7 @@ public class ManifestCommittableSerializerCompatibilityTest
{
singleColumn("max_value"),
fromLongArray(new Long[] {0L}));
DataFileMeta dataFile =
- new DataFileMeta(
+ DataFileMeta.create(
"my_file",
1024 * 1024,
1024,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
index c7cfe4f9c8..4ff378a5ac 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
@@ -101,7 +101,7 @@ public class ManifestCommittableSerializerTest {
}
public static DataFileMeta newFile(int name, int level) {
- return new DataFileMeta(
+ return DataFileMeta.create(
String.valueOf(name),
0,
1,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index 50258bc8cc..5cac5e65e2 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -73,12 +73,12 @@ public abstract class ManifestFileMetaTestBase {
binaryRow = BinaryRow.EMPTY_ROW;
}
- return new ManifestEntry(
+ return ManifestEntry.create(
isAdd ? FileKind.ADD : FileKind.DELETE,
binaryRow,
0, // not used
0, // not used
- new DataFileMeta(
+ DataFileMeta.create(
fileName,
0, // not used
0, // not used
@@ -262,12 +262,12 @@ public abstract class ManifestFileMetaTestBase {
public static ManifestEntry makeEntry(
FileKind fileKind, int partition, int bucket, long rowCount) {
- return new ManifestEntry(
+ return ManifestEntry.create(
fileKind,
row(partition),
bucket,
0, // not used
- new DataFileMeta(
+ DataFileMeta.create(
"", // not used
0, // not used
rowCount,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
index 959d3f9e3e..8e09c1bb91 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
@@ -74,7 +74,7 @@ public class ManifestTestDataGenerator {
List<DataFileTestDataGenerator.Data> level =
bucketLevels.get(file.meta.level());
level.add(file);
bufferedResults.push(
- new ManifestEntry(
+ ManifestEntry.create(
FileKind.ADD, file.partition, file.bucket, numBuckets,
file.meta));
mergeLevelsIfNeeded(file.partition, file.bucket);
@@ -144,7 +144,7 @@ public class ManifestTestDataGenerator {
for (DataFileTestDataGenerator.Data file : currentLevel) {
bufferedResults.push(
- new ManifestEntry(
+ ManifestEntry.create(
FileKind.DELETE, partition, bucket,
numBuckets, file.meta));
kvs.addAll(file.content);
}
@@ -152,7 +152,7 @@ public class ManifestTestDataGenerator {
for (DataFileTestDataGenerator.Data file : nextLevel) {
bufferedResults.push(
- new ManifestEntry(
+ ManifestEntry.create(
FileKind.DELETE, partition, bucket,
numBuckets, file.meta));
kvs.addAll(file.content);
}
@@ -164,7 +164,8 @@ public class ManifestTestDataGenerator {
nextLevel.addAll(merged);
for (DataFileTestDataGenerator.Data file : nextLevel) {
bufferedResults.push(
- new ManifestEntry(FileKind.ADD, partition, bucket,
numBuckets, file.meta));
+ ManifestEntry.create(
+ FileKind.ADD, partition, bucket, numBuckets,
file.meta));
}
lastModifiedLevel += 1;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
index a76bfca959..b3c7f0275a 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java
@@ -69,7 +69,7 @@ public class LevelsTest {
}
public static DataFileMeta newFile(int level) {
- return new DataFileMeta(
+ return DataFileMeta.create(
UUID.randomUUID().toString(),
0,
1,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
index fbd046b7e2..464b26b944 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
@@ -167,7 +167,7 @@ public class IntervalPartitionTest {
maxWriter.writeInt(0, right);
maxWriter.complete();
- return new DataFileMeta(
+ return DataFileMeta.create(
"DUMMY",
100,
25,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
index 53bfbd0f18..37e057c704 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
@@ -442,7 +442,7 @@ public class UniversalCompactionTest {
}
static DataFileMeta file(long size) {
- return new DataFileMeta(
+ return DataFileMeta.create(
"",
size,
1,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
index 52afe00a31..d63af5fd2c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
@@ -266,7 +266,7 @@ public class ExpireSnapshotsTest {
// create DataFileMeta and ManifestEntry
List<String> extraFiles = Arrays.asList("extra1", "extra2");
DataFileMeta dataFile =
- new DataFileMeta(
+ DataFileMeta.create(
"myDataFile",
1,
1,
@@ -287,8 +287,8 @@ public class ExpireSnapshotsTest {
null,
null,
null);
- ManifestEntry add = new ManifestEntry(FileKind.ADD, partition, 0, 1,
dataFile);
- ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition,
0, 1, dataFile);
+ ManifestEntry add = ManifestEntry.create(FileKind.ADD, partition, 0,
1, dataFile);
+ ManifestEntry delete = ManifestEntry.create(FileKind.DELETE,
partition, 0, 1, dataFile);
// expire
expire.snapshotDeletion()
@@ -329,7 +329,7 @@ public class ExpireSnapshotsTest {
// create DataFileMeta and ManifestEntry
List<String> extraFiles = Arrays.asList("extra1", "extra2");
DataFileMeta dataFile =
- new DataFileMeta(
+ DataFileMeta.create(
fileName,
1,
1,
@@ -350,8 +350,8 @@ public class ExpireSnapshotsTest {
myDataFile.toString(),
null,
null);
- ManifestEntry add = new ManifestEntry(FileKind.ADD, partition, 0, 1,
dataFile);
- ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition,
0, 1, dataFile);
+ ManifestEntry add = ManifestEntry.create(FileKind.ADD, partition, 0,
1, dataFile);
+ ManifestEntry delete = ManifestEntry.create(FileKind.DELETE,
partition, 0, 1, dataFile);
// expire
expire.snapshotDeletion()
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index b369beff43..8e794fe749 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -912,7 +912,7 @@ public class FileDeletionTest {
bucketEntries.stream()
.map(
entry ->
- new ManifestEntry(
+ ManifestEntry.create(
FileKind.DELETE,
partition,
bucket,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/DataEvolutionSplitGeneratorTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/DataEvolutionSplitGeneratorTest.java
index 9d9f9354db..27e7841f9d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/DataEvolutionSplitGeneratorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/DataEvolutionSplitGeneratorTest.java
@@ -37,7 +37,7 @@ public class DataEvolutionSplitGeneratorTest {
private static DataFileMeta createFile(
String name, @Nullable Long firstRowId, long maxSequence) {
- return new DataFileMeta(
+ return DataFileMeta.create(
name,
10000L,
1,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
index f3a2b3b47d..379207acdc 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java
@@ -44,7 +44,7 @@ public class SplitGeneratorTest {
public static DataFileMeta newFileFromSequence(
String name, int fileSize, long minSequence, long maxSequence) {
- return new DataFileMeta(
+ return DataFileMeta.create(
name,
fileSize,
1,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
index ad20c4b699..6eb11583f4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
@@ -198,7 +198,7 @@ public class SplitTest {
fromLongArray(new Long[] {0L}));
DataFileMeta dataFile =
- new DataFileMeta(
+ DataFileMeta.create(
"my_file",
1024 * 1024,
1024,
@@ -263,7 +263,7 @@ public class SplitTest {
fromLongArray(new Long[] {0L}));
DataFileMeta dataFile =
- new DataFileMeta(
+ DataFileMeta.create(
"my_file",
1024 * 1024,
1024,
@@ -328,7 +328,7 @@ public class SplitTest {
fromLongArray(new Long[] {0L}));
DataFileMeta dataFile =
- new DataFileMeta(
+ DataFileMeta.create(
"my_file",
1024 * 1024,
1024,
@@ -397,7 +397,7 @@ public class SplitTest {
fromLongArray(new Long[] {0L}));
DataFileMeta dataFile =
- new DataFileMeta(
+ DataFileMeta.create(
"my_file",
1024 * 1024,
1024,
@@ -466,7 +466,7 @@ public class SplitTest {
fromLongArray(new Long[] {0L}));
DataFileMeta dataFile =
- new DataFileMeta(
+ DataFileMeta.create(
"my_file",
1024 * 1024,
1024,
@@ -535,7 +535,7 @@ public class SplitTest {
fromLongArray(new Long[] {0L}));
DataFileMeta dataFile =
- new DataFileMeta(
+ DataFileMeta.create(
"my_file",
1024 * 1024,
1024,
@@ -605,7 +605,7 @@ public class SplitTest {
fromLongArray(new Long[] {0L}));
DataFileMeta dataFile =
- new DataFileMeta(
+ DataFileMeta.create(
"my_file",
1024 * 1024,
1024,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyManifestFileOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyManifestFileOperator.java
index 40ba6fc400..1abcccd0bb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyManifestFileOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyManifestFileOperator.java
@@ -150,7 +150,7 @@ public class CopyManifestFileOperator extends
AbstractStreamOperator<CopyFileInf
// path is null
for (ManifestEntry manifestEntry : manifestEntries) {
ManifestEntry newManifestEntry =
- new ManifestEntry(
+ ManifestEntry.create(
manifestEntry.kind(),
manifestEntry.partition(),
manifestEntry.bucket(),
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperatorTest.java
index 40b122f9d2..baedafcf8a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperatorTest.java
@@ -180,7 +180,7 @@ public class ChangelogCompactSortOperatorTest {
}
private DataFileMeta createDataFileMeta(int mb, long creationMillis) {
- return new DataFileMeta(
+ return DataFileMeta.create(
UUID.randomUUID().toString(),
MemorySize.ofMebiBytes(mb).getBytes(),
0,
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
index 840fa21ed6..344200d043 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
@@ -86,7 +86,7 @@ public class ChangelogCompactTaskSerializerTest {
}
private DataFileMeta newFile() {
- return new DataFileMeta(
+ return DataFileMeta.create(
UUID.randomUUID().toString(),
0,
1,
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
index 84a6f9fdb0..7c22f125ff 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
@@ -64,7 +64,7 @@ public class CompactionTaskSimpleSerializerTest {
}
private DataFileMeta newFile() {
- return new DataFileMeta(
+ return DataFileMeta.create(
UUID.randomUUID().toString(),
0,
1,
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
index 9a4bf070e0..932b7ee951 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
@@ -101,7 +101,7 @@ public class FileStoreSourceSplitGeneratorTest {
List<DataFileMeta> metas = new ArrayList<>();
for (String fileName : fileNames) {
metas.add(
- new DataFileMeta(
+ DataFileMeta.create(
fileName,
0, // not used
0, // not used
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
index 4b721de319..5514fa4ce2 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
@@ -74,7 +74,7 @@ public class FileStoreSourceSplitSerializerTest {
// ------------------------------------------------------------------------
public static DataFileMeta newFile(int level) {
- return new DataFileMeta(
+ return DataFileMeta.create(
"",
0,
1,