This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new df0a8c8d60f [To dev/1.3] Pipe: Fixed the interval manager in general
model (#16268)
df0a8c8d60f is described below
commit df0a8c8d60f2c25db56f65f326fe9d9081b19237
Author: Caideyipi <[email protected]>
AuthorDate: Wed Aug 27 16:23:54 2025 +0800
[To dev/1.3] Pipe: Fixed the interval manager in general model (#16268)
* fix
* fix
---
.../storageengine/dataregion/compaction/tool/Interval.java | 2 +-
.../agent/task/progress/interval/PipeCommitInterval.java | 6 +++---
.../iotdb/commons/pipe/datastructure/interval/Interval.java | 6 +++---
.../pipe/datastructure/interval/IntervalManager.java | 13 +++++++++----
4 files changed, 16 insertions(+), 11 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/Interval.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/Interval.java
index 8f2a2da301d..539a1593af7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/Interval.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tool/Interval.java
@@ -23,7 +23,7 @@ public class Interval {
private long start;
private long end;
- public Interval(long start, long end) {
+ public Interval(final long start, final long end) {
this.start = start;
this.end = end;
if (end < start) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
index 7ee8008c0e0..46a3d3e2a86 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
@@ -33,12 +33,12 @@ public class PipeCommitInterval extends
Interval<PipeCommitInterval> {
private final PipeTaskMeta pipeTaskMeta;
public PipeCommitInterval(
- final long s,
- final long e,
+ final long start,
+ final long end,
final ProgressIndex currentIndex,
final List<Runnable> onCommittedHooks,
final PipeTaskMeta pipeTaskMeta) {
- super(s, e);
+ super(start, end);
this.pipeTaskMeta = pipeTaskMeta;
this.currentIndex = currentIndex;
this.onCommittedHooks = onCommittedHooks;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java
index 67ecaa66d6e..45b51e3ee29 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java
@@ -23,9 +23,9 @@ public class Interval<T extends Interval<T>> implements
Comparable<Interval<?>>
public long start;
public long end;
- public Interval(final long s, final long e) {
- start = s;
- end = e;
+ public Interval(final long start, final long end) {
+ this.start = start;
+ this.end = end;
}
public void onMerged(final T another) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java
index 179a379b7c3..cecdef312de 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java
@@ -19,33 +19,38 @@
package org.apache.iotdb.commons.pipe.datastructure.interval;
+import javax.annotation.concurrent.NotThreadSafe;
+
import java.util.TreeSet;
+@NotThreadSafe
public class IntervalManager<T extends Interval<T>> {
private final TreeSet<T> intervals = new TreeSet<>();
// insert into new interval and merge
public void addInterval(final T newInterval) {
// Left closest
- final T left = intervals.floor(newInterval);
+ T left = intervals.floor(newInterval);
// Right closest
- final T right = intervals.ceiling(newInterval);
+ T right = intervals.ceiling(newInterval);
// Merge left ([0,1] + [2,3] → [0,3])
- if (left != null && left.end >= newInterval.start - 1) {
+ while (left != null && left.end >= newInterval.start - 1) {
newInterval.start = Math.min(left.start, newInterval.start);
newInterval.end = Math.max(left.end, newInterval.end);
newInterval.onMerged(left);
intervals.remove(left);
+ left = intervals.floor(newInterval);
}
// Merge right ([2,3] + [3,4] → [2,4])
- if (right != null && newInterval.end >= right.start - 1) {
+ while (right != null && newInterval.end >= right.start - 1) {
newInterval.start = Math.min(newInterval.start, right.start);
newInterval.end = Math.max(newInterval.end, right.end);
newInterval.onMerged(right);
intervals.remove(right);
+ right = intervals.ceiling(newInterval);
}
intervals.add(newInterval);