This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 63bcf6e874a Subscription: revert ScheduledExecutorService to
ExecutorService to avoid task queue oom & fix subscription task count (#14186)
63bcf6e874a is described below
commit 63bcf6e874aa8fd74b8c2279d92dbb82fd6a5e4c
Author: V_Galaxy <[email protected]>
AuthorDate: Sat Nov 23 01:06:01 2024 +0800
Subscription: revert ScheduledExecutorService to ExecutorService to avoid
task queue oom & fix subscription task count (#14186)
---
.../subtask/connector/PipeConnectorSubtaskLifeCycle.java | 4 ++--
.../agent/task/subtask/processor/PipeProcessorSubtask.java | 4 ++--
.../task/subtask/SubscriptionConnectorSubtask.java | 12 ++----------
.../task/subtask/SubscriptionConnectorSubtaskLifeCycle.java | 8 +-------
.../pipe/agent/task/execution/PipeSubtaskExecutor.java | 6 +++---
.../agent/task/subtask/PipeAbstractConnectorSubtask.java | 6 +++---
.../iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java | 4 ++--
7 files changed, 15 insertions(+), 29 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
index 14d4f604c22..ecbbc641e4b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
@@ -34,8 +34,8 @@ public class PipeConnectorSubtaskLifeCycle implements
AutoCloseable {
protected final PipeConnectorSubtask subtask;
private final UnboundedBlockingPendingQueue<Event> pendingQueue;
- private int runningTaskCount;
- private int registeredTaskCount;
+ protected int runningTaskCount;
+ protected int registeredTaskCount;
public PipeConnectorSubtaskLifeCycle(
PipeConnectorSubtaskExecutor executor,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index e5383bfaae4..7f29b3c55f9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -47,7 +47,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
public class PipeProcessorSubtask extends PipeReportableSubtask {
@@ -96,7 +96,7 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
@Override
public void bindExecutors(
final ListeningExecutorService subtaskWorkerThreadPoolExecutor,
- final ScheduledExecutorService ignored,
+ final ExecutorService ignored,
final PipeSubtaskScheduler subtaskScheduler) {
this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor;
this.subtaskScheduler = subtaskScheduler;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
index c5b841a8c66..9b7587381d9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.subscription.task.subtask;
import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
-import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import
org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtask;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -31,8 +30,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.time.Duration;
-
public class SubscriptionConnectorSubtask extends PipeConnectorSubtask {
private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionConnectorSubtask.class);
@@ -76,13 +73,8 @@ public class SubscriptionConnectorSubtask extends
PipeConnectorSubtask {
@Override
protected void registerCallbackHookAfterSubmit(final
ListenableFuture<Boolean> future) {
- final ListenableFuture<Boolean> nextFuture =
- Futures.withTimeout(
- future,
- Duration.ofSeconds(
-
SubscriptionConfig.getInstance().getSubscriptionDefaultTimeoutInMs()),
- subtaskCallbackListeningExecutor);
- Futures.addCallback(nextFuture, this, subtaskCallbackListeningExecutor);
+ // TODO: Futures.withTimeout
+ Futures.addCallback(future, this, subtaskCallbackListeningExecutor);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
index 57fb2c7004d..359690fa272 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
@@ -34,17 +34,11 @@ public class SubscriptionConnectorSubtaskLifeCycle extends
PipeConnectorSubtaskL
private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionConnectorSubtaskLifeCycle.class);
- private int runningTaskCount;
- private int registeredTaskCount;
-
public SubscriptionConnectorSubtaskLifeCycle(
final PipeConnectorSubtaskExecutor executor, //
SubscriptionSubtaskExecutor
final PipeConnectorSubtask subtask, // SubscriptionConnectorSubtask
final UnboundedBlockingPendingQueue<Event> pendingQueue) {
super(executor, subtask, pendingQueue);
-
- runningTaskCount = 0;
- registeredTaskCount = 0;
}
@Override
@@ -69,7 +63,7 @@ public class SubscriptionConnectorSubtaskLifeCycle extends
PipeConnectorSubtaskL
}
@Override
- public synchronized boolean deregister(final String ignored, int regionId) {
+ public synchronized boolean deregister(final String pipeNameToDeregister,
int regionId) {
if (registeredTaskCount <= 0) {
throw new IllegalStateException("registeredTaskCount <= 0");
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
index c4c96dad3e7..4ea7714962b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
@@ -32,14 +32,14 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
public abstract class PipeSubtaskExecutor {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeSubtaskExecutor.class);
- private static final ScheduledExecutorService
subtaskCallbackListeningExecutor =
- IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ private static final ExecutorService subtaskCallbackListeningExecutor =
+ IoTDBThreadPoolFactory.newSingleThreadExecutor(
ThreadName.PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL.getName());
protected final WrappedThreadPoolExecutor underlyingThreadPool;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
index 58cc142713d..cfd987758e4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
@@ -33,7 +33,7 @@ import
com.google.common.util.concurrent.ListeningExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
public abstract class PipeAbstractConnectorSubtask extends
PipeReportableSubtask {
@@ -43,7 +43,7 @@ public abstract class PipeAbstractConnectorSubtask extends
PipeReportableSubtask
protected PipeConnector outputPipeConnector;
// For thread pool to execute callbacks
- protected ScheduledExecutorService subtaskCallbackListeningExecutor;
+ protected ExecutorService subtaskCallbackListeningExecutor;
// For controlling subtask submitting, making sure that
// a subtask is submitted to only one thread at a time
@@ -62,7 +62,7 @@ public abstract class PipeAbstractConnectorSubtask extends
PipeReportableSubtask
@Override
public void bindExecutors(
final ListeningExecutorService subtaskWorkerThreadPoolExecutor,
- final ScheduledExecutorService subtaskCallbackListeningExecutor,
+ final ExecutorService subtaskCallbackListeningExecutor,
final PipeSubtaskScheduler subtaskScheduler) {
this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor;
this.subtaskCallbackListeningExecutor = subtaskCallbackListeningExecutor;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
index 1169711b46d..2da797c2b3b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
@@ -29,7 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -65,7 +65,7 @@ public abstract class PipeSubtask
public abstract void bindExecutors(
ListeningExecutorService subtaskWorkerThreadPoolExecutor,
- ScheduledExecutorService subtaskCallbackListeningExecutor,
+ ExecutorService subtaskCallbackListeningExecutor,
PipeSubtaskScheduler subtaskScheduler);
@Override