This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 961127fbc5c [To dev/1.3] Subscription: intro poll and prefetch v2 for
tsfile topic (#15790) (#15865)
961127fbc5c is described below
commit 961127fbc5c038b23ceb6407645c776fa47f3f50
Author: VGalaxies <[email protected]>
AuthorDate: Fri Jul 4 10:07:30 2025 +0800
[To dev/1.3] Subscription: intro poll and prefetch v2 for tsfile topic
(#15790) (#15865)
* Subscription: intro poll and prefetch v2 for tsfile topic (#15790)
* fixup! Subscription: intro poll and prefetch v2 for tsfile topic (#15790)
---
.../it/env/cluster/config/MppCommonConfig.java | 17 +
.../env/cluster/config/MppSharedCommonConfig.java | 19 +
.../it/env/remote/config/RemoteCommonConfig.java | 11 +
.../org/apache/iotdb/itbase/env/CommonConfig.java | 6 +
.../it/triple/AbstractSubscriptionTripleIT.java | 7 +
.../AbstractSubscriptionRegressionIT.java | 71 +++-
.../IoTDBRootPullConsumeTsfileIT.java | 15 +-
.../format/IoTDBDBTsfilePullConsumerIT.java | 18 +-
.../IoTDBAllTsTsfilePullConsumerIT.java | 16 +-
.../IoTDBAllTsfilePullConsumerSnapshotIT.java | 14 +-
.../IoTDBPathDeviceTsfilePullConsumerIT.java | 17 +-
.../IoTDBTimeTsTsfilePullConsumerIT.java | 17 +-
.../IoTDBSnapshotDevicePullConsumerTsfileIT.java | 16 +-
.../multi/IoTDBOneConsumerMultiTopicsMixIT.java | 33 +-
.../multi/IoTDBOneConsumerMultiTopicsTsfileIT.java | 39 +--
.../pattern/IoTDBDBPatternPullConsumeTsfileIT.java | 15 +-
.../IoTDBDevicePatternPullConsumeTsfileIT.java | 14 +-
.../IoTDBRootPatternPullConsumeTsfileIT.java | 12 +-
.../pattern/IoTDBTSPatternPullConsumeTsfileIT.java | 15 +-
.../multi/IoTDBOneConsumerMultiTopicsMixIT.java | 8 +
.../multi/IoTDBOneConsumerMultiTopicsTsfileIT.java | 11 +
.../topic/IoTDBDataSet1TopicConsumerSpecialIT.java | 4 +-
.../db/subscription/broker/SubscriptionBroker.java | 35 +-
.../broker/SubscriptionPrefetchingQueue.java | 387 ++++++++++++++++++++-
.../broker/SubscriptionPrefetchingTabletQueue.java | 7 +-
.../broker/SubscriptionPrefetchingTsFileQueue.java | 6 -
.../SubscriptionPipeEventBatchSegmentLock.java | 75 ++++
.../event/batch/SubscriptionPipeEventBatches.java | 118 ++++---
.../batch/SubscriptionPipeTsFileEventBatch.java | 40 +--
.../receiver/SubscriptionReceiverV1.java | 2 +
.../apache/iotdb/commons/conf/CommonConfig.java | 4 +-
31 files changed, 785 insertions(+), 284 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index fe4190d7708..47f96590964 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -503,6 +503,23 @@ public class MppCommonConfig extends MppBaseConfig
implements CommonConfig {
}
@Override
+ public CommonConfig setSubscriptionPrefetchTsFileBatchMaxDelayInMs(
+ int subscriptionPrefetchTsFileBatchMaxDelayInMs) {
+ setProperty(
+ "subscription_prefetch_ts_file_batch_max_delay_in_ms",
+ String.valueOf(subscriptionPrefetchTsFileBatchMaxDelayInMs));
+ return this;
+ }
+
+ @Override
+ public CommonConfig setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
+ int subscriptionPrefetchTsFileBatchMaxSizeInBytes) {
+ setProperty(
+ "subscription_prefetch_ts_file_batch_max_size_in_bytes",
+ String.valueOf(subscriptionPrefetchTsFileBatchMaxSizeInBytes));
+ return this;
+ }
+
public CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled) {
setProperty("subscription_enabled", String.valueOf(subscriptionEnabled));
return this;
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index c2ade6eace0..151e13f1290 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -514,6 +514,25 @@ public class MppSharedCommonConfig implements CommonConfig
{
}
@Override
+ public CommonConfig setSubscriptionPrefetchTsFileBatchMaxDelayInMs(
+ int subscriptionPrefetchTsFileBatchMaxDelayInMs) {
+ dnConfig.setSubscriptionPrefetchTsFileBatchMaxDelayInMs(
+ subscriptionPrefetchTsFileBatchMaxDelayInMs);
+ cnConfig.setSubscriptionPrefetchTsFileBatchMaxDelayInMs(
+ subscriptionPrefetchTsFileBatchMaxDelayInMs);
+ return this;
+ }
+
+ @Override
+ public CommonConfig setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
+ int subscriptionPrefetchTsFileBatchMaxSizeInBytes) {
+ dnConfig.setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
+ subscriptionPrefetchTsFileBatchMaxSizeInBytes);
+ cnConfig.setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
+ subscriptionPrefetchTsFileBatchMaxSizeInBytes);
+ return this;
+ }
+
public CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled) {
dnConfig.setSubscriptionEnabled(subscriptionEnabled);
cnConfig.setSubscriptionEnabled(subscriptionEnabled);
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 581c5f4345e..9e82aaf9ebf 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -362,6 +362,17 @@ public class RemoteCommonConfig implements CommonConfig {
}
@Override
+ public CommonConfig setSubscriptionPrefetchTsFileBatchMaxDelayInMs(
+ int subscriptionPrefetchTsFileBatchMaxDelayInMs) {
+ return this;
+ }
+
+ @Override
+ public CommonConfig setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
+ int subscriptionPrefetchTsFileBatchMaxSizeInBytes) {
+ return this;
+ }
+
public CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled) {
return this;
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 16d50954209..639c6142783 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -160,6 +160,12 @@ public interface CommonConfig {
CommonConfig setQueryMemoryProportion(String queryMemoryProportion);
+ CommonConfig setSubscriptionPrefetchTsFileBatchMaxDelayInMs(
+ int subscriptionPrefetchTsFileBatchMaxDelayInMs);
+
+ CommonConfig setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
+ int subscriptionPrefetchTsFileBatchMaxSizeInBytes);
+
CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled);
default CommonConfig setDefaultStorageGroupLevel(int
defaultStorageGroupLevel) {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java
index 3b49eb8ed13..ee4a33ca2ed 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java
@@ -69,6 +69,13 @@ public abstract class AbstractSubscriptionTripleIT extends
AbstractSubscriptionI
sender.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
receiver1.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
receiver2.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+
+ // reduce tsfile batch memory usage
+
sender.getConfig().getCommonConfig().setSubscriptionPrefetchTsFileBatchMaxDelayInMs(500);
+ sender
+ .getConfig()
+ .getCommonConfig()
+ .setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(32 * 1024);
}
@Override
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java
index 0eb5798599d..125195b5a95 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java
@@ -57,6 +57,8 @@ import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
import static
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS;
@@ -305,11 +307,6 @@ public abstract class AbstractSubscriptionRegressionIT
extends AbstractSubscript
}
}
- public List<Integer> consume_tsfile_withFileCount(
- SubscriptionPullConsumer consumer, String device) throws
InterruptedException {
- return consume_tsfile(consumer, Collections.singletonList(device));
- }
-
public int consume_tsfile(SubscriptionPullConsumer consumer, String device)
throws InterruptedException {
return consume_tsfile(consumer, Collections.singletonList(device)).get(0);
@@ -361,15 +358,6 @@ public abstract class AbstractSubscriptionRegressionIT
extends AbstractSubscript
return results;
}
- public void consume_data(SubscriptionPullConsumer consumer)
- throws TException,
- IOException,
- StatementExecutionException,
- InterruptedException,
- IoTDBConnectionException {
- consume_data(consumer, session_dest);
- }
-
public void consume_data_await(
SubscriptionPullConsumer consumer, Session session,
List<WrappedVoidSupplier> assertions) {
AWAIT.untilAsserted(
@@ -393,11 +381,64 @@ public abstract class AbstractSubscriptionRegressionIT
extends AbstractSubscript
}
public void consume_tsfile_await(
+ SubscriptionPullConsumer consumer,
+ List<String> devices,
+ List<Integer> expected,
+ List<Boolean> allowGte) {
+ final List<AtomicInteger> counters = new ArrayList<>(devices.size());
+ for (int i = 0; i < devices.size(); i++) {
+ counters.add(new AtomicInteger(0));
+ }
+ AWAIT.untilAsserted(
+ () -> {
+ List<SubscriptionMessage> messages =
consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+ if (messages.isEmpty()) {
+ session_src.executeNonQueryStatement("flush");
+ }
+ for (final SubscriptionMessage message : messages) {
+ final SubscriptionTsFileHandler tsFileHandler =
message.getTsFileHandler();
+ try (final TsFileReader tsFileReader = tsFileHandler.openReader())
{
+ for (int i = 0; i < devices.size(); i++) {
+ final Path path = new Path(devices.get(i), "s_0", true);
+ final QueryDataSet dataSet =
+ tsFileReader.query(
+
QueryExpression.create(Collections.singletonList(path), null));
+ while (dataSet.hasNext()) {
+ dataSet.next();
+ counters.get(i).addAndGet(1);
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ consumer.commitSync(messages);
+ for (int i = 0; i < devices.size(); i++) {
+ if (allowGte.get(i)) {
+ assertGte(counters.get(i).get(), expected.get(i));
+ } else {
+ assertEquals(counters.get(i).get(), expected.get(i));
+ }
+ }
+ });
+ }
+
+ public void consume_tsfile_await(
+ SubscriptionPullConsumer consumer, List<String> devices, List<Integer>
expected) {
+ consume_tsfile_await(
+ consumer,
+ devices,
+ expected,
+ Stream.generate(() ->
false).limit(devices.size()).collect(Collectors.toList()));
+ }
+
+ public void consume_tsfile_with_file_count_await(
SubscriptionPullConsumer consumer, List<String> devices, List<Integer>
expected) {
final List<AtomicInteger> counters = new ArrayList<>(devices.size());
for (int i = 0; i < devices.size(); i++) {
counters.add(new AtomicInteger(0));
}
+ AtomicInteger onReceived = new AtomicInteger(0);
AWAIT.untilAsserted(
() -> {
List<SubscriptionMessage> messages =
consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
@@ -405,6 +446,7 @@ public abstract class AbstractSubscriptionRegressionIT
extends AbstractSubscript
session_src.executeNonQueryStatement("flush");
}
for (final SubscriptionMessage message : messages) {
+ onReceived.incrementAndGet();
final SubscriptionTsFileHandler tsFileHandler =
message.getTsFileHandler();
try (final TsFileReader tsFileReader = tsFileHandler.openReader())
{
for (int i = 0; i < devices.size(); i++) {
@@ -425,6 +467,7 @@ public abstract class AbstractSubscriptionRegressionIT
extends AbstractSubscript
for (int i = 0; i < devices.size(); i++) {
assertEquals(counters.get(i).get(), expected.get(i));
}
+ assertEquals(onReceived.get(), expected.get(devices.size()));
});
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java
index 6106a392c66..169cbd7305f 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java
@@ -39,6 +39,7 @@ import org.junit.runner.RunWith;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
/***
@@ -134,23 +135,13 @@ public class IoTDBRootPullConsumeTsfileIT extends
AbstractSubscriptionRegression
List<String> devices = new ArrayList<>(2);
devices.add(device);
devices.add(device2);
- List<Integer> results = consume_tsfile(consumer, devices);
- assertEquals(results.get(0), 10);
- assertEquals(results.get(1), 10);
+ consume_tsfile_await(consumer, devices, Arrays.asList(10, 10));
consumer.unsubscribe(topicName);
assertEquals(subs.getSubscriptions(topicName).size(), 0, "unsubscribe:show
subscriptions");
consumer.subscribe(topicName);
assertEquals(subs.getSubscriptions().size(), 1, "subscribe again:show
subscriptions");
insert_data(1707782400000L, device); // 2024-02-13 08:00:00+08:00
insert_data(1707782400000L, device2); // 2024-02-13 08:00:00+08:00
- results = consume_tsfile(consumer, devices);
- assertEquals(
- results.get(0),
- 15,
- "Unsubscribing and then re-subscribing will not retain progress. Full
synchronization.");
- assertEquals(
- results.get(1),
- 15,
- "Unsubscribing and then re-subscribing will not retain progress. Full
synchronization.");
+ consume_tsfile_await(consumer, devices, Arrays.asList(15, 15));
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/format/IoTDBDBTsfilePullConsumerIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/format/IoTDBDBTsfilePullConsumerIT.java
index aa5b519ca63..a034a408062 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/format/IoTDBDBTsfilePullConsumerIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/format/IoTDBDBTsfilePullConsumerIT.java
@@ -40,6 +40,8 @@ import org.junit.runner.RunWith;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
/***
@@ -129,9 +131,8 @@ public class IoTDBDBTsfilePullConsumerIT extends
AbstractSubscriptionRegressionI
// insert_data(1706659200000L); //2024-01-31 08:00:00+08:00
insert_data(System.currentTimeMillis());
// Consumption data
- List<Integer> results = consume_tsfile_withFileCount(consumer, device);
- assertEquals(results.get(0), 10);
- assertEquals(results.get(1), 2, "number of received files");
+ consume_tsfile_with_file_count_await(
+ consumer, Collections.singletonList(device), Arrays.asList(10, 2));
// Unsubscribe
consumer.unsubscribe(topicName);
assertEquals(subs.getSubscriptions().size(), 0, "Show subscriptions after
unsubscription");
@@ -141,14 +142,7 @@ public class IoTDBDBTsfilePullConsumerIT extends
AbstractSubscriptionRegressionI
insert_data(1707782400000L); // 2024-02-13 08:00:00+08:00
// Consumption data: Progress is not retained when re-subscribing after
cancellation. Full
// synchronization.
- results = consume_tsfile_withFileCount(consumer, device);
- assertEquals(
- results.get(0),
- 15,
- "Unsubscribe and resubscribe, progress is not retained. Full
synchronization.");
- assertEquals(
- results.get(1),
- 3,
- "Number of received files: After unsubscribing and resubscribing,
progress is not retained. Full synchronization.");
+ consume_tsfile_with_file_count_await(
+ consumer, Collections.singletonList(device), Arrays.asList(15, 3));
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBAllTsTsfilePullConsumerIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBAllTsTsfilePullConsumerIT.java
index 4d11e63006d..1e70c9a9ac9 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBAllTsTsfilePullConsumerIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBAllTsTsfilePullConsumerIT.java
@@ -41,6 +41,7 @@ import org.junit.runner.RunWith;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
import java.util.List;
@@ -170,10 +171,8 @@ public class IoTDBAllTsTsfilePullConsumerIT extends
AbstractSubscriptionRegressi
paths.add(device);
paths.add(device2);
paths.add(database2 + ".d_2");
- List<Integer> rowCountList = consume_tsfile(consumer, paths);
- assertGte(rowCountList.get(0), 8);
- assertEquals(rowCountList.get(1), 0);
- assertEquals(rowCountList.get(2), 0);
+ consume_tsfile_await(
+ consumer, paths, Arrays.asList(8, 0, 0), Arrays.asList(true, false,
false));
// Unsubscribe
consumer.unsubscribe(topicName);
@@ -187,12 +186,7 @@ public class IoTDBAllTsTsfilePullConsumerIT extends
AbstractSubscriptionRegressi
// Consumption data: Progress is not retained after unsubscribing and
resubscribing. Full
// synchronization.
- rowCountList = consume_tsfile(consumer, paths);
- assertGte(
- rowCountList.get(0),
- 13,
- "Unsubscribe and then resubscribe, progress is not retained. Full
synchronization.");
- assertEquals(rowCountList.get(1), 0, "Unsubscribe and then resubscribe," +
device2);
- assertEquals(rowCountList.get(2), 0, "Unsubscribe and then resubscribe," +
database2 + ".d_2");
+ consume_tsfile_await(
+ consumer, paths, Arrays.asList(13, 0, 0), Arrays.asList(true, false,
false));
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBAllTsfilePullConsumerSnapshotIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBAllTsfilePullConsumerSnapshotIT.java
index e30ea3a5dba..03a4956ce45 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBAllTsfilePullConsumerSnapshotIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBAllTsfilePullConsumerSnapshotIT.java
@@ -41,6 +41,7 @@ import org.junit.runner.RunWith;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
import java.util.List;
@@ -171,11 +172,8 @@ public class IoTDBAllTsfilePullConsumerSnapshotIT extends
AbstractSubscriptionRe
paths.add(device);
paths.add(device2);
paths.add(database2 + ".d_2");
- List<Integer> rowCountList = consume_tsfile(consumer, paths);
// Subscribe and write without consuming
- assertEquals(rowCountList.get(0), 5, "Write without consume after
subscription");
- assertEquals(rowCountList.get(1), 0);
- assertEquals(rowCountList.get(2), 0);
+ consume_tsfile_await(consumer, paths, Arrays.asList(5, 0, 0));
// Unsubscribe
consumer.unsubscribe(topicName);
@@ -189,12 +187,6 @@ public class IoTDBAllTsfilePullConsumerSnapshotIT extends
AbstractSubscriptionRe
// Consumption data: Progress is not retained after unsubscribing and
re-subscribing. Full
// synchronization.
- rowCountList = consume_tsfile(consumer, paths);
- assertEquals(
- rowCountList.get(0),
- 10,
- "Unsubscribe and then resubscribe, progress is not retained. Full
synchronization.");
- assertEquals(rowCountList.get(1), 0, "Unsubscribe and then resubscribe," +
device2);
- assertEquals(rowCountList.get(2), 0, "Unsubscribe and then resubscribe," +
database2 + ".d_2");
+ consume_tsfile_await(consumer, paths, Arrays.asList(10, 0, 0));
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBPathDeviceTsfilePullConsumerIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBPathDeviceTsfilePullConsumerIT.java
index 4f94a42abf0..49dae7dd726 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBPathDeviceTsfilePullConsumerIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBPathDeviceTsfilePullConsumerIT.java
@@ -41,6 +41,7 @@ import org.junit.runner.RunWith;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
/***
@@ -160,12 +161,7 @@ public class IoTDBPathDeviceTsfilePullConsumerIT extends
AbstractSubscriptionReg
devices.add(device);
devices.add(device2);
devices.add(database2 + ".d_2");
- List<Integer> results = consume_tsfile(consumer, devices);
-
- assertEquals(results.get(0), 10);
- assertEquals(results.get(1), 0);
- assertEquals(results.get(2), 0);
- assertEquals(results.get(3), 2, "number of received files");
+ consume_tsfile_with_file_count_await(consumer, devices, Arrays.asList(10,
0, 0, 2));
// Unsubscribe
consumer.unsubscribe(topicName);
assertEquals(subs.getSubscriptions().size(), 0, "show subscriptions after
cancellation");
@@ -176,13 +172,6 @@ public class IoTDBPathDeviceTsfilePullConsumerIT extends
AbstractSubscriptionReg
insert_data(1707782400000L, device2); // 2024-02-13 08:00:00+08:00
// Consumption data: Progress is not retained after unsubscribing and
re-subscribing. Full
// synchronization.
- results = consume_tsfile(consumer, devices);
- assertEquals(
- results.get(0),
- 15,
- "Unsubscribe and resubscribe, progress is not retained. Full
synchronization.");
- assertEquals(results.get(1), 0, "Unsubscribe and then subscribe again," +
device2);
- assertEquals(results.get(2), 0, "Subscribe again after cancellation," +
database2 + ".d_2");
- assertEquals(results.get(3), 3, "Number of received files: resubscribe
after unsubscribe");
+ consume_tsfile_with_file_count_await(consumer, devices, Arrays.asList(15,
0, 0, 3));
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBTimeTsTsfilePullConsumerIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBTimeTsTsfilePullConsumerIT.java
index 5c6c7c363bd..404ac75f99e 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBTimeTsTsfilePullConsumerIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBTimeTsTsfilePullConsumerIT.java
@@ -41,6 +41,7 @@ import org.junit.runner.RunWith;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
/***
@@ -170,10 +171,8 @@ public class IoTDBTimeTsTsfilePullConsumerIT extends
AbstractSubscriptionRegress
paths.add(device2);
paths.add(device3);
- rowCountList = consume_tsfile(consumer, paths);
- assertGte(rowCountList.get(0), 8, device);
- assertEquals(rowCountList.get(1), 0, device2);
- assertEquals(rowCountList.get(2), 0, device3);
+ consume_tsfile_await(
+ consumer, paths, Arrays.asList(8, 0, 0), Arrays.asList(true, false,
false));
// Unsubscribe
consumer.unsubscribe(topicName);
@@ -190,13 +189,7 @@ public class IoTDBTimeTsTsfilePullConsumerIT extends
AbstractSubscriptionRegress
// synchronization.
System.out.println("TimeTsTsfilePullConsumer src2 filter:" +
getCount(session_src, sql));
- rowCountList = consume_tsfile(consumer, paths);
- assertGte(
- rowCountList.get(0),
- 13,
- "Unsubscribe and then resubscribe, progress is not retained. Full
synchronization. Actual="
- + rowCountList.get(0));
- assertEquals(rowCountList.get(1), 0, "Unsubscribe and subscribe again," +
device2);
- assertEquals(rowCountList.get(2), 0, "Unsubscribe and then resubscribe," +
device3);
+ consume_tsfile_await(
+ consumer, paths, Arrays.asList(13, 0, 0), Arrays.asList(true, false,
false));
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerTsfileIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerTsfileIT.java
index 2fe3f482764..386ab059545 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerTsfileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerTsfileIT.java
@@ -41,6 +41,7 @@ import org.junit.runner.RunWith;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
/***
@@ -153,11 +154,7 @@ public class IoTDBSnapshotDevicePullConsumerTsfileIT
extends AbstractSubscriptio
devices.add(device);
devices.add(database + ".d_1");
devices.add(database2 + ".d_2");
- List<Integer> rowCounts = consume_tsfile(consumer, devices);
-
- assertEquals(rowCounts.get(0), 5);
- assertEquals(rowCounts.get(1), 0);
- assertEquals(rowCounts.get(2), 0);
+ consume_tsfile_await(consumer, devices, Arrays.asList(5, 0, 0));
// Unsubscribe
consumer.unsubscribe(topicName);
@@ -169,13 +166,6 @@ public class IoTDBSnapshotDevicePullConsumerTsfileIT
extends AbstractSubscriptio
// Consumption data: Progress is not retained after unsubscribing and then
re-subscribing. Full
// synchronization.
- rowCounts = consume_tsfile(consumer, devices);
- assertEquals(
- rowCounts.get(0),
- 10,
- "Unsubscribe and resubscribe, progress is not retained. Full
synchronization.");
- assertEquals(rowCounts.get(1), 0, "Unsubscribe and resubscribe," +
database + ".d_1");
- assertEquals(
- rowCounts.get(2), 0, "Cancel subscription and subscribe again," +
database2 + ".d_2");
+ consume_tsfile_await(consumer, devices, Arrays.asList(10, 0, 0));
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsMixIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsMixIT.java
index bd93bb2adb5..2155d09ca83 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsMixIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsMixIT.java
@@ -26,6 +26,9 @@ import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
+import org.apache.iotdb.subscription.it.Retry;
+import org.apache.iotdb.subscription.it.RetryRule;
import
org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
import org.apache.thrift.TException;
@@ -41,6 +44,7 @@ import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -52,6 +56,7 @@ import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
@@ -62,6 +67,9 @@ import static
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2SubscriptionRegressionConsumer.class})
public class IoTDBOneConsumerMultiTopicsMixIT extends
AbstractSubscriptionRegressionIT {
+
+ @Rule public RetryRule retryRule = new RetryRule();
+
private static final String database = "root.test.OneConsumerMultiTopicsMix";
private static final String device = database + ".d_0";
private String pattern = device + ".s_0";
@@ -88,6 +96,15 @@ public class IoTDBOneConsumerMultiTopicsMixIT extends
AbstractSubscriptionRegres
assertTrue(subs.getTopic(topicName2).isPresent(), "Create show topics 2");
}
+ @Override
+ protected void setUpConfig() {
+ super.setUpConfig();
+
+ IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+ }
+
@Override
@After
public void tearDown() throws Exception {
@@ -98,6 +115,7 @@ public class IoTDBOneConsumerMultiTopicsMixIT extends
AbstractSubscriptionRegres
subs.dropTopic(topicName);
subs.dropTopic(topicName2);
dropDB(database);
+ schemaList.clear();
super.tearDown();
}
@@ -116,6 +134,7 @@ public class IoTDBOneConsumerMultiTopicsMixIT extends
AbstractSubscriptionRegres
session_src.executeNonQueryStatement("flush;");
}
+ @Retry
@Test
public void do_test()
throws InterruptedException,
@@ -156,16 +175,13 @@ public class IoTDBOneConsumerMultiTopicsMixIT extends
AbstractSubscriptionRegres
});
thread.start();
+ AtomicBoolean isClosed = new AtomicBoolean(false);
AtomicInteger rowCount = new AtomicInteger(0);
Thread thread2 =
new Thread(
() -> {
- while (true) {
+ while (!isClosed.get()) {
List<SubscriptionMessage> messages =
consumer.poll(Duration.ofMillis(10000));
- if (messages.isEmpty()) {
- break;
- }
-
for (final SubscriptionMessage message : messages) {
final short messageType = message.getMessageType();
if
(SubscriptionMessageType.isValidatedMessageType(messageType)) {
@@ -213,12 +229,8 @@ public class IoTDBOneConsumerMultiTopicsMixIT extends
AbstractSubscriptionRegres
Thread thread3 =
new Thread(
() -> {
- while (true) {
+ while (!isClosed.get()) {
List<SubscriptionMessage> messages =
consumer.poll(Duration.ofMillis(10000));
- if (messages.isEmpty()) {
- break;
- }
-
for (final SubscriptionMessage message : messages) {
final short messageType = message.getMessageType();
if
(SubscriptionMessageType.isValidatedMessageType(messageType)) {
@@ -307,6 +319,7 @@ public class IoTDBOneConsumerMultiTopicsMixIT extends
AbstractSubscriptionRegres
});
// close
consumer.close();
+ isClosed.set(true);
try {
consumer.subscribe(topicName, topicName2);
} catch (Exception e) {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
index a28f617f32d..d1b4d38f36e 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
+import org.apache.iotdb.subscription.it.Retry;
+import org.apache.iotdb.subscription.it.RetryRule;
import
org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
import org.apache.thrift.TException;
@@ -35,14 +37,16 @@ import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
/***
* 1 consumer subscribes to 2 topics: historical data
@@ -50,6 +54,9 @@ import java.util.concurrent.atomic.AtomicInteger;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2SubscriptionRegressionConsumer.class})
public class IoTDBOneConsumerMultiTopicsTsfileIT extends
AbstractSubscriptionRegressionIT {
+
+ @Rule public RetryRule retryRule = new RetryRule();
+
private static final String database =
"root.test.OneConsumerMultiTopicsTsfile";
private static final String device = database + ".d_0";
private static List<MeasurementSchema> schemaList = new ArrayList<>();
@@ -103,6 +110,7 @@ public class IoTDBOneConsumerMultiTopicsTsfileIT extends
AbstractSubscriptionReg
subs.dropTopic(topicName2);
dropDB(database);
dropDB(database2);
+ schemaList.clear();
super.tearDown();
}
@@ -121,6 +129,7 @@ public class IoTDBOneConsumerMultiTopicsTsfileIT extends
AbstractSubscriptionReg
session_src.executeNonQueryStatement("flush;");
}
+ @Retry
@Test
public void do_test()
throws InterruptedException,
@@ -147,23 +156,6 @@ public class IoTDBOneConsumerMultiTopicsTsfileIT extends
AbstractSubscriptionReg
subs.getSubscriptions().forEach((System.out::println));
assertEquals(subs.getSubscriptions().size(), 2, "subscribe then show
subscriptions");
- final AtomicInteger rowCount = new AtomicInteger();
- Thread thread1 =
- new Thread(
- () -> {
- List<String> devices = new ArrayList<>(2);
- devices.add(device);
- devices.add(device2);
- try {
- List<Integer> results = consume_tsfile(consumer, devices);
- System.out.println(results);
- rowCount.addAndGet(results.get(0));
- rowCount.addAndGet(results.get(1));
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- });
- thread1.start();
// Subscribe and then write data
Thread thread =
new Thread(
@@ -181,7 +173,6 @@ public class IoTDBOneConsumerMultiTopicsTsfileIT extends
AbstractSubscriptionReg
});
thread.start();
thread.join();
- thread1.join();
System.out.println(
"src insert " + device + " :" + getCount(session_src, "select
count(s_0) from " + device));
@@ -190,7 +181,11 @@ public class IoTDBOneConsumerMultiTopicsTsfileIT extends
AbstractSubscriptionReg
+ device2
+ " :"
+ getCount(session_src, "select count(s_0) from " + device2));
- assertEquals(rowCount.get(), 200, "After first consumption");
+ // After first consumption
+ List<String> devices = new ArrayList<>(2);
+ devices.add(device);
+ devices.add(device2);
+ consume_tsfile_await(consumer, devices, Arrays.asList(100, 100));
// Unsubscribe
consumer.unsubscribe(topicName);
System.out.println("###### After cancellation query:");
@@ -200,12 +195,12 @@ public class IoTDBOneConsumerMultiTopicsTsfileIT extends
AbstractSubscriptionReg
// Unsubscribe and then write data
insert_data(System.currentTimeMillis(), device2);
- int result = consume_tsfile(consumer, device2);
session_src.executeNonQueryStatement(
"insert into "
+ device
+ "(time,s_0,s_1)values(1703980800000,3.45,'2023-12-31
08:00:00+08:00');"); // 2023-12-31 08:00:00+08:00
- assertEquals(result, 5, "After the second consumption");
+ consume_tsfile_await(
+ consumer, Collections.singletonList(device2),
Collections.singletonList(5));
// close
consumer.close();
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDBPatternPullConsumeTsfileIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDBPatternPullConsumeTsfileIT.java
index 2f4fe734185..fbc355d30e4 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDBPatternPullConsumeTsfileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDBPatternPullConsumeTsfileIT.java
@@ -40,6 +40,8 @@ import org.junit.runner.RunWith;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
/***
@@ -151,9 +153,8 @@ public class IoTDBDBPatternPullConsumeTsfileIT extends
AbstractSubscriptionRegre
// insert_data(1706659200000L); //2024-01-31 08:00:00+08:00
insert_data(System.currentTimeMillis());
// Consumption data
- List<Integer> results = consume_tsfile_withFileCount(consumer, device);
- assertEquals(results.get(0), 10);
- assertEquals(results.get(1), 3, "number of files received");
+ consume_tsfile_with_file_count_await(
+ consumer, Collections.singletonList(device), Arrays.asList(10, 3));
// Unsubscribe
consumer.unsubscribe(topicName);
assertEquals(
@@ -166,11 +167,7 @@ public class IoTDBDBPatternPullConsumeTsfileIT extends
AbstractSubscriptionRegre
// Consumption data: Progress is not retained after unsubscribing and
resubscribing. Full
// synchronization.
- results = consume_tsfile_withFileCount(consumer, device);
- assertEquals(
- results.get(0),
- 15,
- "Unsubscribe and then resubscribe, progress is not retained. Full
synchronization.");
- assertEquals(results.get(1), 4, "Number of files received: resubscribe
after unsubscribe");
+ consume_tsfile_with_file_count_await(
+ consumer, Collections.singletonList(device), Arrays.asList(15, 4));
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumeTsfileIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumeTsfileIT.java
index 64507390442..dc19e2201f6 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumeTsfileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumeTsfileIT.java
@@ -40,6 +40,7 @@ import org.junit.runner.RunWith;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
/***
@@ -151,10 +152,7 @@ public class IoTDBDevicePatternPullConsumeTsfileIT extends
AbstractSubscriptionR
devices.add(device);
devices.add(device2);
devices.add(device3);
- List<Integer> results = consume_tsfile(consumer, devices);
- assertEquals(results.get(0), 10);
- assertEquals(results.get(1), 0);
- assertEquals(results.get(2), 0);
+ consume_tsfile_await(consumer, devices, Arrays.asList(10, 0, 0));
// Unsubscribe
consumer.unsubscribe(topicName);
assertEquals(subs.getSubscriptions().size(), 0, "Show subscriptions after
unsubscribe");
@@ -164,12 +162,6 @@ public class IoTDBDevicePatternPullConsumeTsfileIT extends
AbstractSubscriptionR
insert_data(1707782400000L); // 2024-02-13 08:00:00+08:00
// Consumption data: Progress is not retained after unsubscribing and
resubscribing. Full
// synchronization.
- results = consume_tsfile(consumer, devices);
- assertEquals(
- results.get(0),
- 15,
- "After unsubscribing and resubscribing, progress is not retained. Full
synchronization.");
- assertEquals(results.get(1), 0, "Cancel subscription and subscribe again,"
+ device2);
- assertEquals(results.get(2), 0, "Unsubscribe and subscribe again," +
device3);
+ consume_tsfile_await(consumer, devices, Arrays.asList(15, 0, 0));
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBRootPatternPullConsumeTsfileIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBRootPatternPullConsumeTsfileIT.java
index 1ff3750d99d..c9409c5518d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBRootPatternPullConsumeTsfileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBRootPatternPullConsumeTsfileIT.java
@@ -40,6 +40,7 @@ import org.junit.runner.RunWith;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
/***
@@ -131,8 +132,8 @@ public class IoTDBRootPatternPullConsumeTsfileIT extends
AbstractSubscriptionReg
// insert_data(1706659200000L); //2024-01-31 08:00:00+08:00
insert_data(System.currentTimeMillis());
// Consumption data
- List<Integer> results = consume_tsfile_withFileCount(consumer, device);
- assertEquals(results.get(0), 10, "Number of consumption data rows");
+ consume_tsfile_await(
+ consumer, Collections.singletonList(device),
Collections.singletonList(10));
// Unsubscribe
consumer.unsubscribe(topicName);
assertEquals(subs.getSubscriptions().size(), 0, "show subscriptions after
unsubscribe");
@@ -142,10 +143,7 @@ public class IoTDBRootPatternPullConsumeTsfileIT extends
AbstractSubscriptionReg
insert_data(1707782400000L); // 2024-02-13 08:00:00+08:00
// Consumption data: Progress is not retained after unsubscribing and
re-subscribing. Full
// synchronization.
- results = consume_tsfile_withFileCount(consumer, device);
- assertEquals(
- results.get(0),
- 15,
- "After unsubscribing and resubscribing, progress is not retained. Full
synchronization.");
+ consume_tsfile_await(
+ consumer, Collections.singletonList(device),
Collections.singletonList(15));
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumeTsfileIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumeTsfileIT.java
index 53729b3d26c..0b60dfd330d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumeTsfileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumeTsfileIT.java
@@ -40,6 +40,7 @@ import org.junit.runner.RunWith;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
/***
@@ -153,10 +154,7 @@ public class IoTDBTSPatternPullConsumeTsfileIT extends
AbstractSubscriptionRegre
devices.add(device);
devices.add(device2);
devices.add(database2 + ".d_2");
- List<Integer> results = consume_tsfile(consumer, devices);
- assertEquals(results.get(0), 10);
- assertEquals(results.get(1), 0);
- assertEquals(results.get(2), 0);
+ consume_tsfile_await(consumer, devices, Arrays.asList(10, 0, 0));
insert_data(System.currentTimeMillis());
// Unsubscribe
consumer.unsubscribe(topicName);
@@ -167,13 +165,6 @@ public class IoTDBTSPatternPullConsumeTsfileIT extends
AbstractSubscriptionRegre
insert_data(1707782400000L); // 2024-02-13 08:00:00+08:00
// Consumption data: Progress is not retained after unsubscribing and
re-subscribing. Full
// synchronization.
- results = consume_tsfile(consumer, devices);
-
- assertEquals(
- results.get(0),
- 15,
- "Unsubscribe and resubscribe, progress is not retained. Full
synchronization.");
- assertEquals(results.get(1), 0, "Subscribe again after unsubscribe," +
database + ".d_1");
- assertEquals(results.get(2), 0, "Unsubscribe and then subscribe again," +
database2 + ".d_2");
+ consume_tsfile_await(consumer, devices, Arrays.asList(15, 0, 0));
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBOneConsumerMultiTopicsMixIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBOneConsumerMultiTopicsMixIT.java
index 3c229d55ddd..d6637e0c351 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBOneConsumerMultiTopicsMixIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBOneConsumerMultiTopicsMixIT.java
@@ -28,6 +28,8 @@ import
org.apache.iotdb.session.subscription.consumer.ConsumeResult;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
import
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
+import org.apache.iotdb.subscription.it.Retry;
+import org.apache.iotdb.subscription.it.RetryRule;
import
org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
import org.apache.thrift.TException;
@@ -43,6 +45,7 @@ import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -62,6 +65,9 @@ import static
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2SubscriptionRegressionConsumer.class})
public class IoTDBOneConsumerMultiTopicsMixIT extends
AbstractSubscriptionRegressionIT {
+
+ @Rule public RetryRule retryRule = new RetryRule();
+
private static final String database = "root.test.OneConsumerMultiTopicsMix";
private static final String database2 = "root.OneConsumerMultiTopicsMix";
private static final String device = database + ".d_0";
@@ -109,6 +115,7 @@ public class IoTDBOneConsumerMultiTopicsMixIT extends
AbstractSubscriptionRegres
subs.dropTopic(topicName2);
dropDB(database);
dropDB(database2);
+ schemaList.clear();
super.tearDown();
}
@@ -127,6 +134,7 @@ public class IoTDBOneConsumerMultiTopicsMixIT extends
AbstractSubscriptionRegres
session_src.executeNonQueryStatement("flush");
}
+ @Retry
@Test
public void do_test()
throws InterruptedException,
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
index 0fcb1cd94af..35f1409f9ff 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.subscription.consumer.AckStrategy;
import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
import org.apache.iotdb.subscription.it.Retry;
import org.apache.iotdb.subscription.it.RetryRule;
import
org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
@@ -98,6 +99,15 @@ public class IoTDBOneConsumerMultiTopicsTsfileIT extends
AbstractSubscriptionReg
assertTrue(subs.getTopic(topicName).isPresent(), "create show topics");
}
+ @Override
+ protected void setUpConfig() {
+ super.setUpConfig();
+
+ IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+ }
+
@Override
@After
public void tearDown() throws Exception {
@@ -109,6 +119,7 @@ public class IoTDBOneConsumerMultiTopicsTsfileIT extends
AbstractSubscriptionReg
subs.dropTopic(topicName2);
dropDB(database);
dropDB(database2);
+ schemaList.clear();
super.tearDown();
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/topic/IoTDBDataSet1TopicConsumerSpecialIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/topic/IoTDBDataSet1TopicConsumerSpecialIT.java
index 0682d0879e6..c4917bf027c 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/topic/IoTDBDataSet1TopicConsumerSpecialIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/topic/IoTDBDataSet1TopicConsumerSpecialIT.java
@@ -118,7 +118,7 @@ public class IoTDBDataSet1TopicConsumerSpecialIT extends
AbstractSubscriptionReg
consumer.subscribe(topicName);
assertEquals(subs.getSubscriptions().size(), 1, "show subscriptions after
subscription");
// Consumption data
- consume_data(consumer);
+ consume_data(consumer, session_dest);
check_count(4, "select count(`ABH#01`) from " + device, "Consumption
data:" + pattern);
// Unsubscribe
consumer.unsubscribe(topicName);
@@ -133,7 +133,7 @@ public class IoTDBDataSet1TopicConsumerSpecialIT extends
AbstractSubscriptionReg
consumer.subscribe(topicName);
assertEquals(subs.getSubscriptions().size(), 1, "show subscriptions after
re-subscribing");
// Consumption data
- consume_data(consumer);
+ consume_data(consumer, session_dest);
check_count(8, "select count(`ABH#01`) from " + device, "consume data
again:" + pattern);
// Unsubscribe
consumer.unsubscribe(topicName);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index 0b434b7b331..cc03f726141 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -32,6 +32,7 @@ import
org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
import org.apache.iotdb.rpc.subscription.payload.poll.TerminationPayload;
+import org.apache.iotdb.session.subscription.util.PollTimer;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
@@ -109,16 +110,30 @@ public class SubscriptionBroker {
final Map<String, Long> topicNameToIncrements = new HashMap<>();
// Iterate over each sorted topic name and poll the corresponding events
+ int remainingTopicSize = sortedTopicNames.size();
for (final String topicName : sortedTopicNames) {
final SubscriptionPrefetchingQueue prefetchingQueue =
topicNameToPrefetchingQueue.get(topicName);
+ remainingTopicSize -= 1;
+
// Recheck
if (Objects.isNull(prefetchingQueue) || prefetchingQueue.isClosed()) {
continue;
}
// Poll the event from the prefetching queue
- final SubscriptionEvent event = prefetchingQueue.poll(consumerId);
+ final SubscriptionEvent event;
+ if (prefetchingQueue instanceof SubscriptionPrefetchingTsFileQueue) {
+ // TODO: current poll timeout is uniform for all candidate topics
+ final PollTimer timer =
+ new PollTimer(
+ System.currentTimeMillis(),
+ SubscriptionAgent.receiver().remainingMs() / Math.max(1,
remainingTopicSize));
+ event = prefetchingQueue.pollV2(consumerId, timer);
+ } else {
+ // TODO: migrate poll to pollV2
+ event = prefetchingQueue.poll(consumerId);
+ }
if (Objects.isNull(event)) {
continue;
}
@@ -199,10 +214,14 @@ public class SubscriptionBroker {
// Check if the prefetching queue is closed
if (prefetchingQueue.isClosed()) {
- LOGGER.warn(
- "Subscription: prefetching queue bound to topic [{}] for consumer
group [{}] is closed",
- topicName,
- brokerId);
+ SubscriptionDataNodeResourceManager.log()
+ .schedule(SubscriptionBroker.class, brokerId, topicName)
+ .ifPresent(
+ l ->
+ l.warn(
+ "Subscription: prefetching queue bound to topic [{}]
for consumer group [{}] is closed",
+ topicName,
+ brokerId));
continue;
}
@@ -479,7 +498,11 @@ public class SubscriptionBroker {
brokerId));
return false;
}
- return prefetchingQueue.executePrefetch();
+
+ // TODO: migrate executePrefetch to executePrefetchV2
+ return prefetchingQueue instanceof SubscriptionPrefetchingTabletQueue
+ ? prefetchingQueue.executePrefetch()
+ : prefetchingQueue.executePrefetchV2();
}
public int getPipeEventCount(final String topicName) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index fa443eb50f9..3dd9e0d54fb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -19,24 +19,30 @@
package org.apache.iotdb.db.subscription.broker;
+import
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
import
org.apache.iotdb.db.pipe.agent.task.execution.PipeSubtaskExecutorManager;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatches;
+import
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTsFileEventBatch;
import
org.apache.iotdb.db.subscription.task.subtask.SubscriptionReceiverSubtask;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.subscription.payload.poll.ErrorPayload;
import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
+import org.apache.iotdb.session.subscription.util.PollTimer;
import com.google.common.collect.ImmutableSet;
import org.apache.tsfile.utils.Pair;
@@ -45,6 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@@ -105,6 +112,12 @@ public abstract class SubscriptionPrefetchingQueue {
private volatile boolean isCompleted = false;
private volatile boolean isClosed = false;
+ // for prefetch v2
+ // TODO: make it thread-local for higher throughput
+ private volatile TsFileInsertionEvent currentTsFileInsertionEvent;
+ private volatile RetryableEvent<TabletInsertionEvent>
currentTabletInsertionEvent;
+ private volatile SubscriptionTsFileToTabletIterator currentToTabletIterator;
+
public SubscriptionPrefetchingQueue(
final String brokerId,
final String topicName,
@@ -148,6 +161,20 @@ public abstract class SubscriptionPrefetchingQueue {
// no need to clean up events in inputPendingQueue, see
//
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask.close
+
+ if (Objects.nonNull(currentToTabletIterator)) {
+ currentToTabletIterator.cleanUp();
+ currentToTabletIterator = null;
+ }
+ if (Objects.nonNull(currentTsFileInsertionEvent)) {
+ ((EnrichedEvent)
currentTsFileInsertionEvent).clearReferenceCount(this.getClass().getName());
+ currentTsFileInsertionEvent = null;
+ }
+ if (Objects.nonNull(currentTabletInsertionEvent)) {
+ ((EnrichedEvent) currentTabletInsertionEvent.innerEvent)
+ .clearReferenceCount(this.getClass().getName());
+ currentTabletInsertionEvent = null;
+ }
}
///////////////////////////////// lock /////////////////////////////////
@@ -188,7 +215,7 @@ public abstract class SubscriptionPrefetchingQueue {
}
}
- public SubscriptionEvent pollInternal(final String consumerId) {
+ private SubscriptionEvent pollInternal(final String consumerId) {
states.markPollRequest();
if (prefetchingQueue.isEmpty()) {
@@ -258,6 +285,77 @@ public abstract class SubscriptionPrefetchingQueue {
return null;
}
+ public SubscriptionEvent pollV2(final String consumerId, final PollTimer
timer) {
+ acquireReadLock();
+ try {
+ return isClosed() ? null : pollInternalV2(consumerId, timer);
+ } finally {
+ releaseReadLock();
+ }
+ }
+
+ private SubscriptionEvent pollInternalV2(final String consumerId, final
PollTimer timer) {
+ states.markPollRequest();
+
+ // do-while ensures at least one poll
+ do {
+ SubscriptionEvent event;
+ try {
+ if (prefetchingQueue.isEmpty()) {
+ // TODO: concurrent polling of multiple prefetching queues
+ Thread.sleep(100);
+ onEvent();
+ }
+
+ final long size = prefetchingQueue.size();
+ long count = 0;
+
+ while (count++ < size // limit control
+ && Objects.nonNull(
+ event =
+ prefetchingQueue.poll(
+
SubscriptionConfig.getInstance().getSubscriptionPollMaxBlockingTimeMs(),
+ TimeUnit.MILLISECONDS))) {
+ if (event.isCommitted()) {
+ LOGGER.warn(
+ "Subscription: SubscriptionPrefetchingQueue {} poll committed
event {} from prefetching queue (broken invariant), remove it",
+ this,
+ event);
+ // no need to update inFlightEvents
+ continue;
+ }
+
+ if (!event.pollable()) {
+ LOGGER.warn(
+ "Subscription: SubscriptionPrefetchingQueue {} poll
non-pollable event {} from prefetching queue (broken invariant), nack and
remove it",
+ this,
+ event);
+ event.nack(); // now pollable
+ // no need to update inFlightEvents and prefetchingQueue
+ continue;
+ }
+
+ // This operation should be performed before updating inFlightEvents
to prevent multiple
+ // consumers from consuming the same event.
+ event.recordLastPolledTimestamp(); // now non-pollable
+
+ inFlightEvents.put(new Pair<>(consumerId, event.getCommitContext()),
event);
+ event.recordLastPolledConsumerId(consumerId);
+ return event;
+ }
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn(
+ "Subscription: SubscriptionPrefetchingQueue {} interrupted while
polling events.",
+ this,
+ e);
+ }
+ timer.update();
+ } while (!timer.isExpired());
+
+ return null;
+ }
+
/////////////////////////////// prefetch ///////////////////////////////
public boolean executePrefetch() {
@@ -283,6 +381,22 @@ public abstract class SubscriptionPrefetchingQueue {
}
}
+ public boolean executePrefetchV2() {
+ acquireReadLock();
+ try {
+ if (isClosed()) {
+ return false;
+ }
+ reportStateIfNeeded();
+ tryPrefetchV2();
+ remapInFlightEventsSnapshot(committedCleaner, pollableNacker);
+ // always return false
+ return false;
+ } finally {
+ releaseReadLock();
+ }
+ }
+
private void reportStateIfNeeded() {
if (System.currentTimeMillis() - lastStateReportTimestamp
>
SubscriptionConfig.getInstance().getSubscriptionLogManagerBaseIntervalMs()
@@ -353,7 +467,19 @@ public abstract class SubscriptionPrefetchingQueue {
* {@link SubscriptionPrefetchingQueue#inputPendingQueue} is empty.
*/
private synchronized void tryPrefetch() {
- while (!inputPendingQueue.isEmpty()) {
+ while (!inputPendingQueue.isEmpty() ||
Objects.nonNull(currentTabletInsertionEvent)) {
+ if (Objects.nonNull(currentTabletInsertionEvent)) {
+ final RetryableState state =
onRetryableTabletInsertionEvent(currentTabletInsertionEvent);
+ switch (state) {
+ case PREFETCHED:
+ return;
+ case RETRY:
+ continue;
+ case NO_RETRY:
+ break;
+ }
+ }
+
final Event event =
UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll());
if (Objects.isNull(event)) {
// The event will be null in two cases:
@@ -389,10 +515,16 @@ public abstract class SubscriptionPrefetchingQueue {
}
if (event instanceof TabletInsertionEvent) {
- if (onEvent((TabletInsertionEvent) event)) {
- return;
+ final RetryableState state =
+ onRetryableTabletInsertionEvent(
+ new RetryableEvent<>((TabletInsertionEvent) event, false,
false));
+ switch (state) {
+ case PREFETCHED:
+ return;
+ case RETRY:
+ case NO_RETRY:
+ continue;
}
- continue;
}
if (event instanceof TsFileInsertionEvent) {
@@ -420,6 +552,173 @@ public abstract class SubscriptionPrefetchingQueue {
// At this moment, the inputPendingQueue is empty.
}
+ private synchronized void tryPrefetchV2() {
+ if (!prefetchingQueue.isEmpty()) {
+ return;
+ }
+
+ if (Objects.nonNull(currentTsFileInsertionEvent)) {
+ constructToTabletIterator(currentTsFileInsertionEvent);
+ return;
+ }
+
+ if (Objects.nonNull(currentTabletInsertionEvent)) {
+ final RetryableState state =
onRetryableTabletInsertionEvent(currentTabletInsertionEvent);
+ switch (state) {
+ case PREFETCHED:
+ case RETRY:
+ return;
+ case NO_RETRY:
+ break;
+ }
+ }
+
+ if (Objects.nonNull(currentToTabletIterator)) {
+ while (currentToTabletIterator.hasNext() ||
Objects.nonNull(currentTabletInsertionEvent)) {
+ if (Objects.nonNull(currentTabletInsertionEvent)) {
+ final RetryableState state =
onRetryableTabletInsertionEvent(currentTabletInsertionEvent);
+ switch (state) {
+ case PREFETCHED:
+ case RETRY:
+ return;
+ case NO_RETRY:
+ break;
+ }
+ }
+ final RetryableState state =
+ onRetryableTabletInsertionEvent(
+ new RetryableEvent<>(currentToTabletIterator.next(), true,
true));
+ switch (state) {
+ case PREFETCHED:
+ case RETRY:
+ return;
+ case NO_RETRY:
+ continue;
+ }
+ }
+ currentToTabletIterator.ack();
+ currentToTabletIterator = null;
+ }
+
+ final Event event =
UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll());
+ if (Objects.isNull(event)) {
+ // The event will be null in two cases:
+ // 1. The inputPendingQueue is empty.
+ // 2. The tsfile event has been deduplicated.
+ return;
+ }
+
+ if (!(event instanceof EnrichedEvent)) {
+ LOGGER.warn(
+ "Subscription: SubscriptionPrefetchingQueue {} only support prefetch
EnrichedEvent. Ignore {}.",
+ this,
+ event);
+ return;
+ }
+
+ if (event instanceof PipeTerminateEvent) {
+ final PipeTerminateEvent terminateEvent = (PipeTerminateEvent) event;
+ // add mark completed hook
+ terminateEvent.addOnCommittedHook(
+ () -> {
+ markCompleted();
+ return null;
+ });
+ // commit directly
+ ((PipeTerminateEvent) event)
+
.decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), true);
+ LOGGER.info(
+ "Subscription: SubscriptionPrefetchingQueue {} commit
PipeTerminateEvent {}",
+ this,
+ terminateEvent);
+ return;
+ }
+
+ if (event instanceof TabletInsertionEvent) {
+ onRetryableTabletInsertionEvent(
+ new RetryableEvent<>((TabletInsertionEvent) event, false, false));
+ return; // always return here
+ }
+
+ if (event instanceof TsFileInsertionEvent) {
+ if
(PipeEventCollector.canSkipParsing4TsFileEvent((PipeTsFileInsertionEvent)
event)) {
+ onEvent((TsFileInsertionEvent) event);
+ return;
+ }
+ if (Objects.nonNull(currentToTabletIterator)) {
+ LOGGER.warn(
+ "Subscription: SubscriptionPrefetchingQueue {} prefetch
TsFileInsertionEvent when ToTabletIterator is not null (broken invariant).
Ignore {}.",
+ this,
+ event);
+ } else {
+ constructToTabletIterator((PipeTsFileInsertionEvent) event);
+ return;
+ }
+ }
+
+ // TODO:
+ // - PipeHeartbeatEvent: ignored? (may affect pipe metrics)
+ // - UserDefinedEnrichedEvent: ignored?
+ // - Others: events related to meta sync, safe to ignore
+ LOGGER.info(
+ "Subscription: SubscriptionPrefetchingQueue {} ignore EnrichedEvent {}
when prefetching.",
+ this,
+ event);
+ ((EnrichedEvent) event)
+ .decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(),
false);
+
+ onEvent();
+ }
+
+ private void constructToTabletIterator(final TsFileInsertionEvent event) {
+ currentTsFileInsertionEvent = null;
+ final Iterator<TabletInsertionEvent> tabletInsertionEventsIterator;
+ try {
+ tabletInsertionEventsIterator =
event.toTabletInsertionEvents().iterator();
+ currentToTabletIterator =
+ new SubscriptionTsFileToTabletIterator(
+ (PipeTsFileInsertionEvent) event, tabletInsertionEventsIterator);
+ } catch (final PipeException e) {
+ LOGGER.warn("Exception {} occurred when {} construct ToTabletIterator",
this, e, e);
+ currentTsFileInsertionEvent = event;
+ }
+ }
+
+ private RetryableState onRetryableTabletInsertionEvent(
+ final RetryableEvent<TabletInsertionEvent> retryableEvent) {
+ currentTabletInsertionEvent = null;
+ final EnrichedEvent event = (EnrichedEvent) retryableEvent.innerEvent;
+ if (retryableEvent.shouldIncreaseReferenceCount) {
+ if (!event.increaseReferenceCount(this.getClass().getName())) {
+ LOGGER.warn(
+ "Failed to increase reference count for {} when {} on retryable
TabletInsertionEvent",
+ event,
+ this);
+ currentTabletInsertionEvent = retryableEvent;
+ return RetryableState.RETRY;
+ }
+ retryableEvent.shouldIncreaseReferenceCount = false;
+ }
+ if (retryableEvent.shouldEnrichWithCommitterKeyAndCommitId) {
+ PipeEventCommitManager.getInstance()
+ .enrichWithCommitterKeyAndCommitId(
+ event,
+ currentToTabletIterator.getCreationTime(),
+ currentToTabletIterator.getRegionId());
+ retryableEvent.shouldEnrichWithCommitterKeyAndCommitId = false;
+ }
+ try {
+ return onEvent(retryableEvent.innerEvent)
+ ? RetryableState.PREFETCHED
+ : RetryableState.NO_RETRY;
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Exception occurred when {} on retryable TabletInsertionEvent {}",
this, event, e);
+ currentTabletInsertionEvent = retryableEvent;
+ return RetryableState.RETRY;
+ }
+ }
+
/**
* @return {@code true} if there are subscription events prefetched.
*/
@@ -427,8 +726,9 @@ public abstract class SubscriptionPrefetchingQueue {
/**
* @return {@code true} if there are subscription events prefetched.
+ * @throws Exception only occur when constructing {@link
SubscriptionPipeTsFileEventBatch}.
*/
- protected boolean onEvent(final TabletInsertionEvent event) {
+ protected boolean onEvent(final TabletInsertionEvent event) throws Exception
{
return batches.onEvent((EnrichedEvent) event, this::prefetchEvent);
}
@@ -761,4 +1061,79 @@ public abstract class SubscriptionPrefetchingQueue {
}
return ev;
};
+
+ /////////////////////////////// tsfile to tablet iteration
///////////////////////////////
+
+ private static class SubscriptionTsFileToTabletIterator
+ implements Iterator<TabletInsertionEvent> {
+
+ private final PipeTsFileInsertionEvent tsFileInsertionEvent;
+ private final Iterator<TabletInsertionEvent> tabletInsertionEventsIterator;
+
+ private SubscriptionTsFileToTabletIterator(
+ final PipeTsFileInsertionEvent tsFileInsertionEvent,
+ final Iterator<TabletInsertionEvent> tabletInsertionEventsIterator) {
+ this.tsFileInsertionEvent = tsFileInsertionEvent;
+ this.tabletInsertionEventsIterator = tabletInsertionEventsIterator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return tabletInsertionEventsIterator.hasNext();
+ }
+
+ @Override
+ public TabletInsertionEvent next() {
+ return tabletInsertionEventsIterator.next();
+ }
+
+ public void ack() {
+ try {
+ tsFileInsertionEvent.close();
+ } catch (final Exception ignored) {
+ }
+ // should not report here
+ tsFileInsertionEvent.decreaseReferenceCount(this.getClass().getName(),
false);
+ }
+
+ public void cleanUp() {
+ try {
+ tsFileInsertionEvent.close();
+ } catch (final Exception ignored) {
+ }
+ tsFileInsertionEvent.clearReferenceCount(this.getClass().getName());
+ }
+
+ public long getCreationTime() {
+ return tsFileInsertionEvent.getCreationTime();
+ }
+
+ public int getRegionId() {
+ return tsFileInsertionEvent.getRegionId();
+ }
+ }
+
+ /////////////////////////////// retryable Event
///////////////////////////////
+
+ private static class RetryableEvent<E extends Event> {
+
+ private final E innerEvent;
+ private volatile boolean shouldIncreaseReferenceCount;
+ private volatile boolean shouldEnrichWithCommitterKeyAndCommitId;
+
+ private RetryableEvent(
+ final E innerEvent,
+ final boolean shouldIncreaseReferenceCount,
+ final boolean shouldEnrichWithCommitterKeyAndCommitId) {
+ this.innerEvent = innerEvent;
+ this.shouldIncreaseReferenceCount = shouldIncreaseReferenceCount;
+ this.shouldEnrichWithCommitterKeyAndCommitId =
shouldEnrichWithCommitterKeyAndCommitId;
+ }
+ }
+
+ private enum RetryableState {
+ RETRY,
+ NO_RETRY,
+ PREFETCHED,
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
index 6596b26e7d8..755826f4ced 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
@@ -193,7 +193,12 @@ public class SubscriptionPrefetchingTabletQueue extends
SubscriptionPrefetchingQ
@Override
protected boolean onEvent(final TsFileInsertionEvent event) {
- return batches.onEvent((EnrichedEvent) event, this::prefetchEvent);
+ try {
+ return batches.onEvent((EnrichedEvent) event, this::prefetchEvent);
+ } catch (final Exception e) {
+ LOGGER.error("Subscription: unexpected exception (broken invariant) {}",
e, e);
+ }
+ return false;
}
/////////////////////////////// stringify ///////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index 6217af0123b..1b29d1d97c3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -19,9 +19,7 @@
package org.apache.iotdb.db.subscription.broker;
-import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
-import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
@@ -247,10 +245,6 @@ public class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQ
@Override
protected boolean onEvent(final TsFileInsertionEvent event) {
- if
(!PipeEventCollector.canSkipParsing4TsFileEvent((PipeTsFileInsertionEvent)
event)) {
- return batches.onEvent((EnrichedEvent) event, this::prefetchEvent);
- }
-
final SubscriptionCommitContext commitContext =
generateSubscriptionCommitContext();
final SubscriptionEvent ev =
new SubscriptionEvent(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLock.java
new file mode 100644
index 00000000000..194eb5a8fa5
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLock.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.event.batch;
+
+import org.apache.iotdb.db.storageengine.StorageEngine;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/** refer to {@link
org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceSegmentLock} */
+public class SubscriptionPipeEventBatchSegmentLock {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(SubscriptionPipeEventBatchSegmentLock.class);
+
+ private static final int SEGMENT_LOCK_MIN_SIZE = 32;
+ private static final int SEGMENT_LOCK_MAX_SIZE = 128;
+
+ private volatile ReentrantLock[] locks;
+
+ private void initIfNecessary() {
+ if (locks == null) {
+ synchronized (this) {
+ if (locks == null) {
+ int lockSegmentSize = SEGMENT_LOCK_MIN_SIZE;
+ try {
+ lockSegmentSize =
StorageEngine.getInstance().getAllDataRegionIds().size();
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Cannot get data region ids, use default lock segment size:
{}", lockSegmentSize);
+ }
+ lockSegmentSize = Math.min(SEGMENT_LOCK_MAX_SIZE, lockSegmentSize);
+ lockSegmentSize = Math.max(SEGMENT_LOCK_MIN_SIZE, lockSegmentSize);
+
+ final ReentrantLock[] tmpLocks = new ReentrantLock[lockSegmentSize];
+ for (int i = 0; i < tmpLocks.length; i++) {
+ tmpLocks[i] = new ReentrantLock();
+ }
+
+ // publish this variable
+ locks = tmpLocks;
+ }
+ }
+ }
+ }
+
+ public void lock(final int regionId) {
+ initIfNecessary();
+ locks[regionId % locks.length].lock();
+ }
+
+ public void unlock(final int regionId) {
+ initIfNecessary();
+ locks[regionId % locks.length].unlock();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
index c20dffcd5e9..17debfc4b48 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java
@@ -30,9 +30,9 @@ import org.checkerframework.checker.nullness.qual.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
@@ -45,6 +45,7 @@ public class SubscriptionPipeEventBatches {
protected final long maxBatchSizeInBytes;
private final Map<Integer, SubscriptionPipeEventBatch> regionIdToBatch;
+ private final SubscriptionPipeEventBatchSegmentLock segmentLock;
public SubscriptionPipeEventBatches(
final SubscriptionPrefetchingQueue prefetchingQueue,
@@ -54,7 +55,8 @@ public class SubscriptionPipeEventBatches {
this.maxDelayInMs = maxDelayInMs;
this.maxBatchSizeInBytes = maxBatchSizeInBytes;
- this.regionIdToBatch = new ConcurrentHashMap<>();
+ this.regionIdToBatch = new HashMap<>();
+ this.segmentLock = new SubscriptionPipeEventBatchSegmentLock();
}
/**
@@ -63,29 +65,25 @@ public class SubscriptionPipeEventBatches {
public boolean onEvent(final Consumer<SubscriptionEvent> consumer) {
final AtomicBoolean hasNew = new AtomicBoolean(false);
for (final int regionId : ImmutableList.copyOf(regionIdToBatch.keySet())) {
- regionIdToBatch.compute(
- regionId,
- (key, batch) -> {
- if (Objects.isNull(batch)) {
- return null;
- }
-
- try {
- if (batch.onEvent(consumer)) {
- hasNew.set(true);
- return null; // remove this entry
- }
- // Seal this batch next time.
- } catch (final Exception e) {
- LOGGER.warn("Exception occurred when sealing events from batch
{}", batch, e);
- // Seal this batch next time.
- }
-
- return batch;
- });
-
- if (hasNew.get()) {
- break;
+ try {
+ segmentLock.lock(regionId);
+ final SubscriptionPipeEventBatch batch = regionIdToBatch.get(regionId);
+ if (Objects.isNull(batch)) {
+ continue;
+ }
+ try {
+ if (batch.onEvent(consumer)) {
+ hasNew.set(true);
+ }
+ } catch (final Exception e) {
+ LOGGER.warn("Exception occurred when sealing events from batch {}",
batch, e);
+ }
+ if (hasNew.get()) {
+ regionIdToBatch.remove(regionId);
+ break;
+ }
+ } finally {
+ segmentLock.unlock(regionId);
}
}
@@ -96,41 +94,51 @@ public class SubscriptionPipeEventBatches {
* @return {@code true} if there are subscription events consumed.
*/
public boolean onEvent(
- final @NonNull EnrichedEvent event, final Consumer<SubscriptionEvent>
consumer) {
+ final @NonNull EnrichedEvent event, final Consumer<SubscriptionEvent>
consumer)
+ throws Exception {
final int regionId = event.getCommitterKey().getRegionId();
final AtomicBoolean hasNew = new AtomicBoolean(false);
- regionIdToBatch.compute(
- regionId,
- (key, batch) -> {
- if (Objects.isNull(batch)) {
- batch =
- prefetchingQueue instanceof SubscriptionPrefetchingTabletQueue
- ? new SubscriptionPipeTabletEventBatch(
- key,
- (SubscriptionPrefetchingTabletQueue) prefetchingQueue,
- maxDelayInMs,
- maxBatchSizeInBytes)
- : new SubscriptionPipeTsFileEventBatch(
- key,
- (SubscriptionPrefetchingTsFileQueue) prefetchingQueue,
- maxDelayInMs,
- maxBatchSizeInBytes);
- }
+ try {
+ segmentLock.lock(regionId);
+ SubscriptionPipeEventBatch batch = regionIdToBatch.get(regionId);
+ if (Objects.isNull(batch)) {
+ try {
+ batch =
+ prefetchingQueue instanceof SubscriptionPrefetchingTabletQueue
+ ? new SubscriptionPipeTabletEventBatch(
+ regionId,
+ (SubscriptionPrefetchingTabletQueue) prefetchingQueue,
+ maxDelayInMs,
+ maxBatchSizeInBytes)
+ : new SubscriptionPipeTsFileEventBatch(
+ regionId,
+ (SubscriptionPrefetchingTsFileQueue) prefetchingQueue,
+ maxDelayInMs,
+ maxBatchSizeInBytes);
+ } catch (final Exception e) {
+ LOGGER.warn("Exception occurred when construct new batch", e);
+ throw e; // rethrow exception for retry
+ }
+ }
- try {
- if (batch.onEvent(event, consumer)) {
- hasNew.set(true);
- return null; // remove this entry
- }
- // Seal this batch next time.
- } catch (final Exception e) {
- LOGGER.warn("Exception occurred when sealing events from batch
{}", batch, e);
- // Seal this batch next time.
- }
+ try {
+ if (batch.onEvent(event, consumer)) {
+ hasNew.set(true);
+ }
+ } catch (final Exception e) {
+ LOGGER.warn("Exception occurred when sealing events from batch {}",
batch, e);
+ }
- return batch;
- });
+ if (hasNew.get()) {
+ regionIdToBatch.remove(regionId);
+ } else {
+ regionIdToBatch.put(regionId, batch);
+ }
+
+ } finally {
+ segmentLock.unlock(regionId);
+ }
return hasNew.get();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index 15c93d7a5e6..47be439912b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.subscription.event.batch;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch;
-import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import
org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTsFileQueue;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import
org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTsFileBatchEvents;
@@ -56,22 +55,12 @@ public class SubscriptionPipeTsFileEventBatch extends
SubscriptionPipeEventBatch
@Override
public synchronized void ack() {
batch.decreaseEventsReferenceCount(this.getClass().getName(), true);
- enrichedEvents.stream()
- // only decrease reference count for tsfile event, since we already
decrease reference count
- // for tablet event in batch
- .filter(event -> event instanceof PipeTsFileInsertionEvent)
- .forEach(event ->
event.decreaseReferenceCount(this.getClass().getName(), true));
}
@Override
public synchronized void cleanUp(final boolean force) {
// close batch, it includes clearing the reference count of events
batch.close();
-
- // clear the reference count of events
- for (final EnrichedEvent enrichedEvent : enrichedEvents) {
- enrichedEvent.clearReferenceCount(this.getClass().getName());
- }
enrichedEvents.clear();
}
@@ -92,31 +81,10 @@ public class SubscriptionPipeTsFileEventBatch extends
SubscriptionPipeEventBatch
@Override
protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) {
- // TODO: parse tsfile event on the fly like
SubscriptionPipeTabletEventBatch
- try {
- ((PipeTsFileInsertionEvent) event)
- .consumeTabletInsertionEventsWithRetry(
- event1 -> {
- if (!event1.increaseReferenceCount(this.getClass().getName()))
{
- LOGGER.warn(
- "SubscriptionPipeTsFileEventBatch: Failed to increase
the reference count of event {}, skipping it.",
- event1.coreReportMessage());
- } else {
- try {
- batch.onEvent(event1);
- } catch (final Exception ignored) {
- // no exceptions will be thrown
- }
- }
- },
- "SubscriptionPipeTsFileEventBatch::onTsFileInsertionEvent");
- } finally {
- try {
- event.close();
- } catch (final Exception ignored) {
- // no exceptions will be thrown
- }
- }
+ LOGGER.warn(
+ "SubscriptionPipeTsFileEventBatch {} ignore TsFileInsertionEvent {}
when batching.",
+ this,
+ event);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index c1e52165b8d..4c8033f1612 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -177,6 +177,8 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
if (Objects.isNull(pollTimer)) {
return
SubscriptionConfig.getInstance().getSubscriptionDefaultTimeoutInMs();
}
+ // update timer before fetch remaining ms
+ pollTimer.update();
return pollTimer.remainingMs();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 04d2263ca31..556cc581c7e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -333,9 +333,9 @@ public class CommonConfig {
private float subscriptionCacheMemoryUsagePercentage = 0.2F;
private int subscriptionSubtaskExecutorMaxThreadNum = 2;
- private int subscriptionPrefetchTabletBatchMaxDelayInMs = 20; // 1s
+ private int subscriptionPrefetchTabletBatchMaxDelayInMs = 20;
private long subscriptionPrefetchTabletBatchMaxSizeInBytes = MB;
- private int subscriptionPrefetchTsFileBatchMaxDelayInMs = 1000; // 5s
+ private int subscriptionPrefetchTsFileBatchMaxDelayInMs = 1000;
private long subscriptionPrefetchTsFileBatchMaxSizeInBytes = 2 * MB;
private int subscriptionPollMaxBlockingTimeMs = 500;
private int subscriptionDefaultTimeoutInMs = 10_000; // 10s