This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6210708b09 [core] Add totalBuckets to CommitMessage and DataSplit
(#5343)
6210708b09 is described below
commit 6210708b096a98ad4af67724e75e008f8c31b843
Author: tsreaper <[email protected]>
AuthorDate: Thu Mar 27 15:55:54 2025 +0800
[core] Add totalBuckets to CommitMessage and DataSplit (#5343)
---
.../paimon/append/UnawareAppendCompactionTask.java | 6 +-
.../paimon/iceberg/migrate/IcebergMigrator.java | 3 +-
.../org/apache/paimon/migrate/FileMetaUtils.java | 4 +-
.../paimon/operation/AbstractFileStoreWrite.java | 51 +++++--
.../paimon/operation/FileStoreCommitImpl.java | 7 +-
.../org/apache/paimon/operation/FileStoreScan.java | 7 +-
.../apache/paimon/operation/FileStoreWrite.java | 16 ++-
.../apache/paimon/table/sink/CommitMessage.java | 6 +
.../paimon/table/sink/CommitMessageImpl.java | 20 ++-
.../sink/CommitMessageLegacyV2Serializer.java | 1 +
.../paimon/table/sink/CommitMessageSerializer.java | 13 +-
.../apache/paimon/table/sink/TableCommitImpl.java | 2 +-
.../org/apache/paimon/table/source/DataSplit.java | 23 +++-
.../snapshot/IncrementalDeltaStartingScanner.java | 24 ++--
.../table/source/snapshot/SnapshotReaderImpl.java | 66 ++++++---
.../org/apache/paimon/TestAppendFileStore.java | 2 +
.../test/java/org/apache/paimon/TestFileStore.java | 1 +
.../DeletionVectorsMaintainerTest.java | 2 +
.../paimon/index/HashBucketAssignerTest.java | 21 +--
...festCommittableSerializerCompatibilityTest.java | 78 +++++++++++
.../ManifestCommittableSerializerTest.java | 17 +--
.../paimon/operation/FileStoreTestUtils.java | 1 +
.../paimon/operation/PartitionExpireTest.java | 1 +
.../apache/paimon/operation/TestCommitThread.java | 22 ++-
.../table/sink/CommitMessageSerializerTest.java | 10 +-
.../org/apache/paimon/table/source/SplitTest.java | 68 ++++++++++
.../src/test/resources/compatibility/datasplit-v6 | Bin 0 -> 976 bytes
.../compatibility/manifest-committable-v7 | Bin 0 -> 3454 bytes
.../apache/paimon/flink/action/CompactAction.java | 22 ++-
.../flink/action/RemoveUnexistingFilesAction.java | 1 +
...ucketNewFilesCompactionCoordinatorOperator.java | 4 +
...wareBucketNewFilesCompactionWorkerOperator.java | 1 +
.../ChangelogCompactCoordinateOperator.java | 2 +
.../compact/changelog/ChangelogCompactTask.java | 9 ++
.../changelog/ChangelogCompactTaskSerializer.java | 5 +-
.../postpone/PostponeBucketCompactSplitSource.java | 1 +
.../RemovePostponeBucketFilesOperator.java | 1 +
.../RewritePostponeBucketCommittableOperator.java | 9 +-
.../paimon/flink/sink/RewriteFileIndexSink.java | 10 +-
.../paimon/flink/PostponeBucketTableITCase.java | 36 ++---
...tNewFilesCompactionCoordinatorOperatorTest.java | 1 +
.../ChangelogCompactTaskSerializerTest.java | 3 +-
.../flink/sink/CommittableSerializerTest.java | 2 +-
.../paimon/flink/sink/CommitterOperatorTest.java | 6 +
.../sink/MultiTableCommittableSerializerTest.java | 4 +-
.../WrappedManifestCommittableSerializerTest.java | 13 +-
.../sink/partition/PartitionMarkDoneTest.java | 2 +
.../format/orc/filter/OrcSimpleStatsExtractor.java | 148 +++++++++------------
.../apache/paimon/hive/migrate/HiveMigrator.java | 3 +-
.../scala/org/apache/paimon/spark/ScanHelper.scala | 1 +
.../paimon/spark/commands/PaimonCommand.scala | 1 +
.../paimon/spark/commands/PaimonSparkWriter.scala | 2 +
.../spark/commands/SparkDeletionVectors.scala | 7 +-
.../procedure/SparkRemoveUnexistingFiles.scala | 1 +
54 files changed, 549 insertions(+), 218 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendCompactionTask.java
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendCompactionTask.java
index b510f8346e..37c3b78af0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendCompactionTask.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendCompactionTask.java
@@ -108,8 +108,10 @@ public class UnawareAppendCompactionTask {
new CompactIncrement(compactBefore, compactAfter,
Collections.emptyList());
return new CommitMessageImpl(
partition,
- 0, // bucket 0 is bucket for unaware-bucket table for
compatibility with the old
- // design
+ // bucket 0 is bucket for unaware-bucket table
+ // for compatibility with the old design
+ 0,
+ table.coreOptions().bucket(),
DataIncrement.emptyIncrement(),
compactIncrement,
indexIncrement);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
index 8fa08287d8..29ae1bdde4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
@@ -446,7 +446,8 @@ public class IcebergMigrator implements Migrator {
}
List<DataFileMeta> fileMetas =
construct(icebergDataFileMetas, fileIO, paimonTable,
newDir, rollback);
- return FileMetaUtils.commitFile(partitionRow, fileMetas);
+ return FileMetaUtils.commitFile(
+ partitionRow, paimonTable.coreOptions().bucket(),
fileMetas);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
index 0057b2b432..e72120abac 100644
--- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
@@ -83,10 +83,12 @@ public class FileMetaUtils {
.collect(Collectors.toList());
}
- public static CommitMessage commitFile(BinaryRow partition,
List<DataFileMeta> dataFileMetas) {
+ public static CommitMessage commitFile(
+ BinaryRow partition, int totalBuckets, List<DataFileMeta>
dataFileMetas) {
return new CommitMessageImpl(
partition,
0,
+ totalBuckets,
new DataIncrement(dataFileMetas, Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(
Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index f045a056c9..37647bc9e7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -74,7 +74,7 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
private final int writerNumberMax;
@Nullable private final IndexMaintainer.Factory<T> indexFactory;
@Nullable private final DeletionVectorsMaintainer.Factory
dvMaintainerFactory;
- private final int totalBuckets;
+ private final int numBuckets;
private final RowType partitionType;
@Nullable protected IOManager ioManager;
@@ -84,12 +84,13 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
private ExecutorService lazyCompactExecutor;
private boolean closeCompactExecutorWhenLeaving = true;
private boolean ignorePreviousFiles = false;
+ private boolean ignoreNumBucketCheck = false;
protected boolean isStreamingMode = false;
protected CompactionMetrics compactionMetrics = null;
protected final String tableName;
private boolean isInsertOnly;
- private boolean legacyPartitionName;
+ private final boolean legacyPartitionName;
protected AbstractFileStoreWrite(
SnapshotManager snapshotManager,
@@ -98,7 +99,7 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
@Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
String tableName,
CoreOptions options,
- int totalBuckets,
+ int numBuckets,
RowType partitionType,
int writerNumberMax,
boolean legacyPartitionName) {
@@ -112,7 +113,7 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
}
this.indexFactory = indexFactory;
this.dvMaintainerFactory = dvMaintainerFactory;
- this.totalBuckets = totalBuckets;
+ this.numBuckets = numBuckets;
this.partitionType = partitionType;
this.writers = new HashMap<>();
this.tableName = tableName;
@@ -136,6 +137,11 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
this.ignorePreviousFiles = ignorePreviousFiles;
}
+ @Override
+ public void withIgnoreNumBucketCheck(boolean ignoreNumBucketCheck) {
+ this.ignoreNumBucketCheck = ignoreNumBucketCheck;
+ }
+
@Override
public void withCompactExecutor(ExecutorService compactExecutor) {
this.lazyCompactExecutor = compactExecutor;
@@ -228,6 +234,7 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
new CommitMessageImpl(
partition,
bucket,
+ writerContainer.totalBuckets,
increment.newFilesIncrement(),
increment.compactIncrement(),
new IndexIncrement(newIndexFiles));
@@ -348,6 +355,7 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
new State<>(
partition,
bucket,
+ writerContainer.totalBuckets,
writerContainer.baseSnapshotId,
writerContainer.lastModifiedCommitIdentifier,
dataFiles,
@@ -380,6 +388,7 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
WriterContainer<T> writerContainer =
new WriterContainer<>(
writer,
+ state.totalBuckets,
state.indexMaintainer,
state.deletionVectorsMaintainer,
state.baseSnapshotId);
@@ -409,7 +418,7 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
}
private long writerNumber() {
- return writers.values().stream().mapToLong(e ->
e.values().size()).sum();
+ return writers.values().stream().mapToLong(Map::size).sum();
}
@VisibleForTesting
@@ -429,9 +438,13 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
List<DataFileMeta> restoreFiles = new ArrayList<>();
+ int totalBuckets;
if (!ignorePreviousFiles && latestSnapshot != null) {
- restoreFiles = scanExistingFileMetas(latestSnapshot, partition,
bucket);
+ totalBuckets = scanExistingFileMetas(latestSnapshot, partition,
bucket, restoreFiles);
+ } else {
+ totalBuckets = getDefaultBucketNum(partition);
}
+
IndexMaintainer<T> indexMaintainer =
indexFactory == null
? null
@@ -455,6 +468,7 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
notifyNewWriter(writer);
return new WriterContainer<>(
writer,
+ totalBuckets,
indexMaintainer,
deletionVectorsMaintainer,
latestSnapshot == null ? null : latestSnapshot.id());
@@ -471,13 +485,16 @@ public abstract class AbstractFileStoreWrite<T>
implements FileStoreWrite<T> {
return this;
}
- private List<DataFileMeta> scanExistingFileMetas(
- Snapshot snapshot, BinaryRow partition, int bucket) {
- List<DataFileMeta> existingFileMetas = new ArrayList<>();
+ private int scanExistingFileMetas(
+ Snapshot snapshot,
+ BinaryRow partition,
+ int bucket,
+ List<DataFileMeta> existingFileMetas) {
List<ManifestEntry> files =
scan.withSnapshot(snapshot).withPartitionBucket(partition,
bucket).plan().files();
+ int totalBuckets = getDefaultBucketNum(partition);
for (ManifestEntry entry : files) {
- if (entry.totalBuckets() != totalBuckets) {
+ if (!ignoreNumBucketCheck && entry.totalBuckets() != numBuckets) {
String partInfo =
partitionType.getFieldCount() > 0
? "partition "
@@ -491,11 +508,18 @@ public abstract class AbstractFileStoreWrite<T>
implements FileStoreWrite<T> {
String.format(
"Try to write %s with a new bucket num %d, but
the previous bucket num is %d. "
+ "Please switch to batch mode, and
perform INSERT OVERWRITE to rescale current data layout first.",
- partInfo, totalBuckets, entry.totalBuckets()));
+ partInfo, numBuckets, entry.totalBuckets()));
}
+ totalBuckets = entry.totalBuckets();
existingFileMetas.add(entry.file());
}
- return existingFileMetas;
+ return totalBuckets;
+ }
+
+ // TODO see comments on FileStoreWrite#withIgnoreNumBucketCheck for what
is needed to support
+ // writing partitions with different buckets
+ public int getDefaultBucketNum(BinaryRow partition) {
+ return numBuckets;
}
private ExecutorService compactExecutor() {
@@ -534,6 +558,7 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
@VisibleForTesting
public static class WriterContainer<T> {
public final RecordWriter<T> writer;
+ public final int totalBuckets;
@Nullable public final IndexMaintainer<T> indexMaintainer;
@Nullable public final DeletionVectorsMaintainer
deletionVectorsMaintainer;
protected final long baseSnapshotId;
@@ -541,10 +566,12 @@ public abstract class AbstractFileStoreWrite<T>
implements FileStoreWrite<T> {
protected WriterContainer(
RecordWriter<T> writer,
+ int totalBuckets,
@Nullable IndexMaintainer<T> indexMaintainer,
@Nullable DeletionVectorsMaintainer deletionVectorsMaintainer,
Long baseSnapshotId) {
this.writer = writer;
+ this.totalBuckets = totalBuckets;
this.indexMaintainer = indexMaintainer;
this.deletionVectorsMaintainer = deletionVectorsMaintainer;
this.baseSnapshotId =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index ba66294262..ee672f4ac6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -704,8 +704,13 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
private ManifestEntry makeEntry(FileKind kind, CommitMessage
commitMessage, DataFileMeta file) {
+ Integer totalBuckets = commitMessage.totalBuckets();
+ if (totalBuckets == null) {
+ totalBuckets = numBucket;
+ }
+
return new ManifestEntry(
- kind, commitMessage.partition(), commitMessage.bucket(),
numBucket, file);
+ kind, commitMessage.partition(), commitMessage.bucket(),
totalBuckets, file);
}
private int tryCommit(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 3e0bd25475..3b77c0a1a1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -20,7 +20,6 @@ package org.apache.paimon.operation;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.BucketEntry;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestCacheFilter;
@@ -148,13 +147,13 @@ public interface FileStoreScan {
}
/** Return a map group by partition and bucket. */
- static Map<BinaryRow, Map<Integer, List<DataFileMeta>>>
groupByPartFiles(
+ static Map<BinaryRow, Map<Integer, List<ManifestEntry>>>
groupByPartFiles(
List<ManifestEntry> files) {
- Map<BinaryRow, Map<Integer, List<DataFileMeta>>> groupBy = new
LinkedHashMap<>();
+ Map<BinaryRow, Map<Integer, List<ManifestEntry>>> groupBy = new
LinkedHashMap<>();
for (ManifestEntry entry : files) {
groupBy.computeIfAbsent(entry.partition(), k -> new
LinkedHashMap<>())
.computeIfAbsent(entry.bucket(), k -> new
ArrayList<>())
- .add(entry.file());
+ .add(entry);
}
return groupBy;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
index 993e07d532..f10f9fb62c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
@@ -73,6 +73,16 @@ public interface FileStoreWrite<T> extends
Restorable<List<FileStoreWrite.State<
*/
void withIgnorePreviousFiles(boolean ignorePreviousFiles);
+ /**
+ * Ignores the check that the written partition must have the same number
of buckets with the
+ * table option.
+ *
+ * <p>TODO: to support writing partitions with different total buckets,
we'll also need a
+ * special {@link org.apache.paimon.table.sink.ChannelComputer} and {@link
+ * org.apache.paimon.table.sink.KeyAndBucketExtractor} to deal with
different bucket numbers.
+ */
+ void withIgnoreNumBucketCheck(boolean ignoreNumBucketCheck);
+
/**
* We detect whether it is in batch mode, if so, we do some optimization.
*
@@ -151,6 +161,7 @@ public interface FileStoreWrite<T> extends
Restorable<List<FileStoreWrite.State<
protected final BinaryRow partition;
protected final int bucket;
+ protected final int totalBuckets;
protected final long baseSnapshotId;
protected final long lastModifiedCommitIdentifier;
@@ -163,6 +174,7 @@ public interface FileStoreWrite<T> extends
Restorable<List<FileStoreWrite.State<
protected State(
BinaryRow partition,
int bucket,
+ int totalBuckets,
long baseSnapshotId,
long lastModifiedCommitIdentifier,
Collection<DataFileMeta> dataFiles,
@@ -172,6 +184,7 @@ public interface FileStoreWrite<T> extends
Restorable<List<FileStoreWrite.State<
CommitIncrement commitIncrement) {
this.partition = partition;
this.bucket = bucket;
+ this.totalBuckets = totalBuckets;
this.baseSnapshotId = baseSnapshotId;
this.lastModifiedCommitIdentifier = lastModifiedCommitIdentifier;
this.dataFiles = new ArrayList<>(dataFiles);
@@ -184,9 +197,10 @@ public interface FileStoreWrite<T> extends
Restorable<List<FileStoreWrite.State<
@Override
public String toString() {
return String.format(
- "{%s, %d, %d, %d, %s, %d, %s, %s, %s}",
+ "{%s, %d, %d, %d, %d, %s, %d, %s, %s, %s}",
partition,
bucket,
+ totalBuckets,
baseSnapshotId,
lastModifiedCommitIdentifier,
dataFiles,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessage.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessage.java
index 32a8d66aa5..47f82bcfe8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessage.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessage.java
@@ -21,6 +21,8 @@ package org.apache.paimon.table.sink;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.BinaryRow;
+import javax.annotation.Nullable;
+
import java.io.Serializable;
/**
@@ -36,4 +38,8 @@ public interface CommitMessage extends Serializable {
/** Bucket of this commit message. */
int bucket();
+
+ /** Total number of buckets in this partition. */
+ @Nullable
+ Integer totalBuckets();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java
index b95e96ac1f..5831066816 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java
@@ -26,6 +26,8 @@ import org.apache.paimon.io.DataInputViewStreamWrapper;
import org.apache.paimon.io.DataOutputViewStreamWrapper;
import org.apache.paimon.io.IndexIncrement;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
@@ -45,6 +47,7 @@ public class CommitMessageImpl implements CommitMessage {
private transient BinaryRow partition;
private transient int bucket;
+ private transient @Nullable Integer totalBuckets;
private transient DataIncrement dataIncrement;
private transient CompactIncrement compactIncrement;
private transient IndexIncrement indexIncrement;
@@ -53,11 +56,13 @@ public class CommitMessageImpl implements CommitMessage {
public CommitMessageImpl(
BinaryRow partition,
int bucket,
+ @Nullable Integer totalBuckets,
DataIncrement dataIncrement,
CompactIncrement compactIncrement) {
this(
partition,
bucket,
+ totalBuckets,
dataIncrement,
compactIncrement,
new IndexIncrement(Collections.emptyList()));
@@ -66,11 +71,13 @@ public class CommitMessageImpl implements CommitMessage {
public CommitMessageImpl(
BinaryRow partition,
int bucket,
+ @Nullable Integer totalBuckets,
DataIncrement dataIncrement,
CompactIncrement compactIncrement,
IndexIncrement indexIncrement) {
this.partition = partition;
this.bucket = bucket;
+ this.totalBuckets = totalBuckets;
this.dataIncrement = dataIncrement;
this.compactIncrement = compactIncrement;
this.indexIncrement = indexIncrement;
@@ -86,6 +93,11 @@ public class CommitMessageImpl implements CommitMessage {
return bucket;
}
+ @Override
+ public @Nullable Integer totalBuckets() {
+ return totalBuckets;
+ }
+
public DataIncrement newFilesIncrement() {
return dataIncrement;
}
@@ -116,6 +128,7 @@ public class CommitMessageImpl implements CommitMessage {
CommitMessageImpl message = (CommitMessageImpl)
CACHE.get().deserialize(version, bytes);
this.partition = message.partition;
this.bucket = message.bucket;
+ this.totalBuckets = message.totalBuckets;
this.dataIncrement = message.dataIncrement;
this.compactIncrement = message.compactIncrement;
this.indexIncrement = message.indexIncrement;
@@ -133,6 +146,7 @@ public class CommitMessageImpl implements CommitMessage {
CommitMessageImpl that = (CommitMessageImpl) o;
return bucket == that.bucket
&& Objects.equals(partition, that.partition)
+ && Objects.equals(totalBuckets, that.totalBuckets)
&& Objects.equals(dataIncrement, that.dataIncrement)
&& Objects.equals(compactIncrement, that.compactIncrement)
&& Objects.equals(indexIncrement, that.indexIncrement);
@@ -140,7 +154,8 @@ public class CommitMessageImpl implements CommitMessage {
@Override
public int hashCode() {
- return Objects.hash(partition, bucket, dataIncrement,
compactIncrement, indexIncrement);
+ return Objects.hash(
+ partition, bucket, totalBuckets, dataIncrement,
compactIncrement, indexIncrement);
}
@Override
@@ -149,9 +164,10 @@ public class CommitMessageImpl implements CommitMessage {
"FileCommittable {"
+ "partition = %s, "
+ "bucket = %d, "
+ + "totalBuckets = %s, "
+ "newFilesIncrement = %s, "
+ "compactIncrement = %s, "
+ "indexIncrement = %s}",
- partition, bucket, dataIncrement, compactIncrement,
indexIncrement);
+ partition, bucket, totalBuckets, dataIncrement,
compactIncrement, indexIncrement);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
index 5da96da765..fe1bc6adb9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
@@ -71,6 +71,7 @@ public class CommitMessageLegacyV2Serializer {
return new CommitMessageImpl(
deserializeBinaryRow(view),
view.readInt(),
+ null,
new DataIncrement(
dataFileSerializer.deserializeList(view),
Collections.emptyList(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
index 1c0b67d409..4d91359ec0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
@@ -48,7 +48,7 @@ import static
org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
/** {@link VersionedSerializer} for {@link CommitMessage}. */
public class CommitMessageSerializer implements
VersionedSerializer<CommitMessage> {
- private static final int CURRENT_VERSION = 6;
+ private static final int CURRENT_VERSION = 7;
private final DataFileMetaSerializer dataFileSerializer;
private final IndexFileMetaSerializer indexEntrySerializer;
@@ -85,8 +85,18 @@ public class CommitMessageSerializer implements
VersionedSerializer<CommitMessag
private void serialize(CommitMessage obj, DataOutputView view) throws
IOException {
CommitMessageImpl message = (CommitMessageImpl) obj;
+
serializeBinaryRow(obj.partition(), view);
view.writeInt(obj.bucket());
+
+ Integer totalBuckets = obj.totalBuckets();
+ if (totalBuckets != null) {
+ view.writeBoolean(true);
+ view.writeInt(totalBuckets);
+ } else {
+ view.writeBoolean(false);
+ }
+
dataFileSerializer.serializeList(message.newFilesIncrement().newFiles(), view);
dataFileSerializer.serializeList(message.newFilesIncrement().deletedFiles(),
view);
dataFileSerializer.serializeList(message.newFilesIncrement().changelogFiles(),
view);
@@ -120,6 +130,7 @@ public class CommitMessageSerializer implements
VersionedSerializer<CommitMessag
return new CommitMessageImpl(
deserializeBinaryRow(view),
view.readInt(),
+ version >= 7 && view.readBoolean() ? view.readInt() : null,
new DataIncrement(
fileDeserializer.get(), fileDeserializer.get(),
fileDeserializer.get()),
new CompactIncrement(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index e91153f5bc..e4f5cea8c8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -300,7 +300,7 @@ public class TableCommitImpl implements InnerTableCommit {
f -> nonExists.test(f) ? singletonList(f) :
emptyList(),
files));
- if (nonExistFiles.size() > 0) {
+ if (!nonExistFiles.isEmpty()) {
String message =
String.join(
"\n",
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 5e39d3a71b..c34fca6997 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -58,12 +58,13 @@ public class DataSplit implements Split {
private static final long serialVersionUID = 7L;
private static final long MAGIC = -2394839472490812314L;
- private static final int VERSION = 5;
+ private static final int VERSION = 6;
private long snapshotId = 0;
private BinaryRow partition;
private int bucket = -1;
private String bucketPath;
+ @Nullable private Integer totalBuckets;
private List<DataFileMeta> beforeFiles = new ArrayList<>();
@Nullable private List<DeletionFile> beforeDeletionFiles;
@@ -92,6 +93,10 @@ public class DataSplit implements Split {
return bucketPath;
}
+ public @Nullable Integer totalBuckets() {
+ return totalBuckets;
+ }
+
public List<DataFileMeta> beforeFiles() {
return beforeFiles;
}
@@ -276,6 +281,7 @@ public class DataSplit implements Split {
&& rawConvertible == dataSplit.rawConvertible
&& Objects.equals(partition, dataSplit.partition)
&& Objects.equals(bucketPath, dataSplit.bucketPath)
+ && Objects.equals(totalBuckets, dataSplit.totalBuckets)
&& Objects.equals(beforeFiles, dataSplit.beforeFiles)
&& Objects.equals(beforeDeletionFiles,
dataSplit.beforeDeletionFiles)
&& Objects.equals(dataFiles, dataSplit.dataFiles)
@@ -289,6 +295,7 @@ public class DataSplit implements Split {
partition,
bucket,
bucketPath,
+ totalBuckets,
beforeFiles,
beforeDeletionFiles,
dataFiles,
@@ -310,6 +317,7 @@ public class DataSplit implements Split {
this.partition = other.partition;
this.bucket = other.bucket;
this.bucketPath = other.bucketPath;
+ this.totalBuckets = other.totalBuckets;
this.beforeFiles = other.beforeFiles;
this.beforeDeletionFiles = other.beforeDeletionFiles;
this.dataFiles = other.dataFiles;
@@ -325,6 +333,12 @@ public class DataSplit implements Split {
SerializationUtils.serializeBinaryRow(partition, out);
out.writeInt(bucket);
out.writeUTF(bucketPath);
+ if (totalBuckets != null) {
+ out.writeBoolean(true);
+ out.writeInt(totalBuckets);
+ } else {
+ out.writeBoolean(false);
+ }
DataFileMetaSerializer dataFileSer = new DataFileMetaSerializer();
out.writeInt(beforeFiles.size());
@@ -354,6 +368,7 @@ public class DataSplit implements Split {
BinaryRow partition = SerializationUtils.deserializeBinaryRow(in);
int bucket = in.readInt();
String bucketPath = in.readUTF();
+ Integer totalBuckets = version >= 6 && in.readBoolean() ? in.readInt()
: null;
FunctionWithIOException<DataInputView, DataFileMeta> dataFileSer =
getFileMetaSerde(version);
@@ -385,6 +400,7 @@ public class DataSplit implements Split {
.withPartition(partition)
.withBucket(bucket)
.withBucketPath(bucketPath)
+ .withTotalBuckets(totalBuckets)
.withBeforeFiles(beforeFiles)
.withDataFiles(dataFiles)
.isStreaming(isStreaming)
@@ -458,6 +474,11 @@ public class DataSplit implements Split {
return this;
}
+ public Builder withTotalBuckets(Integer totalBuckets) {
+ this.split.totalBuckets = totalBuckets;
+ return this;
+ }
+
public Builder withBeforeFiles(List<DataFileMeta> beforeFiles) {
this.split.beforeFiles = new ArrayList<>(beforeFiles);
return this;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
index 80a56e4f0c..df837bab0a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
@@ -71,7 +70,7 @@ public class IncrementalDeltaStartingScanner extends
AbstractStartingScanner {
@Override
public Result scan(SnapshotReader reader) {
- Map<Pair<BinaryRow, Integer>, List<DataFileMeta>> grouped = new
ConcurrentHashMap<>();
+ Map<Pair<BinaryRow, Integer>, List<ManifestEntry>> grouped = new
ConcurrentHashMap<>();
ManifestsReader manifestsReader = reader.manifestsReader();
List<Long> snapshots =
@@ -114,29 +113,28 @@ public class IncrementalDeltaStartingScanner extends
AbstractStartingScanner {
ManifestEntry entry = entries.next();
checkArgument(
entry.kind() == FileKind.ADD, "Delta or changelog should
only have ADD files.");
- grouped.compute(
- Pair.of(entry.partition(), entry.bucket()),
- (key, files) -> {
- if (files == null) {
- files = new ArrayList<>();
- }
- files.add(entry.file());
- return files;
- });
+ grouped.computeIfAbsent(
+ Pair.of(entry.partition(), entry.bucket()), ignore
-> new ArrayList<>())
+ .add(entry);
}
List<Split> result = new ArrayList<>();
- for (Map.Entry<Pair<BinaryRow, Integer>, List<DataFileMeta>> entry :
grouped.entrySet()) {
+ for (Map.Entry<Pair<BinaryRow, Integer>, List<ManifestEntry>> entry :
grouped.entrySet()) {
BinaryRow partition = entry.getKey().getLeft();
int bucket = entry.getKey().getRight();
String bucketPath = reader.pathFactory().bucketPath(partition,
bucket).toString();
for (SplitGenerator.SplitGroup splitGroup :
- reader.splitGenerator().splitForBatch(entry.getValue())) {
+ reader.splitGenerator()
+ .splitForBatch(
+ entry.getValue().stream()
+ .map(ManifestEntry::file)
+ .collect(Collectors.toList()))) {
DataSplit.Builder dataSplitBuilder =
DataSplit.builder()
.withSnapshot(endingSnapshotId)
.withPartition(partition)
.withBucket(bucket)
+
.withTotalBuckets(entry.getValue().get(0).totalBuckets())
.withDataFiles(splitGroup.files)
.rawConvertible(splitGroup.rawConvertible)
.withBucketPath(bucketPath);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 649dc5b1e4..931e9d4eba 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -65,6 +65,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
import static org.apache.paimon.Snapshot.FIRST_SNAPSHOT_ID;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
@@ -311,17 +312,17 @@ public class SnapshotReaderImpl implements SnapshotReader
{
FileStoreScan.Plan plan = scan.plan();
@Nullable Snapshot snapshot = plan.snapshot();
- Map<BinaryRow, Map<Integer, List<DataFileMeta>>> files =
+ Map<BinaryRow, Map<Integer, List<ManifestEntry>>> grouped =
groupByPartFiles(plan.files(FileKind.ADD));
if (options.scanPlanSortPartition()) {
- Map<BinaryRow, Map<Integer, List<DataFileMeta>>> newFiles = new
LinkedHashMap<>();
- files.entrySet().stream()
+ Map<BinaryRow, Map<Integer, List<ManifestEntry>>> sorted = new
LinkedHashMap<>();
+ grouped.entrySet().stream()
.sorted((o1, o2) ->
partitionComparator().compare(o1.getKey(), o2.getKey()))
- .forEach(entry -> newFiles.put(entry.getKey(),
entry.getValue()));
- files = newFiles;
+ .forEach(entry -> sorted.put(entry.getKey(),
entry.getValue()));
+ grouped = sorted;
}
List<DataSplit> splits =
- generateSplits(snapshot, scanMode != ScanMode.ALL,
splitGenerator, files);
+ generateSplits(snapshot, scanMode != ScanMode.ALL,
splitGenerator, grouped);
return new PlanImpl(
plan.watermark(), snapshot == null ? null : snapshot.id(),
(List) splits);
}
@@ -330,7 +331,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
@Nullable Snapshot snapshot,
boolean isStreaming,
SplitGenerator splitGenerator,
- Map<BinaryRow, Map<Integer, List<DataFileMeta>>> groupedDataFiles)
{
+ Map<BinaryRow, Map<Integer, List<ManifestEntry>>>
groupedManifestEntries) {
List<DataSplit> splits = new ArrayList<>();
// Read deletion indexes at once to reduce file IO
Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>>
deletionIndexFilesMap = null;
@@ -338,22 +339,28 @@ public class SnapshotReaderImpl implements SnapshotReader
{
deletionIndexFilesMap =
deletionVectors && snapshot != null
? indexFileHandler.scan(
- snapshot, DELETION_VECTORS_INDEX,
groupedDataFiles.keySet())
+ snapshot,
+ DELETION_VECTORS_INDEX,
+ groupedManifestEntries.keySet())
: Collections.emptyMap();
}
- for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> entry :
- groupedDataFiles.entrySet()) {
+ for (Map.Entry<BinaryRow, Map<Integer, List<ManifestEntry>>> entry :
+ groupedManifestEntries.entrySet()) {
BinaryRow partition = entry.getKey();
- Map<Integer, List<DataFileMeta>> buckets = entry.getValue();
- for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
buckets.entrySet()) {
+ Map<Integer, List<ManifestEntry>> buckets = entry.getValue();
+ for (Map.Entry<Integer, List<ManifestEntry>> bucketEntry :
buckets.entrySet()) {
int bucket = bucketEntry.getKey();
- List<DataFileMeta> bucketFiles = bucketEntry.getValue();
+ List<DataFileMeta> bucketFiles =
+ bucketEntry.getValue().stream()
+ .map(ManifestEntry::file)
+ .collect(Collectors.toList());
DataSplit.Builder builder =
DataSplit.builder()
.withSnapshot(
snapshot == null ? FIRST_SNAPSHOT_ID -
1 : snapshot.id())
.withPartition(partition)
.withBucket(bucket)
+
.withTotalBuckets(bucketEntry.getValue().get(0).totalBuckets())
.isStreaming(isStreaming);
List<SplitGenerator.SplitGroup> splitGroups =
isStreaming
@@ -406,9 +413,9 @@ public class SnapshotReaderImpl implements SnapshotReader {
withMode(ScanMode.DELTA);
FileStoreScan.Plan plan = scan.plan();
- Map<BinaryRow, Map<Integer, List<DataFileMeta>>> beforeFiles =
+ Map<BinaryRow, Map<Integer, List<ManifestEntry>>> beforeFiles =
groupByPartFiles(plan.files(FileKind.DELETE));
- Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles =
+ Map<BinaryRow, Map<Integer, List<ManifestEntry>>> dataFiles =
groupByPartFiles(plan.files(FileKind.ADD));
Snapshot beforeSnapshot =
snapshotManager.snapshot(plan.snapshot().id() - 1);
return toChangesPlan(true, plan, beforeSnapshot, beforeFiles,
dataFiles);
@@ -418,8 +425,8 @@ public class SnapshotReaderImpl implements SnapshotReader {
boolean isStreaming,
FileStoreScan.Plan plan,
Snapshot beforeSnapshot,
- Map<BinaryRow, Map<Integer, List<DataFileMeta>>> beforeFiles,
- Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles) {
+ Map<BinaryRow, Map<Integer, List<ManifestEntry>>> beforeFiles,
+ Map<BinaryRow, Map<Integer, List<ManifestEntry>>> dataFiles) {
Snapshot snapshot = plan.snapshot();
List<DataSplit> splits = new ArrayList<>();
Map<BinaryRow, Set<Integer>> buckets = new HashMap<>();
@@ -450,23 +457,38 @@ public class SnapshotReaderImpl implements SnapshotReader
{
for (Map.Entry<BinaryRow, Set<Integer>> entry : buckets.entrySet()) {
BinaryRow part = entry.getKey();
for (Integer bucket : entry.getValue()) {
- List<DataFileMeta> before =
+ List<ManifestEntry> beforeEntries =
beforeFiles
.getOrDefault(part, Collections.emptyMap())
.getOrDefault(bucket, Collections.emptyList());
- List<DataFileMeta> data =
+ List<ManifestEntry> dataEntries =
dataFiles
.getOrDefault(part, Collections.emptyMap())
.getOrDefault(bucket, Collections.emptyList());
// deduplicate
- before.removeIf(data::remove);
+ beforeEntries.removeIf(dataEntries::remove);
+
+ Integer totalBuckets = null;
+ if (!dataEntries.isEmpty()) {
+ totalBuckets = dataEntries.get(0).totalBuckets();
+ } else if (!beforeEntries.isEmpty()) {
+ totalBuckets = beforeEntries.get(0).totalBuckets();
+ }
+
+ List<DataFileMeta> before =
+ beforeEntries.stream()
+ .map(ManifestEntry::file)
+ .collect(Collectors.toList());
+ List<DataFileMeta> data =
+
dataEntries.stream().map(ManifestEntry::file).collect(Collectors.toList());
DataSplit.Builder builder =
DataSplit.builder()
.withSnapshot(snapshot.id())
.withPartition(part)
.withBucket(bucket)
+ .withTotalBuckets(totalBuckets)
.withBeforeFiles(before)
.withDataFiles(data)
.isStreaming(isStreaming)
@@ -497,9 +519,9 @@ public class SnapshotReaderImpl implements SnapshotReader {
public Plan readIncrementalDiff(Snapshot before) {
withMode(ScanMode.ALL);
FileStoreScan.Plan plan = scan.plan();
- Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles =
+ Map<BinaryRow, Map<Integer, List<ManifestEntry>>> dataFiles =
groupByPartFiles(plan.files(FileKind.ADD));
- Map<BinaryRow, Map<Integer, List<DataFileMeta>>> beforeFiles =
+ Map<BinaryRow, Map<Integer, List<ManifestEntry>>> beforeFiles =
groupByPartFiles(scan.withSnapshot(before).plan().files(FileKind.ADD));
return toChangesPlan(false, plan, before, beforeFiles, dataFiles);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
index 3c794e146c..aa629a67bc 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
@@ -109,6 +109,7 @@ public class TestAppendFileStore extends
AppendOnlyFileStore {
return new CommitMessageImpl(
partition,
bucket,
+ options().bucket(),
DataIncrement.emptyIncrement(),
CompactIncrement.emptyIncrement(),
new IndexIncrement(Collections.emptyList(), indexFileMetas));
@@ -143,6 +144,7 @@ public class TestAppendFileStore extends
AppendOnlyFileStore {
return new CommitMessageImpl(
partition,
bucket,
+ options().bucket(),
DataIncrement.emptyIncrement(),
CompactIncrement.emptyIncrement(),
new IndexIncrement(dvMaintainer.writeDeletionVectorsIndex()));
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 0da4489e3f..3e09b5a887 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -340,6 +340,7 @@ public class TestFileStore extends KeyValueFileStore {
new CommitMessageImpl(
entryWithPartition.getKey(),
entryWithBucket.getKey(),
+ options().bucket(),
increment.newFilesIncrement(),
increment.compactIncrement(),
new IndexIncrement(indexFiles)));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
index 07446faab6..21ff27ae99 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
@@ -94,6 +94,7 @@ public class DeletionVectorsMaintainerTest extends
PrimaryKeyTableTestBase {
new CommitMessageImpl(
BinaryRow.EMPTY_ROW,
0,
+ 1,
DataIncrement.emptyIncrement(),
CompactIncrement.emptyIncrement(),
new IndexIncrement(fileMetas1));
@@ -115,6 +116,7 @@ public class DeletionVectorsMaintainerTest extends
PrimaryKeyTableTestBase {
new CommitMessageImpl(
BinaryRow.EMPTY_ROW,
0,
+ 1,
DataIncrement.emptyIncrement(),
CompactIncrement.emptyIncrement(),
new IndexIncrement(fileMetas2));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
index df71f204b7..43f594073f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
@@ -208,10 +208,12 @@ public class HashBucketAssignerTest extends
PrimaryKeyTableTestBase {
assertThat(assigner.currentPartitions()).contains(row(2));
}
- private CommitMessage createCommitMessage(BinaryRow partition, int bucket,
IndexFileMeta file) {
+ private CommitMessage createCommitMessage(
+ BinaryRow partition, int bucket, int totalBuckets, IndexFileMeta
file) {
return new CommitMessageImpl(
partition,
bucket,
+ totalBuckets,
new DataIncrement(
Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(
@@ -226,8 +228,8 @@ public class HashBucketAssignerTest extends
PrimaryKeyTableTestBase {
commit.commit(
0,
Arrays.asList(
- createCommitMessage(row(1), 0, bucket0),
- createCommitMessage(row(1), 2, bucket2)));
+ createCommitMessage(row(1), 0, 3, bucket0),
+ createCommitMessage(row(1), 2, 3, bucket2)));
HashBucketAssigner assigner0 = createAssigner(3, 3, 0);
HashBucketAssigner assigner2 = createAssigner(3, 3, 2);
@@ -252,8 +254,8 @@ public class HashBucketAssignerTest extends
PrimaryKeyTableTestBase {
commit.commit(
0,
Arrays.asList(
- createCommitMessage(row(1), 0, bucket0),
- createCommitMessage(row(1), 2, bucket2)));
+ createCommitMessage(row(1), 0, 3, bucket0),
+ createCommitMessage(row(1), 2, 3, bucket2)));
HashBucketAssigner assigner0 = createAssigner(3, 3, 0, 1);
HashBucketAssigner assigner2 = createAssigner(3, 3, 2, 1);
@@ -311,8 +313,10 @@ public class HashBucketAssignerTest extends
PrimaryKeyTableTestBase {
commit.commit(
0,
Arrays.asList(
- createCommitMessage(row(1), 0,
fileHandler.writeHashIndex(new int[] {0})),
- createCommitMessage(row(2), 0,
fileHandler.writeHashIndex(new int[] {0}))));
+ createCommitMessage(
+ row(1), 0, 1, fileHandler.writeHashIndex(new
int[] {0})),
+ createCommitMessage(
+ row(2), 0, 1, fileHandler.writeHashIndex(new
int[] {0}))));
assertThat(assigner.currentPartitions()).containsExactlyInAnyOrder(row(1),
row(2));
// checkpoint 1, but no commit
@@ -328,7 +332,8 @@ public class HashBucketAssignerTest extends
PrimaryKeyTableTestBase {
commit.commit(
1,
Collections.singletonList(
- createCommitMessage(row(1), 0,
fileHandler.writeHashIndex(new int[] {1}))));
+ createCommitMessage(
+ row(1), 0, 1, fileHandler.writeHashIndex(new
int[] {1}))));
assigner.prepareCommit(3);
assertThat(assigner.currentPartitions()).isEmpty();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
index 1544e879c7..b33055e9d5 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
@@ -45,6 +45,78 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Compatibility Test for {@link ManifestCommittableSerializer}. */
public class ManifestCommittableSerializerCompatibilityTest {
+ @Test
+ public void testCompatibilityToV3CommitV7() throws IOException {
+ SimpleStats keyStats =
+ new SimpleStats(
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ fromLongArray(new Long[] {0L}));
+ SimpleStats valueStats =
+ new SimpleStats(
+ singleColumn("min_value"),
+ singleColumn("max_value"),
+ fromLongArray(new Long[] {0L}));
+ DataFileMeta dataFile =
+ new DataFileMeta(
+ "my_file",
+ 1024 * 1024,
+ 1024,
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ keyStats,
+ valueStats,
+ 15,
+ 200,
+ 5,
+ 3,
+ Arrays.asList("extra1", "extra2"),
+
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+ 11L,
+ new byte[] {1, 2, 4},
+ FileSource.COMPACT,
+ Arrays.asList("field1", "field2", "field3"),
+ "hdfs://localhost:9000/path/to/file");
+ List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+ LinkedHashMap<String, DeletionVectorMeta> dvMetas = new
LinkedHashMap<>();
+ dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, 3L));
+ dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, 5L));
+ IndexFileMeta indexFile =
+ new IndexFileMeta("my_index_type", "my_index_file", 1024 *
100, 1002, dvMetas);
+ List<IndexFileMeta> indexFiles = Collections.singletonList(indexFile);
+
+ CommitMessageImpl commitMessage =
+ new CommitMessageImpl(
+ singleColumn("my_partition"),
+ 11,
+ 16,
+ new DataIncrement(dataFiles, dataFiles, dataFiles),
+ new CompactIncrement(dataFiles, dataFiles, dataFiles),
+ new IndexIncrement(indexFiles));
+
+ ManifestCommittable manifestCommittable =
+ new ManifestCommittable(
+ 5,
+ 202020L,
+ Collections.singletonMap(5, 555L),
+ Collections.singletonList(commitMessage));
+
+ ManifestCommittableSerializer serializer = new
ManifestCommittableSerializer();
+ byte[] bytes = serializer.serialize(manifestCommittable);
+ ManifestCommittable deserialized = serializer.deserialize(3, bytes);
+ assertThat(deserialized).isEqualTo(manifestCommittable);
+
+ byte[] oldBytes =
+ IOUtils.readFully(
+ ManifestCommittableSerializerCompatibilityTest.class
+ .getClassLoader()
+
.getResourceAsStream("compatibility/manifest-committable-v7"),
+ true);
+ deserialized = serializer.deserialize(3, oldBytes);
+ assertThat(deserialized).isEqualTo(manifestCommittable);
+ }
+
@Test
public void testCompatibilityToV3CommitV6() throws IOException {
SimpleStats keyStats =
@@ -90,6 +162,7 @@ public class ManifestCommittableSerializerCompatibilityTest {
new CommitMessageImpl(
singleColumn("my_partition"),
11,
+ null,
new DataIncrement(dataFiles, dataFiles, dataFiles),
new CompactIncrement(dataFiles, dataFiles, dataFiles),
new IndexIncrement(indexFiles));
@@ -161,6 +234,7 @@ public class ManifestCommittableSerializerCompatibilityTest
{
new CommitMessageImpl(
singleColumn("my_partition"),
11,
+ null,
new DataIncrement(dataFiles, dataFiles, dataFiles),
new CompactIncrement(dataFiles, dataFiles, dataFiles),
new IndexIncrement(indexFiles));
@@ -231,6 +305,7 @@ public class ManifestCommittableSerializerCompatibilityTest
{
new CommitMessageImpl(
singleColumn("my_partition"),
11,
+ null,
new DataIncrement(dataFiles, dataFiles, dataFiles),
new CompactIncrement(dataFiles, dataFiles, dataFiles),
new IndexIncrement(indexFiles));
@@ -302,6 +377,7 @@ public class ManifestCommittableSerializerCompatibilityTest
{
new CommitMessageImpl(
singleColumn("my_partition"),
11,
+ null,
new DataIncrement(dataFiles, dataFiles, dataFiles),
new CompactIncrement(dataFiles, dataFiles, dataFiles),
new IndexIncrement(indexFiles));
@@ -373,6 +449,7 @@ public class ManifestCommittableSerializerCompatibilityTest
{
new CommitMessageImpl(
singleColumn("my_partition"),
11,
+ null,
new DataIncrement(dataFiles, dataFiles, dataFiles),
new CompactIncrement(dataFiles, dataFiles, dataFiles),
new IndexIncrement(indexFiles));
@@ -441,6 +518,7 @@ public class ManifestCommittableSerializerCompatibilityTest
{
new CommitMessageImpl(
singleColumn("my_partition"),
11,
+ null,
new DataIncrement(dataFiles, Collections.emptyList(),
dataFiles),
new CompactIncrement(dataFiles, dataFiles, dataFiles),
new IndexIncrement(indexFiles));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
index 8de8309bc8..6b2212d983 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
@@ -28,9 +28,7 @@ import org.apache.paimon.table.sink.CommitMessageImpl;
import org.junit.jupiter.api.Test;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
@@ -61,23 +59,22 @@ public class ManifestCommittableSerializerTest {
rnd.nextBoolean()
? new ManifestCommittable(rnd.nextLong(),
rnd.nextLong())
: new ManifestCommittable(rnd.nextLong(), null);
- addFileCommittables(committable, row(0), 0);
- addFileCommittables(committable, row(0), 1);
- addFileCommittables(committable, row(1), 0);
- addFileCommittables(committable, row(1), 1);
+ addFileCommittables(committable, row(0), 0, 2);
+ addFileCommittables(committable, row(0), 1, 2);
+ addFileCommittables(committable, row(1), 0, 2);
+ addFileCommittables(committable, row(1), 1, 2);
return committable;
}
private static void addFileCommittables(
- ManifestCommittable committable, BinaryRow partition, int bucket) {
- List<CommitMessage> commitMessages = new ArrayList<>();
+ ManifestCommittable committable, BinaryRow partition, int bucket,
int totalBuckets) {
int length = ThreadLocalRandom.current().nextInt(10) + 1;
for (int i = 0; i < length; i++) {
DataIncrement dataIncrement = randomNewFilesIncrement();
CompactIncrement compactIncrement = randomCompactIncrement();
CommitMessage commitMessage =
- new CommitMessageImpl(partition, bucket, dataIncrement,
compactIncrement);
- commitMessages.add(commitMessage);
+ new CommitMessageImpl(
+ partition, bucket, totalBuckets, dataIncrement,
compactIncrement);
committable.addFileCommittable(commitMessage);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
index dc2af066e9..deab3a9703 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
@@ -100,6 +100,7 @@ public class FileStoreTestUtils {
new CommitMessageImpl(
entryWithPartition.getKey(),
entryWithBucket.getKey(),
+ store.options().bucket(),
increment.newFilesIncrement(),
increment.compactIncrement()));
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
index aba37d4cce..901a93737b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
@@ -330,6 +330,7 @@ public class PartitionExpireTest {
new CommitMessageImpl(
message.partition(),
message.bucket(),
+ message.totalBuckets(),
new DataIncrement(emptyList(), emptyList(),
emptyList()),
new CompactIncrement(singletonList(file), emptyList(),
emptyList()));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
index ba735594a9..74c652b052 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
@@ -58,6 +58,8 @@ public class TestCommitThread extends Thread {
private final RowType valueType;
private final boolean enableOverwrite;
private final Map<BinaryRow, List<KeyValue>> data;
+ private final int totalBuckets;
+
private final Map<BinaryRow, List<KeyValue>> result;
private final Map<BinaryRow, MergeTreeWriter> writers;
private final Set<BinaryRow> writtenPartitions;
@@ -79,6 +81,8 @@ public class TestCommitThread extends Thread {
this.valueType = valueType;
this.enableOverwrite = enableOverwrite;
this.data = data;
+ this.totalBuckets = testStore.options().bucket();
+
this.result = new HashMap<>();
this.writers = new HashMap<>();
this.writtenPartitions = new HashSet<>();
@@ -140,7 +144,11 @@ public class TestCommitThread extends Thread {
CommitIncrement inc = entry.getValue().prepareCommit(true);
committable.addFileCommittable(
new CommitMessageImpl(
- entry.getKey(), 0, inc.newFilesIncrement(),
inc.compactIncrement()));
+ entry.getKey(),
+ 0,
+ totalBuckets,
+ inc.newFilesIncrement(),
+ inc.compactIncrement()));
}
runWithRetry(committable, () -> commit.commit(committable,
Collections.emptyMap()));
@@ -152,7 +160,11 @@ public class TestCommitThread extends Thread {
CommitIncrement inc = writers.get(partition).prepareCommit(true);
committable.addFileCommittable(
new CommitMessageImpl(
- partition, 0, inc.newFilesIncrement(),
inc.compactIncrement()));
+ partition,
+ 0,
+ totalBuckets,
+ inc.newFilesIncrement(),
+ inc.compactIncrement()));
runWithRetry(
committable,
@@ -175,7 +187,11 @@ public class TestCommitThread extends Thread {
CommitIncrement inc = writer.prepareCommit(true);
committable.addFileCommittable(
new CommitMessageImpl(
- partition, 0, inc.newFilesIncrement(),
inc.compactIncrement()));
+ partition,
+ 0,
+ totalBuckets,
+ inc.newFilesIncrement(),
+ inc.compactIncrement()));
}
commit.commit(committable, Collections.emptyMap());
break;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
index 1f87838aea..351af6a3b7 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
@@ -39,6 +39,7 @@ public class CommitMessageSerializerTest {
@Test
public void test() throws IOException {
CommitMessageSerializer serializer = new CommitMessageSerializer();
+
DataIncrement dataIncrement = randomNewFilesIncrement();
CompactIncrement compactIncrement = randomCompactIncrement();
IndexIncrement indexIncrement =
@@ -46,9 +47,14 @@ public class CommitMessageSerializerTest {
Arrays.asList(randomIndexFile(), randomIndexFile()),
Arrays.asList(randomIndexFile(), randomIndexFile()));
CommitMessageImpl committable =
- new CommitMessageImpl(row(0), 1, dataIncrement,
compactIncrement, indexIncrement);
+ new CommitMessageImpl(
+ row(0), 1, 2, dataIncrement, compactIncrement,
indexIncrement);
+
CommitMessageImpl newCommittable =
- (CommitMessageImpl) serializer.deserialize(5,
serializer.serialize(committable));
+ (CommitMessageImpl) serializer.deserialize(7,
serializer.serialize(committable));
+
assertThat(newCommittable.partition()).isEqualTo(committable.partition());
+ assertThat(newCommittable.bucket()).isEqualTo(committable.bucket());
+
assertThat(newCommittable.totalBuckets()).isEqualTo(committable.totalBuckets());
assertThat(newCommittable.compactIncrement()).isEqualTo(committable.compactIncrement());
assertThat(newCommittable.newFilesIncrement()).isEqualTo(committable.newFilesIncrement());
assertThat(newCommittable.indexIncrement()).isEqualTo(committable.indexIncrement());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
index a87a645711..7213f11ca0 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
@@ -511,6 +511,74 @@ public class SplitTest {
assertThat(actual).isEqualTo(split);
}
+ @Test
+ public void testSerializerCompatibleV6() throws Exception {
+ SimpleStats keyStats =
+ new SimpleStats(
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ fromLongArray(new Long[] {0L}));
+ SimpleStats valueStats =
+ new SimpleStats(
+ singleColumn("min_value"),
+ singleColumn("max_value"),
+ fromLongArray(new Long[] {0L}));
+
+ DataFileMeta dataFile =
+ new DataFileMeta(
+ "my_file",
+ 1024 * 1024,
+ 1024,
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ keyStats,
+ valueStats,
+ 15,
+ 200,
+ 5,
+ 3,
+ Arrays.asList("extra1", "extra2"),
+
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+ 11L,
+ new byte[] {1, 2, 4},
+ FileSource.COMPACT,
+ Arrays.asList("field1", "field2", "field3"),
+ "hdfs:///path/to/warehouse");
+ List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+ DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22,
33L);
+ List<DeletionFile> deletionFiles =
Collections.singletonList(deletionFile);
+
+ BinaryRow partition = new BinaryRow(1);
+ BinaryRowWriter binaryRowWriter = new BinaryRowWriter(partition);
+ binaryRowWriter.writeString(0, BinaryString.fromString("aaaaa"));
+ binaryRowWriter.complete();
+
+ DataSplit split =
+ DataSplit.builder()
+ .withSnapshot(18)
+ .withPartition(partition)
+ .withBucket(20)
+ .withTotalBuckets(32)
+ .withDataFiles(dataFiles)
+ .withDataDeletionFiles(deletionFiles)
+ .withBucketPath("my path")
+ .build();
+
+ assertThat(InstantiationUtil.clone(split)).isEqualTo(split);
+
+ byte[] v6Bytes =
+ IOUtils.readFully(
+ SplitTest.class
+ .getClassLoader()
+
.getResourceAsStream("compatibility/datasplit-v6"),
+ true);
+
+ DataSplit actual =
+ InstantiationUtil.deserializeObject(v6Bytes,
DataSplit.class.getClassLoader());
+ assertThat(actual).isEqualTo(split);
+ }
+
private DataFileMeta newDataFile(long rowCount) {
return newDataFile(rowCount, null, null);
}
diff --git a/paimon-core/src/test/resources/compatibility/datasplit-v6
b/paimon-core/src/test/resources/compatibility/datasplit-v6
new file mode 100644
index 0000000000..91cbc60001
Binary files /dev/null and
b/paimon-core/src/test/resources/compatibility/datasplit-v6 differ
diff --git
a/paimon-core/src/test/resources/compatibility/manifest-committable-v7
b/paimon-core/src/test/resources/compatibility/manifest-committable-v7
new file mode 100644
index 0000000000..31566ca7fd
Binary files /dev/null and
b/paimon-core/src/test/resources/compatibility/manifest-committable-v7 differ
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 91bb30b45b..316655613b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -58,6 +58,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -239,9 +240,12 @@ public class CompactAction extends TableActionBase {
whereSql == null,
"Postpone bucket compaction currently does not support
predicates");
+ Options options = new Options(table.options());
+ int defaultBucketNum =
options.get(FlinkConnectorOptions.POSTPONE_DEFAULT_BUCKET_NUM);
+
// change bucket to a positive value, so we can scan files from the
bucket = -2 directory
Map<String, String> bucketOptions = new HashMap<>(table.options());
- bucketOptions.put(CoreOptions.BUCKET.key(), "1");
+ bucketOptions.put(CoreOptions.BUCKET.key(),
String.valueOf(defaultBucketNum));
FileStoreTable fileStoreTable =
table.copy(table.schema().copy(bucketOptions));
List<BinaryRow> partitions =
@@ -253,15 +257,16 @@ public class CompactAction extends TableActionBase {
return false;
}
- Options options = new Options(fileStoreTable.options());
InternalRowPartitionComputer partitionComputer =
new InternalRowPartitionComputer(
fileStoreTable.coreOptions().partitionDefaultName(),
fileStoreTable.rowType(),
fileStoreTable.partitionKeys().toArray(new String[0]),
fileStoreTable.coreOptions().legacyPartitionName());
+ String commitUser = CoreOptions.createCommitUser(options);
+ List<DataStream<Committable>> dataStreams = new ArrayList<>();
for (BinaryRow partition : partitions) {
- int bucketNum =
options.get(FlinkConnectorOptions.POSTPONE_DEFAULT_BUCKET_NUM);
+ int bucketNum = defaultBucketNum;
Iterator<ManifestEntry> it =
table.newSnapshotReader()
@@ -295,8 +300,6 @@ public class CompactAction extends TableActionBase {
new RowDataChannelComputer(realTable.schema(),
false),
null);
FixedBucketSink sink = new FixedBucketSink(realTable, null, null);
- String commitUser =
-
CoreOptions.createCommitUser(realTable.coreOptions().toConfiguration());
DataStream<Committable> written =
sink.doWrite(partitioned, commitUser,
partitioned.getParallelism())
.forward()
@@ -304,9 +307,16 @@ public class CompactAction extends TableActionBase {
"Rewrite compact committable",
new CommittableTypeInfo(),
new
RewritePostponeBucketCommittableOperator(realTable));
- sink.doCommit(written.union(sourcePair.getRight()), commitUser);
+ dataStreams.add(written);
+ dataStreams.add(sourcePair.getRight());
}
+ FixedBucketSink sink = new FixedBucketSink(fileStoreTable, null, null);
+ DataStream<Committable> dataStream = dataStreams.get(0);
+ for (int i = 1; i < dataStreams.size(); i++) {
+ dataStream = dataStream.union(dataStreams.get(i));
+ }
+ sink.doCommit(dataStream, commitUser);
return true;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java
index 8d014d66ee..996182e0b3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java
@@ -174,6 +174,7 @@ public class RemoveUnexistingFilesAction extends
TableActionBase {
new CommitMessageImpl(
reuse,
entry.getKey(),
+ table.coreOptions().bucket(),
new DataIncrement(
Collections.emptyList(),
new
ArrayList<>(entry.getValue().values()),
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperator.java
index c52c0b2d85..a906f9b414 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperator.java
@@ -61,6 +61,7 @@ public class
UnawareBucketNewFilesCompactionCoordinatorOperator
private final long targetFileSize;
private final long compactionFileSize;
+ private final int totalBuckets;
private transient UnawareBucketNewFilesCompactionCoordinator coordinator;
private transient long checkpointId;
@@ -68,6 +69,7 @@ public class
UnawareBucketNewFilesCompactionCoordinatorOperator
public UnawareBucketNewFilesCompactionCoordinatorOperator(CoreOptions
options) {
this.targetFileSize = options.targetFileSize(false);
this.compactionFileSize = options.compactionFileSize(false);
+ this.totalBuckets = options.bucket();
}
@Override
@@ -124,6 +126,7 @@ public class
UnawareBucketNewFilesCompactionCoordinatorOperator
new CommitMessageImpl(
message.partition(),
message.bucket(),
+ message.totalBuckets(),
new DataIncrement(
skippedFiles,
message.newFilesIncrement().deletedFiles(),
@@ -163,6 +166,7 @@ public class
UnawareBucketNewFilesCompactionCoordinatorOperator
new CommitMessageImpl(
p.getKey(),
BucketMode.UNAWARE_BUCKET,
+ totalBuckets,
new DataIncrement(
Collections.singletonList(p.getValue().get(0)),
Collections.emptyList(),
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionWorkerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionWorkerOperator.java
index d634d22ec6..2ada98f6b9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionWorkerOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionWorkerOperator.java
@@ -102,6 +102,7 @@ public class UnawareBucketNewFilesCompactionWorkerOperator
return new CommitMessageImpl(
message.partition(),
message.bucket(),
+ message.totalBuckets(),
new DataIncrement(
message.compactIncrement().compactAfter(),
Collections.emptyList(),
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
index d1f6a5abd2..9f6bd4431d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
@@ -107,6 +107,7 @@ public class ChangelogCompactCoordinateOperator
new CommitMessageImpl(
message.partition(),
message.bucket(),
+ message.totalBuckets(),
new DataIncrement(
message.newFilesIncrement().newFiles(),
message.newFilesIncrement().deletedFiles(),
@@ -137,6 +138,7 @@ public class ChangelogCompactCoordinateOperator
new ChangelogCompactTask(
checkpointId,
partition,
+ table.coreOptions().bucket(),
partitionChangelog.newFileChangelogFiles,
partitionChangelog.compactChangelogFiles))));
partitionChangelogs.remove(partition);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
index 7f3f730280..96fe15c344 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
@@ -49,18 +49,22 @@ import java.util.UUID;
* file, in order to reduce the number of small files.
*/
public class ChangelogCompactTask implements Serializable {
+
private final long checkpointId;
private final BinaryRow partition;
+ private final int totalBuckets;
private final Map<Integer, List<DataFileMeta>> newFileChangelogFiles;
private final Map<Integer, List<DataFileMeta>> compactChangelogFiles;
public ChangelogCompactTask(
long checkpointId,
BinaryRow partition,
+ int totalBuckets,
Map<Integer, List<DataFileMeta>> newFileChangelogFiles,
Map<Integer, List<DataFileMeta>> compactChangelogFiles) {
this.checkpointId = checkpointId;
this.partition = partition;
+ this.totalBuckets = totalBuckets;
this.newFileChangelogFiles = newFileChangelogFiles;
this.compactChangelogFiles = compactChangelogFiles;
}
@@ -73,6 +77,10 @@ public class ChangelogCompactTask implements Serializable {
return partition;
}
+ public int totalBuckets() {
+ return totalBuckets;
+ }
+
public Map<Integer, List<DataFileMeta>> newFileChangelogFiles() {
return newFileChangelogFiles;
}
@@ -204,6 +212,7 @@ public class ChangelogCompactTask implements Serializable {
new CommitMessageImpl(
partition,
entry.getKey(),
+ totalBuckets,
new DataIncrement(
Collections.emptyList(),
Collections.emptyList(),
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializer.java
index e21220b26d..c7f56dc5de 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializer.java
@@ -39,7 +39,8 @@ import static
org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
/** Serializer for {@link ChangelogCompactTask}. */
public class ChangelogCompactTaskSerializer
implements SimpleVersionedSerializer<ChangelogCompactTask> {
- private static final int CURRENT_VERSION = 1;
+
+ private static final int CURRENT_VERSION = 2;
private final DataFileMetaSerializer dataFileSerializer;
@@ -69,6 +70,7 @@ public class ChangelogCompactTaskSerializer
private void serialize(ChangelogCompactTask task, DataOutputView view)
throws IOException {
view.writeLong(task.checkpointId());
serializeBinaryRow(task.partition(), view);
+ view.writeInt(task.totalBuckets());
// serialize newFileChangelogFiles map
serializeMap(task.newFileChangelogFiles(), view);
serializeMap(task.compactChangelogFiles(), view);
@@ -82,6 +84,7 @@ public class ChangelogCompactTaskSerializer
return new ChangelogCompactTask(
view.readLong(),
deserializeBinaryRow(view),
+ view.readInt(),
deserializeMap(view),
deserializeMap(view));
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
index 0bfab0d196..3746f355c3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
@@ -108,6 +108,7 @@ public class PostponeBucketCompactSplitSource extends
AbstractNonCoordinatedSour
.withPartition(dataSplit.partition())
.withBucket(dataSplit.bucket())
.withBucketPath(dataSplit.bucketPath())
+
.withTotalBuckets(dataSplit.totalBuckets())
.withDataFiles(Collections.singletonList(meta))
.isStreaming(false)
.build();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RemovePostponeBucketFilesOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RemovePostponeBucketFilesOperator.java
index 63b44d0f85..09d3f829bf 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RemovePostponeBucketFilesOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RemovePostponeBucketFilesOperator.java
@@ -45,6 +45,7 @@ public class RemovePostponeBucketFilesOperator extends
BoundedOneInputOperator<S
new CommitMessageImpl(
dataSplit.partition(),
dataSplit.bucket(),
+ dataSplit.totalBuckets(),
DataIncrement.emptyIncrement(),
new CompactIncrement(
dataSplit.dataFiles(),
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
index f387222e4f..8cb2540aa5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
@@ -33,6 +33,8 @@ import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -93,12 +95,14 @@ public class RewritePostponeBucketCommittableOperator
bucketFiles.entrySet()) {
for (Map.Entry<Integer, BucketFiles> bucketEntry :
partitionEntry.getValue().entrySet()) {
+ BucketFiles bucketFiles = bucketEntry.getValue();
CommitMessageImpl message =
new CommitMessageImpl(
partitionEntry.getKey(),
bucketEntry.getKey(),
+ bucketFiles.totalBuckets,
DataIncrement.emptyIncrement(),
- bucketEntry.getValue().makeIncrement());
+ bucketFiles.makeIncrement());
output.collect(
new StreamRecord<>(
new Committable(checkpointId,
Committable.Kind.FILE, message)));
@@ -112,6 +116,7 @@ public class RewritePostponeBucketCommittableOperator
private final DataFilePathFactory pathFactory;
private final FileIO fileIO;
+ private @Nullable Integer totalBuckets;
private final Map<String, DataFileMeta> newFiles;
private final List<DataFileMeta> compactBefore;
private final List<DataFileMeta> compactAfter;
@@ -128,6 +133,8 @@ public class RewritePostponeBucketCommittableOperator
}
private void update(CommitMessageImpl message) {
+ totalBuckets = message.totalBuckets();
+
for (DataFileMeta file : message.newFilesIncrement().newFiles()) {
newFiles.put(file.fileName(), file);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
index bd7ae4a824..d377851d38 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java
@@ -127,16 +127,16 @@ public class RewriteFileIndexSink extends
FlinkWriteSink<ManifestEntry> {
ManifestEntry entry = element.getValue();
BinaryRow partition = entry.partition();
int bucket = entry.bucket();
- DataFileMeta file = entry.file();
- DataFileMeta indexedFile = fileIndexProcessor.process(partition,
bucket, file);
+ DataFileMeta indexedFile = fileIndexProcessor.process(partition,
bucket, entry);
CommitMessageImpl commitMessage =
new CommitMessageImpl(
partition,
bucket,
+ entry.totalBuckets(),
DataIncrement.emptyIncrement(),
new CompactIncrement(
- Collections.singletonList(file),
+ Collections.singletonList(entry.file()),
Collections.singletonList(indexedFile),
Collections.emptyList()));
@@ -176,8 +176,9 @@ public class RewriteFileIndexSink extends
FlinkWriteSink<ManifestEntry> {
this.sizeInMeta =
table.coreOptions().fileIndexInManifestThreshold();
}
- public DataFileMeta process(BinaryRow partition, int bucket,
DataFileMeta dataFileMeta)
+ public DataFileMeta process(BinaryRow partition, int bucket,
ManifestEntry manifestEntry)
throws IOException {
+ DataFileMeta dataFileMeta = manifestEntry.file();
DataFilePathFactory dataFilePathFactory =
pathFactories.get(partition, bucket);
SchemaInfo schemaInfo =
schemaInfoCache.schemaInfo(dataFileMeta.schemaId());
List<String> extras = new ArrayList<>(dataFileMeta.extraFiles());
@@ -245,6 +246,7 @@ public class RewriteFileIndexSink extends
FlinkWriteSink<ManifestEntry> {
pathFactory
.bucketPath(partition, bucket)
.toString())
+
.withTotalBuckets(manifestEntry.totalBuckets())
.withDataFiles(
Collections.singletonList(dataFileMeta))
.rawConvertible(true)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
index d274a02cd8..bd7c6f898f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
@@ -264,10 +264,9 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
+ " 'postpone.default-bucket-num' = '2'\n"
+ ")");
- int numPartitions = 3;
int numKeys = 100;
List<String> values = new ArrayList<>();
- for (int i = 0; i < numPartitions; i++) {
+ for (int i = 0; i < 3; i++) {
for (int j = 0; j < numKeys; j++) {
values.add(String.format("(%d, %d, %d)", i, j, i * numKeys +
j));
}
@@ -276,7 +275,7 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
List<String> expectedBuckets = new ArrayList<>();
- for (int i = 0; i < numPartitions; i++) {
+ for (int i = 0; i < 3; i++) {
expectedBuckets.add(String.format("+I[{%d}, 2]", i));
}
String bucketSql =
@@ -284,7 +283,7 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
assertThat(collect(tEnv.executeSql(bucketSql))).hasSameElementsAs(expectedBuckets);
List<String> expectedData = new ArrayList<>();
- for (int i = 0; i < numPartitions; i++) {
+ for (int i = 0; i < 3; i++) {
expectedData.add(
String.format(
"+I[%d, %d]",
@@ -296,23 +295,20 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
// before rescaling, write some files in bucket = -2 directory,
// these files should not be touched by rescaling
values.clear();
- int changedPartition = 1;
for (int j = 0; j < numKeys; j++) {
- values.add(
- String.format(
- "(%d, %d, %d)",
- changedPartition, j, -(changedPartition * numKeys
+ j)));
+ values.add(String.format("(1, %d, 0)", j));
+ values.add(String.format("(2, %d, 1)", j));
}
tEnv.executeSql("INSERT INTO T VALUES " + String.join(", ",
values)).await();
tEnv.executeSql(
- "CALL sys.rescale(`table` => 'default.T', `bucket_num` => 4,
`partition` => 'pt="
- + changedPartition
- + "')");
+ "CALL sys.rescale(`table` => 'default.T', `bucket_num` => 4,
`partition` => 'pt=1')");
+ tEnv.executeSql(
+ "CALL sys.rescale(`table` => 'default.T', `bucket_num` => 8,
`partition` => 'pt=2')");
expectedBuckets.clear();
- for (int i = 0; i < numPartitions; i++) {
- expectedBuckets.add(String.format("+I[{%d}, %d]", i, i ==
changedPartition ? 4 : 2));
- }
+ expectedBuckets.add("+I[{0}, 2]");
+ expectedBuckets.add("+I[{1}, 4]");
+ expectedBuckets.add("+I[{2}, 8]");
assertThat(collect(tEnv.executeSql(bucketSql))).hasSameElementsAs(expectedBuckets);
assertThat(collect(tEnv.executeSql(query))).hasSameElementsAs(expectedData);
@@ -321,13 +317,9 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
assertThat(collect(tEnv.executeSql(bucketSql))).hasSameElementsAs(expectedBuckets);
expectedData.clear();
- for (int i = 0; i < numPartitions; i++) {
- int val = (i * numKeys + i * numKeys + numKeys - 1) * numKeys / 2;
- if (i == changedPartition) {
- val *= -1;
- }
- expectedData.add(String.format("+I[%d, %d]", i, val));
- }
+ expectedData.add(String.format("+I[0, %d]", (numKeys - 1) * numKeys /
2));
+ expectedData.add("+I[1, 0]");
+ expectedData.add(String.format("+I[2, %d]", numKeys));
assertThat(collect(tEnv.executeSql(query))).hasSameElementsAs(expectedData);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperatorTest.java
index 4f734ff6da..5273e0c7db 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperatorTest.java
@@ -218,6 +218,7 @@ public class
UnawareBucketNewFilesCompactionCoordinatorOperatorTest {
new CommitMessageImpl(
partition,
BucketMode.UNAWARE_BUCKET,
+ -1,
new DataIncrement(
Arrays.stream(mbs)
.mapToObj(this::createDataFileMetaOfSize)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
index 906fac8509..7fcde6214f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
@@ -49,6 +49,7 @@ public class ChangelogCompactTaskSerializerTest {
new ChangelogCompactTask(
1L,
partition,
+ 2,
new HashMap<Integer, List<DataFileMeta>>() {
{
put(0, newFiles(20));
@@ -61,7 +62,7 @@ public class ChangelogCompactTaskSerializerTest {
put(1, newFiles(10));
}
});
- ChangelogCompactTask serializeTask = serializer.deserialize(1,
serializer.serialize(task));
+ ChangelogCompactTask serializeTask = serializer.deserialize(2,
serializer.serialize(task));
assertThat(task).isEqualTo(serializeTask);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommittableSerializerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommittableSerializerTest.java
index 47f9ce570e..1cda7ddc18 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommittableSerializerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommittableSerializerTest.java
@@ -45,7 +45,7 @@ public class CommittableSerializerTest {
DataIncrement dataIncrement = randomNewFilesIncrement();
CompactIncrement compactIncrement = randomCompactIncrement();
CommitMessage committable =
- new CommitMessageImpl(row(0), 1, dataIncrement,
compactIncrement);
+ new CommitMessageImpl(row(0), 1, 2, dataIncrement,
compactIncrement);
CommitMessage newCommittable =
(CommitMessage)
serializer
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 9d3bc135e2..41ccfbf79e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -336,6 +336,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
new CommitMessageImpl(
BinaryRow.EMPTY_ROW,
0,
+ 1,
new DataIncrement(
Collections.emptyList(),
Collections.emptyList(),
@@ -353,6 +354,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
new CommitMessageImpl(
BinaryRow.EMPTY_ROW,
0,
+ 1,
new DataIncrement(
Collections.emptyList(),
Collections.emptyList(),
@@ -369,6 +371,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
new CommitMessageImpl(
BinaryRow.EMPTY_ROW,
0,
+ 1,
new DataIncrement(
Collections.emptyList(),
Collections.emptyList(),
@@ -404,6 +407,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
new CommitMessageImpl(
BinaryRow.EMPTY_ROW,
0,
+ 1,
new DataIncrement(
Collections.emptyList(),
Collections.emptyList(),
@@ -421,6 +425,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
new CommitMessageImpl(
BinaryRow.EMPTY_ROW,
0,
+ 1,
new DataIncrement(
Collections.emptyList(),
Collections.emptyList(),
@@ -437,6 +442,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
new CommitMessageImpl(
BinaryRow.EMPTY_ROW,
0,
+ 1,
new DataIncrement(
Collections.emptyList(),
Collections.emptyList(),
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java
index 16c1a7d044..549595ec7b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/MultiTableCommittableSerializerTest.java
@@ -49,7 +49,7 @@ class MultiTableCommittableSerializerTest {
DataIncrement dataIncrement = randomNewFilesIncrement();
CompactIncrement compactIncrement = randomCompactIncrement();
CommitMessage commitMessage =
- new CommitMessageImpl(row(0), 1, dataIncrement,
compactIncrement);
+ new CommitMessageImpl(row(0), 1, 2, dataIncrement,
compactIncrement);
Committable committable = new Committable(9, Committable.Kind.FILE,
commitMessage);
Arrays.asList(Tuple2.of("测试数据库", "用户信息表"), Tuple2.of("database",
"table"))
@@ -82,7 +82,7 @@ class MultiTableCommittableSerializerTest {
DataIncrement newFilesIncrement = randomNewFilesIncrement();
CompactIncrement compactIncrement = randomCompactIncrement();
CommitMessage commitMessage =
- new CommitMessageImpl(row(0), 1, newFilesIncrement,
compactIncrement);
+ new CommitMessageImpl(row(0), 1, 2, newFilesIncrement,
compactIncrement);
Committable committable = new Committable(9, Committable.Kind.FILE,
commitMessage);
Arrays.asList(Tuple2.of("测试数据库", "用户信息表"), Tuple2.of("database",
"table"))
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java
index b0aa76f157..f82bfc4278 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java
@@ -76,22 +76,23 @@ class WrappedManifestCommittableSerializerTest {
rnd.nextBoolean()
? new ManifestCommittable(rnd.nextLong(),
rnd.nextLong())
: new ManifestCommittable(rnd.nextLong(), null);
- addFileCommittables(committable, row(0), 0);
- addFileCommittables(committable, row(0), 1);
- addFileCommittables(committable, row(1), 0);
- addFileCommittables(committable, row(1), 1);
+ addFileCommittables(committable, row(0), 0, 2);
+ addFileCommittables(committable, row(0), 1, 2);
+ addFileCommittables(committable, row(1), 0, 2);
+ addFileCommittables(committable, row(1), 1, 2);
return committable;
}
public static void addFileCommittables(
- ManifestCommittable committable, BinaryRow partition, int bucket) {
+ ManifestCommittable committable, BinaryRow partition, int bucket,
int totalBuckets) {
List<CommitMessage> commitMessages = new ArrayList<>();
int length = ThreadLocalRandom.current().nextInt(10) + 1;
for (int i = 0; i < length; i++) {
DataIncrement dataIncrement = randomNewFilesIncrement();
CompactIncrement compactIncrement = randomCompactIncrement();
CommitMessage commitMessage =
- new CommitMessageImpl(partition, bucket, dataIncrement,
compactIncrement);
+ new CommitMessageImpl(
+ partition, bucket, totalBuckets, dataIncrement,
compactIncrement);
commitMessages.add(commitMessage);
committable.addFileCommittable(commitMessage);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
index f737a19fa9..a220d566ad 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
@@ -112,6 +112,7 @@ class PartitionMarkDoneTest extends TableTestBase {
new CommitMessageImpl(
BinaryRow.singleColumn(0),
0,
+ 1,
new DataIncrement(emptyList(), emptyList(),
emptyList()),
new CompactIncrement(singletonList(file),
emptyList(), emptyList()),
new IndexIncrement(emptyList()));
@@ -120,6 +121,7 @@ class PartitionMarkDoneTest extends TableTestBase {
new CommitMessageImpl(
BinaryRow.singleColumn(0),
0,
+ 1,
new DataIncrement(singletonList(file),
emptyList(), emptyList()),
new CompactIncrement(emptyList(), emptyList(),
emptyList()),
new IndexIncrement(emptyList()));
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcSimpleStatsExtractor.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcSimpleStatsExtractor.java
index 88fe069d46..f044596708 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcSimpleStatsExtractor.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcSimpleStatsExtractor.java
@@ -45,6 +45,8 @@ import org.apache.orc.Reader;
import org.apache.orc.StringColumnStatistics;
import org.apache.orc.TimestampColumnStatistics;
import org.apache.orc.TypeDescription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Date;
@@ -54,6 +56,8 @@ import java.util.stream.IntStream;
/** {@link SimpleStatsExtractor} for orc files. */
public class OrcSimpleStatsExtractor implements SimpleStatsExtractor {
+ private static final Logger LOG =
LoggerFactory.getLogger(OrcSimpleStatsExtractor.class);
+
private final RowType rowType;
private final SimpleColStatsCollector.Factory[] statsCollectors;
private final boolean legacyTimestampLtzType;
@@ -131,135 +135,109 @@ public class OrcSimpleStatsExtractor implements
SimpleStatsExtractor {
+ "!");
SimpleColStats fieldStats;
+ try {
+ fieldStats = toFieldStats(field, stats, nullCount);
+ } catch (Exception e) {
+ LOG.warn("Failed to extract field stats for field {}", field, e);
+ fieldStats = new SimpleColStats(null, null, nullCount);
+ }
+
+ return collector.convert(fieldStats);
+ }
+
+ private SimpleColStats toFieldStats(DataField field, ColumnStatistics
stats, long nullCount) {
switch (field.type().getTypeRoot()) {
case CHAR:
case VARCHAR:
assertStatsClass(field, stats, StringColumnStatistics.class);
StringColumnStatistics stringStats = (StringColumnStatistics)
stats;
- fieldStats =
- new SimpleColStats(
-
BinaryString.fromString(stringStats.getMinimum()),
-
BinaryString.fromString(stringStats.getMaximum()),
- nullCount);
- break;
+ return new SimpleColStats(
+ BinaryString.fromString(stringStats.getMinimum()),
+ BinaryString.fromString(stringStats.getMaximum()),
+ nullCount);
case BOOLEAN:
assertStatsClass(field, stats, BooleanColumnStatistics.class);
BooleanColumnStatistics boolStats = (BooleanColumnStatistics)
stats;
- fieldStats =
- new SimpleColStats(
- boolStats.getFalseCount() == 0,
- boolStats.getTrueCount() != 0,
- nullCount);
- break;
+ return new SimpleColStats(
+ boolStats.getFalseCount() == 0,
boolStats.getTrueCount() != 0, nullCount);
case DECIMAL:
assertStatsClass(field, stats, DecimalColumnStatistics.class);
DecimalColumnStatistics decimalStats =
(DecimalColumnStatistics) stats;
DecimalType decimalType = (DecimalType) (field.type());
int precision = decimalType.getPrecision();
int scale = decimalType.getScale();
- fieldStats =
- new SimpleColStats(
- Decimal.fromBigDecimal(
-
decimalStats.getMinimum().bigDecimalValue(),
- precision,
- scale),
- Decimal.fromBigDecimal(
-
decimalStats.getMaximum().bigDecimalValue(),
- precision,
- scale),
- nullCount);
- break;
+ return new SimpleColStats(
+ Decimal.fromBigDecimal(
+ decimalStats.getMinimum().bigDecimalValue(),
precision, scale),
+ Decimal.fromBigDecimal(
+ decimalStats.getMaximum().bigDecimalValue(),
precision, scale),
+ nullCount);
case TINYINT:
assertStatsClass(field, stats, IntegerColumnStatistics.class);
IntegerColumnStatistics byteStats = (IntegerColumnStatistics)
stats;
- fieldStats =
- new SimpleColStats(
- (byte) byteStats.getMinimum(),
- (byte) byteStats.getMaximum(),
- nullCount);
- break;
+ return new SimpleColStats(
+ (byte) byteStats.getMinimum(), (byte)
byteStats.getMaximum(), nullCount);
case SMALLINT:
assertStatsClass(field, stats, IntegerColumnStatistics.class);
IntegerColumnStatistics shortStats = (IntegerColumnStatistics)
stats;
- fieldStats =
- new SimpleColStats(
- (short) shortStats.getMinimum(),
- (short) shortStats.getMaximum(),
- nullCount);
- break;
+ return new SimpleColStats(
+ (short) shortStats.getMinimum(),
+ (short) shortStats.getMaximum(),
+ nullCount);
case INTEGER:
case TIME_WITHOUT_TIME_ZONE:
assertStatsClass(field, stats, IntegerColumnStatistics.class);
IntegerColumnStatistics intStats = (IntegerColumnStatistics)
stats;
- fieldStats =
- new SimpleColStats(
- Long.valueOf(intStats.getMinimum()).intValue(),
- Long.valueOf(intStats.getMaximum()).intValue(),
- nullCount);
- break;
+ return new SimpleColStats(
+ Long.valueOf(intStats.getMinimum()).intValue(),
+ Long.valueOf(intStats.getMaximum()).intValue(),
+ nullCount);
case BIGINT:
assertStatsClass(field, stats, IntegerColumnStatistics.class);
IntegerColumnStatistics longStats = (IntegerColumnStatistics)
stats;
- fieldStats =
- new SimpleColStats(
- longStats.getMinimum(),
longStats.getMaximum(), nullCount);
- break;
+ return new SimpleColStats(
+ longStats.getMinimum(), longStats.getMaximum(),
nullCount);
case FLOAT:
assertStatsClass(field, stats, DoubleColumnStatistics.class);
DoubleColumnStatistics floatStats = (DoubleColumnStatistics)
stats;
- fieldStats =
- new SimpleColStats(
- (float) floatStats.getMinimum(),
- (float) floatStats.getMaximum(),
- nullCount);
- break;
+ return new SimpleColStats(
+ (float) floatStats.getMinimum(),
+ (float) floatStats.getMaximum(),
+ nullCount);
case DOUBLE:
assertStatsClass(field, stats, DoubleColumnStatistics.class);
DoubleColumnStatistics doubleStats = (DoubleColumnStatistics)
stats;
- fieldStats =
- new SimpleColStats(
- doubleStats.getMinimum(),
doubleStats.getMaximum(), nullCount);
- break;
+ return new SimpleColStats(
+ doubleStats.getMinimum(), doubleStats.getMaximum(),
nullCount);
case DATE:
assertStatsClass(field, stats, DateColumnStatistics.class);
DateColumnStatistics dateStats = (DateColumnStatistics) stats;
- fieldStats =
- new SimpleColStats(
- DateTimeUtils.toInternal(
- new
Date(dateStats.getMinimum().getTime())),
- DateTimeUtils.toInternal(
- new
Date(dateStats.getMaximum().getTime())),
- nullCount);
- break;
+ return new SimpleColStats(
+ DateTimeUtils.toInternal(new
Date(dateStats.getMinimum().getTime())),
+ DateTimeUtils.toInternal(new
Date(dateStats.getMaximum().getTime())),
+ nullCount);
case TIMESTAMP_WITHOUT_TIME_ZONE:
assertStatsClass(field, stats,
TimestampColumnStatistics.class);
TimestampColumnStatistics timestampStats =
(TimestampColumnStatistics) stats;
- fieldStats =
- new SimpleColStats(
-
Timestamp.fromSQLTimestamp(timestampStats.getMinimum()),
-
Timestamp.fromSQLTimestamp(timestampStats.getMaximum()),
- nullCount);
- break;
+ return new SimpleColStats(
+
Timestamp.fromSQLTimestamp(timestampStats.getMinimum()),
+
Timestamp.fromSQLTimestamp(timestampStats.getMaximum()),
+ nullCount);
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
assertStatsClass(field, stats,
TimestampColumnStatistics.class);
TimestampColumnStatistics timestampLtzStats =
(TimestampColumnStatistics) stats;
- fieldStats =
- legacyTimestampLtzType
- ? new SimpleColStats(
-
Timestamp.fromSQLTimestamp(timestampLtzStats.getMinimum()),
-
Timestamp.fromSQLTimestamp(timestampLtzStats.getMaximum()),
- nullCount)
- : new SimpleColStats(
- Timestamp.fromInstant(
-
timestampLtzStats.getMinimum().toInstant()),
- Timestamp.fromInstant(
-
timestampLtzStats.getMaximum().toInstant()),
- nullCount);
- break;
+ return legacyTimestampLtzType
+ ? new SimpleColStats(
+
Timestamp.fromSQLTimestamp(timestampLtzStats.getMinimum()),
+
Timestamp.fromSQLTimestamp(timestampLtzStats.getMaximum()),
+ nullCount)
+ : new SimpleColStats(
+
Timestamp.fromInstant(timestampLtzStats.getMinimum().toInstant()),
+
Timestamp.fromInstant(timestampLtzStats.getMaximum().toInstant()),
+ nullCount);
default:
- fieldStats = new SimpleColStats(null, null, nullCount);
+ return new SimpleColStats(null, null, nullCount);
}
- return collector.convert(fieldStats);
}
private void assertStatsClass(
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
index d1478830ac..74f82cbbbe 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
@@ -426,7 +426,8 @@ public class HiveMigrator implements Migrator {
HIDDEN_PATH_FILTER,
newDir,
rollback);
- return FileMetaUtils.commitFile(partitionRow, fileMetas);
+ return FileMetaUtils.commitFile(
+ partitionRow, paimonTable.coreOptions().bucket(),
fileMetas);
}
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
index b5b56ba1d5..e68407903d 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
@@ -137,6 +137,7 @@ trait ScanHelper extends Logging {
.withSnapshot(split.snapshotId())
.withPartition(split.partition())
.withBucket(split.bucket())
+ .withTotalBuckets(split.totalBuckets())
.withDataFiles(dataFiles.toList.asJava)
.rawConvertible(split.rawConvertible)
.withBucketPath(split.bucketPath)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index b83831ba21..fea0fd006d 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -271,6 +271,7 @@ trait PaimonCommand extends WithFileStoreTable with
ExpressionHelper with SQLCon
new CommitMessageImpl(
partition,
bucket,
+ files.head.totalBuckets,
new DataIncrement(
Collections.emptyList[DataFileMeta],
deletedDataFileMetas,
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index e974ad98b4..b8a3cccf08 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -311,6 +311,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
val commitMessage = new CommitMessageImpl(
dvIndexFileMaintainer.getPartition,
dvIndexFileMaintainer.getBucket,
+ null,
DataIncrement.emptyIncrement(),
CompactIncrement.emptyIncrement(),
new IndexIncrement(added.map(_.indexFile).asJava,
deleted.map(_.indexFile).asJava)
@@ -333,6 +334,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
new CommitMessageImpl(
partition,
bucket,
+ null,
DataIncrement.emptyIncrement(),
CompactIncrement.emptyIncrement(),
new IndexIncrement(added.map(_.indexFile()).asJava,
removed.map(_.indexFile()).asJava))
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVectors.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVectors.scala
index 1f908aeb90..5a76030fd4 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVectors.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVectors.scala
@@ -49,19 +49,20 @@ object SparkDeletionVectors {
root: Path,
pathFactory: FileStorePathFactory,
dataFilePathToMeta: Map[String, SparkDataFileMeta]): DataSplit = {
- val (dataFiles, deletionFiles) = sparkDeletionVectors
+ val (dataFiles, deletionFiles, totalBuckets) = sparkDeletionVectors
.relativePaths(pathFactory)
.map {
dataFile =>
val meta = dataFilePathToMeta(dataFile)
- (meta.dataFileMeta, meta.deletionFile.orNull)
+ (meta.dataFileMeta, meta.deletionFile.orNull, meta.totalBuckets)
}
- .unzip
+ .unzip3
DataSplit
.builder()
.withBucketPath(root + "/" + sparkDeletionVectors.partitionAndBucket)
.withPartition(SerializationUtils.deserializeBinaryRow(sparkDeletionVectors.partition))
.withBucket(sparkDeletionVectors.bucket)
+ .withTotalBuckets(totalBuckets.head)
.withDataFiles(dataFiles.toList.asJava)
.withDataDeletionFiles(deletionFiles.toList.asJava)
.rawConvertible(true)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala
index d370993882..f28a12824e 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala
@@ -66,6 +66,7 @@ case class SparkRemoveUnexistingFiles(
val message = new CommitMessageImpl(
reuse,
bucket,
+ table.coreOptions().bucket(),
new DataIncrement(
Collections.emptyList(),
new util.ArrayList[DataFileMeta](metaMap.values()),