This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new dcdbbe524e9 Subscription: improve parsing logic when using JAVA SDK
client & refactor subscription IT & intro `getSubscribedTopicNames` API (#12721)
dcdbbe524e9 is described below
commit dcdbbe524e934c7d330e476198faea4b443b42de
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Jun 12 19:29:35 2024 +0800
Subscription: improve parsing logic when using JAVA SDK client & refactor
subscription IT & intro `getSubscribedTopicNames` API (#12721)
---
.../apache/iotdb/SubscriptionSessionExample.java | 48 +++++---
...tionDualIT.java => AbstractSubscriptionIT.java} | 36 +-----
.../it/IoTDBSubscriptionITConstant.java | 19 ++-
.../IoTDBSubscriptionRestartIT.java | 53 ++++-----
.../it/dual/AbstractSubscriptionDualIT.java | 24 ++--
.../it/dual/IoTDBSubscriptionConsumerGroupIT.java | 31 ++---
.../it/dual/IoTDBSubscriptionTimePrecisionIT.java | 33 +++---
.../it/dual/IoTDBSubscriptionTopicIT.java | 131 +++++++++------------
.../AbstractSubscriptionLocalIT.java} | 29 +++--
.../it/local/IoTDBSubscriptionBasicIT.java | 117 ++++++------------
.../it/local/IoTDBSubscriptionIdempotentIT.java | 14 +--
.../SubscriptionIdentifierSemanticException.java | 26 ++--
.../response/PipeSubscribeSubscribeResp.java | 51 +++++++-
.../response/PipeSubscribeUnsubscribeResp.java | 51 +++++++-
.../consumer/SubscriptionConsumer.java | 102 ++++++++++++----
.../consumer/SubscriptionProvider.java | 12 +-
.../consumer/SubscriptionPullConsumer.java | 9 +-
.../consumer/SubscriptionPushConsumer.java | 8 +-
.../payload/SubscriptionFileHandler.java | 20 ++--
.../session/subscription/util/IdentifierUtils.java | 51 ++++++++
.../agent/SubscriptionAgentLauncher.java | 8 +-
.../agent/SubscriptionBrokerAgent.java | 11 +-
.../agent/SubscriptionConsumerAgent.java | 22 ++--
.../agent/SubscriptionReceiverAgent.java | 6 +-
.../subscription/agent/SubscriptionTopicAgent.java | 20 ++--
.../db/subscription/broker/SubscriptionBroker.java | 33 +++++-
.../broker/SubscriptionPrefetchingQueue.java | 28 ++++-
.../SubscriptionPrefetchingTabletsQueue.java | 9 ++
.../broker/SubscriptionPrefetchingTsFileQueue.java | 24 ++++
.../db/subscription/event/SubscriptionEvent.java | 6 +
.../receiver/SubscriptionReceiverV1.java | 36 +++---
.../task/subtask/SubscriptionConnectorSubtask.java | 18 +--
.../SubscriptionConnectorSubtaskLifeCycle.java | 54 ++-------
33 files changed, 673 insertions(+), 467 deletions(-)
diff --git
a/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
b/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
index 62c869e02bf..3aea4e0a762 100644
---
a/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
+++
b/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
@@ -30,9 +30,13 @@ import
org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
import org.apache.tsfile.read.TsFileReader;
+import org.apache.tsfile.read.common.Path;
+import org.apache.tsfile.read.expression.QueryExpression;
+import org.apache.tsfile.read.query.dataset.QueryDataSet;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
@@ -46,11 +50,12 @@ public class SubscriptionSessionExample {
private static final int PORT = 6667;
private static final String TOPIC_1 = "topic1";
- private static final String TOPIC_2 = "`topic2`";
+ private static final String TOPIC_2 = "`'topic2'`";
- public static final long SLEEP_NS = 1_000_000_000L;
- public static final long POLL_TIMEOUT_MS = 10_000L;
+ private static final long SLEEP_NS = 1_000_000_000L;
+ private static final long POLL_TIMEOUT_MS = 10_000L;
private static final int MAX_RETRY_TIMES = 3;
+ private static final int PARALLELISM = 8;
private static void prepareData() throws Exception {
// Open session
@@ -66,13 +71,13 @@ public class SubscriptionSessionExample {
// Insert some historical data
final long currentTime = System.currentTimeMillis();
- for (int i = 0; i < 10000; ++i) {
+ for (int i = 0; i < 100; ++i) {
session.executeNonQueryStatement(
String.format("insert into root.db.d1(time, s1, s2) values (%s, 1,
2)", i));
session.executeNonQueryStatement(
- String.format("insert into root.db.d2(time, s3, s4) values (%s, 3,
4)", currentTime + i));
+ String.format("insert into root.db.d2(time, s1, s2) values (%s, 3,
4)", currentTime + i));
session.executeNonQueryStatement(
- String.format("insert into root.sg.d3(time, s5) values (%s, 5)",
currentTime + 2 * i));
+ String.format("insert into root.sg.d3(time, s1) values (%s, 5)",
currentTime + 2 * i));
}
session.executeNonQueryStatement("flush");
@@ -104,11 +109,16 @@ public class SubscriptionSessionExample {
session = null;
}
- private static void subscriptionExample1() throws Exception {
+ /** single pull consumer subscribe topic with path and time range */
+ private static void dataSubscription1() throws Exception {
// Create topics
try (final SubscriptionSession subscriptionSession = new
SubscriptionSession(HOST, PORT)) {
subscriptionSession.open();
- subscriptionSession.createTopic(TOPIC_1);
+ final Properties config = new Properties();
+ config.put(TopicConstant.PATH_KEY, "root.db.d1.s1");
+ config.put(TopicConstant.START_TIME_KEY, 25);
+ config.put(TopicConstant.END_TIME_KEY, 75);
+ subscriptionSession.createTopic(TOPIC_1, config);
}
int retryCount = 0;
@@ -151,7 +161,8 @@ public class SubscriptionSessionExample {
consumer1.close();
}
- private static void subscriptionExample2() throws Exception {
+ /** multi pull consumer subscribe topic with tsfile format */
+ private static void dataSubscription2() throws Exception {
try (final SubscriptionSession subscriptionSession = new
SubscriptionSession(HOST, PORT)) {
subscriptionSession.open();
final Properties config = new Properties();
@@ -160,7 +171,7 @@ public class SubscriptionSessionExample {
}
final List<Thread> threads = new ArrayList<>();
- for (int i = 0; i < 8; ++i) {
+ for (int i = 0; i < PARALLELISM; ++i) {
final int idx = i;
final Thread thread =
new Thread(
@@ -187,7 +198,16 @@ public class SubscriptionSessionExample {
}
for (final SubscriptionMessage message : messages) {
try (final TsFileReader reader =
message.getTsFileHandler().openReader()) {
- // do something...
+ final QueryDataSet dataSet =
+ reader.query(
+ QueryExpression.create(
+ Arrays.asList(
+ new Path("root.db.d2", "s2", true),
+ new Path("root.db.d3", "s1", true)),
+ null));
+ while (dataSet.hasNext()) {
+ System.out.println(dataSet.next());
+ }
}
}
consumer2.commitSync(messages);
@@ -208,8 +228,8 @@ public class SubscriptionSessionExample {
public static void main(final String[] args) throws Exception {
prepareData();
- dataQuery();
- subscriptionExample1();
- subscriptionExample2();
+ // dataQuery();
+ // dataSubscription1();
+ dataSubscription2();
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/AbstractSubscriptionIT.java
similarity index 59%
copy from
integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
copy to
integration-test/src/test/java/org/apache/iotdb/subscription/it/AbstractSubscriptionIT.java
index 3d8eb45a7fd..a168dbc78ba 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/AbstractSubscriptionIT.java
@@ -17,24 +17,22 @@
* under the License.
*/
-package org.apache.iotdb.subscription.it.dual;
+package org.apache.iotdb.subscription.it;
-import org.apache.iotdb.it.env.MultiEnvFactory;
-import org.apache.iotdb.itbase.env.BaseEnv;
import
org.apache.iotdb.session.subscription.consumer.SubscriptionExecutorServiceManager;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
-abstract class AbstractSubscriptionDualIT {
-
- protected BaseEnv senderEnv;
- protected BaseEnv receiverEnv;
+public abstract class AbstractSubscriptionIT {
@Rule public TestName testName = new TestName();
+ @Rule public final TestRule skipOnSetUpFailure = new
SkipOnSetUpFailure("setUp");
+
@Before
public void setUp() {
// set thread name
@@ -44,30 +42,8 @@ abstract class AbstractSubscriptionDualIT {
SubscriptionExecutorServiceManager.setControlFlowExecutorCorePoolSize(1);
SubscriptionExecutorServiceManager.setUpstreamDataFlowExecutorCorePoolSize(1);
SubscriptionExecutorServiceManager.setDownstreamDataFlowExecutorCorePoolSize(1);
-
- MultiEnvFactory.createEnv(2);
- senderEnv = MultiEnvFactory.getEnv(0);
- receiverEnv = MultiEnvFactory.getEnv(1);
-
- setUpConfig();
-
- senderEnv.initClusterEnvironment();
- receiverEnv.initClusterEnvironment();
- }
-
- void setUpConfig() {
- // enable auto create schema
- senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
- receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
-
- // 10 min, assert that the operations will not time out
- senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
- receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
}
@After
- public final void tearDown() {
- senderEnv.cleanClusterEnvironment();
- receiverEnv.cleanClusterEnvironment();
- }
+ public void tearDown() {}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
index 7d16110cd1f..840abb4ccb5 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
@@ -19,11 +19,24 @@
package org.apache.iotdb.subscription.it;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionFactory;
+
+import java.util.concurrent.TimeUnit;
+
public class IoTDBSubscriptionITConstant {
- public static final long AWAITILITY_POLL_DELAY_SECOND = 1L;
- public static final long AWAITILITY_POLL_INTERVAL_SECOND = 2L;
- public static final long AWAITILITY_AT_MOST_SECOND = 600L;
+ private static final long AWAITILITY_POLL_DELAY_SECOND = 1L;
+ private static final long AWAITILITY_POLL_INTERVAL_SECOND = 2L;
+ private static final long AWAITILITY_AT_MOST_SECOND = 600L;
+
+ public static final ConditionFactory AWAIT =
+ Awaitility.await()
+ .pollInSameThread()
+ .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND,
TimeUnit.SECONDS)
+ .pollInterval(
+ IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND,
TimeUnit.SECONDS)
+ .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND,
TimeUnit.SECONDS);
public static final long SLEEP_NS = 1_000_000_000L;
public static final long POLL_TIMEOUT_MS = 10_000L;
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java
similarity index 90%
rename from
integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
rename to
integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java
index d3680219578..41dd654383d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.subscription.it.local;
+package org.apache.iotdb.subscription.it.cluster;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
@@ -36,17 +36,14 @@ import
org.apache.iotdb.session.subscription.SubscriptionSession;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
+import org.apache.iotdb.subscription.it.AbstractSubscriptionIT;
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
-import org.apache.iotdb.subscription.it.SkipOnSetUpFailure;
-import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,22 +51,23 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
+import static
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@Category({ClusterIT.class})
-public class IoTDBSubscriptionRestartIT {
+public class IoTDBSubscriptionRestartIT extends AbstractSubscriptionIT {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSubscriptionRestartIT.class);
- @Rule public final TestRule skipOnSetUpFailure = new
SkipOnSetUpFailure("setUp");
-
+ @Override
@Before
- public void setUp() throws Exception {
+ public void setUp() {
+ super.setUp();
+
EnvFactory.getEnv()
.getConfig()
.getCommonConfig()
@@ -82,8 +80,11 @@ public class IoTDBSubscriptionRestartIT {
EnvFactory.getEnv().initClusterEnvironment(3, 3);
}
+ @Override
@After
- public void tearDown() throws Exception {
+ public void tearDown() {
+ super.tearDown();
+
EnvFactory.getEnv().cleanClusterEnvironment();
}
@@ -195,18 +196,14 @@ public class IoTDBSubscriptionRestartIT {
} finally {
LOGGER.info("consumer exiting...");
}
- });
+ },
+ String.format("%s - %s", testName.getMethodName(), consumer));
thread.start();
// Check timestamps size
try {
// Keep retrying if there are execution failures
- Awaitility.await()
- .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND,
TimeUnit.SECONDS)
- .pollInterval(
- IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND,
TimeUnit.SECONDS)
- .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND,
TimeUnit.SECONDS)
- .untilAsserted(() -> Assert.assertEquals(100, timestamps.size()));
+ AWAIT.untilAsserted(() -> Assert.assertEquals(100, timestamps.size()));
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -312,7 +309,8 @@ public class IoTDBSubscriptionRestartIT {
} finally {
LOGGER.info("consumer exiting...");
}
- });
+ },
+ String.format("%s - %s", testName.getMethodName(), consumer));
thread.start();
// Start DN 1 & DN 2
@@ -343,12 +341,7 @@ public class IoTDBSubscriptionRestartIT {
// Check timestamps size
try {
// Keep retrying if there are execution failures
- Awaitility.await()
- .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND,
TimeUnit.SECONDS)
- .pollInterval(
- IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND,
TimeUnit.SECONDS)
- .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND,
TimeUnit.SECONDS)
- .untilAsserted(() -> Assert.assertEquals(200, timestamps.size()));
+ AWAIT.untilAsserted(() -> Assert.assertEquals(200, timestamps.size()));
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -443,7 +436,8 @@ public class IoTDBSubscriptionRestartIT {
} finally {
LOGGER.info("consumer exiting...");
}
- });
+ },
+ String.format("%s - %s", testName.getMethodName(), consumer));
thread.start();
// Shutdown leader CN
@@ -486,12 +480,7 @@ public class IoTDBSubscriptionRestartIT {
// Check timestamps size
try {
// Keep retrying if there are execution failures
- Awaitility.await()
- .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND,
TimeUnit.SECONDS)
- .pollInterval(
- IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND,
TimeUnit.SECONDS)
- .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND,
TimeUnit.SECONDS)
- .untilAsserted(() -> Assert.assertEquals(200, timestamps.size()));
+ AWAIT.untilAsserted(() -> Assert.assertEquals(200, timestamps.size()));
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
index 3d8eb45a7fd..fb25cdc3a6c 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
@@ -21,29 +21,20 @@ package org.apache.iotdb.subscription.it.dual;
import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.itbase.env.BaseEnv;
-import
org.apache.iotdb.session.subscription.consumer.SubscriptionExecutorServiceManager;
+import org.apache.iotdb.subscription.it.AbstractSubscriptionIT;
import org.junit.After;
import org.junit.Before;
-import org.junit.Rule;
-import org.junit.rules.TestName;
-abstract class AbstractSubscriptionDualIT {
+abstract class AbstractSubscriptionDualIT extends AbstractSubscriptionIT {
protected BaseEnv senderEnv;
protected BaseEnv receiverEnv;
- @Rule public TestName testName = new TestName();
-
+ @Override
@Before
public void setUp() {
- // set thread name
- Thread.currentThread().setName(String.format("%s - main",
testName.getMethodName()));
-
- // set thread pools core size
- SubscriptionExecutorServiceManager.setControlFlowExecutorCorePoolSize(1);
-
SubscriptionExecutorServiceManager.setUpstreamDataFlowExecutorCorePoolSize(1);
-
SubscriptionExecutorServiceManager.setDownstreamDataFlowExecutorCorePoolSize(1);
+ super.setUp();
MultiEnvFactory.createEnv(2);
senderEnv = MultiEnvFactory.getEnv(0);
@@ -55,7 +46,7 @@ abstract class AbstractSubscriptionDualIT {
receiverEnv.initClusterEnvironment();
}
- void setUpConfig() {
+ protected void setUpConfig() {
// enable auto create schema
senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
@@ -65,8 +56,11 @@ abstract class AbstractSubscriptionDualIT {
receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
}
+ @Override
@After
- public final void tearDown() {
+ public void tearDown() {
+ super.tearDown();
+
senderEnv.cleanClusterEnvironment();
receiverEnv.cleanClusterEnvironment();
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
index db7b9e7ca3a..650f861f498 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
@@ -42,7 +42,6 @@ import org.apache.tsfile.read.common.RowRecord;
import org.apache.tsfile.read.expression.QueryExpression;
import org.apache.tsfile.read.query.dataset.QueryDataSet;
import org.apache.tsfile.utils.Pair;
-import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -62,11 +61,11 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
+import static
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@@ -109,7 +108,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
}
@Override
- void setUpConfig() {
+ protected void setUpConfig() {
super.setUpConfig();
// Enable air gap receiver
@@ -1015,22 +1014,16 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
try (final Connection connection = receiverEnv.getConnection();
final Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
- Awaitility.await()
- .pollInSameThread()
-
.pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND,
TimeUnit.SECONDS)
- .pollInterval(
- IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND,
TimeUnit.SECONDS)
- .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND,
TimeUnit.SECONDS)
- .untilAsserted(
- () -> {
- if (receiverCrashed.get()) {
- LOGGER.info("detect receiver crashed, skipping this
test...");
- return;
- }
- TestUtils.assertSingleResultSetEqual(
- TestUtils.executeQueryWithRetry(statement, "select
count(*) from root.**"),
- expectedHeaderWithResult);
- });
+ AWAIT.untilAsserted(
+ () -> {
+ if (receiverCrashed.get()) {
+ LOGGER.info("detect receiver crashed, skipping this test...");
+ return;
+ }
+ TestUtils.assertSingleResultSetEqual(
+ TestUtils.executeQueryWithRetry(statement, "select count(*)
from root.**"),
+ expectedHeaderWithResult);
+ });
}
} catch (final Exception e) {
e.printStackTrace();
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java
index 2b1cc407b7f..39a9f2225f6 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java
@@ -30,7 +30,6 @@ import
org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
import org.apache.tsfile.write.record.Tablet;
-import org.awaitility.Awaitility;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -43,10 +42,10 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
+import static
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@@ -57,7 +56,7 @@ public class IoTDBSubscriptionTimePrecisionIT extends
AbstractSubscriptionDualIT
LoggerFactory.getLogger(IoTDBSubscriptionTimePrecisionIT.class);
@Override
- void setUpConfig() {
+ protected void setUpConfig() {
super.setUpConfig();
// Set timestamp precision to nanosecond
@@ -160,7 +159,8 @@ public class IoTDBSubscriptionTimePrecisionIT extends
AbstractSubscriptionDualIT
} finally {
LOGGER.info("consumer exiting...");
}
- });
+ },
+ String.format("%s - consumer", testName.getMethodName()));
thread.start();
// Check data on receiver
@@ -168,21 +168,16 @@ public class IoTDBSubscriptionTimePrecisionIT extends
AbstractSubscriptionDualIT
try (final Connection connection = receiverEnv.getConnection();
final Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
- Awaitility.await()
-
.pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND,
TimeUnit.SECONDS)
- .pollInterval(
- IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND,
TimeUnit.SECONDS)
- .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND,
TimeUnit.SECONDS)
- .untilAsserted(
- () ->
- TestUtils.assertSingleResultSetEqual(
- TestUtils.executeQueryWithRetry(statement, "select
count(*) from root.**"),
- new HashMap<String, String>() {
- {
- put("count(root.db.d1.s2)", "100");
- put("count(root.db.d2.s1)", "100");
- }
- }));
+ AWAIT.untilAsserted(
+ () ->
+ TestUtils.assertSingleResultSetEqual(
+ TestUtils.executeQueryWithRetry(statement, "select
count(*) from root.**"),
+ new HashMap<String, String>() {
+ {
+ put("count(root.db.d1.s2)", "100");
+ put("count(root.db.d2.s1)", "100");
+ }
+ }));
}
} catch (final Exception e) {
e.printStackTrace();
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
index 0cd6bc0d0a1..ac7e8333984 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
@@ -34,7 +34,6 @@ import
org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
import org.apache.tsfile.write.record.Tablet;
-import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -51,11 +50,11 @@ import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
+import static
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@@ -136,7 +135,8 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
} finally {
LOGGER.info("consumer exiting...");
}
- });
+ },
+ String.format("%s - consumer", testName.getMethodName()));
thread.start();
// Check data on receiver
@@ -144,21 +144,16 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
try (final Connection connection = receiverEnv.getConnection();
final Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
- Awaitility.await()
-
.pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND,
TimeUnit.SECONDS)
- .pollInterval(
- IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND,
TimeUnit.SECONDS)
- .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND,
TimeUnit.SECONDS)
- .untilAsserted(
- () ->
- TestUtils.assertSingleResultSetEqual(
- TestUtils.executeQueryWithRetry(statement, "select
count(*) from root.**"),
- new HashMap<String, String>() {
- {
- put("count(root.db.d1.s)", "100");
- put("count(root.db.d2.s)", "100");
- }
- }));
+ AWAIT.untilAsserted(
+ () ->
+ TestUtils.assertSingleResultSetEqual(
+ TestUtils.executeQueryWithRetry(statement, "select
count(*) from root.**"),
+ new HashMap<String, String>() {
+ {
+ put("count(root.db.d1.s)", "100");
+ put("count(root.db.d2.s)", "100");
+ }
+ }));
}
} catch (final Exception e) {
e.printStackTrace();
@@ -238,7 +233,8 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
} finally {
LOGGER.info("consumer exiting...");
}
- });
+ },
+ String.format("%s - consumer", testName.getMethodName()));
thread.start();
// Check data on receiver
@@ -246,20 +242,15 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
try (final Connection connection = receiverEnv.getConnection();
final Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
- Awaitility.await()
-
.pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND,
TimeUnit.SECONDS)
- .pollInterval(
- IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND,
TimeUnit.SECONDS)
- .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND,
TimeUnit.SECONDS)
- .untilAsserted(
- () ->
- TestUtils.assertSingleResultSetEqual(
- TestUtils.executeQueryWithRetry(statement, "select
count(*) from root.**"),
- new HashMap<String, String>() {
- {
- put("count(root.db.d2.s)", "100");
- }
- }));
+ AWAIT.untilAsserted(
+ () ->
+ TestUtils.assertSingleResultSetEqual(
+ TestUtils.executeQueryWithRetry(statement, "select
count(*) from root.**"),
+ new HashMap<String, String>() {
+ {
+ put("count(root.db.d2.s)", "100");
+ }
+ }));
}
} catch (final Exception e) {
e.printStackTrace();
@@ -336,7 +327,8 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
} finally {
LOGGER.info("consumer exiting...");
}
- });
+ },
+ String.format("%s - consumer", testName.getMethodName()));
thread.start();
// Check data on receiver
@@ -348,17 +340,12 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
try (final Connection connection = receiverEnv.getConnection();
final Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
- Awaitility.await()
-
.pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND,
TimeUnit.SECONDS)
- .pollInterval(
- IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND,
TimeUnit.SECONDS)
- .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND,
TimeUnit.SECONDS)
- .untilAsserted(
- () ->
- TestUtils.assertResultSetEqual(
- TestUtils.executeQueryWithRetry(statement, "select *
from root.**"),
- "Time,root.db.d1.at1,",
- expectedResSet));
+ AWAIT.untilAsserted(
+ () ->
+ TestUtils.assertResultSetEqual(
+ TestUtils.executeQueryWithRetry(statement, "select * from
root.**"),
+ "Time,root.db.d1.at1,",
+ expectedResSet));
}
} catch (final Exception e) {
e.printStackTrace();
@@ -392,9 +379,9 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
}
// Create topic on sender
- final String topic1 = "`topic1`";
- final String topic2 = "`'topic2'`";
- final String topic3 = "`\"topic3\"`";
+ final String topic1 = "`topic4`";
+ final String topic2 = "`'topic5'`";
+ final String topic3 = "`\"topic6\"`";
final String host = senderEnv.getIP();
final int port = Integer.parseInt(senderEnv.getPort());
try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
@@ -464,7 +451,8 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
} finally {
LOGGER.info("consumer exiting...");
}
- });
+ },
+ String.format("%s - consumer", testName.getMethodName()));
thread.start();
// Check data on receiver
@@ -472,20 +460,15 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
try (final Connection connection = receiverEnv.getConnection();
final Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
- Awaitility.await()
-
.pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND,
TimeUnit.SECONDS)
- .pollInterval(
- IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND,
TimeUnit.SECONDS)
- .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND,
TimeUnit.SECONDS)
- .untilAsserted(
- () ->
- TestUtils.assertSingleResultSetEqual(
- TestUtils.executeQueryWithRetry(statement, "select
count(*) from root.**"),
- new HashMap<String, String>() {
- {
- put("count(root.db.d1.s)", "300");
- }
- }));
+ AWAIT.untilAsserted(
+ () ->
+ TestUtils.assertSingleResultSetEqual(
+ TestUtils.executeQueryWithRetry(statement, "select
count(*) from root.**"),
+ new HashMap<String, String>() {
+ {
+ put("count(root.db.d1.s)", "300");
+ }
+ }));
}
} catch (final Exception e) {
e.printStackTrace();
@@ -507,7 +490,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
final Properties properties = new Properties();
properties.put(TopicConstant.START_TIME_KEY, "2024-01-32");
properties.put(TopicConstant.END_TIME_KEY, TopicConstant.NOW_TIME_VALUE);
- session.createTopic("topic1", properties);
+ session.createTopic("topic7", properties);
fail();
} catch (final Exception ignored) {
}
@@ -519,7 +502,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
final Properties properties = new Properties();
properties.put(TopicConstant.START_TIME_KEY, "2001.01.01T08:00:00");
properties.put(TopicConstant.END_TIME_KEY, "2000.01.01T08:00:00");
- session.createTopic("topic2", properties);
+ session.createTopic("topic8", properties);
fail();
} catch (final Exception ignored) {
}
@@ -532,7 +515,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
final Properties config = new Properties();
config.put(TopicConstant.FORMAT_KEY,
TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
config.put(TopicConstant.PATH_KEY, "root.db.*.s");
- testTopicInvalidRuntimeConfigTemplate("topic3", config);
+ testTopicInvalidRuntimeConfigTemplate("topic9", config);
}
@Test
@@ -543,7 +526,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
config.put("processor", "tumbling-time-sampling-processor");
config.put("processor.tumbling-time.interval-seconds", "1");
config.put("processor.down-sampling.split-file", "true");
- testTopicInvalidRuntimeConfigTemplate("topic4", config);
+ testTopicInvalidRuntimeConfigTemplate("topic10", config);
}
private void testTopicInvalidRuntimeConfigTemplate(
@@ -600,7 +583,8 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
} finally {
LOGGER.info("consumer exiting...");
}
- }));
+ },
+ String.format("%s - consumer", testName.getMethodName())));
// Insert some realtime data on sender
threads.add(
@@ -628,18 +612,15 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
fail(e.getMessage());
}
dataPrepared.set(true);
- }));
+ },
+ String.format("%s - data inserter", testName.getMethodName())));
for (final Thread thread : threads) {
thread.start();
}
- Awaitility.await()
- .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND,
TimeUnit.SECONDS)
-
.pollInterval(IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND,
TimeUnit.SECONDS)
- .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND,
TimeUnit.SECONDS)
- // The expected SubscriptionRuntimeCriticalException was not thrown if
result is false
- .untilTrue(result);
+ // The expected SubscriptionRuntimeCriticalException was not thrown if
result is false
+ AWAIT.untilTrue(result);
for (final Thread thread : threads) {
thread.join();
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java
similarity index 60%
copy from
integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
copy to
integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java
index 7d16110cd1f..bb248f745d4 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java
@@ -17,14 +17,29 @@
* under the License.
*/
-package org.apache.iotdb.subscription.it;
+package org.apache.iotdb.subscription.it.local;
-public class IoTDBSubscriptionITConstant {
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.subscription.it.AbstractSubscriptionIT;
- public static final long AWAITILITY_POLL_DELAY_SECOND = 1L;
- public static final long AWAITILITY_POLL_INTERVAL_SECOND = 2L;
- public static final long AWAITILITY_AT_MOST_SECOND = 600L;
+import org.junit.After;
+import org.junit.Before;
- public static final long SLEEP_NS = 1_000_000_000L;
- public static final long POLL_TIMEOUT_MS = 10_000L;
+abstract class AbstractSubscriptionLocalIT extends AbstractSubscriptionIT {
+
+ @Override
+ @Before
+ public void setUp() {
+ super.setUp();
+
+ EnvFactory.getEnv().initClusterEnvironment();
+ }
+
+ @Override
+ @After
+ public void tearDown() {
+ super.tearDown();
+
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
index 13a5e8abc26..fccbee6d95d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
@@ -40,10 +40,7 @@ import org.apache.tsfile.read.TsFileReader;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.read.expression.QueryExpression;
import org.apache.tsfile.read.query.dataset.QueryDataSet;
-import org.awaitility.Awaitility;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -57,31 +54,21 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
+import static
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@Category({LocalStandaloneIT.class})
-public class IoTDBSubscriptionBasicIT {
+public class IoTDBSubscriptionBasicIT extends AbstractSubscriptionLocalIT {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSubscriptionBasicIT.class);
- @Before
- public void setUp() throws Exception {
- EnvFactory.getEnv().initClusterEnvironment();
- }
-
- @After
- public void tearDown() throws Exception {
- EnvFactory.getEnv().cleanClusterEnvironment();
- }
-
@Test
public void testBasicSubscribeTablets() throws Exception {
// Insert some historical data
@@ -146,18 +133,14 @@ public class IoTDBSubscriptionBasicIT {
} finally {
LOGGER.info("consumer exiting...");
}
- });
+ },
+ String.format("%s - consumer", testName.getMethodName()));
thread.start();
// Check row count
try {
// Keep retrying if there are execution failures
- Awaitility.await()
- .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND,
TimeUnit.SECONDS)
- .pollInterval(
- IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND,
TimeUnit.SECONDS)
- .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND,
TimeUnit.SECONDS)
- .untilAsserted(() -> Assert.assertEquals(100, rowCount.get()));
+ AWAIT.untilAsserted(() -> Assert.assertEquals(100, rowCount.get()));
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -242,18 +225,14 @@ public class IoTDBSubscriptionBasicIT {
} finally {
LOGGER.info("consumer exiting...");
}
- });
+ },
+ String.format("%s - consumer", testName.getMethodName()));
thread.start();
// Check row count
try {
// Keep retrying if there are execution failures
- Awaitility.await()
- .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND,
TimeUnit.SECONDS)
- .pollInterval(
- IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND,
TimeUnit.SECONDS)
- .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND,
TimeUnit.SECONDS)
- .untilAsserted(() -> Assert.assertEquals(100, rowCount.get()));
+ AWAIT.untilAsserted(() -> Assert.assertEquals(100, rowCount.get()));
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -379,18 +358,14 @@ public class IoTDBSubscriptionBasicIT {
} finally {
LOGGER.info("consumer exiting...");
}
- });
+ },
+ String.format("%s - consumer", testName.getMethodName()));
thread.start();
// Check row count
try {
// Keep retrying if there are execution failures
- Awaitility.await()
- .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND,
TimeUnit.SECONDS)
- .pollInterval(
- IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND,
TimeUnit.SECONDS)
- .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND,
TimeUnit.SECONDS)
- .untilAsserted(() -> Assert.assertEquals(100, rowCount.get()));
+ AWAIT.untilAsserted(() -> Assert.assertEquals(100, rowCount.get()));
Assert.assertTrue(commitSuccessCount.get() >
lastCommitSuccessCount.get());
Assert.assertEquals(0, commitFailureCount.get());
} catch (final Exception e) {
@@ -416,12 +391,7 @@ public class IoTDBSubscriptionBasicIT {
// Check row count
try {
// Keep retrying if there are execution failures
- Awaitility.await()
- .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND,
TimeUnit.SECONDS)
- .pollInterval(
- IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND,
TimeUnit.SECONDS)
- .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND,
TimeUnit.SECONDS)
- .untilAsserted(() -> Assert.assertEquals(200, rowCount.get()));
+ AWAIT.untilAsserted(() -> Assert.assertEquals(200, rowCount.get()));
Assert.assertTrue(commitSuccessCount.get() >
lastCommitSuccessCount.get());
Assert.assertEquals(0, commitFailureCount.get());
} catch (final Exception e) {
@@ -486,16 +456,11 @@ public class IoTDBSubscriptionBasicIT {
consumer.subscribe(topicName);
// The push consumer should automatically poll 10 rows of data by 1
onReceive()
- Awaitility.await()
- .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND,
TimeUnit.SECONDS)
- .pollInterval(
- IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND,
TimeUnit.SECONDS)
- .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND,
TimeUnit.SECONDS)
- .untilAsserted(
- () -> {
- Assert.assertEquals(10, rowCount.get());
- Assert.assertTrue(onReceiveCount.get() >
lastOnReceiveCount.get());
- });
+ AWAIT.untilAsserted(
+ () -> {
+ Assert.assertEquals(10, rowCount.get());
+ Assert.assertTrue(onReceiveCount.get() > lastOnReceiveCount.get());
+ });
lastOnReceiveCount.set(onReceiveCount.get());
@@ -508,16 +473,11 @@ public class IoTDBSubscriptionBasicIT {
session.executeNonQueryStatement("flush");
}
- Awaitility.await()
- .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND,
TimeUnit.SECONDS)
- .pollInterval(
- IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND,
TimeUnit.SECONDS)
- .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND,
TimeUnit.SECONDS)
- .untilAsserted(
- () -> {
- Assert.assertEquals(20, rowCount.get());
- Assert.assertTrue(onReceiveCount.get() >
lastOnReceiveCount.get());
- });
+ AWAIT.untilAsserted(
+ () -> {
+ Assert.assertEquals(20, rowCount.get());
+ Assert.assertTrue(onReceiveCount.get() > lastOnReceiveCount.get());
+ });
lastOnReceiveCount.set(onReceiveCount.get());
@@ -529,16 +489,11 @@ public class IoTDBSubscriptionBasicIT {
session.executeNonQueryStatement("flush");
}
- Awaitility.await()
- .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND,
TimeUnit.SECONDS)
- .pollInterval(
- IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND,
TimeUnit.SECONDS)
- .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND,
TimeUnit.SECONDS)
- .untilAsserted(
- () -> {
- Assert.assertEquals(30, rowCount.get());
- Assert.assertTrue(onReceiveCount.get() >
lastOnReceiveCount.get());
- });
+ AWAIT.untilAsserted(
+ () -> {
+ Assert.assertEquals(30, rowCount.get());
+ Assert.assertTrue(onReceiveCount.get() > lastOnReceiveCount.get());
+ });
consumer.unsubscribe(topicName);
} catch (final Exception e) {
@@ -624,22 +579,18 @@ public class IoTDBSubscriptionBasicIT {
} finally {
LOGGER.info("consumer exiting...");
}
- });
+ },
+ String.format("%s - consumer", testName.getMethodName()));
thread.start();
// Check row count
try {
// Keep retrying if there are execution failures
- Awaitility.await()
- .pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND,
TimeUnit.SECONDS)
- .pollInterval(
- IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND,
TimeUnit.SECONDS)
- .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND,
TimeUnit.SECONDS)
- .untilAsserted(
- () -> {
- Assert.assertEquals(100, rowCount.get());
- Assert.assertEquals((100 + 199) * 100 / 2, timestampSum.get());
- });
+ AWAIT.untilAsserted(
+ () -> {
+ Assert.assertEquals(100, rowCount.get());
+ Assert.assertEquals((100 + 199) * 100 / 2, timestampSum.get());
+ });
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
index b671acd6709..85fff437f18 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
@@ -25,9 +25,7 @@ import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.apache.iotdb.session.subscription.SubscriptionSession;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -38,20 +36,10 @@ import static org.junit.jupiter.api.Assertions.fail;
@RunWith(IoTDBTestRunner.class)
@Category({LocalStandaloneIT.class})
-public class IoTDBSubscriptionIdempotentIT {
+public class IoTDBSubscriptionIdempotentIT extends AbstractSubscriptionLocalIT
{
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSubscriptionIdempotentIT.class);
- @Before
- public void setUp() throws Exception {
- EnvFactory.getEnv().initClusterEnvironment();
- }
-
- @After
- public void tearDown() throws Exception {
- EnvFactory.getEnv().cleanClusterEnvironment();
- }
-
@Test
public void testSubscribeOrUnsubscribeNonExistedTopicTest() {
final String host = EnvFactory.getEnv().getIP();
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionIdentifierSemanticException.java
similarity index 52%
copy from
integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
copy to
iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionIdentifierSemanticException.java
index 7d16110cd1f..6eb9ba6d734 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionIdentifierSemanticException.java
@@ -17,14 +17,26 @@
* under the License.
*/
-package org.apache.iotdb.subscription.it;
+package org.apache.iotdb.rpc.subscription.exception;
-public class IoTDBSubscriptionITConstant {
+import java.util.Objects;
- public static final long AWAITILITY_POLL_DELAY_SECOND = 1L;
- public static final long AWAITILITY_POLL_INTERVAL_SECOND = 2L;
- public static final long AWAITILITY_AT_MOST_SECOND = 600L;
+public class SubscriptionIdentifierSemanticException extends
SubscriptionException {
- public static final long SLEEP_NS = 1_000_000_000L;
- public static final long POLL_TIMEOUT_MS = 10_000L;
+ public SubscriptionIdentifierSemanticException(final String message) {
+ super(message);
+ }
+
+ public SubscriptionIdentifierSemanticException(final String message, final
Throwable cause) {
+ super(message, cause);
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ return obj instanceof SubscriptionIdentifierSemanticException
+ && Objects.equals(
+ getMessage(), ((SubscriptionIdentifierSemanticException)
obj).getMessage())
+ && Objects.equals(
+ getTimeStamp(), ((SubscriptionIdentifierSemanticException)
obj).getTimeStamp());
+ }
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeSubscribeResp.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeSubscribeResp.java
index a5540150898..1d1571b0122 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeSubscribeResp.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeSubscribeResp.java
@@ -22,10 +22,25 @@ package org.apache.iotdb.rpc.subscription.payload.response;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.Objects;
+import java.util.Set;
public class PipeSubscribeSubscribeResp extends TPipeSubscribeResp {
+ private transient Set<String> topicNames = new HashSet<>(); // subscribed
topic names
+
+ public Set<String> getTopicNames() {
+ return topicNames;
+ }
+
/////////////////////////////// Thrift ///////////////////////////////
/**
@@ -34,11 +49,32 @@ public class PipeSubscribeSubscribeResp extends
TPipeSubscribeResp {
*/
public static PipeSubscribeSubscribeResp toTPipeSubscribeResp(final TSStatus
status) {
final PipeSubscribeSubscribeResp resp = new PipeSubscribeSubscribeResp();
+ resp.status = status;
+ resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
+ resp.type = PipeSubscribeResponseType.ACK.getType();
+ return resp;
+ }
+
+ /**
+ * Serialize the incoming parameters into `PipeSubscribeSubscribeResp`,
called by the subscription
+ * server.
+ */
+ public static PipeSubscribeSubscribeResp toTPipeSubscribeResp(
+ final TSStatus status, final Set<String> topicNames) throws IOException {
+ final PipeSubscribeSubscribeResp resp = new PipeSubscribeSubscribeResp();
resp.status = status;
resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
resp.type = PipeSubscribeResponseType.ACK.getType();
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.writeObjectSet(topicNames, outputStream);
+ resp.body =
+ Collections.singletonList(
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size()));
+ }
+
return resp;
}
@@ -47,11 +83,19 @@ public class PipeSubscribeSubscribeResp extends
TPipeSubscribeResp {
final TPipeSubscribeResp subscribeResp) {
final PipeSubscribeSubscribeResp resp = new PipeSubscribeSubscribeResp();
+ if (Objects.nonNull(subscribeResp.body)) {
+ for (final ByteBuffer byteBuffer : subscribeResp.body) {
+ if (Objects.nonNull(byteBuffer) && byteBuffer.hasRemaining()) {
+ resp.topicNames = ReadWriteIOUtils.readObjectSet(byteBuffer);
+ break;
+ }
+ }
+ }
+
resp.status = subscribeResp.status;
resp.version = subscribeResp.version;
resp.type = subscribeResp.type;
resp.body = subscribeResp.body;
-
return resp;
}
@@ -66,7 +110,8 @@ public class PipeSubscribeSubscribeResp extends
TPipeSubscribeResp {
return false;
}
final PipeSubscribeSubscribeResp that = (PipeSubscribeSubscribeResp) obj;
- return Objects.equals(this.status, that.status)
+ return Objects.equals(this.topicNames, that.topicNames)
+ && Objects.equals(this.status, that.status)
&& this.version == that.version
&& this.type == that.type
&& Objects.equals(this.body, that.body);
@@ -74,6 +119,6 @@ public class PipeSubscribeSubscribeResp extends
TPipeSubscribeResp {
@Override
public int hashCode() {
- return Objects.hash(status, version, type, body);
+ return Objects.hash(topicNames, status, version, type, body);
}
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeUnsubscribeResp.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeUnsubscribeResp.java
index a93e38013ac..e5d8ccf69cf 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeUnsubscribeResp.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeUnsubscribeResp.java
@@ -22,10 +22,25 @@ package org.apache.iotdb.rpc.subscription.payload.response;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.Objects;
+import java.util.Set;
public class PipeSubscribeUnsubscribeResp extends TPipeSubscribeResp {
+ private transient Set<String> topicNames = new HashSet<>(); // subscribed
topic names
+
+ public Set<String> getTopicNames() {
+ return topicNames;
+ }
+
/////////////////////////////// Thrift ///////////////////////////////
/**
@@ -34,11 +49,32 @@ public class PipeSubscribeUnsubscribeResp extends
TPipeSubscribeResp {
*/
public static PipeSubscribeUnsubscribeResp toTPipeSubscribeResp(final
TSStatus status) {
final PipeSubscribeUnsubscribeResp resp = new
PipeSubscribeUnsubscribeResp();
+ resp.status = status;
+ resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
+ resp.type = PipeSubscribeResponseType.ACK.getType();
+ return resp;
+ }
+
+ /**
+ * Serialize the incoming parameters into `PipeSubscribeUnsubscribeResp`,
called by the
+ * subscription server.
+ */
+ public static PipeSubscribeUnsubscribeResp toTPipeSubscribeResp(
+ final TSStatus status, final Set<String> topicNames) throws IOException {
+ final PipeSubscribeUnsubscribeResp resp = new
PipeSubscribeUnsubscribeResp();
resp.status = status;
resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
resp.type = PipeSubscribeResponseType.ACK.getType();
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.writeObjectSet(topicNames, outputStream);
+ resp.body =
+ Collections.singletonList(
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size()));
+ }
+
return resp;
}
@@ -47,11 +83,19 @@ public class PipeSubscribeUnsubscribeResp extends
TPipeSubscribeResp {
final TPipeSubscribeResp unsubscribeResp) {
final PipeSubscribeUnsubscribeResp resp = new
PipeSubscribeUnsubscribeResp();
+ if (Objects.nonNull(unsubscribeResp.body)) {
+ for (final ByteBuffer byteBuffer : unsubscribeResp.body) {
+ if (Objects.nonNull(byteBuffer) && byteBuffer.hasRemaining()) {
+ resp.topicNames = ReadWriteIOUtils.readObjectSet(byteBuffer);
+ break;
+ }
+ }
+ }
+
resp.status = unsubscribeResp.status;
resp.version = unsubscribeResp.version;
resp.type = unsubscribeResp.type;
resp.body = unsubscribeResp.body;
-
return resp;
}
@@ -66,7 +110,8 @@ public class PipeSubscribeUnsubscribeResp extends
TPipeSubscribeResp {
return false;
}
final PipeSubscribeUnsubscribeResp that = (PipeSubscribeUnsubscribeResp)
obj;
- return Objects.equals(this.status, that.status)
+ return Objects.equals(this.topicNames, that.topicNames)
+ && Objects.equals(this.status, that.status)
&& this.version == that.version
&& this.type == that.type
&& Objects.equals(this.body, that.body);
@@ -74,6 +119,6 @@ public class PipeSubscribeUnsubscribeResp extends
TPipeSubscribeResp {
@Override
public int hashCode() {
- return Objects.hash(status, version, type, body);
+ return Objects.hash(topicNames, status, version, type, body);
}
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index 2bbd4a129a9..a6b418f0220 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -40,6 +40,7 @@ import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+import org.apache.iotdb.session.subscription.util.IdentifierUtils;
import org.apache.iotdb.session.subscription.util.RandomStringGenerator;
import org.apache.iotdb.session.subscription.util.SubscriptionPollTimer;
import org.apache.iotdb.session.util.SessionUtils;
@@ -50,8 +51,10 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.net.URLEncoder;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
+import java.nio.file.InvalidPathException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -69,6 +72,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
+import java.util.stream.Collectors;
abstract class SubscriptionConsumer implements AutoCloseable {
@@ -92,6 +96,10 @@ abstract class SubscriptionConsumer implements AutoCloseable
{
private final String fileSaveDir;
private final boolean fileSaveFsync;
+ protected volatile Set<String> subscribedTopicNames = new HashSet<>();
+
+ /////////////////////////////// getter ///////////////////////////////
+
public String getConsumerId() {
return consumerId;
}
@@ -100,6 +108,10 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
return consumerGroupId;
}
+ public Set<String> getSubscribedTopicNames() {
+ return subscribedTopicNames;
+ }
+
/////////////////////////////// ctor ///////////////////////////////
protected SubscriptionConsumer(final Builder builder) {
@@ -221,6 +233,17 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
}
public void subscribe(final Set<String> topicNames) throws
SubscriptionException {
+ // parse topic names from external source
+ subscribe(topicNames, true);
+ }
+
+ private void subscribe(Set<String> topicNames, final boolean needParse)
+ throws SubscriptionException {
+ if (needParse) {
+ topicNames =
+
topicNames.stream().map(IdentifierUtils::parseIdentifier).collect(Collectors.toSet());
+ }
+
providers.acquireReadLock();
try {
subscribeWithRedirection(topicNames);
@@ -238,6 +261,17 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
}
public void unsubscribe(final Set<String> topicNames) throws
SubscriptionException {
+ // parse topic names from external source
+ unsubscribe(topicNames, true);
+ }
+
+ private void unsubscribe(Set<String> topicNames, final boolean needParse)
+ throws SubscriptionException {
+ if (needParse) {
+ topicNames =
+
topicNames.stream().map(IdentifierUtils::parseIdentifier).collect(Collectors.toSet());
+ }
+
providers.acquireReadLock();
try {
unsubscribeWithRedirection(topicNames);
@@ -284,33 +318,51 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
return dirPath;
}
- private Path getFilePath(final String topicName, String fileName) throws
SubscriptionException {
- Path filePath;
+ private Path getFilePath(
+ final String topicName,
+ final String fileName,
+ final boolean allowFileAlreadyExistsException,
+ final boolean allowInvalidPathException)
+ throws SubscriptionException {
try {
- filePath = getFileDir(topicName).resolve(fileName);
+ final Path filePath = getFileDir(topicName).resolve(fileName);
Files.createFile(filePath);
+ return filePath;
} catch (final FileAlreadyExistsException fileAlreadyExistsException) {
- fileName += "." + RandomStringGenerator.generate(16);
- try {
- filePath = getFileDir(topicName).resolve(fileName);
- Files.createFile(filePath);
- } catch (final IOException e) {
- throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), e);
+ if (allowFileAlreadyExistsException) {
+ return getFilePath(
+ topicName, fileName + "." + RandomStringGenerator.generate(16),
false, true);
}
+ throw new SubscriptionRuntimeNonCriticalException(
+ fileAlreadyExistsException.getMessage(), fileAlreadyExistsException);
+ } catch (final InvalidPathException invalidPathException) {
+ if (allowInvalidPathException) {
+ return getFilePath(URLEncoder.encode(topicName), fileName, true,
false);
+ }
+ throw new SubscriptionRuntimeNonCriticalException(
+ invalidPathException.getMessage(), invalidPathException);
} catch (final IOException e) {
throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), e);
}
- return filePath;
}
/////////////////////////////// poll ///////////////////////////////
- protected List<SubscriptionMessage> poll(final Set<String> topicNames, final
long timeoutMs)
+ protected List<SubscriptionMessage> poll(
+ /* @NotNull */ final Set<String> topicNames, final long timeoutMs)
throws SubscriptionException {
final List<SubscriptionMessage> messages = new ArrayList<>();
final SubscriptionPollTimer timer =
new SubscriptionPollTimer(System.currentTimeMillis(), timeoutMs);
+ // check topic names
+ topicNames.stream()
+ .filter(topicName -> !subscribedTopicNames.contains(topicName))
+ .forEach(
+ topicName ->
+ LOGGER.warn(
+ "SubscriptionConsumer {} does not subscribe to topic {}",
this, topicName));
+
do {
try {
// poll tablets or file
@@ -336,14 +388,7 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
case ERROR:
final ErrorPayload payload = (ErrorPayload)
pollResponse.getPayload();
final String errorMessage = payload.getErrorMessage();
- final boolean critical = payload.isCritical();
- LOGGER.warn(
- "Error occurred when SubscriptionConsumer {} polling topics
{}: {}, critical: {}",
- this,
- topicNames,
- errorMessage,
- critical);
- if (critical) {
+ if (payload.isCritical()) {
throw new SubscriptionRuntimeCriticalException(errorMessage);
} else {
throw new
SubscriptionRuntimeNonCriticalException(errorMessage);
@@ -398,7 +443,7 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
final SubscriptionCommitContext commitContext, final String fileName)
throws SubscriptionException {
final String topicName = commitContext.getTopicName();
- final Path filePath = getFilePath(topicName, fileName);
+ final Path filePath = getFilePath(topicName, fileName, true, true);
final File file = filePath.toFile();
try (final RandomAccessFile fileWriter = new RandomAccessFile(file, "rw"))
{
return Optional.of(pollFileInternal(commitContext, file, fileWriter));
@@ -576,6 +621,9 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
try {
final SubscriptionProvider provider =
providers.getNextAvailableProvider();
if (Objects.isNull(provider) || !provider.isAvailable()) {
+ if (isClosed()) {
+ return Collections.emptyList();
+ }
throw new SubscriptionConnectionException(
String.format(
"Cluster has no available subscription providers when %s poll
topic %s",
@@ -601,6 +649,9 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
try {
final SubscriptionProvider provider = providers.getProvider(dataNodeId);
if (Objects.isNull(provider) || !provider.isAvailable()) {
+ if (isClosed()) {
+ return Collections.emptyList();
+ }
throw new SubscriptionConnectionException(
String.format(
"something unexpected happened when %s poll file from
subscription provider with data node id %s, the subscription provider may be
unavailable or not existed",
@@ -660,6 +711,9 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
try {
final SubscriptionProvider provider = providers.getProvider(dataNodeId);
if (Objects.isNull(provider) || !provider.isAvailable()) {
+ if (isClosed()) {
+ return;
+ }
throw new SubscriptionConnectionException(
String.format(
"something unexpected happened when %s commit (nack: %s)
messages to subscription provider with data node id %s, the subscription
provider may be unavailable or not existed",
@@ -775,7 +829,7 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
}
for (final SubscriptionProvider provider : providers) {
try {
- provider.subscribe(topicNames);
+ subscribedTopicNames = provider.subscribe(topicNames);
return;
} catch (final Exception e) {
LOGGER.warn(
@@ -805,7 +859,7 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
}
for (final SubscriptionProvider provider : providers) {
try {
- provider.unsubscribe(topicNames);
+ subscribedTopicNames = provider.unsubscribe(topicNames);
return;
} catch (final Exception e) {
LOGGER.warn(
@@ -897,12 +951,12 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
}
public Builder consumerId(final String consumerId) {
- this.consumerId = consumerId;
+ this.consumerId = IdentifierUtils.parseIdentifier(consumerId);
return this;
}
public Builder consumerGroupId(final String consumerGroupId) {
- this.consumerGroupId = consumerGroupId;
+ this.consumerGroupId = IdentifierUtils.parseIdentifier(consumerGroupId);
return this;
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
index 70589472f82..9bce0d6774f 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
@@ -40,6 +40,8 @@ import
org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeSubscribeR
import
org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeUnsubscribeReq;
import
org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeHandshakeResp;
import
org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribePollResp;
+import
org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeSubscribeResp;
+import
org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeUnsubscribeResp;
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
import org.apache.iotdb.session.subscription.SubscriptionSession;
import org.apache.iotdb.session.subscription.SubscriptionSessionConnection;
@@ -215,7 +217,7 @@ final class SubscriptionProvider extends
SubscriptionSession {
verifyPipeSubscribeSuccess(resp.status);
}
- void subscribe(final Set<String> topicNames) throws SubscriptionException {
+ Set<String> subscribe(final Set<String> topicNames) throws
SubscriptionException {
final PipeSubscribeSubscribeReq req;
try {
req = PipeSubscribeSubscribeReq.toTPipeSubscribeReq(topicNames);
@@ -241,9 +243,12 @@ final class SubscriptionProvider extends
SubscriptionSession {
throw new SubscriptionConnectionException(e.getMessage(), e);
}
verifyPipeSubscribeSuccess(resp.status);
+ final PipeSubscribeSubscribeResp subscribeResp =
+ PipeSubscribeSubscribeResp.fromTPipeSubscribeResp(resp);
+ return subscribeResp.getTopicNames();
}
- void unsubscribe(final Set<String> topicNames) throws SubscriptionException {
+ Set<String> unsubscribe(final Set<String> topicNames) throws
SubscriptionException {
final PipeSubscribeUnsubscribeReq req;
try {
req = PipeSubscribeUnsubscribeReq.toTPipeSubscribeReq(topicNames);
@@ -269,6 +274,9 @@ final class SubscriptionProvider extends
SubscriptionSession {
throw new SubscriptionConnectionException(e.getMessage(), e);
}
verifyPipeSubscribeSuccess(resp.status);
+ final PipeSubscribeUnsubscribeResp unsubscribeResp =
+ PipeSubscribeUnsubscribeResp.fromTPipeSubscribeResp(resp);
+ return unsubscribeResp.getTopicNames();
}
List<SubscriptionPollResponse> poll(final SubscriptionPollRequest
pollMessage)
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
index 2306e4d05c1..38dfc530ee3 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.session.subscription.consumer;
import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+import org.apache.iotdb.session.subscription.util.IdentifierUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +40,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
/**
* The {@link SubscriptionPullConsumer} corresponds to the pull consumption
mode in the message
@@ -150,7 +152,12 @@ public class SubscriptionPullConsumer extends
SubscriptionConsumer {
@Override
public List<SubscriptionMessage> poll(final Set<String> topicNames, final
long timeoutMs)
throws SubscriptionException {
- final List<SubscriptionMessage> messages = super.poll(topicNames,
timeoutMs);
+ // parse topic names from external source
+ final Set<String> parsedTopicNames =
+
topicNames.stream().map(IdentifierUtils::parseIdentifier).collect(Collectors.toSet());
+
+ // poll messages
+ final List<SubscriptionMessage> messages = super.poll(parsedTopicNames,
timeoutMs);
// add to uncommitted messages
if (autoCommit) {
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
index 7ff1dd39b15..7323473552c 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
@@ -27,7 +27,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
@@ -162,9 +161,12 @@ public class SubscriptionPushConsumer extends
SubscriptionConsumer {
return;
}
+ if (subscribedTopicNames.isEmpty()) {
+ return;
+ }
+
try {
- // Poll all subscribed topics by passing an empty set
- final List<SubscriptionMessage> messages =
poll(Collections.emptySet(), autoPollTimeoutMs);
+ final List<SubscriptionMessage> messages = poll(subscribedTopicNames,
autoPollTimeoutMs);
if (ackStrategy.equals(AckStrategy.BEFORE_CONSUME)) {
ack(messages);
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionFileHandler.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionFileHandler.java
index 60c269b8637..0ec121f9699 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionFileHandler.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionFileHandler.java
@@ -39,14 +39,14 @@ public abstract class SubscriptionFileHandler implements
SubscriptionMessageHand
/**
* @return a new File instance of the corresponding file
*/
- public File getFile() {
+ public synchronized File getFile() {
return new File(absolutePath);
}
/**
* @return a new Path instance of the corresponding file
*/
- public Path getPath() {
+ public synchronized Path getPath() {
return Paths.get(absolutePath);
}
@@ -54,7 +54,7 @@ public abstract class SubscriptionFileHandler implements
SubscriptionMessageHand
* @return the path to the source file
* @throws IOException if an I/O error occurs
*/
- public Path deleteFile() throws IOException {
+ public synchronized Path deleteFile() throws IOException {
final Path sourcePath = getPath();
Files.delete(sourcePath);
return sourcePath;
@@ -65,7 +65,7 @@ public abstract class SubscriptionFileHandler implements
SubscriptionMessageHand
* @return the path to the target file
* @throws IOException if an I/O error occurs
*/
- public Path moveFile(final String target) throws IOException {
+ public synchronized Path moveFile(final String target) throws IOException {
return this.moveFile(Paths.get(target));
}
@@ -74,7 +74,10 @@ public abstract class SubscriptionFileHandler implements
SubscriptionMessageHand
* @return the path to the target file
* @throws IOException if an I/O error occurs
*/
- public Path moveFile(final Path target) throws IOException {
+ public synchronized Path moveFile(final Path target) throws IOException {
+ if (!Files.exists(target.getParent())) {
+ Files.createDirectories(target.getParent());
+ }
return Files.move(getPath(), target, StandardCopyOption.REPLACE_EXISTING);
}
@@ -83,7 +86,7 @@ public abstract class SubscriptionFileHandler implements
SubscriptionMessageHand
* @return the path to the target file
* @throws IOException if an I/O error occurs
*/
- public Path copyFile(final String target) throws IOException {
+ public synchronized Path copyFile(final String target) throws IOException {
return this.copyFile(Paths.get(target));
}
@@ -92,7 +95,10 @@ public abstract class SubscriptionFileHandler implements
SubscriptionMessageHand
* @return the path to the target file
* @throws IOException if an I/O error occurs
*/
- public Path copyFile(final Path target) throws IOException {
+ public synchronized Path copyFile(final Path target) throws IOException {
+ if (!Files.exists(target.getParent())) {
+ Files.createDirectories(target.getParent());
+ }
return Files.copy(
getPath(), target, StandardCopyOption.REPLACE_EXISTING,
StandardCopyOption.COPY_ATTRIBUTES);
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/util/IdentifierUtils.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/util/IdentifierUtils.java
new file mode 100644
index 00000000000..9f6d09ef44e
--- /dev/null
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/util/IdentifierUtils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.subscription.util;
+
+import
org.apache.iotdb.rpc.subscription.exception.SubscriptionIdentifierSemanticException;
+
+import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.read.common.parser.PathVisitor;
+
+public class IdentifierUtils {
+
+ /**
+ * refer
org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor#parseIdentifier(java.lang.String)
+ */
+ public static String parseIdentifier(final String src) {
+ if (src.startsWith(TsFileConstant.BACK_QUOTE_STRING)
+ && src.endsWith(TsFileConstant.BACK_QUOTE_STRING)) {
+ return src.substring(1, src.length() - 1)
+ .replace(TsFileConstant.DOUBLE_BACK_QUOTE_STRING,
TsFileConstant.BACK_QUOTE_STRING);
+ }
+ checkIdentifier(src);
+ return src;
+ }
+
+ private static void checkIdentifier(final String src) {
+ if (!TsFileConstant.IDENTIFIER_PATTERN.matcher(src).matches()
+ || PathVisitor.isRealNumber(src)) {
+ throw new SubscriptionIdentifierSemanticException(
+ String.format(
+ "%s is illegal, identifier not enclosed with backticks can only
consist of digits, characters and underscore.",
+ src));
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionAgentLauncher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionAgentLauncher.java
index 32dd078dd78..d83996cda47 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionAgentLauncher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionAgentLauncher.java
@@ -87,7 +87,7 @@ class SubscriptionAgentLauncher {
}
return;
- } catch (SubscriptionException | ClientManagerException | TException e) {
+ } catch (final SubscriptionException | ClientManagerException |
TException e) {
retry++;
LOGGER.warn(
"Failed to get topic meta from config node for {} times, will
retry at most {} times.",
@@ -97,7 +97,7 @@ class SubscriptionAgentLauncher {
try {
Thread.sleep(
retry *
SubscriptionConfig.getInstance().getSubscriptionLaunchRetryIntervalMs());
- } catch (InterruptedException interruptedException) {
+ } catch (final InterruptedException interruptedException) {
LOGGER.info(
"Interrupted while sleeping, will retry to get topic meta from
config node.",
interruptedException);
@@ -146,7 +146,7 @@ class SubscriptionAgentLauncher {
}
return;
- } catch (SubscriptionException | ClientManagerException | TException e) {
+ } catch (final SubscriptionException | ClientManagerException |
TException e) {
retry++;
LOGGER.warn(
"Failed to get consumer group meta from config node for {} times,
will retry at most {} times.",
@@ -156,7 +156,7 @@ class SubscriptionAgentLauncher {
try {
Thread.sleep(
retry *
SubscriptionConfig.getInstance().getSubscriptionLaunchRetryIntervalMs());
- } catch (InterruptedException interruptedException) {
+ } catch (final InterruptedException interruptedException) {
LOGGER.info(
"Interrupted while sleeping, will retry to get consumer group
meta from config node.",
interruptedException);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
index aff4893c61a..21064ac34c8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
@@ -141,23 +141,22 @@ public class SubscriptionBrokerAgent {
broker.bindPrefetchingQueue(subtask.getTopicName(),
subtask.getInputPendingQueue());
}
- public void unbindPrefetchingQueue(final SubscriptionConnectorSubtask
subtask) {
- final String consumerGroupId = subtask.getConsumerGroupId();
+ public void unbindPrefetchingQueue(
+ final String consumerGroupId, final String topicName, final boolean
doRemove) {
final SubscriptionBroker broker =
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
if (Objects.isNull(broker)) {
LOGGER.warn("Subscription: consumer group [{}] does not exist",
consumerGroupId);
return;
}
- broker.unbindPrefetchingQueue(subtask.getTopicName());
+ broker.unbindPrefetchingQueue(topicName, doRemove);
}
- public void executePrefetch(final SubscriptionConnectorSubtask subtask) {
- final String consumerGroupId = subtask.getConsumerGroupId();
+ public void executePrefetch(final String consumerGroupId, final String
topicName) {
final SubscriptionBroker broker =
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
if (Objects.isNull(broker)) {
LOGGER.warn("Subscription: consumer group [{}] does not exist",
consumerGroupId);
return;
}
- broker.executePrefetch(subtask.getTopicName());
+ broker.executePrefetch(topicName);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
index d6572923635..5284d295863 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
@@ -62,7 +62,7 @@ public class SubscriptionConsumerAgent {
////////////////////////// ConsumerGroupMeta Management Entry
//////////////////////////
public TPushConsumerGroupMetaRespExceptionMessage
handleSingleConsumerGroupMetaChanges(
- ConsumerGroupMeta consumerGroupMetaFromCoordinator) {
+ final ConsumerGroupMeta consumerGroupMetaFromCoordinator) {
acquireWriteLock();
try {
if (consumerGroupMetaFromCoordinator.isEmpty()) {
@@ -71,7 +71,7 @@ public class SubscriptionConsumerAgent {
handleSingleConsumerGroupMetaChangesInternal(consumerGroupMetaFromCoordinator);
}
return null;
- } catch (Exception e) {
+ } catch (final Exception e) {
final String consumerGroupId =
consumerGroupMetaFromCoordinator.getConsumerGroupId();
final String exceptionMessage =
String.format(
@@ -121,14 +121,15 @@ public class SubscriptionConsumerAgent {
}
public TPushConsumerGroupMetaRespExceptionMessage
handleConsumerGroupMetaChanges(
- List<ConsumerGroupMeta> consumerGroupMetasFromCoordinator) {
+ final List<ConsumerGroupMeta> consumerGroupMetasFromCoordinator) {
acquireWriteLock();
try {
- for (ConsumerGroupMeta consumerGroupMetaFromCoordinator :
consumerGroupMetasFromCoordinator) {
+ for (final ConsumerGroupMeta consumerGroupMetaFromCoordinator :
+ consumerGroupMetasFromCoordinator) {
try {
handleSingleConsumerGroupMetaChangesInternal(consumerGroupMetaFromCoordinator);
return null;
- } catch (Exception e) {
+ } catch (final Exception e) {
final String consumerGroupId =
consumerGroupMetaFromCoordinator.getConsumerGroupId();
final String exceptionMessage =
String.format(
@@ -146,12 +147,12 @@ public class SubscriptionConsumerAgent {
}
public TPushConsumerGroupMetaRespExceptionMessage handleDropConsumerGroup(
- String consumerGroupId) {
+ final String consumerGroupId) {
acquireWriteLock();
try {
handleDropConsumerGroupInternal(consumerGroupId);
return null;
- } catch (Exception e) {
+ } catch (final Exception e) {
final String exceptionMessage =
String.format(
"Subscription: Failed to drop consumer group %s, because %s",
consumerGroupId, e);
@@ -163,7 +164,7 @@ public class SubscriptionConsumerAgent {
}
}
- private void handleDropConsumerGroupInternal(String consumerGroupId) {
+ private void handleDropConsumerGroupInternal(final String consumerGroupId) {
if (SubscriptionAgent.broker().isBrokerExist(consumerGroupId)) {
if (!SubscriptionAgent.broker().dropBroker(consumerGroupId)) {
final String exceptionMessage =
@@ -180,7 +181,7 @@ public class SubscriptionConsumerAgent {
consumerGroupMetaKeeper.removeConsumerGroupMeta(consumerGroupId);
}
- public boolean isConsumerExisted(String consumerGroupId, String consumerId) {
+ public boolean isConsumerExisted(final String consumerGroupId, final String
consumerId) {
acquireReadLock();
try {
final ConsumerGroupMeta consumerGroupMeta =
@@ -191,7 +192,8 @@ public class SubscriptionConsumerAgent {
}
}
- public Set<String> getTopicsSubscribedByConsumer(String consumerGroupId,
String consumerId) {
+ public Set<String> getTopicsSubscribedByConsumer(
+ final String consumerGroupId, final String consumerId) {
acquireReadLock();
try {
return
consumerGroupMetaKeeper.getTopicsSubscribedByConsumer(consumerGroupId,
consumerId);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
index 1bb0735b019..71f319374cd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
@@ -51,7 +51,7 @@ public class SubscriptionReceiverAgent {
PipeSubscribeRequestVersion.VERSION_1.getVersion(),
SubscriptionReceiverV1::new);
}
- public TPipeSubscribeResp handle(TPipeSubscribeReq req) {
+ public TPipeSubscribeResp handle(final TPipeSubscribeReq req) {
final byte reqVersion = req.getVersion();
if (RECEIVER_CONSTRUCTORS.containsKey(reqVersion)) {
return getReceiver(reqVersion).handle(req);
@@ -69,7 +69,7 @@ public class SubscriptionReceiverAgent {
}
}
- private SubscriptionReceiver getReceiver(byte reqVersion) {
+ private SubscriptionReceiver getReceiver(final byte reqVersion) {
if (receiverThreadLocal.get() == null) {
return setAndGetReceiver(reqVersion);
}
@@ -88,7 +88,7 @@ public class SubscriptionReceiverAgent {
return receiverThreadLocal.get();
}
- private SubscriptionReceiver setAndGetReceiver(byte reqVersion) {
+ private SubscriptionReceiver setAndGetReceiver(final byte reqVersion) {
if (RECEIVER_CONSTRUCTORS.containsKey(reqVersion)) {
receiverThreadLocal.set(RECEIVER_CONSTRUCTORS.get(reqVersion).get());
} else {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
index fb27e8825b8..45145bd2b67 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
@@ -60,12 +60,12 @@ public class SubscriptionTopicAgent {
////////////////////////// Topic Management Entry //////////////////////////
public TPushTopicMetaRespExceptionMessage handleSingleTopicMetaChanges(
- TopicMeta topicMetaFromCoordinator) {
+ final TopicMeta topicMetaFromCoordinator) {
acquireWriteLock();
try {
handleSingleTopicMetaChangesInternal(topicMetaFromCoordinator);
return null;
- } catch (Exception e) {
+ } catch (final Exception e) {
final String topicName = topicMetaFromCoordinator.getTopicName();
final String exceptionMessage =
String.format(
@@ -86,13 +86,13 @@ public class SubscriptionTopicAgent {
}
public TPushTopicMetaRespExceptionMessage handleTopicMetaChanges(
- List<TopicMeta> topicMetasFromCoordinator) {
+ final List<TopicMeta> topicMetasFromCoordinator) {
acquireWriteLock();
try {
- for (TopicMeta topicMetaFromCoordinator : topicMetasFromCoordinator) {
+ for (final TopicMeta topicMetaFromCoordinator :
topicMetasFromCoordinator) {
try {
handleSingleTopicMetaChangesInternal(topicMetaFromCoordinator);
- } catch (Exception e) {
+ } catch (final Exception e) {
final String topicName = topicMetaFromCoordinator.getTopicName();
final String exceptionMessage =
String.format(
@@ -109,12 +109,12 @@ public class SubscriptionTopicAgent {
}
}
- public TPushTopicMetaRespExceptionMessage handleDropTopic(String topicName) {
+ public TPushTopicMetaRespExceptionMessage handleDropTopic(final String
topicName) {
acquireWriteLock();
try {
handleDropTopicInternal(topicName);
return null;
- } catch (Exception e) {
+ } catch (final Exception e) {
final String exceptionMessage =
String.format("Subscription: Failed to drop topic %s, because %s",
topicName, e);
LOGGER.warn(exceptionMessage);
@@ -125,11 +125,11 @@ public class SubscriptionTopicAgent {
}
}
- private void handleDropTopicInternal(String topicName) {
+ private void handleDropTopicInternal(final String topicName) {
topicMetaKeeper.removeTopicMeta(topicName);
}
- public boolean isTopicExisted(String topicName) {
+ public boolean isTopicExisted(final String topicName) {
acquireReadLock();
try {
return topicMetaKeeper.containsTopicMeta(topicName);
@@ -138,7 +138,7 @@ public class SubscriptionTopicAgent {
}
}
- public String getTopicFormat(String topicName) {
+ public String getTopicFormat(final String topicName) {
acquireReadLock();
try {
return topicMetaKeeper
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index 90785c1602e..d26ea8b4458 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -65,6 +65,10 @@ public class SubscriptionBroker {
final String topicName = entry.getKey();
final SubscriptionPrefetchingQueue prefetchingQueue = entry.getValue();
if (topicNames.contains(topicName)) {
+ if (prefetchingQueue.isClosed()) {
+ LOGGER.warn("Subscription: prefetching queue bound to topic [{}] is
closed", topicName);
+ continue;
+ }
final SubscriptionEvent event = prefetchingQueue.poll(consumerId);
if (Objects.nonNull(event)) {
events.add(event);
@@ -95,6 +99,10 @@ public class SubscriptionBroker {
LOGGER.warn(errorMessage);
throw new SubscriptionException(errorMessage);
}
+ if (prefetchingQueue.isClosed()) {
+ LOGGER.warn("Subscription: prefetching queue bound to topic [{}] is
closed", topicName);
+ return Collections.emptyList();
+ }
final SubscriptionEvent event =
((SubscriptionPrefetchingTsFileQueue) prefetchingQueue)
.pollTsFile(consumerId, fileName, writingOffset);
@@ -117,6 +125,10 @@ public class SubscriptionBroker {
"Subscription: prefetching queue bound to topic [{}] does not
exist", topicName);
continue;
}
+ if (prefetchingQueue.isClosed()) {
+ LOGGER.warn("Subscription: prefetching queue bound to topic [{}] is
closed", topicName);
+ continue;
+ }
if (!nack) {
if (prefetchingQueue.ack(commitContext)) {
successfulCommitContexts.add(commitContext);
@@ -155,18 +167,25 @@ public class SubscriptionBroker {
}
}
- public void unbindPrefetchingQueue(final String topicName) {
+ public void unbindPrefetchingQueue(final String topicName, final boolean
doRemove) {
final SubscriptionPrefetchingQueue prefetchingQueue =
topicNameToPrefetchingQueue.get(topicName);
if (Objects.isNull(prefetchingQueue)) {
LOGGER.warn("Subscription: prefetching queue bound to topic [{}] does
not exist", topicName);
return;
}
- // clean up uncommitted events
+
+ // mark prefetching queue closed first
+ prefetchingQueue.markClosed();
+
+ // clean up events in prefetching queue, this operation is idempotent
prefetchingQueue.cleanup();
- topicNameToPrefetchingQueue.remove(topicName);
- SubscriptionPrefetchingQueueMetrics.getInstance()
- .deregister(prefetchingQueue.getPrefetchingQueueId());
+
+ if (doRemove) {
+ topicNameToPrefetchingQueue.remove(topicName);
+ SubscriptionPrefetchingQueueMetrics.getInstance()
+ .deregister(prefetchingQueue.getPrefetchingQueueId());
+ }
}
public void executePrefetch(final String topicName) {
@@ -176,6 +195,10 @@ public class SubscriptionBroker {
LOGGER.warn("Subscription: prefetching queue bound to topic [{}] does
not exist", topicName);
return;
}
+ if (prefetchingQueue.isClosed()) {
+ LOGGER.warn("Subscription: prefetching queue bound to topic [{}] is
closed", topicName);
+ return;
+ }
prefetchingQueue.executePrefetch();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index 43aef195681..9dec709230f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -46,6 +46,9 @@ public abstract class SubscriptionPrefetchingQueue {
protected final Map<SubscriptionCommitContext, SubscriptionEvent>
uncommittedEvents;
private final AtomicLong subscriptionCommitIdGenerator = new AtomicLong(0);
+ private volatile boolean isCompleted = false;
+ private volatile boolean isClosed = false;
+
public SubscriptionPrefetchingQueue(
final String brokerId,
final String topicName,
@@ -61,11 +64,16 @@ public abstract class SubscriptionPrefetchingQueue {
public abstract void executePrefetch();
- /** clean up uncommitted events */
public void cleanup() {
+ // clean up uncommitted events
for (final SubscriptionEvent event : uncommittedEvents.values()) {
+ event.clearReferenceCount();
SubscriptionEventBinaryCache.getInstance().resetByteBuffer(event, true);
}
+ uncommittedEvents.clear();
+
+ // no need to clean up events in inputPendingQueue, see
+ //
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask.close
}
/////////////////////////////// commit ///////////////////////////////
@@ -150,4 +158,22 @@ public abstract class SubscriptionPrefetchingQueue {
public long getCurrentCommitId() {
return subscriptionCommitIdGenerator.get();
}
+
+ /////////////////////////////// termination ///////////////////////////////
+
+ public boolean isClosed() {
+ return isClosed;
+ }
+
+ public void markClosed() {
+ isClosed = true;
+ }
+
+ public boolean isCompleted() {
+ return isCompleted;
+ }
+
+ public void markCompleted() {
+ isCompleted = true;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java
index 195194ef3c8..14d3315e1b7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java
@@ -63,6 +63,15 @@ public class SubscriptionPrefetchingTabletsQueue extends
SubscriptionPrefetching
this.prefetchingQueue = new LinkedBlockingQueue<>();
}
+ @Override
+ public void cleanup() {
+ super.cleanup();
+
+ // no need to clean up events in prefetchingQueue, since all events in
prefetchingQueue are also
+ // in uncommittedEvents
+ prefetchingQueue.clear();
+ }
+
@Override
public SubscriptionEvent poll(final String consumerId) {
if (prefetchingQueue.isEmpty()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index fc5a1a8a81e..3f2dac42db6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.subscription.event.SubscriptionEventBinaryCache;
import org.apache.iotdb.db.subscription.event.SubscriptionTsFileEvent;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -63,6 +64,21 @@ public class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQ
this.consumerIdToCurrentEventMap = new ConcurrentHashMap<>();
}
+ @Override
+ public void cleanup() {
+ super.cleanup();
+
+ // clean up events in consumerIdToCurrentEventMap
+ consumerIdToCurrentEventMap
+ .values()
+ .forEach(
+ (event) -> {
+ event.clearReferenceCount();
+
SubscriptionEventBinaryCache.getInstance().resetByteBuffer(event, true);
+ });
+ consumerIdToCurrentEventMap.clear();
+ }
+
@Override
public SubscriptionTsFileEvent poll(final String consumerId) {
if (hasUnPollableOnTheFlySubscriptionTsFileEvent(consumerId)) {
@@ -78,6 +94,14 @@ public class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQ
Event event;
while (Objects.nonNull(
event =
UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll()))) {
+ if (!(event instanceof EnrichedEvent)) {
+ LOGGER.warn(
+ "Subscription: SubscriptionPrefetchingTsFileQueue {} only support
poll EnrichedEvent. Ignore {}.",
+ this,
+ event);
+ continue;
+ }
+
if (event instanceof TabletInsertionEvent) {
final String errorMessage =
String.format(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
index c327b7e058d..0f7feb08e95 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
@@ -76,6 +76,12 @@ public class SubscriptionEvent {
}
}
+ public void clearReferenceCount() {
+ for (final EnrichedEvent enrichedEvent : enrichedEvents) {
+ enrichedEvent.clearReferenceCount(this.getClass().getName());
+ }
+ }
+
//////////////////////////// pollable ////////////////////////////
public void recordLastPolledTimestamp() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index 8339c3d3c88..9876339101a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
-import org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingQueue;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
@@ -72,6 +71,7 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
@@ -247,7 +247,7 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
private TPipeSubscribeResp handlePipeSubscribeSubscribe(final
PipeSubscribeSubscribeReq req) {
try {
return handlePipeSubscribeSubscribeInternal(req);
- } catch (final SubscriptionException e) {
+ } catch (final SubscriptionException | IOException e) {
final String exceptionMessage =
String.format(
"Subscription: something unexpected happened when subscribing:
%s, req: %s", e, req);
@@ -258,7 +258,7 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
}
private TPipeSubscribeResp handlePipeSubscribeSubscribeInternal(
- final PipeSubscribeSubscribeReq req) {
+ final PipeSubscribeSubscribeReq req) throws IOException {
// check consumer config thread local
final ConsumerConfig consumerConfig = consumerConfigThreadLocal.get();
if (Objects.isNull(consumerConfig)) {
@@ -268,18 +268,21 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
}
// subscribe topics
- Set<String> topicNames = req.getTopicNames();
- topicNames =
topicNames.stream().map(ASTVisitor::parseIdentifier).collect(Collectors.toSet());
+ final Set<String> topicNames = req.getTopicNames();
subscribe(consumerConfig, topicNames);
LOGGER.info("Subscription: consumer {} subscribe {} successfully",
consumerConfig, topicNames);
- return
PipeSubscribeSubscribeResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS);
+ return PipeSubscribeSubscribeResp.toTPipeSubscribeResp(
+ RpcUtils.SUCCESS_STATUS,
+ SubscriptionAgent.consumer()
+ .getTopicsSubscribedByConsumer(
+ consumerConfig.getConsumerGroupId(),
consumerConfig.getConsumerId()));
}
private TPipeSubscribeResp handlePipeSubscribeUnsubscribe(final
PipeSubscribeUnsubscribeReq req) {
try {
return handlePipeSubscribeUnsubscribeInternal(req);
- } catch (final SubscriptionException e) {
+ } catch (final SubscriptionException | IOException e) {
final String exceptionMessage =
String.format(
"Subscription: something unexpected happened when unsubscribing:
%s, req: %s",
@@ -291,7 +294,7 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
}
private TPipeSubscribeResp handlePipeSubscribeUnsubscribeInternal(
- final PipeSubscribeUnsubscribeReq req) {
+ final PipeSubscribeUnsubscribeReq req) throws IOException {
// check consumer config thread local
final ConsumerConfig consumerConfig = consumerConfigThreadLocal.get();
if (Objects.isNull(consumerConfig)) {
@@ -302,13 +305,16 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
}
// unsubscribe topics
- Set<String> topicNames = req.getTopicNames();
- topicNames =
topicNames.stream().map(ASTVisitor::parseIdentifier).collect(Collectors.toSet());
+ final Set<String> topicNames = req.getTopicNames();
unsubscribe(consumerConfig, topicNames);
LOGGER.info(
"Subscription: consumer {} unsubscribe {} successfully",
consumerConfig, topicNames);
- return
PipeSubscribeUnsubscribeResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS);
+ return PipeSubscribeUnsubscribeResp.toTPipeSubscribeResp(
+ RpcUtils.SUCCESS_STATUS,
+ SubscriptionAgent.consumer()
+ .getTopicsSubscribedByConsumer(
+ consumerConfig.getConsumerGroupId(),
consumerConfig.getConsumerId()));
}
private TPipeSubscribeResp handlePipeSubscribePoll(final
PipeSubscribePollReq req) {
@@ -403,15 +409,11 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
SubscriptionAgent.consumer()
.getTopicsSubscribedByConsumer(
consumerConfig.getConsumerGroupId(),
consumerConfig.getConsumerId());
- final Set<String> topicNames;
- if (messagePayload.getTopicNames().isEmpty()) {
+ Set<String> topicNames = messagePayload.getTopicNames();
+ if (topicNames.isEmpty()) {
// poll all subscribed topics
topicNames = subscribedTopicNames;
} else {
- topicNames =
- messagePayload.getTopicNames().stream()
- .map(ASTVisitor::parseIdentifier)
- .collect(Collectors.toSet());
// filter unsubscribed topics
topicNames.removeIf((topicName) ->
!subscribedTopicNames.contains(topicName));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
index bb4838b903e..b429b839942 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
@@ -31,14 +31,14 @@ public class SubscriptionConnectorSubtask extends
PipeConnectorSubtask {
private final String consumerGroupId;
public SubscriptionConnectorSubtask(
- String taskID,
- long creationTime,
- String attributeSortedString,
- int connectorIndex,
- UnboundedBlockingPendingQueue<Event> inputPendingQueue,
- PipeConnector outputPipeConnector,
- String topicName,
- String consumerGroupId) {
+ final String taskID,
+ final long creationTime,
+ final String attributeSortedString,
+ final int connectorIndex,
+ final UnboundedBlockingPendingQueue<Event> inputPendingQueue,
+ final PipeConnector outputPipeConnector,
+ final String topicName,
+ final String consumerGroupId) {
super(
taskID,
creationTime,
@@ -56,7 +56,7 @@ public class SubscriptionConnectorSubtask extends
PipeConnectorSubtask {
return false;
}
- SubscriptionAgent.broker().executePrefetch(this);
+ SubscriptionAgent.broker().executePrefetch(consumerGroupId, topicName);
// always return true
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
index 14633dfafef..43827ab85c4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
@@ -38,9 +38,9 @@ public class SubscriptionConnectorSubtaskLifeCycle extends
PipeConnectorSubtaskL
private int registeredTaskCount;
public SubscriptionConnectorSubtaskLifeCycle(
- PipeConnectorSubtaskExecutor executor, // SubscriptionSubtaskExecutor
- PipeConnectorSubtask subtask, // SubscriptionConnectorSubtask
- UnboundedBlockingPendingQueue<Event> pendingQueue) {
+ final PipeConnectorSubtaskExecutor executor, //
SubscriptionSubtaskExecutor
+ final PipeConnectorSubtask subtask, // SubscriptionConnectorSubtask
+ final UnboundedBlockingPendingQueue<Event> pendingQueue) {
super(executor, subtask, pendingQueue);
runningTaskCount = 0;
@@ -54,6 +54,7 @@ public class SubscriptionConnectorSubtaskLifeCycle extends
PipeConnectorSubtaskL
}
if (registeredTaskCount == 0) {
+ // bind prefetching queue
SubscriptionAgent.broker().bindPrefetchingQueue((SubscriptionConnectorSubtask)
subtask);
executor.register(subtask);
runningTaskCount = 0;
@@ -68,11 +69,13 @@ public class SubscriptionConnectorSubtaskLifeCycle extends
PipeConnectorSubtaskL
}
@Override
- public synchronized boolean deregister(String ignored) {
+ public synchronized boolean deregister(final String ignored) {
if (registeredTaskCount <= 0) {
throw new IllegalStateException("registeredTaskCount <= 0");
}
+ // no need to discard events of pipe
+
try {
if (registeredTaskCount > 1) {
return false;
@@ -91,45 +94,12 @@ public class SubscriptionConnectorSubtaskLifeCycle extends
PipeConnectorSubtaskL
}
}
- @Override
- public synchronized void start() {
- if (runningTaskCount < 0) {
- throw new IllegalStateException("runningTaskCount < 0");
- }
-
- if (runningTaskCount == 0) {
- executor.start(subtask.getTaskID());
- }
-
- runningTaskCount++;
- LOGGER.info(
- "Start subtask {}. runningTaskCount: {}, registeredTaskCount: {}",
- subtask,
- runningTaskCount,
- registeredTaskCount);
- }
-
- @Override
- public synchronized void stop() {
- if (runningTaskCount <= 0) {
- throw new IllegalStateException("runningTaskCount <= 0");
- }
-
- if (runningTaskCount == 1) {
- executor.stop(subtask.getTaskID());
- }
-
- runningTaskCount--;
- LOGGER.info(
- "Stop subtask {}. runningTaskCount: {}, registeredTaskCount: {}",
- subtask,
- runningTaskCount,
- registeredTaskCount);
- }
-
@Override
public synchronized void close() {
- executor.deregister(subtask.getTaskID());
-
SubscriptionAgent.broker().unbindPrefetchingQueue((SubscriptionConnectorSubtask)
subtask);
+ super.close();
+
+ final String consumerGroupId = ((SubscriptionConnectorSubtask)
subtask).getConsumerGroupId();
+ final String topicName = ((SubscriptionConnectorSubtask)
subtask).getTopicName();
+ SubscriptionAgent.broker().unbindPrefetchingQueue(consumerGroupId,
topicName, true);
}
}