This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0a9b05b211f Subscription: shared thread pool between consumers in one
process (#12606)
0a9b05b211f is described below
commit 0a9b05b211f57f53c3128b687ccd83be81e7618f
Author: V_Galaxy <[email protected]>
AuthorDate: Thu May 30 21:16:08 2024 +0800
Subscription: shared thread pool between consumers in one process (#12606)
---
.../apache/iotdb/SubscriptionSessionExample.java | 2 +-
.../it/dual/IoTDBSubscriptionConsumerGroupIT.java | 6 +-
.../it/dual/IoTDBSubscriptionTopicIT.java | 3 +-
.../it/local/IoTDBSubscriptionBasicIT.java | 11 +-
.../it/local/IoTDBSubscriptionIdempotentIT.java | 2 +-
.../it/local/IoTDBSubscriptionRestartIT.java | 2 +-
.../rpc/subscription/config/ConsumerConstant.java | 32 +-
.../subscription/{ => consumer}/AckStrategy.java | 2 +-
.../{ => consumer}/AsyncCommitCallback.java | 2 +-
.../{ => consumer}/ConsumeListener.java | 2 +-
.../subscription/{ => consumer}/ConsumeResult.java | 2 +-
.../{ => consumer}/SubscriptionConsumer.java | 376 +++++++++------------
.../SubscriptionExecutorServiceManager.java | 281 +++++++++++++++
.../{ => consumer}/SubscriptionProvider.java | 4 +-
.../{ => consumer}/SubscriptionProviders.java | 21 +-
.../{ => consumer}/SubscriptionPullConsumer.java | 160 ++++-----
.../{ => consumer}/SubscriptionPushConsumer.java | 249 ++++++++------
.../event/SubscriptionEventBinaryCache.java | 2 +-
.../apache/iotdb/commons/conf/CommonConfig.java | 2 +-
.../meta/consumer/ConsumerGroupMetaKeeper.java | 2 +-
.../subscription/meta/consumer/ConsumerMeta.java | 2 +-
21 files changed, 732 insertions(+), 433 deletions(-)
diff --git
a/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
b/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
index 383b16a345a..62c869e02bf 100644
---
a/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
+++
b/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
@@ -24,8 +24,8 @@ import org.apache.iotdb.isession.util.Version;
import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
import org.apache.iotdb.rpc.subscription.config.TopicConstant;
import org.apache.iotdb.session.Session;
-import org.apache.iotdb.session.subscription.SubscriptionPullConsumer;
import org.apache.iotdb.session.subscription.SubscriptionSession;
+import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
index 7eccef3f76a..07d5b4ed791 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
@@ -29,8 +29,8 @@ import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2Subscription;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.rpc.subscription.config.TopicConstant;
-import org.apache.iotdb.session.subscription.SubscriptionPullConsumer;
import org.apache.iotdb.session.subscription.SubscriptionSession;
+import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
import
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
@@ -900,7 +900,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
}
private SubscriptionPullConsumer createConsumerAndSubscribeTopics(
- final SubscriptionInfo subscriptionInfo) throws Exception {
+ final SubscriptionInfo subscriptionInfo) {
final SubscriptionPullConsumer consumer =
new SubscriptionPullConsumer.Builder()
.host(senderEnv.getIP())
@@ -908,6 +908,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
.consumerId(subscriptionInfo.consumerId)
.consumerGroupId(subscriptionInfo.consumerGroupId)
.autoCommit(false)
+ .fileSaveDir(System.getProperty("java.io.tmpdir")) // hack for
license check
.buildPullConsumer();
consumer.open();
consumer.subscribe(subscriptionInfo.topicNames);
@@ -924,7 +925,6 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
final List<Thread> threads = new ArrayList<>();
for (int i = 0; i < consumers.size(); ++i) {
final int index = i;
- final String consumerId = consumers.get(index).getConsumerId();
final String consumerGroupId = consumers.get(index).getConsumerGroupId();
final Thread t =
new Thread(
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
index f2d4e2191bf..7091c93b4db 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
@@ -28,8 +28,8 @@ import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2Subscription;
import org.apache.iotdb.rpc.subscription.config.TopicConstant;
import
org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeCriticalException;
-import org.apache.iotdb.session.subscription.SubscriptionPullConsumer;
import org.apache.iotdb.session.subscription.SubscriptionSession;
+import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
@@ -578,6 +578,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
.consumerId("c1")
.consumerGroupId("cg1")
.autoCommit(false)
+ .fileSaveDir(System.getProperty("java.io.tmpdir")) //
hack for license check
.buildPullConsumer()) {
consumer.open();
consumer.subscribe(topicName);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
index 88a8c15dbd1..46b697e44db 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
@@ -24,12 +24,12 @@ import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.apache.iotdb.rpc.subscription.config.TopicConstant;
-import org.apache.iotdb.session.subscription.AckStrategy;
-import org.apache.iotdb.session.subscription.AsyncCommitCallback;
-import org.apache.iotdb.session.subscription.ConsumeResult;
-import org.apache.iotdb.session.subscription.SubscriptionPullConsumer;
-import org.apache.iotdb.session.subscription.SubscriptionPushConsumer;
import org.apache.iotdb.session.subscription.SubscriptionSession;
+import org.apache.iotdb.session.subscription.consumer.AckStrategy;
+import org.apache.iotdb.session.subscription.consumer.AsyncCommitCallback;
+import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
+import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
+import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionFileHandler;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
@@ -210,6 +210,7 @@ public class IoTDBSubscriptionBasicIT {
.consumerId("c1")
.consumerGroupId("cg1")
.autoCommit(false)
+ .fileSaveDir(System.getProperty("java.io.tmpdir")) //
hack for license check
.buildPullConsumer()) {
consumer.open();
consumer.subscribe(topicName);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
index f97d642b384..b671acd6709 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
@@ -22,8 +22,8 @@ package org.apache.iotdb.subscription.it.local;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
-import org.apache.iotdb.session.subscription.SubscriptionPullConsumer;
import org.apache.iotdb.session.subscription.SubscriptionSession;
+import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
import org.junit.After;
import org.junit.Assert;
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
index 2b13acdc6d0..231845921e1 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
@@ -32,8 +32,8 @@ import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.session.subscription.SubscriptionPullConsumer;
import org.apache.iotdb.session.subscription.SubscriptionSession;
+import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
index efb0e53a649..d449b2f9741 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
@@ -35,36 +35,42 @@ public class ConsumerConstant {
public static final String CONSUMER_ID_KEY = "consumer-id";
public static final String CONSUMER_GROUP_ID_KEY = "group-id";
- public static final String HEARTBEAT_INTERVAL_MS_KEY =
"heartbeat-interval-ms"; // unit: ms
- public static final long HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE = 30_000;
- public static final long HEARTBEAT_INTERVAL_MS_MIN_VALUE = 1_000;
+ public static final String HEARTBEAT_INTERVAL_MS_KEY =
"heartbeat-interval-ms";
+ public static final long HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE = 30_000L;
+ public static final long HEARTBEAT_INTERVAL_MS_MIN_VALUE = 1_000L;
- public static final String ENDPOINTS_SYNC_INTERVAL_MS_KEY =
- "endpoints-sync-interval-ms"; // unit: ms
- public static final long ENDPOINTS_SYNC_INTERVAL_MS_DEFAULT_VALUE = 120_000;
- public static final long ENDPOINTS_SYNC_INTERVAL_MS_MIN_VALUE = 5_000;
+ public static final String ENDPOINTS_SYNC_INTERVAL_MS_KEY =
"endpoints-sync-interval-ms";
+ public static final long ENDPOINTS_SYNC_INTERVAL_MS_DEFAULT_VALUE = 120_000L;
+ public static final long ENDPOINTS_SYNC_INTERVAL_MS_MIN_VALUE = 5_000L;
public static final String FILE_SAVE_DIR_KEY = "file-save-dir";
public static final String FILE_SAVE_DIR_DEFAULT_VALUE =
Paths.get(System.getProperty("user.dir"),
"iotdb-subscription").toString();
+ public static final String FILE_SAVE_FSYNC_KEY = "file-save-fsync";
+ public static final boolean FILE_SAVE_FSYNC_DEFAULT_VALUE = false;
+
/////////////////////////////// pull consumer ///////////////////////////////
public static final String AUTO_COMMIT_KEY = "auto-commit";
public static final boolean AUTO_COMMIT_DEFAULT_VALUE = true;
- public static final String AUTO_COMMIT_INTERVAL_MS_KEY =
"auto-commit-interval-ms"; // unit: ms
- public static final long AUTO_COMMIT_INTERVAL_MS_DEFAULT_VALUE = 5_000;
- public static final long AUTO_COMMIT_INTERVAL_MS_MIN_VALUE = 500;
+ public static final String AUTO_COMMIT_INTERVAL_MS_KEY =
"auto-commit-interval-ms";
+ public static final long AUTO_COMMIT_INTERVAL_MS_DEFAULT_VALUE = 5_000L;
+ public static final long AUTO_COMMIT_INTERVAL_MS_MIN_VALUE = 500L;
/////////////////////////////// push consumer ///////////////////////////////
public static final String ACK_STRATEGY_KEY = "ack-strategy";
public static final String CONSUME_LISTENER_KEY = "consume-listener";
- // TODO: configure those parameters
- public static final int PUSH_CONSUMER_AUTO_POLL_INTERVAL_MS = 5_000;
- public static final int PUSH_CONSUMER_AUTO_POLL_TIME_OUT_MS = 10_000;
+ public static final String AUTO_POLL_INTERVAL_MS_KEY =
"auto-poll-interval-ms";
+ public static final long AUTO_POLL_INTERVAL_MS_DEFAULT_VALUE = 5_000L;
+ public static final long AUTO_POLL_INTERVAL_MS_MIN_VALUE = 500L;
+
+ public static final String AUTO_POLL_TIMEOUT_MS_KEY = "auto-poll-timeout-ms";
+ public static final long AUTO_POLL_TIMEOUT_MS_DEFAULT_VALUE = 10_000L;
+ public static final long AUTO_POLL_TIMEOUT_MS_MIN_VALUE = 1_000L;
private ConsumerConstant() {
throw new IllegalStateException("Utility class");
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/AckStrategy.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/AckStrategy.java
similarity index 94%
rename from
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/AckStrategy.java
rename to
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/AckStrategy.java
index 217d50ffd02..81f29e2497f 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/AckStrategy.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/AckStrategy.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.session.subscription;
+package org.apache.iotdb.session.subscription.consumer;
public enum AckStrategy {
BEFORE_CONSUME,
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/AsyncCommitCallback.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/AsyncCommitCallback.java
similarity index 94%
rename from
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/AsyncCommitCallback.java
rename to
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/AsyncCommitCallback.java
index 52e0eb7e091..3fd7d91cba9 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/AsyncCommitCallback.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/AsyncCommitCallback.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.session.subscription;
+package org.apache.iotdb.session.subscription.consumer;
public interface AsyncCommitCallback {
default void onComplete() {
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/ConsumeListener.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/ConsumeListener.java
similarity index 94%
rename from
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/ConsumeListener.java
rename to
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/ConsumeListener.java
index e26d7986ae7..39488e0222e 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/ConsumeListener.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/ConsumeListener.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.session.subscription;
+package org.apache.iotdb.session.subscription.consumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/ConsumeResult.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/ConsumeResult.java
similarity index 93%
rename from
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/ConsumeResult.java
rename to
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/ConsumeResult.java
index 63bf701a02d..c674c6ce615 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/ConsumeResult.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/ConsumeResult.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.session.subscription;
+package org.apache.iotdb.session.subscription.consumer;
public enum ConsumeResult {
SUCCESS,
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
similarity index 83%
rename from
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionConsumer.java
rename to
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index 58655aa2f09..2bbd4a129a9 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.session.subscription;
+package org.apache.iotdb.session.subscription.consumer;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.isession.SessionConfig;
@@ -66,14 +66,11 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
-public abstract class SubscriptionConsumer implements AutoCloseable {
+abstract class SubscriptionConsumer implements AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionConsumer.class);
@@ -88,42 +85,12 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
private final long heartbeatIntervalMs;
private final long endpointsSyncIntervalMs;
- private final SubscriptionProviders subscriptionProviders;
-
- private ScheduledExecutorService heartbeatWorkerExecutor;
- private ScheduledExecutorService endpointsSyncerExecutor;
-
- private ExecutorService asyncCommitExecutor;
+ private final SubscriptionProviders providers;
private final AtomicBoolean isClosed = new AtomicBoolean(true);
private final String fileSaveDir;
-
- private Path getFileDir(final String topicName) throws IOException {
- final Path dirPath =
-
Paths.get(fileSaveDir).resolve(consumerGroupId).resolve(consumerId).resolve(topicName);
- Files.createDirectories(dirPath);
- return dirPath;
- }
-
- private Path getFilePath(final String topicName, String fileName) throws
SubscriptionException {
- Path filePath;
- try {
- filePath = getFileDir(topicName).resolve(fileName);
- Files.createFile(filePath);
- } catch (final FileAlreadyExistsException fileAlreadyExistsException) {
- fileName += "." + RandomStringGenerator.generate(16);
- try {
- filePath = getFileDir(topicName).resolve(fileName);
- Files.createFile(filePath);
- } catch (final IOException e) {
- throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), e);
- }
- } catch (final IOException e) {
- throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), e);
- }
- return filePath;
- }
+ private final boolean fileSaveFsync;
public String getConsumerId() {
return consumerId;
@@ -136,7 +103,7 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
/////////////////////////////// ctor ///////////////////////////////
protected SubscriptionConsumer(final Builder builder) {
- final List<TEndPoint> initialEndpoints = new ArrayList<>();
+ final Set<TEndPoint> initialEndpoints = new HashSet<>();
// From org.apache.iotdb.session.Session.getNodeUrls
// Priority is given to `host:port` over `nodeUrls`.
if (Objects.nonNull(builder.host)) {
@@ -144,7 +111,7 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
} else {
initialEndpoints.addAll(SessionUtils.parseSeedNodeUrls(builder.nodeUrls));
}
- this.subscriptionProviders = new SubscriptionProviders(initialEndpoints);
+ this.providers = new SubscriptionProviders(initialEndpoints);
this.username = builder.username;
this.password = builder.password;
@@ -156,6 +123,7 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
this.endpointsSyncIntervalMs = builder.endpointsSyncIntervalMs;
this.fileSaveDir = builder.fileSaveDir;
+ this.fileSaveFsync = builder.fileSaveFsync;
}
protected SubscriptionConsumer(final Builder builder, final Properties
properties) {
@@ -192,7 +160,12 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
(String)
properties.getOrDefault(
ConsumerConstant.FILE_SAVE_DIR_KEY,
- ConsumerConstant.FILE_SAVE_DIR_DEFAULT_VALUE)));
+ ConsumerConstant.FILE_SAVE_DIR_DEFAULT_VALUE))
+ .fileSaveFsync(
+ (Boolean)
+ properties.getOrDefault(
+ ConsumerConstant.FILE_SAVE_FSYNC_KEY,
+ ConsumerConstant.FILE_SAVE_FSYNC_DEFAULT_VALUE)));
}
/////////////////////////////// open & close ///////////////////////////////
@@ -203,18 +176,18 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
}
// open subscription providers
- subscriptionProviders.acquireWriteLock();
+ providers.acquireWriteLock();
try {
- subscriptionProviders.openProviders(this); // throw SubscriptionException
+ providers.openProviders(this); // throw SubscriptionException
} finally {
- subscriptionProviders.releaseWriteLock();
+ providers.releaseWriteLock();
}
- // launch heartbeat worker
- launchHeartbeatWorker();
+ // submit heartbeat worker
+ submitHeartbeatWorker();
- // launch endpoints syncer
- launchEndpointsSyncer();
+ // submit endpoints syncer
+ submitEndpointsSyncer();
isClosed.set(false);
}
@@ -225,26 +198,12 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
return;
}
- try {
- // shutdown endpoints syncer
- shutdownEndpointsSyncer();
-
- // shutdown heartbeat worker
- shutdownHeartbeatWorker();
-
- // shutdown async commit worker if needed
- shutdownAsyncCommitWorkerIfNeeded();
+ // close subscription providers
+ providers.acquireWriteLock();
+ providers.closeProviders();
+ providers.releaseWriteLock();
- // close subscription providers
- subscriptionProviders.acquireWriteLock();
- try {
- subscriptionProviders.closeProviders();
- } finally {
- subscriptionProviders.releaseWriteLock();
- }
- } finally {
- isClosed.set(true);
- }
+ isClosed.set(true);
}
boolean isClosed() {
@@ -262,11 +221,11 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
}
public void subscribe(final Set<String> topicNames) throws
SubscriptionException {
- subscriptionProviders.acquireReadLock();
+ providers.acquireReadLock();
try {
subscribeWithRedirection(topicNames);
} finally {
- subscriptionProviders.releaseReadLock();
+ providers.releaseReadLock();
}
}
@@ -279,74 +238,14 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
}
public void unsubscribe(final Set<String> topicNames) throws
SubscriptionException {
- subscriptionProviders.acquireReadLock();
+ providers.acquireReadLock();
try {
unsubscribeWithRedirection(topicNames);
} finally {
- subscriptionProviders.releaseReadLock();
+ providers.releaseReadLock();
}
}
- /////////////////////////////// heartbeat ///////////////////////////////
-
- @SuppressWarnings("unsafeThreadSchedule")
- private void launchHeartbeatWorker() {
- heartbeatWorkerExecutor =
- Executors.newSingleThreadScheduledExecutor(
- r -> {
- final Thread t =
- new Thread(
- Thread.currentThread().getThreadGroup(), r,
"ConsumerHeartbeatWorker", 0);
- if (!t.isDaemon()) {
- t.setDaemon(true);
- }
- if (t.getPriority() != Thread.NORM_PRIORITY) {
- t.setPriority(Thread.NORM_PRIORITY);
- }
- return t;
- });
- heartbeatWorkerExecutor.scheduleWithFixedDelay(
- () -> subscriptionProviders.heartbeat(this),
- generateRandomInitialDelayMs(heartbeatIntervalMs),
- heartbeatIntervalMs,
- TimeUnit.MILLISECONDS);
- }
-
- private void shutdownHeartbeatWorker() {
- heartbeatWorkerExecutor.shutdown();
- heartbeatWorkerExecutor = null;
- }
-
- /////////////////////////////// sync endpoints
///////////////////////////////
-
- @SuppressWarnings("unsafeThreadSchedule")
- private void launchEndpointsSyncer() {
- endpointsSyncerExecutor =
- Executors.newSingleThreadScheduledExecutor(
- r -> {
- final Thread t =
- new Thread(
- Thread.currentThread().getThreadGroup(), r,
"SubscriptionEndpointsSyncer", 0);
- if (!t.isDaemon()) {
- t.setDaemon(true);
- }
- if (t.getPriority() != Thread.NORM_PRIORITY) {
- t.setPriority(Thread.NORM_PRIORITY);
- }
- return t;
- });
- endpointsSyncerExecutor.scheduleWithFixedDelay(
- () -> subscriptionProviders.sync(this),
- generateRandomInitialDelayMs(endpointsSyncIntervalMs),
- endpointsSyncIntervalMs,
- TimeUnit.MILLISECONDS);
- }
-
- private void shutdownEndpointsSyncer() {
- endpointsSyncerExecutor.shutdown();
- endpointsSyncerExecutor = null;
- }
-
/////////////////////////////// subscription provider
///////////////////////////////
SubscriptionProvider constructProviderAndHandshake(final TEndPoint endPoint)
@@ -376,6 +275,34 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
return provider;
}
+ /////////////////////////////// file ops ///////////////////////////////
+
+ private Path getFileDir(final String topicName) throws IOException {
+ final Path dirPath =
+
Paths.get(fileSaveDir).resolve(consumerGroupId).resolve(consumerId).resolve(topicName);
+ Files.createDirectories(dirPath);
+ return dirPath;
+ }
+
+ private Path getFilePath(final String topicName, String fileName) throws
SubscriptionException {
+ Path filePath;
+ try {
+ filePath = getFileDir(topicName).resolve(fileName);
+ Files.createFile(filePath);
+ } catch (final FileAlreadyExistsException fileAlreadyExistsException) {
+ fileName += "." + RandomStringGenerator.generate(16);
+ try {
+ filePath = getFileDir(topicName).resolve(fileName);
+ Files.createFile(filePath);
+ } catch (final IOException e) {
+ throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), e);
+ }
+ } catch (final IOException e) {
+ throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), e);
+ }
+ return filePath;
+ }
+
/////////////////////////////// poll ///////////////////////////////
protected List<SubscriptionMessage> poll(final Set<String> topicNames, final
long timeoutMs)
@@ -546,7 +473,9 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
// write file piece
fileWriter.write(((FilePiecePayload) payload).getFilePiece());
- fileWriter.getFD().sync();
+ if (fileSaveFsync) {
+ fileWriter.getFD().sync();
+ }
// check offset
if (!Objects.equals(
@@ -599,8 +528,10 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
throw new SubscriptionRuntimeNonCriticalException(errorMessage);
}
- // sync and close
- fileWriter.getFD().sync();
+ // optional sync and close
+ if (fileSaveFsync) {
+ fileWriter.getFD().sync();
+ }
fileWriter.close();
LOGGER.info(
@@ -641,9 +572,9 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
private List<SubscriptionPollResponse> pollInternal(final Set<String>
topicNames)
throws SubscriptionException {
- subscriptionProviders.acquireReadLock();
+ providers.acquireReadLock();
try {
- final SubscriptionProvider provider =
subscriptionProviders.getNextAvailableProvider();
+ final SubscriptionProvider provider =
providers.getNextAvailableProvider();
if (Objects.isNull(provider) || !provider.isAvailable()) {
throw new SubscriptionConnectionException(
String.format(
@@ -659,16 +590,16 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
return Collections.emptyList();
}
} finally {
- subscriptionProviders.releaseReadLock();
+ providers.releaseReadLock();
}
}
private List<SubscriptionPollResponse> pollFileInternal(
final int dataNodeId, final String topicName, final String fileName,
final long writingOffset)
throws SubscriptionException {
- subscriptionProviders.acquireReadLock();
+ providers.acquireReadLock();
try {
- final SubscriptionProvider provider =
subscriptionProviders.getProvider(dataNodeId);
+ final SubscriptionProvider provider = providers.getProvider(dataNodeId);
if (Objects.isNull(provider) || !provider.isAvailable()) {
throw new SubscriptionConnectionException(
String.format(
@@ -686,7 +617,7 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
return Collections.emptyList();
}
} finally {
- subscriptionProviders.releaseReadLock();
+ providers.releaseReadLock();
}
}
@@ -725,9 +656,9 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
final List<SubscriptionCommitContext> subscriptionCommitContexts,
final boolean nack)
throws SubscriptionException {
- subscriptionProviders.acquireReadLock();
+ providers.acquireReadLock();
try {
- final SubscriptionProvider provider =
subscriptionProviders.getProvider(dataNodeId);
+ final SubscriptionProvider provider = providers.getProvider(dataNodeId);
if (Objects.isNull(provider) || !provider.isAvailable()) {
throw new SubscriptionConnectionException(
String.format(
@@ -736,27 +667,92 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
}
provider.commit(subscriptionCommitContexts, nack);
} finally {
- subscriptionProviders.releaseReadLock();
+ providers.releaseReadLock();
}
}
+ /////////////////////////////// heartbeat ///////////////////////////////
+
+ private void submitHeartbeatWorker() {
+ final ScheduledFuture<?>[] future = new ScheduledFuture<?>[1];
+ future[0] =
+ SubscriptionExecutorServiceManager.submitHeartbeatWorker(
+ () -> {
+ if (isClosed()) {
+ if (Objects.nonNull(future[0])) {
+ future[0].cancel(false);
+ LOGGER.info("SubscriptionConsumer {} cancel heartbeat
worker", this);
+ }
+ return;
+ }
+ providers.heartbeat(this);
+ },
+ heartbeatIntervalMs);
+ LOGGER.info("SubscriptionConsumer {} submit heartbeat worker", this);
+ }
+
+ /////////////////////////////// sync endpoints
///////////////////////////////
+
+ private void submitEndpointsSyncer() {
+ final ScheduledFuture<?>[] future = new ScheduledFuture<?>[1];
+ future[0] =
+ SubscriptionExecutorServiceManager.submitEndpointsSyncer(
+ () -> {
+ if (isClosed()) {
+ if (Objects.nonNull(future[0])) {
+ future[0].cancel(false);
+ LOGGER.info("SubscriptionConsumer {} cancel endpoints
syncer", this);
+ }
+ return;
+ }
+ providers.sync(this);
+ },
+ endpointsSyncIntervalMs);
+ LOGGER.info("SubscriptionConsumer {} submit endpoints syncer", this);
+ }
+
/////////////////////////////// commit async ///////////////////////////////
protected void commitAsync(
final Iterable<SubscriptionMessage> messages, final AsyncCommitCallback
callback) {
- // launch async commit worker if needed
- launchAsyncCommitWorkerIfNeeded();
+ SubscriptionExecutorServiceManager.submitAsyncCommitWorker(
+ new AsyncCommitWorker(messages, callback));
+ }
+
+ private class AsyncCommitWorker implements Runnable {
+
+ private final Iterable<SubscriptionMessage> messages;
+ private final AsyncCommitCallback callback;
+
+ public AsyncCommitWorker(
+ final Iterable<SubscriptionMessage> messages, final
AsyncCommitCallback callback) {
+ this.messages = messages;
+ this.callback = callback;
+ }
+
+ @Override
+ public void run() {
+ if (isClosed()) {
+ return;
+ }
- asyncCommitExecutor.submit(new AsyncCommitWorker(messages, callback));
+ try {
+ ack(messages);
+ callback.onComplete();
+ } catch (final Exception e) {
+ callback.onFailure(e);
+ }
+ }
}
protected CompletableFuture<Void> commitAsync(final
Iterable<SubscriptionMessage> messages) {
- // launch async commit worker if needed
- launchAsyncCommitWorkerIfNeeded();
-
final CompletableFuture<Void> future = new CompletableFuture<>();
- asyncCommitExecutor.submit(
+ SubscriptionExecutorServiceManager.submitAsyncCommitWorker(
() -> {
+ if (isClosed()) {
+ return;
+ }
+
try {
ack(messages);
future.complete(null);
@@ -770,7 +766,7 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
/////////////////////////////// redirection ///////////////////////////////
private void subscribeWithRedirection(final Set<String> topicNames) throws
SubscriptionException {
- final List<SubscriptionProvider> providers =
subscriptionProviders.getAllAvailableProviders();
+ final List<SubscriptionProvider> providers =
this.providers.getAllAvailableProviders();
if (providers.isEmpty()) {
throw new SubscriptionConnectionException(
String.format(
@@ -800,7 +796,7 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
private void unsubscribeWithRedirection(final Set<String> topicNames)
throws SubscriptionException {
- final List<SubscriptionProvider> providers =
subscriptionProviders.getAllAvailableProviders();
+ final List<SubscriptionProvider> providers =
this.providers.getAllAvailableProviders();
if (providers.isEmpty()) {
throw new SubscriptionConnectionException(
String.format(
@@ -829,7 +825,7 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
}
Map<Integer, TEndPoint> fetchAllEndPointsWithRedirection() throws
SubscriptionException {
- final List<SubscriptionProvider> providers =
subscriptionProviders.getAllAvailableProviders();
+ final List<SubscriptionProvider> providers =
this.providers.getAllAvailableProviders();
if (providers.isEmpty()) {
throw new SubscriptionConnectionException(
String.format(
@@ -873,6 +869,7 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
ConsumerConstant.ENDPOINTS_SYNC_INTERVAL_MS_DEFAULT_VALUE;
protected String fileSaveDir =
ConsumerConstant.FILE_SAVE_DIR_DEFAULT_VALUE;
+ protected boolean fileSaveFsync =
ConsumerConstant.FILE_SAVE_FSYNC_DEFAULT_VALUE;
public Builder host(final String host) {
this.host = host;
@@ -926,71 +923,14 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
return this;
}
- public abstract SubscriptionPullConsumer buildPullConsumer();
-
- public abstract SubscriptionPushConsumer buildPushConsumer();
- }
-
- /////////////////////////////// commit async worker
///////////////////////////////
-
- private void launchAsyncCommitWorkerIfNeeded() {
- if (asyncCommitExecutor == null) {
- synchronized (this) {
- if (asyncCommitExecutor != null) {
- return;
- }
-
- asyncCommitExecutor =
- Executors.newSingleThreadExecutor(
- r -> {
- final Thread t =
- new Thread(
- Thread.currentThread().getThreadGroup(),
- r,
- "SubscriptionConsumerAsyncCommitWorker",
- 0);
- if (!t.isDaemon()) {
- t.setDaemon(true);
- }
- if (t.getPriority() != Thread.NORM_PRIORITY) {
- t.setPriority(Thread.NORM_PRIORITY);
- }
- return t;
- });
- }
- }
- }
-
- private void shutdownAsyncCommitWorkerIfNeeded() {
- if (asyncCommitExecutor != null) {
- asyncCommitExecutor.shutdown();
- asyncCommitExecutor = null;
- }
- }
-
- class AsyncCommitWorker implements Runnable {
- private final Iterable<SubscriptionMessage> messages;
- private final AsyncCommitCallback callback;
-
- public AsyncCommitWorker(
- final Iterable<SubscriptionMessage> messages, final
AsyncCommitCallback callback) {
- this.messages = messages;
- this.callback = callback;
+ public Builder fileSaveFsync(final boolean fileSaveFsync) {
+ this.fileSaveFsync = fileSaveFsync;
+ return this;
}
- @Override
- public void run() {
- if (isClosed()) {
- return;
- }
+ public abstract SubscriptionPullConsumer buildPullConsumer();
- try {
- ack(messages);
- callback.onComplete();
- } catch (final Exception e) {
- callback.onFailure(e);
- }
- }
+ public abstract SubscriptionPushConsumer buildPushConsumer();
}
/////////////////////////////// object ///////////////////////////////
@@ -1003,10 +943,4 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
+ consumerGroupId
+ "}";
}
-
- /////////////////////////////// utility ///////////////////////////////
-
- protected long generateRandomInitialDelayMs(final long maxMs) {
- return (long) (Math.random() * maxMs);
- }
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionExecutorServiceManager.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionExecutorServiceManager.java
new file mode 100644
index 00000000000..5a587ff96fb
--- /dev/null
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionExecutorServiceManager.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.subscription.consumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+final class SubscriptionExecutorServiceManager {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(SubscriptionExecutorServiceManager.class);
+
+ private static final long AWAIT_TERMINATION_TIMEOUT_MS = 10_000L;
+
+ private static final String CONTROL_FLOW_EXECUTOR_NAME =
"SubscriptionControlFlowExecutor";
+ private static final String UPSTREAM_DATA_FLOW_EXECUTOR_NAME =
+ "SubscriptionUpstreamDataFlowExecutor";
+ private static final String DOWNSTREAM_DATA_FLOW_EXECUTOR_NAME =
+ "SubscriptionDownstreamDataFlowExecutor";
+
+ /**
+ * Control Flow Executor: execute heartbeat worker and endpoints syncer for
{@link
+ * SubscriptionConsumer}
+ */
+ private static final SubscriptionExecutorService CONTROL_FLOW_EXECUTOR =
+ new SubscriptionExecutorService(
+ CONTROL_FLOW_EXECUTOR_NAME,
Math.max(Runtime.getRuntime().availableProcessors() / 2, 1));
+
+ /**
+ * Upstream Data Flow Executor: execute auto commit worker and async commit
worker for {@link
+ * SubscriptionPullConsumer}
+ */
+ private static final SubscriptionExecutorService UPSTREAM_DATA_FLOW_EXECUTOR
=
+ new SubscriptionExecutorService(
+ UPSTREAM_DATA_FLOW_EXECUTOR_NAME,
+ Math.max(Runtime.getRuntime().availableProcessors() / 2, 1));
+
+ /**
+ * Downstream Data Flow Executor: execute auto poll worker for {@link
SubscriptionPushConsumer}
+ */
+ private static final SubscriptionExecutorService
DOWNSTREAM_DATA_FLOW_EXECUTOR =
+ new SubscriptionExecutorService(
+ DOWNSTREAM_DATA_FLOW_EXECUTOR_NAME,
+ Math.max(Runtime.getRuntime().availableProcessors(), 1));
+
+ /////////////////////////////// set core pool size
///////////////////////////////
+
+ public static void setControlFlowExecutorCorePoolSize(final int
corePoolSize) {
+ CONTROL_FLOW_EXECUTOR.setCorePoolSize(corePoolSize);
+ }
+
+ public static void setUpstreamDataFlowExecutorCorePoolSize(final int
corePoolSize) {
+ UPSTREAM_DATA_FLOW_EXECUTOR.setCorePoolSize(corePoolSize);
+ }
+
+ public static void setDownstreamDataFlowExecutorCorePoolSize(final int
corePoolSize) {
+ DOWNSTREAM_DATA_FLOW_EXECUTOR.setCorePoolSize(corePoolSize);
+ }
+
+ /////////////////////////////// shutdown hook ///////////////////////////////
+
+ static {
+ // register shutdown hook
+ Runtime.getRuntime()
+ .addShutdownHook(
+ new Thread(
+ new SubscriptionExecutorServiceShutdownHook(),
+ "SubscriptionExecutorServiceShutdownHook"));
+ }
+
+ private static class SubscriptionExecutorServiceShutdownHook implements
Runnable {
+
+ @Override
+ public void run() {
+ // shutdown executors
+ CONTROL_FLOW_EXECUTOR.shutdown();
+ UPSTREAM_DATA_FLOW_EXECUTOR.shutdown();
+ DOWNSTREAM_DATA_FLOW_EXECUTOR.shutdown();
+ }
+ }
+
+ /////////////////////////////// submitter ///////////////////////////////
+
+ @SuppressWarnings("unsafeThreadSchedule")
+ public static ScheduledFuture<?> submitHeartbeatWorker(
+ final Runnable task, final long heartbeatIntervalMs) {
+ CONTROL_FLOW_EXECUTOR.launchIfNeeded();
+ return CONTROL_FLOW_EXECUTOR.scheduleWithFixedDelay(
+ task,
+ generateRandomInitialDelayMs(heartbeatIntervalMs),
+ heartbeatIntervalMs,
+ TimeUnit.MILLISECONDS);
+ }
+
+ @SuppressWarnings("unsafeThreadSchedule")
+ public static ScheduledFuture<?> submitEndpointsSyncer(
+ final Runnable task, final long endpointsSyncIntervalMs) {
+ CONTROL_FLOW_EXECUTOR.launchIfNeeded();
+ return CONTROL_FLOW_EXECUTOR.scheduleWithFixedDelay(
+ task,
+ generateRandomInitialDelayMs(endpointsSyncIntervalMs),
+ endpointsSyncIntervalMs,
+ TimeUnit.MILLISECONDS);
+ }
+
+ @SuppressWarnings("unsafeThreadSchedule")
+ public static ScheduledFuture<?> submitAutoCommitWorker(
+ final Runnable task, final long autoCommitIntervalMs) {
+ UPSTREAM_DATA_FLOW_EXECUTOR.launchIfNeeded();
+ return UPSTREAM_DATA_FLOW_EXECUTOR.scheduleWithFixedDelay(
+ task,
+ generateRandomInitialDelayMs(autoCommitIntervalMs),
+ autoCommitIntervalMs,
+ TimeUnit.MILLISECONDS);
+ }
+
+ public static void submitAsyncCommitWorker(final Runnable task) {
+ UPSTREAM_DATA_FLOW_EXECUTOR.launchIfNeeded();
+ UPSTREAM_DATA_FLOW_EXECUTOR.submit(task);
+ }
+
+ @SuppressWarnings("unsafeThreadSchedule")
+ public static ScheduledFuture<?> submitAutoPollWorker(
+ final Runnable task, final long autoPollIntervalMs) {
+ DOWNSTREAM_DATA_FLOW_EXECUTOR.launchIfNeeded();
+ return DOWNSTREAM_DATA_FLOW_EXECUTOR.scheduleWithFixedDelay(
+ task,
+ generateRandomInitialDelayMs(autoPollIntervalMs),
+ autoPollIntervalMs,
+ TimeUnit.MILLISECONDS);
+ }
+
+ /////////////////////////////// subscription executor service
///////////////////////////////
+
+ private static class SubscriptionExecutorService {
+
+ String name;
+ volatile int corePoolSize;
+ volatile ScheduledExecutorService executor;
+
+ SubscriptionExecutorService(final String name, final int corePoolSize) {
+ this.name = name;
+ this.corePoolSize = corePoolSize;
+ }
+
+ boolean isShutdown() {
+ return Objects.isNull(this.executor);
+ }
+
+ void setCorePoolSize(final int corePoolSize) {
+ if (!isShutdown()) {
+ synchronized (this) {
+ if (!isShutdown()) {
+ this.corePoolSize = corePoolSize;
+ return;
+ }
+ }
+ }
+
+ LOGGER.warn(
+ "{} has been launched, set core pool size to {} will be ignored",
+ this.name,
+ corePoolSize);
+ }
+
+ void launchIfNeeded() {
+ if (isShutdown()) {
+ synchronized (this) {
+ if (isShutdown()) {
+ LOGGER.info("Launching {} with core pool size {}...", this.name,
this.corePoolSize);
+
+ this.executor =
+ Executors.newScheduledThreadPool(
+ this.corePoolSize,
+ r -> {
+ final Thread t =
+ new Thread(Thread.currentThread().getThreadGroup(),
r, this.name, 0);
+ if (!t.isDaemon()) {
+ t.setDaemon(true);
+ }
+ if (t.getPriority() != Thread.NORM_PRIORITY) {
+ t.setPriority(Thread.NORM_PRIORITY);
+ }
+ return t;
+ });
+ }
+ }
+ }
+ }
+
+ void shutdown() {
+ if (!isShutdown()) {
+ synchronized (this) {
+ if (!isShutdown()) {
+ LOGGER.info("Shutting down {}...", this.name);
+
+ this.executor.shutdown();
+ try {
+ if (!this.executor.awaitTermination(
+ AWAIT_TERMINATION_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
+ this.executor.shutdownNow();
+ LOGGER.warn(
+ "Interrupt the worker, which may cause some task
inconsistent. Please check the biz logs.");
+ if (!this.executor.awaitTermination(
+ AWAIT_TERMINATION_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
+ LOGGER.error(
+ "Thread pool can't be shutdown even with interrupting
worker threads, which may cause some task inconsistent. Please check the biz
logs.");
+ }
+ }
+ } catch (final InterruptedException e) {
+ this.executor.shutdownNow();
+ LOGGER.error(
+ "The current thread is interrupted when it is trying to stop
the worker threads. This may leave an inconsistent state. Please check the biz
logs.");
+ Thread.currentThread().interrupt();
+ }
+
+ this.executor = null;
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("unsafeThreadSchedule")
+ ScheduledFuture<?> scheduleWithFixedDelay(
+ final Runnable task, final long initialDelay, final long delay, final
TimeUnit unit) {
+ if (!isShutdown()) {
+ synchronized (this) {
+ if (!isShutdown()) {
+ return this.executor.scheduleWithFixedDelay(task, initialDelay,
delay, unit);
+ }
+ }
+ }
+
+ LOGGER.warn("{} has not been launched, ignore scheduleWithFixedDelay for
task", this.name);
+ return null;
+ }
+
+ Future<?> submit(final Runnable task) {
+ if (!isShutdown()) {
+ synchronized (this) {
+ if (!isShutdown()) {
+ return this.executor.submit(task);
+ }
+ }
+ }
+
+ LOGGER.warn("{} has not been launched, ignore submit task", this.name);
+ return null;
+ }
+ }
+
+ /////////////////////////////// utility ///////////////////////////////
+
+ private static long generateRandomInitialDelayMs(final long maxMs) {
+ return (long) (Math.random() * maxMs);
+ }
+}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionProvider.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
similarity index 98%
rename from
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionProvider.java
rename to
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
index cccc7cdb389..70589472f82 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionProvider.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.session.subscription;
+package org.apache.iotdb.session.subscription.consumer;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
@@ -41,6 +41,8 @@ import
org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeUnsubscrib
import
org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeHandshakeResp;
import
org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribePollResp;
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
+import org.apache.iotdb.session.subscription.SubscriptionSession;
+import org.apache.iotdb.session.subscription.SubscriptionSessionConnection;
import org.apache.thrift.TException;
import org.slf4j.Logger;
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionProviders.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.java
similarity index 94%
rename from
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionProviders.java
rename to
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.java
index 8038dc724d6..6d250f6d201 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionProviders.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.session.subscription;
+package org.apache.iotdb.session.subscription.consumer;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.rpc.IoTDBConnectionException;
@@ -31,6 +31,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -46,9 +47,9 @@ final class SubscriptionProviders {
private final ReentrantReadWriteLock subscriptionProvidersLock = new
ReentrantReadWriteLock(true);
- private final List<TEndPoint> initialEndpoints;
+ private final Set<TEndPoint> initialEndpoints;
- SubscriptionProviders(final List<TEndPoint> initialEndpoints) {
+ SubscriptionProviders(final Set<TEndPoint> initialEndpoints) {
this.initialEndpoints = initialEndpoints;
}
@@ -336,4 +337,18 @@ final class SubscriptionProviders {
}
}
}
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("SubscriptionProviders{");
+ for (final Map.Entry<Integer, SubscriptionProvider> entry :
subscriptionProviders.entrySet()) {
+ sb.append(entry.getValue().toString()).append(", ");
+ }
+ if (!subscriptionProviders.isEmpty()) {
+ sb.delete(sb.length() - 2, sb.length());
+ }
+ sb.append("}");
+ return sb.toString();
+ }
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionPullConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
similarity index 79%
rename from
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionPullConsumer.java
rename to
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
index 25315f8c624..2306e4d05c1 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionPullConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.session.subscription;
+package org.apache.iotdb.session.subscription.consumer;
import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
@@ -30,17 +30,28 @@ import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
+/**
+ * The {@link SubscriptionPullConsumer} corresponds to the pull consumption
mode in the message
+ * queue.
+ *
+ * <p>User code needs to actively call the data retrieval logic, i.e., the
{@link #poll} method.
+ *
+ * <p>Auto-commit for consumption progress can be configured in {@link
#autoCommit}.
+ *
+ * <p>NOTE: It is not recommended to use the {@link #poll} method with the
same consumer in a
+ * multithreaded environment. Instead, it is advised to increase the number of
consumers to improve
+ * data retrieval parallelism.
+ */
public class SubscriptionPullConsumer extends SubscriptionConsumer {
private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionPullConsumer.class);
@@ -48,7 +59,6 @@ public class SubscriptionPullConsumer extends
SubscriptionConsumer {
private final boolean autoCommit;
private final long autoCommitIntervalMs;
- private ScheduledExecutorService autoCommitWorkerExecutor;
private SortedMap<Long, Set<SubscriptionMessage>> uncommittedMessages;
private final AtomicBoolean isClosed = new AtomicBoolean(true);
@@ -86,7 +96,8 @@ public class SubscriptionPullConsumer extends
SubscriptionConsumer {
properties);
this.autoCommit = autoCommit;
- this.autoCommitIntervalMs = autoCommitIntervalMs;
+ this.autoCommitIntervalMs =
+ Math.max(autoCommitIntervalMs,
ConsumerConstant.AUTO_COMMIT_INTERVAL_MS_MIN_VALUE);
}
/////////////////////////////// open & close ///////////////////////////////
@@ -99,7 +110,8 @@ public class SubscriptionPullConsumer extends
SubscriptionConsumer {
super.open();
if (autoCommit) {
- launchAutoCommitWorker();
+ uncommittedMessages = new ConcurrentSkipListMap<>();
+ submitAutoCommitWorker();
}
isClosed.set(false);
@@ -111,35 +123,31 @@ public class SubscriptionPullConsumer extends
SubscriptionConsumer {
return;
}
- try {
- if (autoCommit) {
- // shutdown auto commit worker
- shutdownAutoCommitWorker();
-
- // commit all uncommitted messages
- commitAllUncommittedMessages();
- }
- super.close();
- } finally {
- isClosed.set(true);
+ if (autoCommit) {
+ // commit all uncommitted messages
+ commitAllUncommittedMessages();
}
+
+ super.close();
+ isClosed.set(true);
}
/////////////////////////////// poll & commit ///////////////////////////////
- public List<SubscriptionMessage> poll(final Duration timeoutMs) throws
SubscriptionException {
- return poll(Collections.emptySet(), timeoutMs.toMillis());
+ public List<SubscriptionMessage> poll(final Duration timeout) throws
SubscriptionException {
+ return poll(Collections.emptySet(), timeout.toMillis());
}
public List<SubscriptionMessage> poll(final long timeoutMs) throws
SubscriptionException {
return poll(Collections.emptySet(), timeoutMs);
}
- public List<SubscriptionMessage> poll(final Set<String> topicNames, final
Duration timeoutMs)
+ public List<SubscriptionMessage> poll(final Set<String> topicNames, final
Duration timeout)
throws SubscriptionException {
- return poll(topicNames, timeoutMs.toMillis());
+ return poll(topicNames, timeout.toMillis());
}
+ @Override
public List<SubscriptionMessage> poll(final Set<String> topicNames, final
long timeoutMs)
throws SubscriptionException {
final List<SubscriptionMessage> messages = super.poll(topicNames,
timeoutMs);
@@ -189,36 +197,47 @@ public class SubscriptionPullConsumer extends
SubscriptionConsumer {
/////////////////////////////// auto commit ///////////////////////////////
- @SuppressWarnings("unsafeThreadSchedule")
- private void launchAutoCommitWorker() {
- uncommittedMessages = new ConcurrentSkipListMap<>();
- autoCommitWorkerExecutor =
- Executors.newSingleThreadScheduledExecutor(
- r -> {
- final Thread t =
- new Thread(
- Thread.currentThread().getThreadGroup(),
- r,
- "PullConsumerAutoCommitWorker",
- 0);
- if (!t.isDaemon()) {
- t.setDaemon(true);
+ private void submitAutoCommitWorker() {
+ final ScheduledFuture<?>[] future = new ScheduledFuture<?>[1];
+ future[0] =
+ SubscriptionExecutorServiceManager.submitAutoCommitWorker(
+ () -> {
+ if (isClosed()) {
+ if (Objects.nonNull(future[0])) {
+ future[0].cancel(false);
+ LOGGER.info("SubscriptionPullConsumer {} cancel auto commit
worker", this);
+ }
+ return;
}
- if (t.getPriority() != Thread.NORM_PRIORITY) {
- t.setPriority(Thread.NORM_PRIORITY);
- }
- return t;
- });
- autoCommitWorkerExecutor.scheduleWithFixedDelay(
- new AutoCommitWorker(),
- generateRandomInitialDelayMs(autoCommitIntervalMs),
- autoCommitIntervalMs,
- TimeUnit.MILLISECONDS);
+ new AutoCommitWorker().run();
+ },
+ autoCommitIntervalMs);
+ LOGGER.info("SubscriptionPullConsumer {} submit auto commit worker", this);
}
- private void shutdownAutoCommitWorker() {
- autoCommitWorkerExecutor.shutdown();
- autoCommitWorkerExecutor = null;
+ private class AutoCommitWorker implements Runnable {
+ @Override
+ public void run() {
+ if (isClosed()) {
+ return;
+ }
+
+ final long currentTimestamp = System.currentTimeMillis();
+ long index = currentTimestamp / autoCommitIntervalMs;
+ if (currentTimestamp % autoCommitIntervalMs == 0) {
+ index -= 1;
+ }
+
+ for (final Map.Entry<Long, Set<SubscriptionMessage>> entry :
+ uncommittedMessages.headMap(index).entrySet()) {
+ try {
+ ack(entry.getValue());
+ uncommittedMessages.remove(entry.getKey());
+ } catch (final Exception e) {
+ LOGGER.warn("something unexpected happened when auto commit
messages...", e);
+ }
+ }
+ }
}
private void commitAllUncommittedMessages() {
@@ -239,56 +258,72 @@ public class SubscriptionPullConsumer extends
SubscriptionConsumer {
private boolean autoCommit = ConsumerConstant.AUTO_COMMIT_DEFAULT_VALUE;
private long autoCommitIntervalMs =
ConsumerConstant.AUTO_COMMIT_INTERVAL_MS_DEFAULT_VALUE;
+ @Override
public Builder host(final String host) {
super.host(host);
return this;
}
+ @Override
public Builder port(final int port) {
super.port(port);
return this;
}
+ @Override
public Builder nodeUrls(final List<String> nodeUrls) {
super.nodeUrls(nodeUrls);
return this;
}
+ @Override
public Builder username(final String username) {
super.username(username);
return this;
}
+ @Override
public Builder password(final String password) {
super.password(password);
return this;
}
+ @Override
public Builder consumerId(final String consumerId) {
super.consumerId(consumerId);
return this;
}
+ @Override
public Builder consumerGroupId(final String consumerGroupId) {
super.consumerGroupId(consumerGroupId);
return this;
}
+ @Override
public Builder heartbeatIntervalMs(final long heartbeatIntervalMs) {
super.heartbeatIntervalMs(heartbeatIntervalMs);
return this;
}
+ @Override
public Builder endpointsSyncIntervalMs(final long endpointsSyncIntervalMs)
{
super.endpointsSyncIntervalMs(endpointsSyncIntervalMs);
return this;
}
+ @Override
public Builder fileSaveDir(final String fileSaveDir) {
super.fileSaveDir(fileSaveDir);
return this;
}
+ @Override
+ public Builder fileSaveFsync(final boolean fileSaveFsync) {
+ super.fileSaveFsync(fileSaveFsync);
+ return this;
+ }
+
public Builder autoCommit(final boolean autoCommit) {
this.autoCommit = autoCommit;
return this;
@@ -311,31 +346,4 @@ public class SubscriptionPullConsumer extends
SubscriptionConsumer {
"SubscriptionPullConsumer.Builder do not support build push
consumer.");
}
}
-
- /////////////////////////////// auto commit worker
///////////////////////////////
-
- class AutoCommitWorker implements Runnable {
- @Override
- public void run() {
- if (isClosed()) {
- return;
- }
-
- final long currentTimestamp = System.currentTimeMillis();
- long index = currentTimestamp / autoCommitIntervalMs;
- if (currentTimestamp % autoCommitIntervalMs == 0) {
- index -= 1;
- }
-
- for (final Map.Entry<Long, Set<SubscriptionMessage>> entry :
- uncommittedMessages.headMap(index).entrySet()) {
- try {
- ack(entry.getValue());
- uncommittedMessages.remove(entry.getKey());
- } catch (final Exception e) {
- LOGGER.warn("something unexpected happened when auto commit
messages...", e);
- }
- }
- }
- }
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionPushConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
similarity index 57%
rename from
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionPushConsumer.java
rename to
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
index 9dbb2df59a5..7ff1dd39b15 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionPushConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.session.subscription;
+package org.apache.iotdb.session.subscription.consumer;
import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
@@ -29,12 +29,21 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.Properties;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
+/**
+ * The {@link SubscriptionPushConsumer} corresponds to the push consumption
mode in the message
+ * queue.
+ *
+ * <p>User code is triggered by newly arrived data events and only needs to
pre-configure message
+ * acknowledgment strategy ({@link #ackStrategy}) and consumption handling
logic ({@link
+ * #consumeListener}).
+ *
+ * <p>User code does not need to manually commit the consumption progress.
+ */
public class SubscriptionPushConsumer extends SubscriptionConsumer {
private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionPushConsumer.class);
@@ -42,7 +51,8 @@ public class SubscriptionPushConsumer extends
SubscriptionConsumer {
private final AckStrategy ackStrategy;
private final ConsumeListener consumeListener;
- private ScheduledExecutorService autoPollWorkerExecutor;
+ private final long autoPollIntervalMs;
+ private final long autoPollTimeoutMs;
private final AtomicBoolean isClosed = new AtomicBoolean(true);
@@ -51,6 +61,9 @@ public class SubscriptionPushConsumer extends
SubscriptionConsumer {
this.ackStrategy = builder.ackStrategy;
this.consumeListener = builder.consumeListener;
+
+ this.autoPollIntervalMs = builder.autoPollIntervalMs;
+ this.autoPollTimeoutMs = builder.autoPollTimeoutMs;
}
public SubscriptionPushConsumer(final Properties config) {
@@ -61,17 +74,38 @@ public class SubscriptionPushConsumer extends
SubscriptionConsumer {
(ConsumeListener)
config.getOrDefault(
ConsumerConstant.CONSUME_LISTENER_KEY,
- (ConsumeListener) message -> ConsumeResult.SUCCESS));
+ (ConsumeListener) message -> ConsumeResult.SUCCESS),
+ (Long)
+ config.getOrDefault(
+ ConsumerConstant.AUTO_POLL_INTERVAL_MS_KEY,
+ ConsumerConstant.AUTO_POLL_INTERVAL_MS_DEFAULT_VALUE),
+ (Long)
+ config.getOrDefault(
+ ConsumerConstant.AUTO_POLL_TIMEOUT_MS_KEY,
+ ConsumerConstant.AUTO_POLL_TIMEOUT_MS_DEFAULT_VALUE));
}
private SubscriptionPushConsumer(
final Properties config,
final AckStrategy ackStrategy,
- final ConsumeListener consumeListener) {
- super(new Builder().ackStrategy(ackStrategy), config);
+ final ConsumeListener consumeListener,
+ final long autoPollIntervalMs,
+ final long autoPollTimeoutMs) {
+ super(
+ new Builder()
+ .ackStrategy(ackStrategy)
+ .consumeListener(consumeListener)
+ .autoPollIntervalMs(autoPollIntervalMs)
+ .autoPollTimeoutMs(autoPollTimeoutMs),
+ config);
this.ackStrategy = ackStrategy;
this.consumeListener = consumeListener;
+
+ this.autoPollIntervalMs =
+ Math.max(autoPollIntervalMs,
ConsumerConstant.AUTO_POLL_INTERVAL_MS_MIN_VALUE);
+ this.autoPollTimeoutMs =
+ Math.max(autoPollTimeoutMs,
ConsumerConstant.AUTO_POLL_TIMEOUT_MS_MIN_VALUE);
}
/////////////////////////////// open & close ///////////////////////////////
@@ -82,9 +116,7 @@ public class SubscriptionPushConsumer extends
SubscriptionConsumer {
}
super.open();
-
- launchAutoPollWorker();
-
+ submitAutoPollWorker();
isClosed.set(false);
}
@@ -94,12 +126,8 @@ public class SubscriptionPushConsumer extends
SubscriptionConsumer {
return;
}
- try {
- shutdownAutoPollWorker();
- super.close();
- } finally {
- isClosed.set(true);
- }
+ super.close();
+ isClosed.set(true);
}
@Override
@@ -109,31 +137,66 @@ public class SubscriptionPushConsumer extends
SubscriptionConsumer {
/////////////////////////////// auto poll ///////////////////////////////
- @SuppressWarnings("unsafeThreadSchedule")
- private void launchAutoPollWorker() {
- autoPollWorkerExecutor =
- Executors.newSingleThreadScheduledExecutor(
- r -> {
- final Thread t =
- new Thread(Thread.currentThread().getThreadGroup(), r,
"PushConsumerWorker", 0);
- if (!t.isDaemon()) {
- t.setDaemon(true);
+ private void submitAutoPollWorker() {
+ final ScheduledFuture<?>[] future = new ScheduledFuture<?>[1];
+ future[0] =
+ SubscriptionExecutorServiceManager.submitAutoPollWorker(
+ () -> {
+ if (isClosed()) {
+ if (Objects.nonNull(future[0])) {
+ future[0].cancel(false);
+ LOGGER.info("SubscriptionPushConsumer {} cancel auto poll
worker", this);
+ }
+ return;
}
- if (t.getPriority() != Thread.NORM_PRIORITY) {
- t.setPriority(Thread.NORM_PRIORITY);
- }
- return t;
- });
- autoPollWorkerExecutor.scheduleWithFixedDelay(
- new AutoPollWorker(),
-
generateRandomInitialDelayMs(ConsumerConstant.PUSH_CONSUMER_AUTO_POLL_INTERVAL_MS),
- ConsumerConstant.PUSH_CONSUMER_AUTO_POLL_INTERVAL_MS,
- TimeUnit.MILLISECONDS);
+ new AutoPollWorker().run();
+ },
+ autoPollIntervalMs);
+ LOGGER.info("SubscriptionPushConsumer {} submit auto poll worker", this);
}
- private void shutdownAutoPollWorker() {
- autoPollWorkerExecutor.shutdown();
- autoPollWorkerExecutor = null;
+ class AutoPollWorker implements Runnable {
+ @Override
+ public void run() {
+ if (isClosed()) {
+ return;
+ }
+
+ try {
+ // Poll all subscribed topics by passing an empty set
+ final List<SubscriptionMessage> messages =
poll(Collections.emptySet(), autoPollTimeoutMs);
+
+ if (ackStrategy.equals(AckStrategy.BEFORE_CONSUME)) {
+ ack(messages);
+ }
+
+ final List<SubscriptionMessage> messagesToAck = new ArrayList<>();
+ final List<SubscriptionMessage> messagesToNack = new ArrayList<>();
+ for (final SubscriptionMessage message : messages) {
+ final ConsumeResult consumeResult;
+ try {
+ consumeResult = consumeListener.onReceive(message);
+ if (consumeResult.equals(ConsumeResult.SUCCESS)) {
+ messagesToAck.add(message);
+ } else {
+ LOGGER.warn("Consumer listener result failure when consuming
message: {}", message);
+ messagesToNack.add(message);
+ }
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Consumer listener raised an exception while consuming
message: {}", message, e);
+ messagesToNack.add(message);
+ }
+ }
+
+ if (ackStrategy.equals(AckStrategy.AFTER_CONSUME)) {
+ ack(messagesToAck);
+ nack(messagesToNack);
+ }
+ } catch (final Exception e) {
+ LOGGER.warn("something unexpected happened when auto poll
messages...", e);
+ }
+ }
}
/////////////////////////////// builder ///////////////////////////////
@@ -143,62 +206,97 @@ public class SubscriptionPushConsumer extends
SubscriptionConsumer {
private AckStrategy ackStrategy = AckStrategy.defaultValue();
private ConsumeListener consumeListener = message -> ConsumeResult.SUCCESS;
- public SubscriptionPushConsumer.Builder host(final String host) {
+ private long autoPollIntervalMs =
ConsumerConstant.AUTO_POLL_INTERVAL_MS_DEFAULT_VALUE;
+ private long autoPollTimeoutMs =
ConsumerConstant.AUTO_POLL_TIMEOUT_MS_DEFAULT_VALUE;
+
+ @Override
+ public Builder host(final String host) {
super.host(host);
return this;
}
- public SubscriptionPushConsumer.Builder port(final int port) {
+ @Override
+ public Builder port(final int port) {
super.port(port);
return this;
}
- public SubscriptionPushConsumer.Builder username(final String username) {
+ @Override
+ public Builder nodeUrls(final List<String> nodeUrls) {
+ super.nodeUrls(nodeUrls);
+ return this;
+ }
+
+ @Override
+ public Builder username(final String username) {
super.username(username);
return this;
}
- public SubscriptionPushConsumer.Builder password(final String password) {
+ @Override
+ public Builder password(final String password) {
super.password(password);
return this;
}
- public SubscriptionPushConsumer.Builder consumerId(final String
consumerId) {
+ @Override
+ public Builder consumerId(final String consumerId) {
super.consumerId(consumerId);
return this;
}
- public SubscriptionPushConsumer.Builder consumerGroupId(final String
consumerGroupId) {
+ @Override
+ public Builder consumerGroupId(final String consumerGroupId) {
super.consumerGroupId(consumerGroupId);
return this;
}
- public SubscriptionPushConsumer.Builder heartbeatIntervalMs(final long
heartbeatIntervalMs) {
+ @Override
+ public Builder heartbeatIntervalMs(final long heartbeatIntervalMs) {
super.heartbeatIntervalMs(heartbeatIntervalMs);
return this;
}
- public SubscriptionPushConsumer.Builder endpointsSyncIntervalMs(
- final long endpointsSyncIntervalMs) {
+ @Override
+ public Builder endpointsSyncIntervalMs(final long endpointsSyncIntervalMs)
{
super.endpointsSyncIntervalMs(endpointsSyncIntervalMs);
return this;
}
- public SubscriptionPushConsumer.Builder fileSaveDir(final String
fileSaveDir) {
- this.fileSaveDir = fileSaveDir;
+ @Override
+ public Builder fileSaveDir(final String fileSaveDir) {
+ super.fileSaveDir(fileSaveDir);
return this;
}
- public SubscriptionPushConsumer.Builder ackStrategy(final AckStrategy
ackStrategy) {
+ @Override
+ public Builder fileSaveFsync(final boolean fileSaveFsync) {
+ super.fileSaveFsync(fileSaveFsync);
+ return this;
+ }
+
+ public Builder ackStrategy(final AckStrategy ackStrategy) {
this.ackStrategy = ackStrategy;
return this;
}
- public SubscriptionPushConsumer.Builder consumeListener(final
ConsumeListener consumeListener) {
+ public Builder consumeListener(final ConsumeListener consumeListener) {
this.consumeListener = consumeListener;
return this;
}
+ public Builder autoPollIntervalMs(final long autoPollIntervalMs) {
+ this.autoPollIntervalMs =
+ Math.max(autoPollIntervalMs,
ConsumerConstant.AUTO_POLL_INTERVAL_MS_MIN_VALUE);
+ return this;
+ }
+
+ public Builder autoPollTimeoutMs(final long autoPollTimeoutMs) {
+ this.autoPollTimeoutMs =
+ Math.max(autoPollTimeoutMs,
ConsumerConstant.AUTO_POLL_TIMEOUT_MS_MIN_VALUE);
+ return this;
+ }
+
@Override
public SubscriptionPullConsumer buildPullConsumer() {
throw new SubscriptionException(
@@ -210,51 +308,4 @@ public class SubscriptionPushConsumer extends
SubscriptionConsumer {
return new SubscriptionPushConsumer(this);
}
}
-
- /////////////////////////////// auto poll worker
///////////////////////////////
-
- class AutoPollWorker implements Runnable {
- @Override
- public void run() {
- if (isClosed()) {
- return;
- }
-
- try {
- // Poll all subscribed topics by passing an empty set
- final List<SubscriptionMessage> messages =
- poll(Collections.emptySet(),
ConsumerConstant.PUSH_CONSUMER_AUTO_POLL_TIME_OUT_MS);
-
- if (ackStrategy.equals(AckStrategy.BEFORE_CONSUME)) {
- ack(messages);
- }
-
- final List<SubscriptionMessage> messagesToAck = new ArrayList<>();
- final List<SubscriptionMessage> messagesToNack = new ArrayList<>();
- for (final SubscriptionMessage message : messages) {
- final ConsumeResult consumeResult;
- try {
- consumeResult = consumeListener.onReceive(message);
- if (consumeResult.equals(ConsumeResult.SUCCESS)) {
- messagesToAck.add(message);
- } else {
- LOGGER.warn("Consumer listener result failure when consuming
message: {}", message);
- messagesToNack.add(message);
- }
- } catch (final Exception e) {
- LOGGER.warn(
- "Consumer listener raised an exception while consuming
message: {}", message, e);
- messagesToNack.add(message);
- }
- }
-
- if (ackStrategy.equals(AckStrategy.AFTER_CONSUME)) {
- ack(messagesToAck);
- nack(messagesToNack);
- }
- } catch (final Exception e) {
- LOGGER.warn("something unexpected happened when auto poll
messages...", e);
- }
- }
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java
index c5a12942718..4a592f8a86f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java
@@ -93,7 +93,7 @@ public class SubscriptionEventBinaryCache {
private SubscriptionEventBinaryCache() {
final long initMemorySizeInBytes =
- PipeResourceManager.memory().getTotalMemorySizeInBytes() / 10;
+ PipeResourceManager.memory().getTotalMemorySizeInBytes() / 20;
final long maxMemorySizeInBytes =
(long)
(PipeResourceManager.memory().getTotalMemorySizeInBytes()
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 59de17e371e..b78f7dcb6fb 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -241,7 +241,7 @@ public class CommonConfig {
private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000L;
// 3 minutes
private long twoStageAggregateSenderEndPointsCacheInMs = 3 * 60 * 1000L; //
3 minutes
- private float subscriptionCacheMemoryUsagePercentage = 0.1F;
+ private float subscriptionCacheMemoryUsagePercentage = 0.2F;
private int subscriptionSubtaskExecutorMaxThreadNum =
Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java
index 210ef5edbdb..c6556076698 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java
@@ -158,7 +158,7 @@ public class ConsumerGroupMetaKeeper {
@Override
public String toString() {
return "ConsumerGroupMetaKeeper{"
- + "consumerGroupIDToConsumerGroupMetaMap="
+ + "consumerGroupIdToConsumerGroupMetaMap="
+ consumerGroupIdToConsumerGroupMetaMap
+ '}';
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
index 5f3c4fc1803..f1bb9b46085 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
@@ -128,7 +128,7 @@ public class ConsumerMeta {
@Override
public String toString() {
return "ConsumerMeta{"
- + "consumerID='"
+ + "consumerId='"
+ consumerId
+ "', creationTime="
+ creationTime