This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new df0a8c8d60f [To dev/1.3] Pipe: Fixed the interval manager in general 
model (#16268)
df0a8c8d60f is described below

commit df0a8c8d60f2c25db56f65f326fe9d9081b19237
Author: Caideyipi <[email protected]>
AuthorDate: Wed Aug 27 16:23:54 2025 +0800

    [To dev/1.3] Pipe: Fixed the interval manager in general model (#16268)
    
    * fix
    
    * fix
---
 .../storageengine/dataregion/compaction/tool/Interval.java  |  2 +-
 .../agent/task/progress/interval/PipeCommitInterval.java    |  6 +++---
 .../iotdb/commons/pipe/datastructure/interval/Interval.java |  6 +++---
 .../pipe/datastructure/interval/IntervalManager.java        | 13 +++++++++----
 4 files changed, 16 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/Interval.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/Interval.java
index 8f2a2da301d..539a1593af7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/Interval.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/Interval.java
@@ -23,7 +23,7 @@ public class Interval {
   private long start;
   private long end;
 
-  public Interval(long start, long end) {
+  public Interval(final long start, final long end) {
     this.start = start;
     this.end = end;
     if (end < start) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
index 7ee8008c0e0..46a3d3e2a86 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
@@ -33,12 +33,12 @@ public class PipeCommitInterval extends 
Interval<PipeCommitInterval> {
   private final PipeTaskMeta pipeTaskMeta;
 
   public PipeCommitInterval(
-      final long s,
-      final long e,
+      final long start,
+      final long end,
       final ProgressIndex currentIndex,
       final List<Runnable> onCommittedHooks,
       final PipeTaskMeta pipeTaskMeta) {
-    super(s, e);
+    super(start, end);
     this.pipeTaskMeta = pipeTaskMeta;
     this.currentIndex = currentIndex;
     this.onCommittedHooks = onCommittedHooks;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java
index 67ecaa66d6e..45b51e3ee29 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java
@@ -23,9 +23,9 @@ public class Interval<T extends Interval<T>> implements 
Comparable<Interval<?>>
   public long start;
   public long end;
 
-  public Interval(final long s, final long e) {
-    start = s;
-    end = e;
+  public Interval(final long start, final long end) {
+    this.start = start;
+    this.end = end;
   }
 
   public void onMerged(final T another) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java
index 179a379b7c3..cecdef312de 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java
@@ -19,33 +19,38 @@
 
 package org.apache.iotdb.commons.pipe.datastructure.interval;
 
+import javax.annotation.concurrent.NotThreadSafe;
+
 import java.util.TreeSet;
 
+@NotThreadSafe
 public class IntervalManager<T extends Interval<T>> {
   private final TreeSet<T> intervals = new TreeSet<>();
 
   // insert into new interval and merge
   public void addInterval(final T newInterval) {
     // Left closest
-    final T left = intervals.floor(newInterval);
+    T left = intervals.floor(newInterval);
 
     // Right closest
-    final T right = intervals.ceiling(newInterval);
+    T right = intervals.ceiling(newInterval);
 
     // Merge left ([0,1] + [2,3] → [0,3])
-    if (left != null && left.end >= newInterval.start - 1) {
+    while (left != null && left.end >= newInterval.start - 1) {
       newInterval.start = Math.min(left.start, newInterval.start);
       newInterval.end = Math.max(left.end, newInterval.end);
       newInterval.onMerged(left);
       intervals.remove(left);
+      left = intervals.floor(newInterval);
     }
 
     // Merge right ([2,3] + [3,4] → [2,4])
-    if (right != null && newInterval.end >= right.start - 1) {
+    while (right != null && newInterval.end >= right.start - 1) {
       newInterval.start = Math.min(newInterval.start, right.start);
       newInterval.end = Math.max(newInterval.end, right.end);
       newInterval.onMerged(right);
       intervals.remove(right);
+      right = intervals.ceiling(newInterval);
     }
 
     intervals.add(newInterval);

Reply via email to