This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch reduce-submit-self in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6714acb68aed5e37e09ba165a4722145e1af40bf Author: Steve Yurong Su <[email protected]> AuthorDate: Wed May 31 15:19:58 2023 +0800 remove listenable logic --- .../PipeRealtimeDataRegionHybridCollector.java | 13 ++- .../PipeRealtimeDataRegionLogCollector.java | 4 +- .../PipeRealtimeDataRegionTsFileCollector.java | 4 +- .../manager/PipeConnectorSubtaskLifeCycle.java | 7 -- .../event/view/collector/PipeEventCollector.java | 11 -- .../task/queue/ListenableBlockingPendingQueue.java | 116 ++------------------- .../db/pipe/task/stage/PipeTaskProcessorStage.java | 41 +------- 7 files changed, 19 insertions(+), 177 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java index 872b9c56f07..fbed165f7a4 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java @@ -39,7 +39,6 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeDataRegionHybridCollector.class); - // TODO: memory control // This queue is used to store pending events collected by the method collect(). The method // supply() will poll events from this queue and send them to the next pipe plugin. private final ListenableUnboundedBlockingPendingQueue<Event> pendingQueue; @@ -86,7 +85,14 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio } if (!event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)) { - pendingQueue.offer(event); + if (!pendingQueue.offer(event)) { + LOGGER.warn( + String.format( + "collectTabletInsertion: pending queue of PipeRealtimeDataRegionHybridCollector %s has reached capacity, discard tablet event %s, current state %s", + this, event, event.getTsFileEpoch().getState(this))); + // this would not happen, but just in case. + // UnboundedBlockingPendingQueue is unbounded, so it should never reach capacity. + } } } @@ -101,11 +107,10 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio if (!pendingQueue.offer(event)) { LOGGER.warn( String.format( - "Pending Queue of Hybrid Realtime Collector %s has reached capacity, discard TsFile Event %s, current state %s", + "collectTsFileInsertion: pending queue of PipeRealtimeDataRegionHybridCollector %s has reached capacity, discard TsFile event %s, current state %s", this, event, event.getTsFileEpoch().getState(this))); // this would not happen, but just in case. // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity. - // TODO: memory control when elements in queue are too many. } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java index c7c96b650e4..99432deca13 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java @@ -36,7 +36,6 @@ public class PipeRealtimeDataRegionLogCollector extends PipeRealtimeDataRegionCo private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeDataRegionLogCollector.class); - // TODO: memory control // This queue is used to store pending events collected by the method collect(). The method // supply() will poll events from this queue and send them to the next pipe plugin. private final ListenableUnboundedBlockingPendingQueue<Event> pendingQueue; @@ -58,11 +57,10 @@ public class PipeRealtimeDataRegionLogCollector extends PipeRealtimeDataRegionCo if (!pendingQueue.offer(event)) { LOGGER.warn( String.format( - "Pending Queue of Log Realtime Collector %s has reached capacity, discard Tablet Event %s, current state %s", + "collect: pending queue of PipeRealtimeDataRegionLogCollector %s has reached capacity, discard tablet event %s, current state %s", this, event, event.getTsFileEpoch().getState(this))); // this would not happen, but just in case. // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity. - // TODO: memory control when elements in queue are too many. } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java index 42bec421eed..214c616441c 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java @@ -36,7 +36,6 @@ public class PipeRealtimeDataRegionTsFileCollector extends PipeRealtimeDataRegio private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeDataRegionTsFileCollector.class); - // TODO: memory control // This queue is used to store pending events collected by the method collect(). The method // supply() will poll events from this queue and send them to the next pipe plugin. private final ListenableUnboundedBlockingPendingQueue<Event> pendingQueue; @@ -58,11 +57,10 @@ public class PipeRealtimeDataRegionTsFileCollector extends PipeRealtimeDataRegio if (!pendingQueue.offer(event)) { LOGGER.warn( String.format( - "Pending Queue of TsFile Realtime Collector %s has reached capacity, discard TsFile Event %s, current state %s", + "collect: pending queue of PipeRealtimeDataRegionTsFileCollector %s has reached capacity, discard TsFile event %s, current state %s", this, event, event.getTsFileEpoch().getState(this))); // this would not happen, but just in case. // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity. - // TODO: memory control when elements in queue are too many. } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java index bec125eae9e..ce930159c52 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java @@ -103,13 +103,6 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable { @Override public synchronized void close() { - pendingQueue.removeEmptyToNotEmptyListener(subtask.getTaskID()); - pendingQueue.removeNotEmptyToEmptyListener(subtask.getTaskID()); - executor.deregister(subtask.getTaskID()); } - - private synchronized boolean hasRunningTasks() { - return runningTaskCount > 0; - } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java index 6f2a7d82591..e39c16077a5 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java @@ -64,15 +64,4 @@ public class PipeEventCollector implements EventCollector { bufferQueue.offer(event); } } - - public synchronized void tryCollectBufferedEvents() { - while (!bufferQueue.isEmpty()) { - final Event bufferedEvent = bufferQueue.peek(); - if (pendingQueue.offer(bufferedEvent)) { - bufferQueue.poll(); - } else { - return; - } - } - } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java index 8751ffc3f52..04a89de9302 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java @@ -25,11 +25,8 @@ import org.apache.iotdb.pipe.api.event.Event; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; public abstract class ListenableBlockingPendingQueue<E extends Event> { @@ -41,109 +38,22 @@ public abstract class ListenableBlockingPendingQueue<E extends Event> { private final BlockingQueue<E> pendingQueue; - private final Map<String, PendingQueueEmptyToNotEmptyListener> emptyToNotEmptyListeners = - new ConcurrentHashMap<>(); - private final Map<String, PendingQueueNotEmptyToEmptyListener> notEmptyToEmptyListeners = - new ConcurrentHashMap<>(); - private final Map<String, PendingQueueFullToNotFullListener> fullToNotFullListeners = - new ConcurrentHashMap<>(); - private final Map<String, PendingQueueNotFullToFullListener> notFullToFullListeners = - new ConcurrentHashMap<>(); - - private final AtomicBoolean isFull = new AtomicBoolean(false); - protected ListenableBlockingPendingQueue(BlockingQueue<E> pendingQueue) { this.pendingQueue = pendingQueue; } - public ListenableBlockingPendingQueue<E> registerEmptyToNotEmptyListener( - String id, PendingQueueEmptyToNotEmptyListener listener) { - emptyToNotEmptyListeners.put(id, listener); - return this; - } - - public void removeEmptyToNotEmptyListener(String id) { - emptyToNotEmptyListeners.remove(id); - } - - public void notifyEmptyToNotEmptyListeners() { - emptyToNotEmptyListeners - .values() - .forEach(PendingQueueEmptyToNotEmptyListener::onPendingQueueEmptyToNotEmpty); - } - - public ListenableBlockingPendingQueue<E> registerNotEmptyToEmptyListener( - String id, PendingQueueNotEmptyToEmptyListener listener) { - notEmptyToEmptyListeners.put(id, listener); - return this; - } - - public void removeNotEmptyToEmptyListener(String id) { - notEmptyToEmptyListeners.remove(id); - } - - public void notifyNotEmptyToEmptyListeners() { - notEmptyToEmptyListeners - .values() - .forEach(PendingQueueNotEmptyToEmptyListener::onPendingQueueNotEmptyToEmpty); - } - - public ListenableBlockingPendingQueue<E> registerFullToNotFullListener( - String id, PendingQueueFullToNotFullListener listener) { - fullToNotFullListeners.put(id, listener); - return this; - } - - public void removeFullToNotFullListener(String id) { - fullToNotFullListeners.remove(id); - } - - public void notifyFullToNotFullListeners() { - fullToNotFullListeners - .values() - .forEach(PendingQueueFullToNotFullListener::onPendingQueueFullToNotFull); - LOGGER.info("notifyFullToNotFullListeners"); - } - - public ListenableBlockingPendingQueue<E> registerNotFullToFullListener( - String id, PendingQueueNotFullToFullListener listener) { - notFullToFullListeners.put(id, listener); - return this; - } - - public void removeNotFullToFullListener(String id) { - notFullToFullListeners.remove(id); - } - - public void notifyNotFullToFullListeners() { - notFullToFullListeners - .values() - .forEach(PendingQueueNotFullToFullListener::onPendingQueueNotFullToFull); - LOGGER.info("notifyNotFullToFullListeners"); - } - public boolean offer(E event) { - final boolean isEmpty = pendingQueue.isEmpty(); - final boolean isAdded = pendingQueue.offer(event); - - if (isAdded) { - // we don't use size() == 1 to check whether the listener should be called, - // because offer() and size() are not atomic, and we don't want to use lock - // to make them atomic. - if (isEmpty) { - notifyEmptyToNotEmptyListeners(); - } - } else { - if (isFull.compareAndSet(false, true)) { - notifyNotFullToFullListeners(); - } + boolean isAdded = false; + try { + isAdded = pendingQueue.offer(event, MAX_BLOCKING_TIME_MS, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOGGER.info("pending queue offer is interrupted.", e); + Thread.currentThread().interrupt(); } - return isAdded; } public E poll() { - final boolean isEmpty = pendingQueue.isEmpty(); E event = null; try { event = pendingQueue.poll(MAX_BLOCKING_TIME_MS, TimeUnit.MILLISECONDS); @@ -151,20 +61,6 @@ public abstract class ListenableBlockingPendingQueue<E extends Event> { LOGGER.info("pending queue poll is interrupted.", e); Thread.currentThread().interrupt(); } - - if (event == null) { - // we don't use size() == 0 to check whether the listener should be called, - // because poll() and size() are not atomic, and we don't want to use lock - // to make them atomic. - if (!isEmpty) { - notifyNotEmptyToEmptyListeners(); - } - } else { - if (isFull.compareAndSet(true, false)) { - notifyFullToNotFullListeners(); - } - } - return event; } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java index 2e06808a8d3..04eedc94d4a 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java @@ -20,7 +20,6 @@ package org.apache.iotdb.db.pipe.task.stage; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; -import org.apache.iotdb.commons.pipe.task.meta.PipeStatus; import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.pipe.core.event.view.collector.PipeEventCollector; @@ -37,15 +36,10 @@ import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfig import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.exception.PipeException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import javax.annotation.Nullable; public class PipeTaskProcessorStage extends PipeTaskStage { - private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskProcessorStage.class); - protected final PipeProcessorSubtaskExecutor executor = PipeSubtaskExecutorManager.getInstance().getProcessorSubtaskExecutor(); @@ -89,29 +83,8 @@ public class PipeTaskProcessorStage extends PipeTaskStage { pipeProcessor, pipeConnectorOutputEventCollector); - final PipeTaskStage pipeTaskStage = this; this.pipeCollectorInputPendingQueue = pipeCollectorInputPendingQueue; - this.pipeConnectorOutputPendingQueue = - pipeConnectorOutputPendingQueue - .registerNotFullToFullListener( - taskId, - () -> { - executor.stop(pipeProcessorSubtask.getTaskID()); - LOGGER.warn("NotFullToFullListener", new Exception()); - }) - .registerFullToNotFullListener( - taskId, - () -> { - // status can be changed by other threads calling pipeTaskStage's methods - synchronized (pipeTaskStage) { - // only start when the pipe is running - if (status == PipeStatus.RUNNING) { - pipeConnectorOutputEventCollector.tryCollectBufferedEvents(); - executor.start(pipeProcessorSubtask.getTaskID()); - LOGGER.warn("FullToNotFullListener", new Exception()); - } - } - }); + this.pipeConnectorOutputPendingQueue = pipeConnectorOutputPendingQueue; } @Override @@ -144,16 +117,6 @@ public class PipeTaskProcessorStage extends PipeTaskStage { @Override public void dropSubtask() throws PipeException { - final String taskId = pipeProcessorSubtask.getTaskID(); - - if (pipeCollectorInputPendingQueue != null) { - pipeCollectorInputPendingQueue.removeEmptyToNotEmptyListener(taskId); - pipeCollectorInputPendingQueue.removeNotEmptyToEmptyListener(taskId); - } - - pipeConnectorOutputPendingQueue.removeNotFullToFullListener(taskId); - pipeConnectorOutputPendingQueue.removeFullToNotFullListener(taskId); - - executor.deregister(taskId); + executor.deregister(pipeProcessorSubtask.getTaskID()); } }
