This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new eb30920396 [core] Add more logs to BucketedAppendCompactManager for 
easier debugging (#5663)
eb30920396 is described below

commit eb30920396dfaa505e649561351b3d7e6b0aac68
Author: tsreaper <[email protected]>
AuthorDate: Mon May 26 10:58:07 2025 +0800

    [core] Add more logs to BucketedAppendCompactManager for easier debugging 
(#5663)
---
 .../apache/paimon/append/BucketedAppendCompactManager.java  |  8 ++++++++
 .../org/apache/paimon/flink/sink/StoreCompactOperator.java  | 13 +++++++++++++
 2 files changed, 21 insertions(+)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
index f5752f1987..d5c47c3b88 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
@@ -104,6 +104,10 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
             return;
         }
 
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Submit full compaction with these files {}", toCompact);
+        }
+
         taskFuture =
                 executor.submit(
                         new FullCompactTask(
@@ -130,6 +134,10 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
         Optional<List<DataFileMeta>> picked = pickCompactBefore();
         if (picked.isPresent()) {
             compacting = picked.get();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Submit normal compaction with these files {}", 
compacting);
+            }
+
             taskFuture =
                     executor.submit(
                             new AutoCompactTask(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 6c0fe5c13a..2074bc8d8a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -37,6 +37,8 @@ import 
org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.data.RowData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.LinkedHashSet;
@@ -54,6 +56,8 @@ import static 
org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
  */
 public class StoreCompactOperator extends PrepareCommitOperator<RowData, 
Committable> {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(StoreCompactOperator.class);
+
     private final FileStoreTable table;
     private final StoreSinkWrite.Provider storeSinkWriteProvider;
     private final String initialCommitUser;
@@ -130,6 +134,15 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData, Committ
         byte[] serializedFiles = record.getBinary(3);
         List<DataFileMeta> files = 
dataFileMetaSerializer.deserializeList(serializedFiles);
 
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                    "Store compact operator received record, snapshotId {}, 
partition {}, bucket {}, files {}",
+                    snapshotId,
+                    partition,
+                    bucket,
+                    files);
+        }
+
         if (write.streamingMode()) {
             write.notifyNewFiles(snapshotId, partition, bucket, files);
         } else {

Reply via email to