This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e905a167b1 [core] Let short streaming job execute partition expire 
(#6642)
e905a167b1 is described below

commit e905a167b1748aab92b7e00d4b4abfcddcb9efe9
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Nov 20 18:22:48 2025 +0800

    [core] Let short streaming job execute partition expire (#6642)
---
 .../java/org/apache/paimon/operation/PartitionExpire.java  | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index 36d6976152..f1e4ae096e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -39,6 +39,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
 /** Expire partitions. */
@@ -56,8 +57,8 @@ public class PartitionExpire {
     private LocalDateTime lastCheck;
     private final PartitionExpireStrategy strategy;
     private final boolean endInputCheckPartitionExpire;
-    private int maxExpireNum;
-    private int expireBatchSize;
+    private final int maxExpireNum;
+    private final int expireBatchSize;
 
     public PartitionExpire(
             Duration expirationTime,
@@ -75,7 +76,14 @@ public class PartitionExpire {
         this.scan = scan;
         this.commit = commit;
         this.partitionHandler = partitionHandler;
-        this.lastCheck = LocalDateTime.now();
+        // Avoid the execution time of stream jobs from being too short and 
preventing partition
+        // expiration
+        long rndSeconds = 0;
+        long checkIntervalSeconds = checkInterval.toMillis() / 1000;
+        if (checkIntervalSeconds > 0) {
+            rndSeconds = 
ThreadLocalRandom.current().nextLong(checkIntervalSeconds);
+        }
+        this.lastCheck = LocalDateTime.now().minusSeconds(rndSeconds);
         this.endInputCheckPartitionExpire = endInputCheckPartitionExpire;
         this.maxExpireNum = maxExpireNum;
         this.expireBatchSize = expireBatchSize;

Reply via email to