This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 344779856 [FLINK-37577][pipeline-connector/paimon] Enable
waitCompaction feature in PaimonWriter only when deletion-vectors.enabled
option enabled
344779856 is described below
commit 344779856a9c37e2fbdb018c5cb1d85597a3d240
Author: Kunni <[email protected]>
AuthorDate: Thu Apr 3 15:21:48 2025 +0800
[FLINK-37577][pipeline-connector/paimon] Enable waitCompaction feature in
PaimonWriter only when deletion-vectors.enabled option enabled
This closes #3971.
---
.../flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java | 14 ++++++++++++--
1 file changed, 12 insertions(+), 2 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
index 72cc8a6a5..bf660454e 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
@@ -107,7 +108,7 @@ public class PaimonWriter<InputT>
// avoid prepareCommit the same
checkpointId with the first
// round.
return entry.getValue()
- .prepareCommit(true,
lastCheckpointId + 1).stream()
+ .prepareCommit(false,
lastCheckpointId + 1).stream()
.map(
committable ->
MultiTableCommittable
@@ -155,13 +156,22 @@ public class PaimonWriter<InputT>
writes.computeIfAbsent(
tableId,
id -> {
+ boolean waitCompaction =
+ Boolean.parseBoolean(
+ table.options()
+ .getOrDefault(
+
CoreOptions.DELETION_VECTORS_ENABLED
+ .key(),
+
CoreOptions.DELETION_VECTORS_ENABLED
+
.defaultValue()
+
.toString()));
StoreSinkWriteImpl storeSinkWrite =
new StoreSinkWriteImpl(
table,
commitUser,
ioManager,
false,
- false,
+ waitCompaction,
true,
memoryPoolFactory,
metricGroup);