This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 63bcf6e874a Subscription: revert ScheduledExecutorService to 
ExecutorService to avoid task queue oom & fix subscription task count (#14186)
63bcf6e874a is described below

commit 63bcf6e874aa8fd74b8c2279d92dbb82fd6a5e4c
Author: V_Galaxy <[email protected]>
AuthorDate: Sat Nov 23 01:06:01 2024 +0800

    Subscription: revert ScheduledExecutorService to ExecutorService to avoid 
task queue oom & fix subscription task count (#14186)
---
 .../subtask/connector/PipeConnectorSubtaskLifeCycle.java     |  4 ++--
 .../agent/task/subtask/processor/PipeProcessorSubtask.java   |  4 ++--
 .../task/subtask/SubscriptionConnectorSubtask.java           | 12 ++----------
 .../task/subtask/SubscriptionConnectorSubtaskLifeCycle.java  |  8 +-------
 .../pipe/agent/task/execution/PipeSubtaskExecutor.java       |  6 +++---
 .../agent/task/subtask/PipeAbstractConnectorSubtask.java     |  6 +++---
 .../iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java   |  4 ++--
 7 files changed, 15 insertions(+), 29 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
index 14d4f604c22..ecbbc641e4b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
@@ -34,8 +34,8 @@ public class PipeConnectorSubtaskLifeCycle implements 
AutoCloseable {
   protected final PipeConnectorSubtask subtask;
   private final UnboundedBlockingPendingQueue<Event> pendingQueue;
 
-  private int runningTaskCount;
-  private int registeredTaskCount;
+  protected int runningTaskCount;
+  protected int registeredTaskCount;
 
   public PipeConnectorSubtaskLifeCycle(
       PipeConnectorSubtaskExecutor executor,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index e5383bfaae4..7f29b3c55f9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -47,7 +47,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Objects;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class PipeProcessorSubtask extends PipeReportableSubtask {
@@ -96,7 +96,7 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
   @Override
   public void bindExecutors(
       final ListeningExecutorService subtaskWorkerThreadPoolExecutor,
-      final ScheduledExecutorService ignored,
+      final ExecutorService ignored,
       final PipeSubtaskScheduler subtaskScheduler) {
     this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor;
     this.subtaskScheduler = subtaskScheduler;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
index c5b841a8c66..9b7587381d9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.subscription.task.subtask;
 
 import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
-import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import 
org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtask;
 import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
 import org.apache.iotdb.pipe.api.PipeConnector;
@@ -31,8 +30,6 @@ import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.time.Duration;
-
 public class SubscriptionConnectorSubtask extends PipeConnectorSubtask {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SubscriptionConnectorSubtask.class);
@@ -76,13 +73,8 @@ public class SubscriptionConnectorSubtask extends 
PipeConnectorSubtask {
 
   @Override
   protected void registerCallbackHookAfterSubmit(final 
ListenableFuture<Boolean> future) {
-    final ListenableFuture<Boolean> nextFuture =
-        Futures.withTimeout(
-            future,
-            Duration.ofSeconds(
-                
SubscriptionConfig.getInstance().getSubscriptionDefaultTimeoutInMs()),
-            subtaskCallbackListeningExecutor);
-    Futures.addCallback(nextFuture, this, subtaskCallbackListeningExecutor);
+    // TODO: Futures.withTimeout
+    Futures.addCallback(future, this, subtaskCallbackListeningExecutor);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
index 57fb2c7004d..359690fa272 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
@@ -34,17 +34,11 @@ public class SubscriptionConnectorSubtaskLifeCycle extends 
PipeConnectorSubtaskL
   private static final Logger LOGGER =
       LoggerFactory.getLogger(SubscriptionConnectorSubtaskLifeCycle.class);
 
-  private int runningTaskCount;
-  private int registeredTaskCount;
-
   public SubscriptionConnectorSubtaskLifeCycle(
       final PipeConnectorSubtaskExecutor executor, // 
SubscriptionSubtaskExecutor
       final PipeConnectorSubtask subtask, // SubscriptionConnectorSubtask
       final UnboundedBlockingPendingQueue<Event> pendingQueue) {
     super(executor, subtask, pendingQueue);
-
-    runningTaskCount = 0;
-    registeredTaskCount = 0;
   }
 
   @Override
@@ -69,7 +63,7 @@ public class SubscriptionConnectorSubtaskLifeCycle extends 
PipeConnectorSubtaskL
   }
 
   @Override
-  public synchronized boolean deregister(final String ignored, int regionId) {
+  public synchronized boolean deregister(final String pipeNameToDeregister, 
int regionId) {
     if (registeredTaskCount <= 0) {
       throw new IllegalStateException("registeredTaskCount <= 0");
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
index c4c96dad3e7..4ea7714962b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
@@ -32,14 +32,14 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
 
 public abstract class PipeSubtaskExecutor {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeSubtaskExecutor.class);
 
-  private static final ScheduledExecutorService 
subtaskCallbackListeningExecutor =
-      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+  private static final ExecutorService subtaskCallbackListeningExecutor =
+      IoTDBThreadPoolFactory.newSingleThreadExecutor(
           ThreadName.PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL.getName());
 
   protected final WrappedThreadPoolExecutor underlyingThreadPool;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
index 58cc142713d..cfd987758e4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
@@ -33,7 +33,7 @@ import 
com.google.common.util.concurrent.ListeningExecutorService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
 
 public abstract class PipeAbstractConnectorSubtask extends 
PipeReportableSubtask {
 
@@ -43,7 +43,7 @@ public abstract class PipeAbstractConnectorSubtask extends 
PipeReportableSubtask
   protected PipeConnector outputPipeConnector;
 
   // For thread pool to execute callbacks
-  protected ScheduledExecutorService subtaskCallbackListeningExecutor;
+  protected ExecutorService subtaskCallbackListeningExecutor;
 
   // For controlling subtask submitting, making sure that
   // a subtask is submitted to only one thread at a time
@@ -62,7 +62,7 @@ public abstract class PipeAbstractConnectorSubtask extends 
PipeReportableSubtask
   @Override
   public void bindExecutors(
       final ListeningExecutorService subtaskWorkerThreadPoolExecutor,
-      final ScheduledExecutorService subtaskCallbackListeningExecutor,
+      final ExecutorService subtaskCallbackListeningExecutor,
       final PipeSubtaskScheduler subtaskScheduler) {
     this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor;
     this.subtaskCallbackListeningExecutor = subtaskCallbackListeningExecutor;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
index 1169711b46d..2da797c2b3b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
@@ -29,7 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.Callable;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -65,7 +65,7 @@ public abstract class PipeSubtask
 
   public abstract void bindExecutors(
       ListeningExecutorService subtaskWorkerThreadPoolExecutor,
-      ScheduledExecutorService subtaskCallbackListeningExecutor,
+      ExecutorService subtaskCallbackListeningExecutor,
       PipeSubtaskScheduler subtaskScheduler);
 
   @Override

Reply via email to