This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new eb30920396 [core] Add more logs to BucketedAppendCompactManager for
easier debugging (#5663)
eb30920396 is described below
commit eb30920396dfaa505e649561351b3d7e6b0aac68
Author: tsreaper <[email protected]>
AuthorDate: Mon May 26 10:58:07 2025 +0800
[core] Add more logs to BucketedAppendCompactManager for easier debugging
(#5663)
---
.../apache/paimon/append/BucketedAppendCompactManager.java | 8 ++++++++
.../org/apache/paimon/flink/sink/StoreCompactOperator.java | 13 +++++++++++++
2 files changed, 21 insertions(+)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
index f5752f1987..d5c47c3b88 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
@@ -104,6 +104,10 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
return;
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Submit full compaction with these files {}", toCompact);
+ }
+
taskFuture =
executor.submit(
new FullCompactTask(
@@ -130,6 +134,10 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
Optional<List<DataFileMeta>> picked = pickCompactBefore();
if (picked.isPresent()) {
compacting = picked.get();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Submit normal compaction with these files {}",
compacting);
+ }
+
taskFuture =
executor.submit(
new AutoCompactTask(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 6c0fe5c13a..2074bc8d8a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -37,6 +37,8 @@ import
org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.LinkedHashSet;
@@ -54,6 +56,8 @@ import static
org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
*/
public class StoreCompactOperator extends PrepareCommitOperator<RowData,
Committable> {
+ private static final Logger LOG =
LoggerFactory.getLogger(StoreCompactOperator.class);
+
private final FileStoreTable table;
private final StoreSinkWrite.Provider storeSinkWriteProvider;
private final String initialCommitUser;
@@ -130,6 +134,15 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData, Committ
byte[] serializedFiles = record.getBinary(3);
List<DataFileMeta> files =
dataFileMetaSerializer.deserializeList(serializedFiles);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Store compact operator received record, snapshotId {},
partition {}, bucket {}, files {}",
+ snapshotId,
+ partition,
+ bucket,
+ files);
+ }
+
if (write.streamingMode()) {
write.notifyNewFiles(snapshotId, partition, bucket, files);
} else {