This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b1deb6c7cf9 Pipe: Optimized the commit queue to avoid OOM problem
(#16145)
b1deb6c7cf9 is described below
commit b1deb6c7cf9e6baf2d4bed0663fb5570bf9d9789
Author: Caideyipi <[email protected]>
AuthorDate: Tue Aug 12 19:14:20 2025 +0800
Pipe: Optimized the commit queue to avoid OOM problem (#16145)
* first
* trial
* mayfix
* may_final
* may_last
* may_fix
* partial
* completion
* enhance-test
* fix
* bug-fix-n
---
.../common/deletion/PipeDeleteDataNodeEvent.java | 5 +-
.../common/tablet/PipeRawTabletInsertionEvent.java | 11 +-
.../event/common/terminate/PipeTerminateEvent.java | 5 +-
.../common/tsfile/PipeTsFileInsertionEvent.java | 13 +-
.../broker/SubscriptionPrefetchingQueue.java | 12 +-
.../agent/task/progress/PipeEventCommitter.java | 70 +---------
.../task/progress/interval/PipeCommitInterval.java | 79 +++++++++++
.../task/progress/interval/PipeCommitQueue.java | 60 +++++++++
.../pipe/datastructure/interval/Interval.java | 48 +++++++
.../datastructure/interval/IntervalManager.java | 74 +++++++++++
.../iotdb/commons/pipe/event/EnrichedEvent.java | 40 +++---
.../pipe/datastructure/PipeCommitQueueTest.java | 148 +++++++++++++++++++++
12 files changed, 449 insertions(+), 116 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java
index 692144edfb0..1225d59cf68 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java
@@ -80,6 +80,7 @@ public class PipeDeleteDataNodeEvent extends EnrichedEvent
implements Serializab
this.deleteDataNode = deleteDataNode;
Optional.ofNullable(deleteDataNode)
.ifPresent(node -> this.progressIndex =
deleteDataNode.getProgressIndex());
+ addOnCommittedHook(this::decreaseDeletionReference);
}
public AbstractDeleteDataNode getDeleteDataNode() {
@@ -104,9 +105,7 @@ public class PipeDeleteDataNodeEvent extends EnrichedEvent
implements Serializab
return true;
}
- @Override
- public void onCommitted() {
- super.onCommitted();
+ public void decreaseDeletionReference() {
if (deletionResource != null) {
deletionResource.decreaseReference();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 349bfc2f6b0..322b5ea2435 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -108,6 +108,13 @@ public class PipeRawTabletInsertionEvent extends
PipeInsertionEvent
// Allocate empty memory block, will be resized later.
this.allocatedMemoryBlock =
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
+
+ addOnCommittedHook(
+ () -> {
+ if (shouldReportOnCommit) {
+ eliminateProgressIndex();
+ }
+ });
}
public PipeRawTabletInsertionEvent(
@@ -259,10 +266,8 @@ public class PipeRawTabletInsertionEvent extends
PipeInsertionEvent
return true;
}
- @Override
- protected void reportProgress() {
+ protected void eliminateProgressIndex() {
if (needToReport) {
- super.reportProgress();
if (sourceEvent instanceof PipeTsFileInsertionEvent) {
((PipeTsFileInsertionEvent) sourceEvent).eliminateProgressIndex();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
index bf7b5a6f527..6e00fc8513f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
@@ -79,6 +79,8 @@ public class PipeTerminateEvent extends EnrichedEvent {
Long.MAX_VALUE);
this.dataRegionId = dataRegionId;
this.shouldMark = shouldMark;
+
+ addOnCommittedHook(this::markCompleted);
}
@Override
@@ -127,8 +129,7 @@ public class PipeTerminateEvent extends EnrichedEvent {
return true;
}
- @Override
- public void reportProgress() {
+ public void markCompleted() {
// To avoid deadlock
if (shouldMark) {
terminateExecutor.submit(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 5db40e82786..454c9cf0063 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -207,6 +207,13 @@ public class PipeTsFileInsertionEvent extends
PipeInsertionEvent
isClosed.set(resource.isClosed());
this.eventParser = new AtomicReference<>(null);
+
+ addOnCommittedHook(
+ () -> {
+ if (shouldReportOnCommit) {
+ eliminateProgressIndex();
+ }
+ });
}
/**
@@ -371,12 +378,6 @@ public class PipeTsFileInsertionEvent extends
PipeInsertionEvent
return resource.getMaxProgressIndex();
}
- @Override
- protected void reportProgress() {
- super.reportProgress();
- this.eliminateProgressIndex();
- }
-
public void eliminateProgressIndex() {
if (Objects.isNull(overridingProgressIndex) && Objects.nonNull(resource)) {
PipeTsFileEpochProgressIndexKeeper.getInstance()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index 3dd9e0d54fb..903f59e4225 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -499,11 +499,7 @@ public abstract class SubscriptionPrefetchingQueue {
if (event instanceof PipeTerminateEvent) {
final PipeTerminateEvent terminateEvent = (PipeTerminateEvent) event;
// add mark completed hook
- terminateEvent.addOnCommittedHook(
- () -> {
- markCompleted();
- return null;
- });
+ terminateEvent.addOnCommittedHook(this::markCompleted);
// commit directly
((PipeTerminateEvent) event)
.decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), true);
@@ -619,11 +615,7 @@ public abstract class SubscriptionPrefetchingQueue {
if (event instanceof PipeTerminateEvent) {
final PipeTerminateEvent terminateEvent = (PipeTerminateEvent) event;
// add mark completed hook
- terminateEvent.addOnCommittedHook(
- () -> {
- markCompleted();
- return null;
- });
+ terminateEvent.addOnCommittedHook(this::markCompleted);
// commit directly
((PipeTerminateEvent) event)
.decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), true);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitter.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitter.java
index 61f23907819..42429fca253 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitter.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitter.java
@@ -19,33 +19,20 @@
package org.apache.iotdb.commons.pipe.agent.task.progress;
+import
org.apache.iotdb.commons.pipe.agent.task.progress.interval.PipeCommitQueue;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.pipe.api.event.Event;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Comparator;
-import java.util.Objects;
-import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
/** Used to queue {@link Event}s for one pipe of one region to commit in
order. */
public class PipeEventCommitter {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipeEventCommitter.class);
-
private final CommitterKey committerKey;
private final AtomicLong commitIdGenerator = new AtomicLong(0);
- private final AtomicLong lastCommitId = new AtomicLong(0);
- private final PriorityBlockingQueue<EnrichedEvent> commitQueue =
- new PriorityBlockingQueue<>(
- 11,
- Comparator.comparing(
- event ->
- Objects.requireNonNull(event, "committable event cannot be
null").getCommitId()));
+ private final PipeCommitQueue commitQueue = new PipeCommitQueue();
PipeEventCommitter(final CommitterKey committerKey) {
// make it package-private
@@ -64,55 +51,6 @@ public class PipeEventCommitter {
}
}
commitQueue.offer(event);
-
- final int commitQueueSizeBeforeCommit = commitQueue.size();
- if (LOGGER.isDebugEnabled()) {
- if (commitQueueSizeBeforeCommit != 0 && commitQueueSizeBeforeCommit %
100 == 0) {
- LOGGER.info(
- "COMMIT QUEUE OFFER: committer key {}, event commit id {}, last
commit id {}, commit queue size {}",
- committerKey,
- event.getCommitId(),
- lastCommitId.get(),
- commitQueueSizeBeforeCommit);
- } else {
- LOGGER.debug(
- "COMMIT QUEUE OFFER: committer key {}, event commit id {}, last
commit id {}, commit queue size {}",
- committerKey,
- event.getCommitId(),
- lastCommitId.get(),
- commitQueueSizeBeforeCommit);
- }
- }
-
- while (!commitQueue.isEmpty()) {
- final EnrichedEvent e = commitQueue.peek();
-
- if (e.getCommitId() <= lastCommitId.get()) {
- LOGGER.info(
- "commit id is not monotonically increasing, current commit id: {},
last commit id: {}, event: {}, may be because the tsFile has been compacted",
- e.getCommitId(),
- lastCommitId.get(),
- e.coreReportMessage());
- commitQueue.poll();
- continue;
- }
-
- if (e.getCommitId() != lastCommitId.get() + 1) {
- break;
- }
-
- e.onCommitted();
- lastCommitId.incrementAndGet();
- commitQueue.poll();
-
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(
- "COMMIT QUEUE POLL: committer key {}, last commit id {}, commit
queue size after commit {}",
- committerKey,
- lastCommitId.get(),
- commitQueue.size());
- }
- }
}
//////////////////////////// APIs provided for metric framework
////////////////////////////
@@ -133,8 +71,4 @@ public class PipeEventCommitter {
public long commitQueueSize() {
return commitQueue.size();
}
-
- public long getCurrentCommitId() {
- return commitIdGenerator.get();
- }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
new file mode 100644
index 00000000000..7ee8008c0e0
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.agent.task.progress.interval;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.datastructure.interval.Interval;
+
+import java.util.List;
+import java.util.Objects;
+
+public class PipeCommitInterval extends Interval<PipeCommitInterval> {
+
+ private ProgressIndex currentIndex;
+ private List<Runnable> onCommittedHooks;
+ private final PipeTaskMeta pipeTaskMeta;
+
+ public PipeCommitInterval(
+ final long s,
+ final long e,
+ final ProgressIndex currentIndex,
+ final List<Runnable> onCommittedHooks,
+ final PipeTaskMeta pipeTaskMeta) {
+ super(s, e);
+ this.pipeTaskMeta = pipeTaskMeta;
+ this.currentIndex = currentIndex;
+ this.onCommittedHooks = onCommittedHooks;
+ }
+
+ @Override
+ public void onMerged(final PipeCommitInterval another) {
+ currentIndex =
currentIndex.updateToMinimumEqualOrIsAfterProgressIndex(another.currentIndex);
+
+ // Keep in order
+ if (this.start <= another.start) {
+ onCommittedHooks.addAll(another.onCommittedHooks);
+ } else {
+ // Note that if merged, another interval is not supposed to be reused
+ // thus we can arbitrarily alter its hooks
+ another.onCommittedHooks.addAll(this.onCommittedHooks);
+ this.onCommittedHooks = another.onCommittedHooks;
+ }
+ }
+
+ @Override
+ public void onRemoved() {
+ if (Objects.nonNull(pipeTaskMeta)) {
+ pipeTaskMeta.updateProgressIndex(currentIndex);
+ }
+ onCommittedHooks.forEach(Runnable::run);
+ }
+
+ @Override
+ public String toString() {
+ return "PipeCommitInterval{"
+ + "progressIndex='"
+ + currentIndex
+ + "', range="
+ + super.toString()
+ + "}";
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitQueue.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitQueue.java
new file mode 100644
index 00000000000..7b48aa39c9a
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitQueue.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.agent.task.progress.interval;
+
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.pipe.datastructure.interval.IntervalManager;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+
+public class PipeCommitQueue {
+ private final IntervalManager<PipeCommitInterval> intervalManager = new
IntervalManager<>();
+ private long lastCommitted = 0;
+
+ public void offer(final EnrichedEvent event) {
+ final PipeCommitInterval interval =
+ new PipeCommitInterval(
+ event.getCommitId(),
+ event.getCommitId(),
+ event.isShouldReportOnCommit()
+ ? event.getProgressIndex()
+ : MinimumProgressIndex.INSTANCE,
+ event.getOnCommittedHooks(),
+ event.getPipeTaskMeta());
+ intervalManager.addInterval(interval);
+ if (interval.start == lastCommitted + 1) {
+ intervalManager.remove(interval);
+ lastCommitted = interval.end;
+ }
+ }
+
+ public int size() {
+ return intervalManager.size();
+ }
+
+ @Override
+ public String toString() {
+ return "PipeCommitQueue{"
+ + "lastCommitted='"
+ + lastCommitted
+ + "', IntervalManager="
+ + intervalManager
+ + "}";
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java
new file mode 100644
index 00000000000..67ecaa66d6e
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/Interval.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.datastructure.interval;
+
+public class Interval<T extends Interval<T>> implements
Comparable<Interval<?>> {
+ public long start;
+ public long end;
+
+ public Interval(final long s, final long e) {
+ start = s;
+ end = e;
+ }
+
+ public void onMerged(final T another) {
+ // Do nothing by default
+ }
+
+ public void onRemoved() {
+ // Do nothing by default
+ }
+
+ @Override
+ public int compareTo(final Interval other) {
+ return Long.compare(this.start, other.start);
+ }
+
+ @Override
+ public String toString() {
+ return "[" + start + ", " + end + "]";
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java
new file mode 100644
index 00000000000..179a379b7c3
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/interval/IntervalManager.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.datastructure.interval;
+
+import java.util.TreeSet;
+
+public class IntervalManager<T extends Interval<T>> {
+ private final TreeSet<T> intervals = new TreeSet<>();
+
+ // insert into new interval and merge
+ public void addInterval(final T newInterval) {
+ // Left closest
+ final T left = intervals.floor(newInterval);
+
+ // Right closest
+ final T right = intervals.ceiling(newInterval);
+
+ // Merge left ([0,1] + [2,3] → [0,3])
+ if (left != null && left.end >= newInterval.start - 1) {
+ newInterval.start = Math.min(left.start, newInterval.start);
+ newInterval.end = Math.max(left.end, newInterval.end);
+ newInterval.onMerged(left);
+ intervals.remove(left);
+ }
+
+ // Merge right ([2,3] + [3,4] → [2,4])
+ if (right != null && newInterval.end >= right.start - 1) {
+ newInterval.start = Math.min(newInterval.start, right.start);
+ newInterval.end = Math.max(newInterval.end, right.end);
+ newInterval.onMerged(right);
+ intervals.remove(right);
+ }
+
+ intervals.add(newInterval);
+ }
+
+ public T peek() {
+ return intervals.first();
+ }
+
+ public boolean remove(final T interval) {
+ if (intervals.remove(interval)) {
+ interval.onRemoved();
+ return true;
+ }
+ return false;
+ }
+
+ public int size() {
+ return intervals.size();
+ }
+
+ @Override
+ public String toString() {
+ return "IntervalManager{" + "intervals=" + intervals + '}';
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index c7e0345cc59..1f1686f1eb5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -20,13 +20,13 @@
package org.apache.iotdb.commons.pipe.event;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
-import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.pipe.api.event.Event;
import org.slf4j.Logger;
@@ -38,7 +38,6 @@ import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Supplier;
/**
* {@link EnrichedEvent} is an {@link Event} that can be enriched with
additional runtime
@@ -78,7 +77,7 @@ public abstract class EnrichedEvent implements Event {
protected boolean isTimeParsed;
protected volatile boolean shouldReportOnCommit = true;
- protected List<Supplier<Void>> onCommittedHooks = new ArrayList<>();
+ protected List<Runnable> onCommittedHooks = new ArrayList<>();
protected String userName;
protected boolean skipIfNoPrivileges;
@@ -110,14 +109,6 @@ public abstract class EnrichedEvent implements Event {
&& (tablePattern == null
||
!tablePattern.hasUserSpecifiedDatabasePatternOrTablePattern());
isTimeParsed = Long.MIN_VALUE == startTime && Long.MAX_VALUE == endTime;
-
- addOnCommittedHook(
- () -> {
- if (shouldReportOnCommit) {
- reportProgress();
- }
- return null;
- });
}
protected void trackResource() {
@@ -283,14 +274,6 @@ public abstract class EnrichedEvent implements Event {
*/
public abstract boolean internallyDecreaseResourceReferenceCount(final
String holderMessage);
- protected void reportProgress() {
- if (pipeTaskMeta != null) {
- final ProgressIndex progressIndex = getProgressIndex();
- pipeTaskMeta.updateProgressIndex(
- progressIndex == null ? MinimumProgressIndex.INSTANCE :
progressIndex);
- }
- }
-
/**
* Externally skip the report of the processing {@link ProgressIndex} of
this {@link
* EnrichedEvent} when committed. Report by generated events are still
allowed.
@@ -400,6 +383,19 @@ public abstract class EnrichedEvent implements Event {
final long startTime,
final long endTime);
+ @TestOnly
+ public void setShouldReportOnCommit(final boolean shouldReportOnCommit) {
+ this.shouldReportOnCommit = shouldReportOnCommit;
+ }
+
+ public boolean isShouldReportOnCommit() {
+ return shouldReportOnCommit;
+ }
+
+ public List<Runnable> getOnCommittedHooks() {
+ return onCommittedHooks;
+ }
+
public PipeTaskMeta getPipeTaskMeta() {
return pipeTaskMeta;
}
@@ -468,11 +464,7 @@ public abstract class EnrichedEvent implements Event {
this.replicateIndexForIoTV2 = replicateIndexForIoTV2;
}
- public void onCommitted() {
- onCommittedHooks.forEach(Supplier::get);
- }
-
- public void addOnCommittedHook(final Supplier<Void> hook) {
+ public void addOnCommittedHook(final Runnable hook) {
onCommittedHooks.add(hook);
}
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/PipeCommitQueueTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/PipeCommitQueueTest.java
new file mode 100644
index 00000000000..651042f9605
--- /dev/null
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/PipeCommitQueueTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.datastructure;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import
org.apache.iotdb.commons.pipe.agent.task.progress.interval.PipeCommitQueue;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class PipeCommitQueueTest {
+
+ private final PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
+ private final Set<Integer> commitHookTestSet = new HashSet<>();
+
+ @Test
+ public void testCommitQueue() {
+ final PipeCommitQueue pipeCommitQueue = new PipeCommitQueue();
+ pipeCommitQueue.offer(new TestEnrichedEvent(1, new IoTProgressIndex(0,
1L)));
+ pipeCommitQueue.offer(new TestEnrichedEvent(3, new IoTProgressIndex(0,
3L)));
+ Assert.assertEquals(1, pipeCommitQueue.size());
+ Assert.assertEquals(new IoTProgressIndex(0, 1L),
pipeTaskMeta.getProgressIndex());
+ TestEnrichedEvent nextEvent = new TestEnrichedEvent(5, new
IoTProgressIndex(0, 5L));
+ nextEvent.setShouldReportOnCommit(false);
+ pipeCommitQueue.offer(nextEvent);
+ pipeCommitQueue.offer(new TestEnrichedEvent(4, new IoTProgressIndex(0,
4L)));
+ Assert.assertEquals(1, pipeCommitQueue.size());
+ nextEvent = new TestEnrichedEvent(2, new IoTProgressIndex(0, 2L));
+ nextEvent.addOnCommittedHook(() -> commitHookTestSet.add(1));
+ pipeCommitQueue.offer(nextEvent);
+ Assert.assertEquals(0, pipeCommitQueue.size());
+ Assert.assertEquals(new IoTProgressIndex(0, 4L),
pipeTaskMeta.getProgressIndex());
+ Assert.assertEquals(1, commitHookTestSet.size());
+ }
+
+ private class TestEnrichedEvent extends EnrichedEvent {
+ private ProgressIndex progressIndex;
+
+ private TestEnrichedEvent(final long commitId, final ProgressIndex
progressIndex) {
+ this(
+ null,
+ 0,
+ PipeCommitQueueTest.this.pipeTaskMeta,
+ null,
+ null,
+ null,
+ false,
+ Long.MIN_VALUE,
+ Long.MAX_VALUE);
+ this.commitId = commitId;
+ this.progressIndex = progressIndex;
+ }
+
+ @Override
+ public ProgressIndex getProgressIndex() {
+ return progressIndex;
+ }
+
+ /////////////////////////////// Useless logic
///////////////////////////////
+
+ protected TestEnrichedEvent(
+ final String pipeName,
+ final long creationTime,
+ final PipeTaskMeta pipeTaskMeta,
+ final TreePattern treePattern,
+ final TablePattern tablePattern,
+ final String userName,
+ final boolean skipIfNoPrivileges,
+ final long startTime,
+ final long endTime) {
+ super(
+ pipeName,
+ creationTime,
+ pipeTaskMeta,
+ treePattern,
+ tablePattern,
+ userName,
+ skipIfNoPrivileges,
+ startTime,
+ endTime);
+ }
+
+ @Override
+ public boolean internallyIncreaseResourceReferenceCount(String
holderMessage) {
+ return false;
+ }
+
+ @Override
+ public boolean internallyDecreaseResourceReferenceCount(String
holderMessage) {
+ return false;
+ }
+
+ @Override
+ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+ final String pipeName,
+ final long creationTime,
+ final PipeTaskMeta pipeTaskMeta,
+ final TreePattern treePattern,
+ final TablePattern tablePattern,
+ final String userName,
+ final boolean skipIfNoPrivileges,
+ final long startTime,
+ final long endTime) {
+ return null;
+ }
+
+ @Override
+ public boolean isGeneratedByPipe() {
+ return false;
+ }
+
+ @Override
+ public boolean mayEventTimeOverlappedWithTimeRange() {
+ return false;
+ }
+
+ @Override
+ public boolean mayEventPathsOverlappedWithPattern() {
+ return false;
+ }
+ }
+}