This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e905a167b1 [core] Let short streaming job execute partition expire
(#6642)
e905a167b1 is described below
commit e905a167b1748aab92b7e00d4b4abfcddcb9efe9
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Nov 20 18:22:48 2025 +0800
[core] Let short streaming job execute partition expire (#6642)
---
.../java/org/apache/paimon/operation/PartitionExpire.java | 14 +++++++++++---
1 file changed, 11 insertions(+), 3 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index 36d6976152..f1e4ae096e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -39,6 +39,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
/** Expire partitions. */
@@ -56,8 +57,8 @@ public class PartitionExpire {
private LocalDateTime lastCheck;
private final PartitionExpireStrategy strategy;
private final boolean endInputCheckPartitionExpire;
- private int maxExpireNum;
- private int expireBatchSize;
+ private final int maxExpireNum;
+ private final int expireBatchSize;
public PartitionExpire(
Duration expirationTime,
@@ -75,7 +76,14 @@ public class PartitionExpire {
this.scan = scan;
this.commit = commit;
this.partitionHandler = partitionHandler;
- this.lastCheck = LocalDateTime.now();
+ // Avoid the execution time of stream jobs from being too short and
preventing partition
+ // expiration
+ long rndSeconds = 0;
+ long checkIntervalSeconds = checkInterval.toMillis() / 1000;
+ if (checkIntervalSeconds > 0) {
+ rndSeconds =
ThreadLocalRandom.current().nextLong(checkIntervalSeconds);
+ }
+ this.lastCheck = LocalDateTime.now().minusSeconds(rndSeconds);
this.endInputCheckPartitionExpire = endInputCheckPartitionExpire;
this.maxExpireNum = maxExpireNum;
this.expireBatchSize = expireBatchSize;