This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8dee2a013a [cdc] Support postpone bucket tables in Paimon CDC (#5627)
8dee2a013a is described below
commit 8dee2a013a9d253a934f44111e302fb1d795db45
Author: tsreaper <[email protected]>
AuthorDate: Tue May 20 21:34:22 2025 +0800
[cdc] Support postpone bucket tables in Paimon CDC (#5627)
---
.../org/apache/paimon/AppendOnlyFileStore.java | 8 +++-
.../src/main/java/org/apache/paimon/FileStore.java | 5 +-
.../java/org/apache/paimon/KeyValueFileStore.java | 11 +++--
.../postpone/PostponeBucketFileStoreWrite.java | 25 ++++++++--
.../paimon/postpone/PostponeBucketWriter.java | 28 +++++++++---
.../paimon/privilege/PrivilegedFileStore.java | 7 ++-
.../paimon/privilege/PrivilegedFileStoreTable.java | 9 +++-
.../paimon/table/AppendOnlyFileStoreTable.java | 10 ++--
.../paimon/table/DelegatedFileStoreTable.java | 9 +++-
.../org/apache/paimon/table/FileStoreTable.java | 7 ++-
.../paimon/table/PrimaryKeyFileStoreTable.java | 8 ++--
.../apache/paimon/table/object/ObjectTable.java | 7 ++-
paimon-flink/paimon-flink-cdc/pom.xml | 7 +--
.../sink/cdc/CdcPostponeBucketChannelComputer.java | 53 ++++++++++++++++++++++
.../sink/cdc/CdcRecordStoreMultiWriteOperator.java | 7 ++-
.../paimon/flink/sink/cdc/CdcSinkBuilder.java | 12 +++++
.../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java | 10 ++++
.../sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java | 27 +++++++++++
.../sink/cdc/FlinkCdcSyncTableSinkITCase.java | 20 ++++++++
.../sink/MultiTablesStoreCompactOperator.java | 1 +
.../paimon/flink/sink/NoopStoreSinkWriteState.java | 9 +++-
.../paimon/flink/sink/RowAppendTableSink.java | 10 ++--
.../paimon/flink/sink/StoreCompactOperator.java | 1 +
.../paimon/flink/sink/StoreSinkWriteImpl.java | 3 +-
.../paimon/flink/sink/StoreSinkWriteState.java | 2 +
.../paimon/flink/sink/StoreSinkWriteStateImpl.java | 9 ++++
.../paimon/flink/sink/TableWriteOperator.java | 11 +++--
27 files changed, 266 insertions(+), 50 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index b901681050..05faeb5fc9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -36,6 +36,8 @@ import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.types.RowType;
+import javax.annotation.Nullable;
+
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
@@ -89,12 +91,14 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
@Override
public BaseAppendFileStoreWrite newWrite(String commitUser) {
- return newWrite(commitUser, null);
+ return newWrite(commitUser, null, null);
}
@Override
public BaseAppendFileStoreWrite newWrite(
- String commitUser, ManifestCacheFilter manifestFilter) {
+ String commitUser,
+ @Nullable ManifestCacheFilter manifestFilter,
+ @Nullable Integer writeId) {
DeletionVectorsMaintainer.Factory dvMaintainerFactory =
options.deletionVectorsEnabled()
?
DeletionVectorsMaintainer.factory(newIndexFileHandler())
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 34e04cfabb..12fc60a383 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -88,7 +88,10 @@ public interface FileStore<T> {
FileStoreWrite<T> newWrite(String commitUser);
- FileStoreWrite<T> newWrite(String commitUser, ManifestCacheFilter
manifestFilter);
+ FileStoreWrite<T> newWrite(
+ String commitUser,
+ @Nullable ManifestCacheFilter manifestFilter,
+ @Nullable Integer writeId);
FileStoreCommit newCommit(String commitUser, FileStoreTable table);
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 57f8db5f2d..81039da369 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -46,6 +46,8 @@ import org.apache.paimon.utils.KeyComparatorSupplier;
import org.apache.paimon.utils.UserDefinedSeqComparator;
import org.apache.paimon.utils.ValueEqualiserSupplier;
+import javax.annotation.Nullable;
+
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
@@ -154,12 +156,14 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
@Override
public AbstractFileStoreWrite<KeyValue> newWrite(String commitUser) {
- return newWrite(commitUser, null);
+ return newWrite(commitUser, null, null);
}
@Override
public AbstractFileStoreWrite<KeyValue> newWrite(
- String commitUser, ManifestCacheFilter manifestFilter) {
+ String commitUser,
+ @Nullable ManifestCacheFilter manifestFilter,
+ @Nullable Integer writeId) {
IndexMaintainer.Factory<KeyValue> indexFactory = null;
if (bucketMode() == BucketMode.HASH_DYNAMIC) {
indexFactory = new
HashIndexMaintainer.Factory(newIndexFileHandler());
@@ -182,7 +186,8 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
snapshotManager(),
newScan(ScanType.FOR_WRITE).withManifestCacheFilter(manifestFilter),
options,
- tableName);
+ tableName,
+ writeId);
} else {
return new KeyValueFileStoreWrite(
fileIO,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
index b52bd0b812..0bc6c48470 100644
---
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
@@ -66,22 +66,28 @@ public class PostponeBucketFileStoreWrite extends
AbstractFileStoreWrite<KeyValu
SnapshotManager snapshotManager,
FileStoreScan scan,
CoreOptions options,
- String tableName) {
+ String tableName,
+ @Nullable Integer writeId) {
super(snapshotManager, scan, null, null, tableName, options,
partitionType);
Options newOptions = new Options(options.toMap());
// use avro for postpone bucket
newOptions.set(CoreOptions.FILE_FORMAT, "avro");
newOptions.set(CoreOptions.METADATA_STATS_MODE, "none");
- // each writer should have its unique prefix, so files from the same
writer can be consumed
- // by the same compaction reader to keep the input order
+ // Each writer should have its unique prefix, so files from the same
writer can be consumed
+ // by the same compaction reader to keep the input order.
+ // Also note that, for Paimon CDC, this object might be created
multiple times in the same
+ // job, however the object will always stay in the same thread, so we
use hash of thread
+ // name as the identifier.
newOptions.set(
CoreOptions.DATA_FILE_PREFIX,
String.format(
"%s-u-%s-s-%d-w-",
options.dataFilePrefix(),
commitUser,
-
ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE)));
+ writeId == null
+ ?
ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE)
+ : writeId));
this.options = new CoreOptions(newOptions);
FileFormat fileFormat = fileFormat(this.options);
@@ -107,6 +113,12 @@ public class PostponeBucketFileStoreWrite extends
AbstractFileStoreWrite<KeyValu
withIgnorePreviousFiles(true);
}
+ @Override
+ public void withIgnorePreviousFiles(boolean ignorePrevious) {
+ // see comments in constructor
+ super.withIgnorePreviousFiles(true);
+ }
+
@Override
protected PostponeBucketWriter createWriter(
BinaryRow partition,
@@ -117,9 +129,12 @@ public class PostponeBucketFileStoreWrite extends
AbstractFileStoreWrite<KeyValu
ExecutorService compactExecutor,
@Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
Preconditions.checkArgument(bucket == BucketMode.POSTPONE_BUCKET);
+ Preconditions.checkArgument(
+ restoreFiles.isEmpty(),
+ "Postpone bucket writers should not restore previous files.
This is unexpected.");
KeyValueFileWriterFactory writerFactory =
writerFactoryBuilder.build(partition, bucket, options);
- return new PostponeBucketWriter(writerFactory);
+ return new PostponeBucketWriter(writerFactory, restoreIncrement);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java
index 7bc7cafbf5..4eee650425 100644
---
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java
@@ -28,6 +28,9 @@ import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.RecordWriter;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -36,10 +39,18 @@ import java.util.List;
public class PostponeBucketWriter implements RecordWriter<KeyValue> {
private final KeyValueFileWriterFactory writerFactory;
+ private final List<DataFileMeta> files;
+
private RollingFileWriter<KeyValue, DataFileMeta> writer;
- public PostponeBucketWriter(KeyValueFileWriterFactory writerFactory) {
+ public PostponeBucketWriter(
+ KeyValueFileWriterFactory writerFactory, @Nullable CommitIncrement
restoreIncrement) {
this.writerFactory = writerFactory;
+ this.files = new ArrayList<>();
+ if (restoreIncrement != null) {
+ files.addAll(restoreIncrement.newFilesIncrement().newFiles());
+ }
+
this.writer = null;
}
@@ -55,12 +66,13 @@ public class PostponeBucketWriter implements
RecordWriter<KeyValue> {
public void compact(boolean fullCompaction) throws Exception {}
@Override
- public void addNewFiles(List<DataFileMeta> files) {}
+ public void addNewFiles(List<DataFileMeta> files) {
+ this.files.addAll(files);
+ }
@Override
public Collection<DataFileMeta> dataFiles() {
- // this method is only for checkpointing, while this writer does not
need any checkpoint
- return Collections.emptyList();
+ return new ArrayList<>(files);
}
@Override
@@ -71,14 +83,16 @@ public class PostponeBucketWriter implements
RecordWriter<KeyValue> {
@Override
public CommitIncrement prepareCommit(boolean waitCompaction) throws
Exception {
- List<DataFileMeta> newFiles = Collections.emptyList();
if (writer != null) {
writer.close();
- newFiles = writer.result();
+ files.addAll(writer.result());
writer = null;
}
+
+ List<DataFileMeta> result = new ArrayList<>(files);
+ files.clear();
return new CommitIncrement(
- new DataIncrement(newFiles, Collections.emptyList(),
Collections.emptyList()),
+ new DataIncrement(result, Collections.emptyList(),
Collections.emptyList()),
CompactIncrement.emptyIncrement(),
null);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index a6084350ba..c30b4a94be 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -147,9 +147,12 @@ public class PrivilegedFileStore<T> implements
FileStore<T> {
}
@Override
- public FileStoreWrite<T> newWrite(String commitUser, ManifestCacheFilter
manifestFilter) {
+ public FileStoreWrite<T> newWrite(
+ String commitUser,
+ @Nullable ManifestCacheFilter manifestFilter,
+ @Nullable Integer writeId) {
privilegeChecker.assertCanInsert(identifier);
- return wrapped.newWrite(commitUser, manifestFilter);
+ return wrapped.newWrite(commitUser, manifestFilter, writeId);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
index 716c243289..26e9ec1ea1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -41,6 +41,8 @@ import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
+import javax.annotation.Nullable;
+
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
@@ -228,9 +230,12 @@ public class PrivilegedFileStoreTable extends
DelegatedFileStoreTable {
}
@Override
- public TableWriteImpl<?> newWrite(String commitUser, ManifestCacheFilter
manifestFilter) {
+ public TableWriteImpl<?> newWrite(
+ String commitUser,
+ @Nullable ManifestCacheFilter manifestFilter,
+ @Nullable Integer writeId) {
privilegeChecker.assertCanInsert(identifier);
- return wrapped.newWrite(commitUser, manifestFilter);
+ return wrapped.newWrite(commitUser, manifestFilter, writeId);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 69a024db9c..240163cf3d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -42,6 +42,8 @@ import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.function.BiConsumer;
@@ -125,13 +127,15 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
@Override
public TableWriteImpl<InternalRow> newWrite(String commitUser) {
- return newWrite(commitUser, null);
+ return newWrite(commitUser, null, null);
}
@Override
public TableWriteImpl<InternalRow> newWrite(
- String commitUser, ManifestCacheFilter manifestFilter) {
- BaseAppendFileStoreWrite writer = store().newWrite(commitUser,
manifestFilter);
+ String commitUser,
+ @Nullable ManifestCacheFilter manifestFilter,
+ @Nullable Integer writeId) {
+ BaseAppendFileStoreWrite writer = store().newWrite(commitUser,
manifestFilter, writeId);
return new TableWriteImpl<>(
rowType(),
writer,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index dc4aeec073..112bb21dc2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -49,6 +49,8 @@ import org.apache.paimon.utils.TagManager;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import javax.annotation.Nullable;
+
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
@@ -292,8 +294,11 @@ public abstract class DelegatedFileStoreTable implements
FileStoreTable {
}
@Override
- public TableWriteImpl<?> newWrite(String commitUser, ManifestCacheFilter
manifestFilter) {
- return wrapped.newWrite(commitUser, manifestFilter);
+ public TableWriteImpl<?> newWrite(
+ String commitUser,
+ @Nullable ManifestCacheFilter manifestFilter,
+ @Nullable Integer writeId) {
+ return wrapped.newWrite(commitUser, manifestFilter, writeId);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index 92b0fc7b6a..f3af21f266 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -41,6 +41,8 @@ import org.apache.paimon.utils.TagManager;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import javax.annotation.Nullable;
+
import java.time.Duration;
import java.util.List;
import java.util.Map;
@@ -111,7 +113,10 @@ public interface FileStoreTable extends DataTable {
@Override
TableWriteImpl<?> newWrite(String commitUser);
- TableWriteImpl<?> newWrite(String commitUser, ManifestCacheFilter
manifestFilter);
+ TableWriteImpl<?> newWrite(
+ String commitUser,
+ @Nullable ManifestCacheFilter manifestFilter,
+ @Nullable Integer writeId);
@Override
TableCommitImpl newCommit(String commitUser);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index c375bb2900..1a22822779 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -152,16 +152,18 @@ public class PrimaryKeyFileStoreTable extends
AbstractFileStoreTable {
@Override
public TableWriteImpl<KeyValue> newWrite(String commitUser) {
- return newWrite(commitUser, null);
+ return newWrite(commitUser, null, null);
}
@Override
public TableWriteImpl<KeyValue> newWrite(
- String commitUser, ManifestCacheFilter manifestFilter) {
+ String commitUser,
+ @Nullable ManifestCacheFilter manifestFilter,
+ @Nullable Integer writeId) {
KeyValue kv = new KeyValue();
return new TableWriteImpl<>(
rowType(),
- store().newWrite(commitUser, manifestFilter),
+ store().newWrite(commitUser, manifestFilter, writeId),
createRowKeyExtractor(),
(record, rowKind) ->
kv.replace(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
index 4013fede49..d63d547656 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
@@ -30,6 +30,8 @@ import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import javax.annotation.Nullable;
+
import java.util.HashSet;
import java.util.Map;
@@ -157,7 +159,10 @@ public interface ObjectTable extends FileStoreTable {
}
@Override
- public TableWriteImpl<?> newWrite(String commitUser,
ManifestCacheFilter manifestFilter) {
+ public TableWriteImpl<?> newWrite(
+ String commitUser,
+ @Nullable ManifestCacheFilter manifestFilter,
+ @Nullable Integer writeId) {
throw new UnsupportedOperationException("Object table does not
support Write.");
}
diff --git a/paimon-flink/paimon-flink-cdc/pom.xml
b/paimon-flink/paimon-flink-cdc/pom.xml
index 182637513e..b489aaf615 100644
--- a/paimon-flink/paimon-flink-cdc/pom.xml
+++ b/paimon-flink/paimon-flink-cdc/pom.xml
@@ -59,12 +59,7 @@ under the License.
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-flink-common</artifactId>
<version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>*</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
+ <scope>provided</scope>
</dependency>
<!-- Flink dependencies -->
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcPostponeBucketChannelComputer.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcPostponeBucketChannelComputer.java
new file mode 100644
index 0000000000..5dacef7cad
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcPostponeBucketChannelComputer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.ChannelComputer;
+import org.apache.paimon.table.sink.KeyAndBucketExtractor;
+
+/**
+ * {@link ChannelComputer} for writing {@link CdcRecord}s into postpone bucket
tables. Records with
+ * same primary keys are distributed to the same subtask.
+ */
+public class CdcPostponeBucketChannelComputer implements
ChannelComputer<CdcRecord> {
+
+ private final TableSchema schema;
+
+ private transient int numChannels;
+ private transient KeyAndBucketExtractor<CdcRecord> extractor;
+
+ public CdcPostponeBucketChannelComputer(TableSchema schema) {
+ this.schema = schema;
+ }
+
+ @Override
+ public void setup(int numChannels) {
+ this.numChannels = numChannels;
+ this.extractor = new CdcRecordKeyAndBucketExtractor(schema);
+ }
+
+ @Override
+ public int channel(CdcRecord record) {
+ extractor.setRecord(record);
+ return Math.abs(
+ (extractor.partition().hashCode() +
extractor.trimmedPrimaryKey().hashCode())
+ % numChannels);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
index fb59878b8b..ea6008d872 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
@@ -30,6 +30,7 @@ import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.flink.sink.StoreSinkWriteState;
import org.apache.paimon.flink.sink.StoreSinkWriteStateImpl;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.options.Options;
@@ -107,7 +108,11 @@ public class CdcRecordStoreMultiWriteOperator
context, "commit_user_state", String.class,
initialCommitUser);
// TODO: should use CdcRecordMultiChannelComputer to filter
- state = new StoreSinkWriteStateImpl(context, (tableName, partition,
bucket) -> true);
+ state =
+ new StoreSinkWriteStateImpl(
+
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()),
+ context,
+ (tableName, partition, bucket) -> true);
tables = new HashMap<>();
writes = new HashMap<>();
compactExecutor =
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
index 9297670d3a..e4518d3a26 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
@@ -132,6 +132,8 @@ public class CdcSinkBuilder<T> {
case HASH_DYNAMIC:
return new CdcDynamicBucketSink((FileStoreTable) table)
.build(converted, parallelism);
+ case POSTPONE_MODE:
+ return buildForPostponeBucket(converted);
case BUCKET_UNAWARE:
return buildForUnawareBucket(converted);
default:
@@ -146,6 +148,16 @@ public class CdcSinkBuilder<T> {
return new CdcFixedBucketSink(dataTable).sinkFrom(partitioned);
}
+ private DataStreamSink<?> buildForPostponeBucket(DataStream<CdcRecord>
parsed) {
+ FileStoreTable dataTable = (FileStoreTable) table;
+ DataStream<CdcRecord> partitioned =
+ partition(
+ parsed,
+ new
CdcPostponeBucketChannelComputer(dataTable.schema()),
+ parallelism);
+ return new CdcFixedBucketSink(dataTable).sinkFrom(partitioned);
+ }
+
private DataStreamSink<?> buildForUnawareBucket(DataStream<CdcRecord>
parsed) {
FileStoreTable dataTable = (FileStoreTable) table;
// rebalance it to make sure schema change work to avoid infinite loop
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index a6b272eaf8..b53ec9c80f 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -204,6 +204,13 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
new CdcFixedBucketSink(table).sinkFrom(partitioned);
}
+ private void buildForPostponeBucket(FileStoreTable table,
DataStream<CdcRecord> parsed) {
+ DataStream<CdcRecord> partitioned =
+ partition(
+ parsed, new
CdcPostponeBucketChannelComputer(table.schema()), parallelism);
+ new CdcFixedBucketSink(table).sinkFrom(partitioned);
+ }
+
private void buildForUnawareBucket(FileStoreTable table,
DataStream<CdcRecord> parsed) {
// rebalance it to make sure schema change work to avoid infinite loop
new CdcAppendTableSink(table,
parallelism).sinkFrom(parsed.rebalance());
@@ -251,6 +258,9 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
case HASH_DYNAMIC:
new CdcDynamicBucketSink(table).build(converted,
parallelism);
break;
+ case POSTPONE_MODE:
+ buildForPostponeBucket(table, converted);
+ break;
case BUCKET_UNAWARE:
buildForUnawareBucket(table, converted);
break;
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
index 484670e23c..4db53f7f80 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
@@ -23,6 +23,7 @@ import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.flink.action.CompactAction;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.FileIO;
@@ -35,6 +36,7 @@ import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.source.ReadBuilder;
@@ -53,6 +55,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
@@ -80,6 +83,12 @@ public class FlinkCdcSyncDatabaseSinkITCase extends
AbstractTestBase {
innerTestRandomCdcEvents(() -> -1, false);
}
+ @Test
+ @Timeout(120)
+ public void testRandomCdcEventsPostponeBucket() throws Exception {
+ innerTestRandomCdcEvents(() -> BucketMode.POSTPONE_BUCKET, false);
+ }
+
@Test
@Timeout(120)
public void testRandomCdcEventsUnawareBucket() throws Exception {
@@ -183,6 +192,24 @@ public class FlinkCdcSyncDatabaseSinkITCase extends
AbstractTestBase {
// no failure when checking results
FailingFileIO.reset(failingName, 0, 1);
+
+ for (int i = 0; i < numTables; i++) {
+ FileStoreTable table =
fileStoreTables.get(i).copyWithLatestSchema();
+ if (table.coreOptions().bucket() == BucketMode.POSTPONE_BUCKET) {
+ // postpone bucket tables must be compacted, so data can be
consumed
+ StreamExecutionEnvironment compactEnv =
+
streamExecutionEnvironmentBuilder().batchMode().parallelism(2).build();
+ CompactAction compactAction =
+ new CompactAction(
+ DATABASE_NAME,
+ table.name(),
+ catalogOptions.toMap(),
+ new HashMap<>());
+ compactAction.withStreamExecutionEnvironment(compactEnv);
+ compactAction.run();
+ }
+ }
+
for (int i = 0; i < numTables; i++) {
FileStoreTable table =
fileStoreTables.get(i).copyWithLatestSchema();
SchemaManager schemaManager = new SchemaManager(table.fileIO(),
table.location());
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
index 9fccaac992..ea02652971 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
@@ -24,6 +24,7 @@ import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.flink.action.CompactAction;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -35,6 +36,7 @@ import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.source.ReadBuilder;
@@ -53,6 +55,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
@@ -77,6 +80,12 @@ public class FlinkCdcSyncTableSinkITCase extends
AbstractTestBase {
innerTestRandomCdcEvents(-1, false, false);
}
+ @Test
+ @Timeout(120)
+ public void testRandomCdcEventsPostponeBucket() throws Exception {
+ innerTestRandomCdcEvents(BucketMode.POSTPONE_BUCKET, false, false);
+ }
+
@Disabled
@Test
@Timeout(120)
@@ -177,6 +186,17 @@ public class FlinkCdcSyncTableSinkITCase extends
AbstractTestBase {
// no failure when checking results
FailingFileIO.reset(failingName, 0, 1);
+ if (numBucket == BucketMode.POSTPONE_BUCKET) {
+ // postpone bucket tables must be compacted, so data can be
consumed
+ StreamExecutionEnvironment compactEnv =
+
streamExecutionEnvironmentBuilder().batchMode().parallelism(3).build();
+ CompactAction compactAction =
+ new CompactAction(
+ DATABASE_NAME, TABLE_NAME, catalogOptions.toMap(),
new HashMap<>());
+ compactAction.withStreamExecutionEnvironment(compactEnv);
+ compactAction.run();
+ }
+
table = table.copyWithLatestSchema();
SchemaManager schemaManager = new SchemaManager(table.fileIO(),
table.location());
TableSchema schema = schemaManager.latest().get();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index 71c41d2f6b..e1eb9ac7d6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -113,6 +113,7 @@ public class MultiTablesStoreCompactOperator
state =
new StoreSinkWriteStateImpl(
+
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()),
context,
(tableName, partition, bucket) ->
ChannelComputer.select(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoopStoreSinkWriteState.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoopStoreSinkWriteState.java
index f1974da321..7e994e2015 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoopStoreSinkWriteState.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoopStoreSinkWriteState.java
@@ -28,9 +28,11 @@ import java.util.List;
*/
public class NoopStoreSinkWriteState implements StoreSinkWriteState {
+ private final int subtaskId;
private final StateValueFilter stateValueFilter;
- public NoopStoreSinkWriteState(StateValueFilter stateValueFilter) {
+ public NoopStoreSinkWriteState(int subtaskId, StateValueFilter
stateValueFilter) {
+ this.subtaskId = subtaskId;
this.stateValueFilter = stateValueFilter;
}
@@ -49,4 +51,9 @@ public class NoopStoreSinkWriteState implements
StoreSinkWriteState {
@Override
public void snapshotState() throws Exception {}
+
+ @Override
+ public int getSubtaskId() {
+ return subtaskId;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowAppendTableSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowAppendTableSink.java
index b6f2381c9f..69a339a411 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowAppendTableSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowAppendTableSink.java
@@ -45,27 +45,27 @@ public class RowAppendTableSink extends
AppendTableSink<InternalRow> {
return new RowDataStoreWriteOperator.Factory(
table, logSinkFunction, writeProvider, commitUser) {
@Override
+ @SuppressWarnings("unchecked, rawtypes")
public StreamOperator
createStreamOperator(StreamOperatorParameters parameters) {
return new RowDataStoreWriteOperator(
parameters, table, logSinkFunction, writeProvider,
commitUser) {
@Override
protected StoreSinkWriteState createState(
+ int subtaskId,
StateInitializationContext context,
StoreSinkWriteState.StateValueFilter stateFilter)
throws Exception {
// No conflicts will occur in append only unaware
bucket writer, so no state
- // is
- // needed.
- return new NoopStoreSinkWriteState(stateFilter);
+ // is needed.
+ return new NoopStoreSinkWriteState(subtaskId,
stateFilter);
}
@Override
protected String getCommitUser(StateInitializationContext
context)
throws Exception {
// No conflicts will occur in append only unaware
bucket writer, so
- // commitUser does
- // not matter.
+ // commitUser does not matter.
return commitUser;
}
};
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 99ad6c0c00..6c0fe5c13a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -93,6 +93,7 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData, Committ
state =
new StoreSinkWriteStateImpl(
+
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()),
context,
(tableName, partition, bucket) ->
ChannelComputer.select(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index ef80428209..96ebf1b662 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -145,7 +145,8 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
table.newWrite(
commitUser,
(part, bucket) ->
-
state.stateValueFilter().filter(table.name(), part, bucket))
+
state.stateValueFilter().filter(table.name(), part, bucket),
+ state.getSubtaskId())
.withIOManager(paimonIOManager)
.withIgnorePreviousFiles(ignorePreviousFiles)
.withExecutionMode(isStreamingMode)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
index 8626a01a66..e7b3bcf254 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
@@ -39,6 +39,8 @@ public interface StoreSinkWriteState {
void snapshotState() throws Exception;
+ int getSubtaskId();
+
/**
* A state value for {@link StoreSinkWrite}. All state values should be
given a partition and a
* bucket so that they can be redistributed once the sink parallelism is
changed.
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteStateImpl.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteStateImpl.java
index a01cbcb68d..7f4379c241 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteStateImpl.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteStateImpl.java
@@ -46,6 +46,7 @@ import java.util.Map;
*/
public class StoreSinkWriteStateImpl implements StoreSinkWriteState {
+ private final int subtaskId;
private final StoreSinkWriteState.StateValueFilter stateValueFilter;
private final ListState<Tuple5<String, String, byte[], Integer, byte[]>>
listState;
@@ -53,10 +54,13 @@ public class StoreSinkWriteStateImpl implements
StoreSinkWriteState {
@SuppressWarnings("unchecked")
public StoreSinkWriteStateImpl(
+ int subtaskId,
StateInitializationContext context,
StoreSinkWriteState.StateValueFilter stateValueFilter)
throws Exception {
+ this.subtaskId = subtaskId;
this.stateValueFilter = stateValueFilter;
+
TupleSerializer<Tuple5<String, String, byte[], Integer, byte[]>>
listStateSerializer =
new TupleSerializer<>(
(Class<Tuple5<String, String, byte[], Integer,
byte[]>>)
@@ -120,4 +124,9 @@ public class StoreSinkWriteStateImpl implements
StoreSinkWriteState {
}
listState.update(list);
}
+
+ @Override
+ public int getSubtaskId() {
+ return subtaskId;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
index ec69ce59f6..127acc97c8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
@@ -64,16 +64,17 @@ public abstract class TableWriteOperator<IN> extends
PrepareCommitOperator<IN, C
boolean containLogSystem = containLogSystem();
int numTasks =
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
+ int subtaskId =
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
StateValueFilter stateFilter =
(tableName, partition, bucket) -> {
int task =
containLogSystem
? ChannelComputer.select(bucket, numTasks)
: ChannelComputer.select(partition,
bucket, numTasks);
- return task ==
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
+ return task == subtaskId;
};
- state = createState(context, stateFilter);
+ state = createState(subtaskId, context, stateFilter);
write =
storeSinkWriteProvider.provide(
table,
@@ -85,9 +86,11 @@ public abstract class TableWriteOperator<IN> extends
PrepareCommitOperator<IN, C
}
protected StoreSinkWriteState createState(
- StateInitializationContext context,
StoreSinkWriteState.StateValueFilter stateFilter)
+ int subtaskId,
+ StateInitializationContext context,
+ StoreSinkWriteState.StateValueFilter stateFilter)
throws Exception {
- return new StoreSinkWriteStateImpl(context, stateFilter);
+ return new StoreSinkWriteStateImpl(subtaskId, context, stateFilter);
}
protected String getCommitUser(StateInitializationContext context) throws
Exception {