This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch reduce-submit-self
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6714acb68aed5e37e09ba165a4722145e1af40bf
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed May 31 15:19:58 2023 +0800

    remove listenable logic
---
 .../PipeRealtimeDataRegionHybridCollector.java     |  13 ++-
 .../PipeRealtimeDataRegionLogCollector.java        |   4 +-
 .../PipeRealtimeDataRegionTsFileCollector.java     |   4 +-
 .../manager/PipeConnectorSubtaskLifeCycle.java     |   7 --
 .../event/view/collector/PipeEventCollector.java   |  11 --
 .../task/queue/ListenableBlockingPendingQueue.java | 116 ++-------------------
 .../db/pipe/task/stage/PipeTaskProcessorStage.java |  41 +-------
 7 files changed, 19 insertions(+), 177 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index 872b9c56f07..fbed165f7a4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -39,7 +39,6 @@ public class PipeRealtimeDataRegionHybridCollector extends 
PipeRealtimeDataRegio
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeRealtimeDataRegionHybridCollector.class);
 
-  // TODO: memory control
   // This queue is used to store pending events collected by the method 
collect(). The method
   // supply() will poll events from this queue and send them to the next pipe 
plugin.
   private final ListenableUnboundedBlockingPendingQueue<Event> pendingQueue;
@@ -86,7 +85,14 @@ public class PipeRealtimeDataRegionHybridCollector extends 
PipeRealtimeDataRegio
     }
 
     if 
(!event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)) 
{
-      pendingQueue.offer(event);
+      if (!pendingQueue.offer(event)) {
+        LOGGER.warn(
+            String.format(
+                "collectTabletInsertion: pending queue of 
PipeRealtimeDataRegionHybridCollector %s has reached capacity, discard tablet 
event %s, current state %s",
+                this, event, event.getTsFileEpoch().getState(this)));
+        // this would not happen, but just in case.
+        // UnboundedBlockingPendingQueue is unbounded, so it should never 
reach capacity.
+      }
     }
   }
 
@@ -101,11 +107,10 @@ public class PipeRealtimeDataRegionHybridCollector 
extends PipeRealtimeDataRegio
     if (!pendingQueue.offer(event)) {
       LOGGER.warn(
           String.format(
-              "Pending Queue of Hybrid Realtime Collector %s has reached 
capacity, discard TsFile Event %s, current state %s",
+              "collectTsFileInsertion: pending queue of 
PipeRealtimeDataRegionHybridCollector %s has reached capacity, discard TsFile 
event %s, current state %s",
               this, event, event.getTsFileEpoch().getState(this)));
       // this would not happen, but just in case.
       // ListenableUnblockingPendingQueue is unbounded, so it should never 
reach capacity.
-      // TODO: memory control when elements in queue are too many.
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
index c7c96b650e4..99432deca13 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
@@ -36,7 +36,6 @@ public class PipeRealtimeDataRegionLogCollector extends 
PipeRealtimeDataRegionCo
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeRealtimeDataRegionLogCollector.class);
 
-  // TODO: memory control
   // This queue is used to store pending events collected by the method 
collect(). The method
   // supply() will poll events from this queue and send them to the next pipe 
plugin.
   private final ListenableUnboundedBlockingPendingQueue<Event> pendingQueue;
@@ -58,11 +57,10 @@ public class PipeRealtimeDataRegionLogCollector extends 
PipeRealtimeDataRegionCo
     if (!pendingQueue.offer(event)) {
       LOGGER.warn(
           String.format(
-              "Pending Queue of Log Realtime Collector %s has reached 
capacity, discard Tablet Event %s, current state %s",
+              "collect: pending queue of PipeRealtimeDataRegionLogCollector %s 
has reached capacity, discard tablet event %s, current state %s",
               this, event, event.getTsFileEpoch().getState(this)));
       // this would not happen, but just in case.
       // ListenableUnblockingPendingQueue is unbounded, so it should never 
reach capacity.
-      // TODO: memory control when elements in queue are too many.
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
index 42bec421eed..214c616441c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
@@ -36,7 +36,6 @@ public class PipeRealtimeDataRegionTsFileCollector extends 
PipeRealtimeDataRegio
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeRealtimeDataRegionTsFileCollector.class);
 
-  // TODO: memory control
   // This queue is used to store pending events collected by the method 
collect(). The method
   // supply() will poll events from this queue and send them to the next pipe 
plugin.
   private final ListenableUnboundedBlockingPendingQueue<Event> pendingQueue;
@@ -58,11 +57,10 @@ public class PipeRealtimeDataRegionTsFileCollector extends 
PipeRealtimeDataRegio
     if (!pendingQueue.offer(event)) {
       LOGGER.warn(
           String.format(
-              "Pending Queue of TsFile Realtime Collector %s has reached 
capacity, discard TsFile Event %s, current state %s",
+              "collect: pending queue of PipeRealtimeDataRegionTsFileCollector 
%s has reached capacity, discard TsFile event %s, current state %s",
               this, event, event.getTsFileEpoch().getState(this)));
       // this would not happen, but just in case.
       // ListenableUnblockingPendingQueue is unbounded, so it should never 
reach capacity.
-      // TODO: memory control when elements in queue are too many.
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
index bec125eae9e..ce930159c52 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
@@ -103,13 +103,6 @@ public class PipeConnectorSubtaskLifeCycle implements 
AutoCloseable {
 
   @Override
   public synchronized void close() {
-    pendingQueue.removeEmptyToNotEmptyListener(subtask.getTaskID());
-    pendingQueue.removeNotEmptyToEmptyListener(subtask.getTaskID());
-
     executor.deregister(subtask.getTaskID());
   }
-
-  private synchronized boolean hasRunningTasks() {
-    return runningTaskCount > 0;
-  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
index 6f2a7d82591..e39c16077a5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
@@ -64,15 +64,4 @@ public class PipeEventCollector implements EventCollector {
       bufferQueue.offer(event);
     }
   }
-
-  public synchronized void tryCollectBufferedEvents() {
-    while (!bufferQueue.isEmpty()) {
-      final Event bufferedEvent = bufferQueue.peek();
-      if (pendingQueue.offer(bufferedEvent)) {
-        bufferQueue.poll();
-      } else {
-        return;
-      }
-    }
-  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
index 8751ffc3f52..04a89de9302 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
@@ -25,11 +25,8 @@ import org.apache.iotdb.pipe.api.event.Event;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 public abstract class ListenableBlockingPendingQueue<E extends Event> {
 
@@ -41,109 +38,22 @@ public abstract class ListenableBlockingPendingQueue<E 
extends Event> {
 
   private final BlockingQueue<E> pendingQueue;
 
-  private final Map<String, PendingQueueEmptyToNotEmptyListener> 
emptyToNotEmptyListeners =
-      new ConcurrentHashMap<>();
-  private final Map<String, PendingQueueNotEmptyToEmptyListener> 
notEmptyToEmptyListeners =
-      new ConcurrentHashMap<>();
-  private final Map<String, PendingQueueFullToNotFullListener> 
fullToNotFullListeners =
-      new ConcurrentHashMap<>();
-  private final Map<String, PendingQueueNotFullToFullListener> 
notFullToFullListeners =
-      new ConcurrentHashMap<>();
-
-  private final AtomicBoolean isFull = new AtomicBoolean(false);
-
   protected ListenableBlockingPendingQueue(BlockingQueue<E> pendingQueue) {
     this.pendingQueue = pendingQueue;
   }
 
-  public ListenableBlockingPendingQueue<E> registerEmptyToNotEmptyListener(
-      String id, PendingQueueEmptyToNotEmptyListener listener) {
-    emptyToNotEmptyListeners.put(id, listener);
-    return this;
-  }
-
-  public void removeEmptyToNotEmptyListener(String id) {
-    emptyToNotEmptyListeners.remove(id);
-  }
-
-  public void notifyEmptyToNotEmptyListeners() {
-    emptyToNotEmptyListeners
-        .values()
-        
.forEach(PendingQueueEmptyToNotEmptyListener::onPendingQueueEmptyToNotEmpty);
-  }
-
-  public ListenableBlockingPendingQueue<E> registerNotEmptyToEmptyListener(
-      String id, PendingQueueNotEmptyToEmptyListener listener) {
-    notEmptyToEmptyListeners.put(id, listener);
-    return this;
-  }
-
-  public void removeNotEmptyToEmptyListener(String id) {
-    notEmptyToEmptyListeners.remove(id);
-  }
-
-  public void notifyNotEmptyToEmptyListeners() {
-    notEmptyToEmptyListeners
-        .values()
-        
.forEach(PendingQueueNotEmptyToEmptyListener::onPendingQueueNotEmptyToEmpty);
-  }
-
-  public ListenableBlockingPendingQueue<E> registerFullToNotFullListener(
-      String id, PendingQueueFullToNotFullListener listener) {
-    fullToNotFullListeners.put(id, listener);
-    return this;
-  }
-
-  public void removeFullToNotFullListener(String id) {
-    fullToNotFullListeners.remove(id);
-  }
-
-  public void notifyFullToNotFullListeners() {
-    fullToNotFullListeners
-        .values()
-        
.forEach(PendingQueueFullToNotFullListener::onPendingQueueFullToNotFull);
-    LOGGER.info("notifyFullToNotFullListeners");
-  }
-
-  public ListenableBlockingPendingQueue<E> registerNotFullToFullListener(
-      String id, PendingQueueNotFullToFullListener listener) {
-    notFullToFullListeners.put(id, listener);
-    return this;
-  }
-
-  public void removeNotFullToFullListener(String id) {
-    notFullToFullListeners.remove(id);
-  }
-
-  public void notifyNotFullToFullListeners() {
-    notFullToFullListeners
-        .values()
-        
.forEach(PendingQueueNotFullToFullListener::onPendingQueueNotFullToFull);
-    LOGGER.info("notifyNotFullToFullListeners");
-  }
-
   public boolean offer(E event) {
-    final boolean isEmpty = pendingQueue.isEmpty();
-    final boolean isAdded = pendingQueue.offer(event);
-
-    if (isAdded) {
-      // we don't use size() == 1 to check whether the listener should be 
called,
-      // because offer() and size() are not atomic, and we don't want to use 
lock
-      // to make them atomic.
-      if (isEmpty) {
-        notifyEmptyToNotEmptyListeners();
-      }
-    } else {
-      if (isFull.compareAndSet(false, true)) {
-        notifyNotFullToFullListeners();
-      }
+    boolean isAdded = false;
+    try {
+      isAdded = pendingQueue.offer(event, MAX_BLOCKING_TIME_MS, 
TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.info("pending queue offer is interrupted.", e);
+      Thread.currentThread().interrupt();
     }
-
     return isAdded;
   }
 
   public E poll() {
-    final boolean isEmpty = pendingQueue.isEmpty();
     E event = null;
     try {
       event = pendingQueue.poll(MAX_BLOCKING_TIME_MS, TimeUnit.MILLISECONDS);
@@ -151,20 +61,6 @@ public abstract class ListenableBlockingPendingQueue<E 
extends Event> {
       LOGGER.info("pending queue poll is interrupted.", e);
       Thread.currentThread().interrupt();
     }
-
-    if (event == null) {
-      // we don't use size() == 0 to check whether the listener should be 
called,
-      // because poll() and size() are not atomic, and we don't want to use 
lock
-      // to make them atomic.
-      if (!isEmpty) {
-        notifyNotEmptyToEmptyListeners();
-      }
-    } else {
-      if (isFull.compareAndSet(true, false)) {
-        notifyFullToNotFullListeners();
-      }
-    }
-
     return event;
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 2e06808a8d3..04eedc94d4a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.pipe.task.stage;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.core.event.view.collector.PipeEventCollector;
@@ -37,15 +36,10 @@ import 
org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfig
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import javax.annotation.Nullable;
 
 public class PipeTaskProcessorStage extends PipeTaskStage {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTaskProcessorStage.class);
-
   protected final PipeProcessorSubtaskExecutor executor =
       PipeSubtaskExecutorManager.getInstance().getProcessorSubtaskExecutor();
 
@@ -89,29 +83,8 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
             pipeProcessor,
             pipeConnectorOutputEventCollector);
 
-    final PipeTaskStage pipeTaskStage = this;
     this.pipeCollectorInputPendingQueue = pipeCollectorInputPendingQueue;
-    this.pipeConnectorOutputPendingQueue =
-        pipeConnectorOutputPendingQueue
-            .registerNotFullToFullListener(
-                taskId,
-                () -> {
-                  executor.stop(pipeProcessorSubtask.getTaskID());
-                  LOGGER.warn("NotFullToFullListener", new Exception());
-                })
-            .registerFullToNotFullListener(
-                taskId,
-                () -> {
-                  // status can be changed by other threads calling 
pipeTaskStage's methods
-                  synchronized (pipeTaskStage) {
-                    // only start when the pipe is running
-                    if (status == PipeStatus.RUNNING) {
-                      
pipeConnectorOutputEventCollector.tryCollectBufferedEvents();
-                      executor.start(pipeProcessorSubtask.getTaskID());
-                      LOGGER.warn("FullToNotFullListener", new Exception());
-                    }
-                  }
-                });
+    this.pipeConnectorOutputPendingQueue = pipeConnectorOutputPendingQueue;
   }
 
   @Override
@@ -144,16 +117,6 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
 
   @Override
   public void dropSubtask() throws PipeException {
-    final String taskId = pipeProcessorSubtask.getTaskID();
-
-    if (pipeCollectorInputPendingQueue != null) {
-      pipeCollectorInputPendingQueue.removeEmptyToNotEmptyListener(taskId);
-      pipeCollectorInputPendingQueue.removeNotEmptyToEmptyListener(taskId);
-    }
-
-    pipeConnectorOutputPendingQueue.removeNotFullToFullListener(taskId);
-    pipeConnectorOutputPendingQueue.removeFullToNotFullListener(taskId);
-
-    executor.deregister(taskId);
+    executor.deregister(pipeProcessorSubtask.getTaskID());
   }
 }

Reply via email to