This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch reduce-submit-self in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 71e71bc8f99822f6a3c1d39142bd59e9acc4102f Author: Steve Yurong Su <[email protected]> AuthorDate: Wed May 31 12:34:01 2023 +0800 remove empty listeners --- .../manager/PipeConnectorSubtaskLifeCycle.java | 10 -------- .../task/queue/ListenableBlockingPendingQueue.java | 2 ++ .../db/pipe/task/stage/PipeTaskProcessorStage.java | 29 +++++++++------------- 3 files changed, 14 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java index 8827bbb8d59..bec125eae9e 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java @@ -41,16 +41,6 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable { this.subtask = subtask; this.pendingQueue = pendingQueue; - pendingQueue.registerEmptyToNotEmptyListener( - subtask.getTaskID(), - () -> { - if (hasRunningTasks()) { - executor.start(subtask.getTaskID()); - } - }); - this.pendingQueue.registerNotEmptyToEmptyListener( - subtask.getTaskID(), () -> executor.stop(subtask.getTaskID())); - runningTaskCount = 0; aliveTaskCount = 0; } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java index 265bdf09234..8751ffc3f52 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java @@ -102,6 +102,7 @@ public abstract class ListenableBlockingPendingQueue<E extends Event> { fullToNotFullListeners .values() .forEach(PendingQueueFullToNotFullListener::onPendingQueueFullToNotFull); + LOGGER.info("notifyFullToNotFullListeners"); } public ListenableBlockingPendingQueue<E> registerNotFullToFullListener( @@ -118,6 +119,7 @@ public abstract class ListenableBlockingPendingQueue<E extends Event> { notFullToFullListeners .values() .forEach(PendingQueueNotFullToFullListener::onPendingQueueNotFullToFull); + LOGGER.info("notifyNotFullToFullListeners"); } public boolean offer(E event) { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java index 40a1b7d7be0..2e06808a8d3 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java @@ -37,10 +37,15 @@ import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfig import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; public class PipeTaskProcessorStage extends PipeTaskStage { + private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskProcessorStage.class); + protected final PipeProcessorSubtaskExecutor executor = PipeSubtaskExecutorManager.getInstance().getProcessorSubtaskExecutor(); @@ -85,26 +90,15 @@ public class PipeTaskProcessorStage extends PipeTaskStage { pipeConnectorOutputEventCollector); final PipeTaskStage pipeTaskStage = this; - this.pipeCollectorInputPendingQueue = - pipeCollectorInputPendingQueue != null - ? pipeCollectorInputPendingQueue - .registerEmptyToNotEmptyListener( - taskId, - () -> { - // status can be changed by other threads calling pipeTaskStage's methods - synchronized (pipeTaskStage) { - if (status == PipeStatus.RUNNING) { - executor.start(pipeProcessorSubtask.getTaskID()); - } - } - }) - .registerNotEmptyToEmptyListener( - taskId, () -> executor.stop(pipeProcessorSubtask.getTaskID())) - : null; + this.pipeCollectorInputPendingQueue = pipeCollectorInputPendingQueue; this.pipeConnectorOutputPendingQueue = pipeConnectorOutputPendingQueue .registerNotFullToFullListener( - taskId, () -> executor.stop(pipeProcessorSubtask.getTaskID())) + taskId, + () -> { + executor.stop(pipeProcessorSubtask.getTaskID()); + LOGGER.warn("NotFullToFullListener", new Exception()); + }) .registerFullToNotFullListener( taskId, () -> { @@ -114,6 +108,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage { if (status == PipeStatus.RUNNING) { pipeConnectorOutputEventCollector.tryCollectBufferedEvents(); executor.start(pipeProcessorSubtask.getTaskID()); + LOGGER.warn("FullToNotFullListener", new Exception()); } } });
