This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a7e5fce9f [flink] Fix Add table type judgment for precommit-compact. 
(#5216)
7a7e5fce9f is described below

commit 7a7e5fce9f99340ed4852c6e898f8981ee40f36e
Author: HunterXHunter <[email protected]>
AuthorDate: Wed Mar 5 17:58:56 2025 +0800

    [flink] Fix Add table type judgment for precommit-compact. (#5216)
---
 .../src/main/java/org/apache/paimon/flink/sink/FlinkSink.java           | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index dd5f697264..9c5bf11221 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -242,7 +242,7 @@ public abstract class FlinkSink<T> implements Serializable {
             declareManagedMemory(written, 
options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY));
         }
 
-        if (options.get(PRECOMMIT_COMPACT)) {
+        if (!table.primaryKeys().isEmpty() && options.get(PRECOMMIT_COMPACT)) {
             SingleOutputStreamOperator<Committable> newWritten =
                     written.transform(
                                     "Changelog Compact Coordinator",

Reply via email to