This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1798e7a3e1 [flink] Add log for partition mark done trigger (#6146)
1798e7a3e1 is described below
commit 1798e7a3e1c596f8e5bd6da13c88b817f2873898
Author: YeJunHao <[email protected]>
AuthorDate: Wed Aug 27 10:19:36 2025 +0800
[flink] Add log for partition mark done trigger (#6146)
---
.../sink/listener/PartitionMarkDoneTrigger.java | 29 ++++++++++++++++++++--
1 file changed, 27 insertions(+), 2 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTrigger.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTrigger.java
index 439c4db3fd..0462582784 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTrigger.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTrigger.java
@@ -132,19 +132,26 @@ public class PartitionMarkDoneTrigger {
if (timeInterval == null || idleTime == null) {
return Collections.emptyList();
}
+ LOG.debug(
+ "End input is true and markDoneWhenEndInput is enabled, mark
all pending partitions done: {}",
+ String.join(",", pendingPartitions.keySet()));
List<String> needDone = new ArrayList<>();
Iterator<Map.Entry<String, Long>> iter =
pendingPartitions.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, Long> entry = iter.next();
String partition = entry.getKey();
-
long lastUpdateTime = entry.getValue();
- long partitionStartTime;
+ LOG.debug(
+ "Partition {} is in progress, last update time: {}",
+ partition,
+ entry.getValue());
+ long partitionStartTime;
Optional<LocalDateTime> partitionLocalDateTimeOpt =
extractDateTime(partition);
// skip illegal partition
if (!partitionLocalDateTimeOpt.isPresent()) {
+ LOG.debug("Partition {} is illegal, skip it", partition);
iter.remove();
continue;
}
@@ -167,12 +174,30 @@ public class PartitionMarkDoneTrigger {
}
long partitionEndTime = partitionStartTime + timeInterval;
lastUpdateTime = Math.max(lastUpdateTime, partitionEndTime);
+ LOG.debug(
+ "Partition {} start time: {}, end time: {}, last update
time after compare: {}",
+ partition,
+ partitionStartTime,
+ partitionEndTime,
+ lastUpdateTime);
if (currentTimeMillis - lastUpdateTime > idleTime) {
+ LOG.debug(
+ "Partition {} is idle for {} greater than idleTime {},
mark it done",
+ partition,
+ currentTimeMillis - lastUpdateTime,
+ idleTime);
needDone.add(partition);
iter.remove();
+ } else {
+ LOG.debug(
+ "Partition {} is idle for {} less than idleTime {}, no
not mark it done",
+ partition,
+ currentTimeMillis - lastUpdateTime,
+ idleTime);
}
}
+ LOG.debug("Need done partitions: {}", String.join(",", needDone));
return needDone;
}