This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new e102e339944 [To dev/1.3] Pipe: Optimized the commit queue to avoid OOM
problem (#16145) (#16157)
e102e339944 is described below
commit e102e3399444e78696d5894d9448848b274c8381
Author: Caideyipi <[email protected]>
AuthorDate: Wed Aug 13 12:19:28 2025 +0800
[To dev/1.3] Pipe: Optimized the commit queue to avoid OOM problem (#16145)
(#16157)
* Pipe: Optimized the commit queue to avoid OOM problem (#16145)
* first
* trial
* mayfix
* may_final
* may_last
* may_fix
* partial
* completion
* enhance-test
* fix
* bug-fix-n
* test
* test
---
.../common/tablet/PipeRawTabletInsertionEvent.java | 11 +-
.../event/common/terminate/PipeTerminateEvent.java | 5 +-
.../common/tsfile/PipeTsFileInsertionEvent.java | 13 +--
.../broker/SubscriptionPrefetchingQueue.java | 12 +--
.../agent/task/progress/PipeEventCommitter.java | 66 +-----------
.../task/progress/interval/PipeCommitInterval.java | 79 ++++++++++++++
.../task/progress/interval/PipeCommitQueue.java | 60 +++++++++++
.../pipe/datastructure/interval/Interval.java | 48 +++++++++
.../datastructure/interval/IntervalManager.java | 74 ++++++++++++++
.../iotdb/commons/pipe/event/EnrichedEvent.java | 39 +++----
.../pipe/datastructure/PipeCommitQueueTest.java | 113 +++++++++++++++++++++
11 files changed, 412 insertions(+), 108 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 9f3ad7a1bdf..5ae27d5eb3c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -83,6 +83,13 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent
// Allocate empty memory block, will be resized later.
this.allocatedMemoryBlock =
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
+
+ addOnCommittedHook(
+ () -> {
+ if (shouldReportOnCommit) {
+ eliminateProgressIndex();
+ }
+ });
}
public PipeRawTabletInsertionEvent(
@@ -154,10 +161,8 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent
return true;
}
- @Override
- protected void reportProgress() {
+ protected void eliminateProgressIndex() {
if (needToReport) {
- super.reportProgress();
if (sourceEvent instanceof PipeTsFileInsertionEvent) {
((PipeTsFileInsertionEvent) sourceEvent).eliminateProgressIndex();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
index 3e0475a3d60..a8bf19ebf3b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
@@ -69,6 +69,8 @@ public class PipeTerminateEvent extends EnrichedEvent {
super(pipeName, creationTime, pipeTaskMeta, null, Long.MIN_VALUE,
Long.MAX_VALUE);
this.dataRegionId = dataRegionId;
this.shouldMark = shouldMark;
+
+ addOnCommittedHook(this::markCompleted);
}
@Override
@@ -114,8 +116,7 @@ public class PipeTerminateEvent extends EnrichedEvent {
return true;
}
- @Override
- public void reportProgress() {
+ public void markCompleted() {
// To avoid deadlock
if (shouldMark) {
terminateExecutor.submit(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 920c5eca561..baa6ae9a3ba 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -160,6 +160,13 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
// If the status is "closed", then the resource status is "closed", the
tsFile won't be altered
// and can be sent.
isClosed.set(resource.isClosed());
+
+ addOnCommittedHook(
+ () -> {
+ if (shouldReportOnCommit) {
+ eliminateProgressIndex();
+ }
+ });
}
/**
@@ -320,12 +327,6 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
return resource.getMaxProgressIndex();
}
- @Override
- protected void reportProgress() {
- super.reportProgress();
- this.eliminateProgressIndex();
- }
-
public void eliminateProgressIndex() {
if (Objects.isNull(overridingProgressIndex) && Objects.nonNull(resource)) {
PipeTsFileEpochProgressIndexKeeper.getInstance()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index 3dd9e0d54fb..903f59e4225 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -499,11 +499,7 @@ public abstract class SubscriptionPrefetchingQueue {
if (event instanceof PipeTerminateEvent) {
final PipeTerminateEvent terminateEvent = (PipeTerminateEvent) event;
// add mark completed hook
- terminateEvent.addOnCommittedHook(
- () -> {
- markCompleted();
- return null;
- });
+ terminateEvent.addOnCommittedHook(this::markCompleted);
// commit directly
((PipeTerminateEvent) event)
.decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), true);
@@ -619,11 +615,7 @@ public abstract class SubscriptionPrefetchingQueue {
if (event instanceof PipeTerminateEvent) {
final PipeTerminateEvent terminateEvent = (PipeTerminateEvent) event;
// add mark completed hook
- terminateEvent.addOnCommittedHook(
- () -> {
- markCompleted();
- return null;
- });
+ terminateEvent.addOnCommittedHook(this::markCompleted);
// commit directly
((PipeTerminateEvent) event)
.decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), true);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitter.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitter.java
index 61f23907819..5ec7e00665b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitter.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitter.java
@@ -19,33 +19,20 @@
package org.apache.iotdb.commons.pipe.agent.task.progress;
+import
org.apache.iotdb.commons.pipe.agent.task.progress.interval.PipeCommitQueue;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.pipe.api.event.Event;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Comparator;
-import java.util.Objects;
-import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
/** Used to queue {@link Event}s for one pipe of one region to commit in
order. */
public class PipeEventCommitter {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipeEventCommitter.class);
-
private final CommitterKey committerKey;
private final AtomicLong commitIdGenerator = new AtomicLong(0);
- private final AtomicLong lastCommitId = new AtomicLong(0);
- private final PriorityBlockingQueue<EnrichedEvent> commitQueue =
- new PriorityBlockingQueue<>(
- 11,
- Comparator.comparing(
- event ->
- Objects.requireNonNull(event, "committable event cannot be
null").getCommitId()));
+ private final PipeCommitQueue commitQueue = new PipeCommitQueue();
PipeEventCommitter(final CommitterKey committerKey) {
// make it package-private
@@ -64,55 +51,6 @@ public class PipeEventCommitter {
}
}
commitQueue.offer(event);
-
- final int commitQueueSizeBeforeCommit = commitQueue.size();
- if (LOGGER.isDebugEnabled()) {
- if (commitQueueSizeBeforeCommit != 0 && commitQueueSizeBeforeCommit %
100 == 0) {
- LOGGER.info(
- "COMMIT QUEUE OFFER: committer key {}, event commit id {}, last
commit id {}, commit queue size {}",
- committerKey,
- event.getCommitId(),
- lastCommitId.get(),
- commitQueueSizeBeforeCommit);
- } else {
- LOGGER.debug(
- "COMMIT QUEUE OFFER: committer key {}, event commit id {}, last
commit id {}, commit queue size {}",
- committerKey,
- event.getCommitId(),
- lastCommitId.get(),
- commitQueueSizeBeforeCommit);
- }
- }
-
- while (!commitQueue.isEmpty()) {
- final EnrichedEvent e = commitQueue.peek();
-
- if (e.getCommitId() <= lastCommitId.get()) {
- LOGGER.info(
- "commit id is not monotonically increasing, current commit id: {},
last commit id: {}, event: {}, may be because the tsFile has been compacted",
- e.getCommitId(),
- lastCommitId.get(),
- e.coreReportMessage());
- commitQueue.poll();
- continue;
- }
-
- if (e.getCommitId() != lastCommitId.get() + 1) {
- break;
- }
-
- e.onCommitted();
- lastCommitId.incrementAndGet();
- commitQueue.poll();
-
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(
- "COMMIT QUEUE POLL: committer key {}, last commit id {}, commit
queue size after commit {}",
- committerKey,
- lastCommitId.get(),
- commitQueue.size());
- }
- }
}
//////////////////////////// APIs provided for metric framework
////////////////////////////
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
new file mode 100644
index 00000000000..7ee8008c0e0
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.agent.task.progress.interval;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.datastructure.interval.Interval;
+
+import java.util.List;
+import java.util.Objects;
+
+public class PipeCommitInterval extends Interval<PipeCommitInterval> {
+
+ private ProgressIndex currentIndex;
+ private List<Runnable> onCommittedHooks;
+ private final PipeTaskMeta pipeTaskMeta;
+
+ public PipeCommitInterval(
+ final long s,
+ final long e,
+ final ProgressIndex currentIndex,
+ final List<Runnable> onCommittedHooks,
+ final PipeTaskMeta pipeTaskMeta) {
+ super(s, e);
+ this.pipeTaskMeta = pipeTaskMeta;
+ this.currentIndex = currentIndex;
+ this.onCommittedHooks = onCommittedHooks;
+ }
+
+ @Override
+ public void onMerged(final PipeCommitInterval another) {
+ currentIndex =
currentIndex.updateToMinimumEqualOrIsAfterProgressIndex(another.currentIndex);
+
+ // Keep in order
+ if (this.start <= another.start) {
+ onCommittedHooks.addAll(another.onCommittedHooks);
+ } else {
+ // Note that if merged, another interval is not supposed to be reused
+ // thus we can arbitrarily alter its hooks
+ another.onCommittedHooks.addAll(this.onCommittedHooks);
+ this.onCommittedHooks = another.onCommittedHooks;
+ }
+ }
+
+ @Override
+ public void onRemoved() {
+ if (Objects.nonNull(pipeTaskMeta)) {
+ pipeTaskMeta.updateProgressIndex(currentIndex);
+ }
+ onCommittedHooks.forEach(Runnable::run);
+ }
+
+ @Override
+ public String toString() {
+ return "PipeCommitInterval{"
+ + "progressIndex='"
+ + currentIndex
+ + "', range="
+ + super.toString()
+ + "}";
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitQueue.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitQueue.java
new file mode 100644
index 00000000000..7b48aa39c9a
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitQueue.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.agent.task.progress.interval;
+
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.pipe.datastructure.interval.IntervalManager;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+
+public class PipeCommitQueue {
+ private final IntervalManager<PipeCommitInterval> intervalManager = new
IntervalManager<>();
+ private long lastCommitted = 0;
+
+ public void offer(final EnrichedEvent event) {
+ final PipeCommitInterval interval =
+ new PipeCommitInterval(
+ event.getCommitId(),
+ event.getCommitId(),
+ event.isShouldReportOnCommit()
+ ? event.getProgressIndex()
+ : MinimumProgressIndex.INSTANCE,
+ event.getOnCommittedHooks(),
+ event.getPipeTaskMeta());
+ intervalManager.addInterval(interval);
+ if (interval.start == lastCommitted + 1) {
+ intervalManager.remove(interval);
+ lastCommitted = interval.end;
+ }
+ }
+
+ public int size() {
+ return intervalManager.size();
+ }
+
+ @Override
+ public String toString() {
+ return "PipeCommitQueue{"
+ + "lastCommitted='"
+ + lastCommitted
+ + "', IntervalManager="
+ + intervalManager
+ + "}";
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java
new file mode 100644
index 00000000000..67ecaa66d6e
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.datastructure.interval;
+
+public class Interval<T extends Interval<T>> implements
Comparable<Interval<?>> {
+ public long start;
+ public long end;
+
+ public Interval(final long s, final long e) {
+ start = s;
+ end = e;
+ }
+
+ public void onMerged(final T another) {
+ // Do nothing by default
+ }
+
+ public void onRemoved() {
+ // Do nothing by default
+ }
+
+ @Override
+ public int compareTo(final Interval other) {
+ return Long.compare(this.start, other.start);
+ }
+
+ @Override
+ public String toString() {
+ return "[" + start + ", " + end + "]";
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java
new file mode 100644
index 00000000000..179a379b7c3
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.datastructure.interval;
+
+import java.util.TreeSet;
+
+public class IntervalManager<T extends Interval<T>> {
+ private final TreeSet<T> intervals = new TreeSet<>();
+
+ // insert into new interval and merge
+ public void addInterval(final T newInterval) {
+ // Left closest
+ final T left = intervals.floor(newInterval);
+
+ // Right closest
+ final T right = intervals.ceiling(newInterval);
+
+ // Merge left ([0,1] + [2,3] → [0,3])
+ if (left != null && left.end >= newInterval.start - 1) {
+ newInterval.start = Math.min(left.start, newInterval.start);
+ newInterval.end = Math.max(left.end, newInterval.end);
+ newInterval.onMerged(left);
+ intervals.remove(left);
+ }
+
+ // Merge right ([2,3] + [3,4] → [2,4])
+ if (right != null && newInterval.end >= right.start - 1) {
+ newInterval.start = Math.min(newInterval.start, right.start);
+ newInterval.end = Math.max(newInterval.end, right.end);
+ newInterval.onMerged(right);
+ intervals.remove(right);
+ }
+
+ intervals.add(newInterval);
+ }
+
+ public T peek() {
+ return intervals.first();
+ }
+
+ public boolean remove(final T interval) {
+ if (intervals.remove(interval)) {
+ interval.onRemoved();
+ return true;
+ }
+ return false;
+ }
+
+ public int size() {
+ return intervals.size();
+ }
+
+ @Override
+ public String toString() {
+ return "IntervalManager{" + "intervals=" + intervals + '}';
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index 90536fbc4a5..476d71747d1 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -20,12 +20,12 @@
package org.apache.iotdb.commons.pipe.event;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
-import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.pipe.api.event.Event;
import org.slf4j.Logger;
@@ -37,7 +37,6 @@ import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Supplier;
/**
* {@link EnrichedEvent} is an {@link Event} that can be enriched with
additional runtime
@@ -72,7 +71,7 @@ public abstract class EnrichedEvent implements Event {
protected boolean isTimeParsed;
protected volatile boolean shouldReportOnCommit = true;
- protected List<Supplier<Void>> onCommittedHooks = new ArrayList<>();
+ protected List<Runnable> onCommittedHooks = new ArrayList<>();
protected EnrichedEvent(
final String pipeName,
@@ -91,13 +90,6 @@ public abstract class EnrichedEvent implements Event {
this.endTime = endTime;
isPatternParsed = this.pipePattern == null || this.pipePattern.isRoot();
isTimeParsed = Long.MIN_VALUE == startTime && Long.MAX_VALUE == endTime;
- addOnCommittedHook(
- () -> {
- if (shouldReportOnCommit) {
- reportProgress();
- }
- return null;
- });
}
protected void trackResource() {
@@ -263,14 +255,6 @@ public abstract class EnrichedEvent implements Event {
*/
public abstract boolean internallyDecreaseResourceReferenceCount(final
String holderMessage);
- protected void reportProgress() {
- if (pipeTaskMeta != null) {
- final ProgressIndex progressIndex = getProgressIndex();
- pipeTaskMeta.updateProgressIndex(
- progressIndex == null ? MinimumProgressIndex.INSTANCE :
progressIndex);
- }
- }
-
/**
* Externally skip the report of the processing {@link ProgressIndex} of
this {@link
* EnrichedEvent} when committed. Report by generated events are still
allowed.
@@ -364,6 +348,19 @@ public abstract class EnrichedEvent implements Event {
final long startTime,
final long endTime);
+ @TestOnly
+ public void setShouldReportOnCommit(final boolean shouldReportOnCommit) {
+ this.shouldReportOnCommit = shouldReportOnCommit;
+ }
+
+ public boolean isShouldReportOnCommit() {
+ return shouldReportOnCommit;
+ }
+
+ public List<Runnable> getOnCommittedHooks() {
+ return onCommittedHooks;
+ }
+
public PipeTaskMeta getPipeTaskMeta() {
return pipeTaskMeta;
}
@@ -412,11 +409,7 @@ public abstract class EnrichedEvent implements Event {
return Collections.singletonList(commitId);
}
- public void onCommitted() {
- onCommittedHooks.forEach(Supplier::get);
- }
-
- public void addOnCommittedHook(final Supplier<Void> hook) {
+ public void addOnCommittedHook(final Runnable hook) {
onCommittedHooks.add(hook);
}
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/PipeCommitQueueTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/PipeCommitQueueTest.java
new file mode 100644
index 00000000000..51f6316c6fd
--- /dev/null
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/PipeCommitQueueTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.datastructure;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import
org.apache.iotdb.commons.pipe.agent.task.progress.interval.PipeCommitQueue;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class PipeCommitQueueTest {
+
+ private final PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
+ private final Set<Integer> commitHookTestSet = new HashSet<>();
+
+ @Test
+ public void testCommitQueue() {
+ final PipeCommitQueue pipeCommitQueue = new PipeCommitQueue();
+ pipeCommitQueue.offer(new TestEnrichedEvent(1, new IoTProgressIndex(0,
1L)));
+ pipeCommitQueue.offer(new TestEnrichedEvent(3, new IoTProgressIndex(0,
3L)));
+ Assert.assertEquals(1, pipeCommitQueue.size());
+ Assert.assertEquals(new IoTProgressIndex(0, 1L),
pipeTaskMeta.getProgressIndex());
+ TestEnrichedEvent nextEvent = new TestEnrichedEvent(5, new
IoTProgressIndex(0, 5L));
+ nextEvent.setShouldReportOnCommit(false);
+ pipeCommitQueue.offer(nextEvent);
+ pipeCommitQueue.offer(new TestEnrichedEvent(4, new IoTProgressIndex(0,
4L)));
+ Assert.assertEquals(1, pipeCommitQueue.size());
+ nextEvent = new TestEnrichedEvent(2, new IoTProgressIndex(0, 2L));
+ nextEvent.addOnCommittedHook(() -> commitHookTestSet.add(1));
+ pipeCommitQueue.offer(nextEvent);
+ Assert.assertEquals(0, pipeCommitQueue.size());
+ Assert.assertEquals(new IoTProgressIndex(0, 4L),
pipeTaskMeta.getProgressIndex());
+ Assert.assertEquals(1, commitHookTestSet.size());
+ }
+
+ private class TestEnrichedEvent extends EnrichedEvent {
+ private final ProgressIndex progressIndex;
+
+ private TestEnrichedEvent(final long commitId, final ProgressIndex
progressIndex) {
+ super(null, 0, PipeCommitQueueTest.this.pipeTaskMeta, null,
Long.MIN_VALUE, Long.MAX_VALUE);
+ this.commitId = commitId;
+ this.progressIndex = progressIndex;
+ }
+
+ @Override
+ public ProgressIndex getProgressIndex() {
+ return progressIndex;
+ }
+
+ /////////////////////////////// Useless logic
///////////////////////////////
+
+ @Override
+ public boolean internallyIncreaseResourceReferenceCount(String
holderMessage) {
+ return false;
+ }
+
+ @Override
+ public boolean internallyDecreaseResourceReferenceCount(String
holderMessage) {
+ return false;
+ }
+
+ @Override
+ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+ String pipeName,
+ long creationTime,
+ PipeTaskMeta pipeTaskMeta,
+ PipePattern pattern,
+ long startTime,
+ long endTime) {
+ return null;
+ }
+
+ @Override
+ public boolean isGeneratedByPipe() {
+ return false;
+ }
+
+ @Override
+ public boolean mayEventTimeOverlappedWithTimeRange() {
+ return false;
+ }
+
+ @Override
+ public boolean mayEventPathsOverlappedWithPattern() {
+ return false;
+ }
+ }
+}