Caideyipi commented on code in PR #12969:
URL: https://github.com/apache/iotdb/pull/12969#discussion_r1691253076
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java:
##########
@@ -110,6 +111,16 @@ protected PipeTaskOperation getOperation() {
return PipeTaskOperation.CREATE_PIPE;
}
+ /**
+ * Check the {@link PipePlugin} configuration in Pipe. If there is an error,
throw {@link
+ * PipeException}. If there is a Pipe with the same name and there is no
IfNotExists condition in
+ * {@link #createPipeRequest}, throw {@link PipeException}. If there is an
IfNotExists condition,
+ * return false. If there is no Pipe with the same name, return true.
Review Comment:
Better use "@code" for false, true, and null, since java does so in its own
docs....
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java:
##########
@@ -160,18 +161,24 @@ public TSStatus stopPipe(String pipeName) {
}
/** Caller should ensure that the method is called in the lock {@link
#lock()}. */
- public TSStatus dropPipe(String pipeName) {
- final boolean isPipeExistedBeforeDrop =
pipeTaskInfo.isPipeExisted(pipeName);
- final TSStatus status =
configManager.getProcedureManager().dropPipe(pipeName);
+ public TSStatus dropPipe(TDropPipeReq req) {
+ final boolean isPipeExistedBeforeDrop =
pipeTaskInfo.isPipeExisted(req.getPipeName());
Review Comment:
Better extract a variable for the frequently called members.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java:
##########
@@ -179,7 +183,15 @@ protected Flow executeFromState(ConfigNodeProcedureEnv
env, OperateSubscriptionS
try {
switch (state) {
case VALIDATE:
- executeFromValidate(env);
+ if (!executeFromValidate(env)) {
+ // On client side, the message returned after the successful
execution of the
+ // subscription
+ // command corresponding to this procedure is "Msg: The statement
is executed
Review Comment:
Can reformat the comment lines a little bit
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java:
##########
@@ -110,6 +111,16 @@ protected PipeTaskOperation getOperation() {
return PipeTaskOperation.CREATE_PIPE;
}
+ /**
+ * Check the {@link PipePlugin} configuration in Pipe. If there is an error,
throw {@link
+ * PipeException}. If there is a Pipe with the same name and there is no
IfNotExists condition in
+ * {@link #createPipeRequest}, throw {@link PipeException}. If there is an
IfNotExists condition,
+ * return false. If there is no Pipe with the same name, return true.
+ *
+ * @param env
+ * @return
Review Comment:
Add some descriptions for "return"
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java:
##########
@@ -53,6 +54,9 @@ public abstract class AbstractOperateSubscriptionProcedure
private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractOperateSubscriptionProcedure.class);
+ private static final String SKIP_Subscription_PROCEDURE_MESSAGE =
Review Comment:
Better unify the cases...
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java:
##########
@@ -149,11 +151,23 @@ public TSStatus createTopic(TCreateTopicReq req) {
return status;
}
- public TSStatus dropTopic(String topicName) {
- final TSStatus status =
configManager.getProcedureManager().dropTopic(topicName);
+ public TSStatus dropTopic(TDropTopicReq req) {
+ final boolean isTopicExistedBeforeDrop =
subscriptionInfo.isTopicExisted(req.getTopicName());
+ final TSStatus status =
configManager.getProcedureManager().dropTopic(req.getTopicName());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.warn("Failed to drop topic {}. Result status: {}.", topicName,
status);
+ LOGGER.warn("Failed to drop topic {}. Result status: {}.",
req.getTopicName(), status);
}
+
+ if (!isTopicExistedBeforeDrop && req.isIfExistsCondition()) {
Review Comment:
..... Seemingly this will generate opposite effects on users' "ifExists"....
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java:
##########
@@ -125,14 +128,18 @@ private Flow executeFromLock(ConfigNodeProcedureEnv env) {
env.getConfigManager().getPipeManager().getPipePluginCoordinator();
pipePluginCoordinator.lock();
-
+ boolean notExists = true;
try {
- pipePluginCoordinator
- .getPipePluginInfo()
- .validateBeforeCreatingPipePlugin(
- pipePluginMeta.getPluginName(),
- pipePluginMeta.getJarName(),
- pipePluginMeta.getJarMD5());
+ // If the result is ture, the procedure will continue; if it is false,
the procedure will exit
Review Comment:
true..
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java:
##########
@@ -149,11 +151,23 @@ public TSStatus createTopic(TCreateTopicReq req) {
return status;
}
- public TSStatus dropTopic(String topicName) {
- final TSStatus status =
configManager.getProcedureManager().dropTopic(topicName);
+ public TSStatus dropTopic(TDropTopicReq req) {
+ final boolean isTopicExistedBeforeDrop =
subscriptionInfo.isTopicExisted(req.getTopicName());
Review Comment:
Same as above, req.getTopicName() may better be extracted ;-)
##########
integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConditionalOperationsIT.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.autocreate;
+
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2AutoCreateSchema.class})
+public class IoTDBPipeConditionalOperationsIT extends AbstractPipeDualAutoIT {
+
+ @Test
+ public void testBasicCreatePipeIfNotExists() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ // Create pipe
+ String sql =
+ String.format(
+ "create pipe If Not Exists a2b with source
('source'='iotdb-source', 'source.pattern'='root.test1',
'source.realtime.mode'='stream') with processor
('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
+ receiverDataNode.getIpAndPortString());
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // show pipe
+ long creationTime;
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ Assert.assertEquals(1, showPipeResult.size());
+ // Check status
+ Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
+ // Check configurations
+
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source.pattern=root.test1"));
+ Assert.assertTrue(
+
showPipeResult.get(0).pipeExtractor.contains("source.realtime.mode=stream"));
+ Assert.assertTrue(
+
showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor"));
+ Assert.assertTrue(
+ showPipeResult
+ .get(0)
+ .pipeConnector
+ .contains(String.format("node-urls=%s",
receiverDataNode.getIpAndPortString())));
+ // Record last creation time
+ creationTime = showPipeResult.get(0).creationTime;
+ }
+
+ // Create pipe If Not Exists
+ sql =
+ String.format(
+ "create pipe If Not Exists a2b with source
('source'='iotdb-source', 'source.path'='root.test2.**') with sink
('node-urls'='%s')",
+ receiverDataNode.getIpAndPortString());
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // show pipe
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ Assert.assertEquals(1, showPipeResult.size());
+ // Check status
+ Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
+ // Check configurations
+
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source.pattern=root.test1"));
+ Assert.assertTrue(
+
showPipeResult.get(0).pipeExtractor.contains("source.realtime.mode=stream"));
+ Assert.assertTrue(
+
showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor"));
+ Assert.assertTrue(
+ showPipeResult
+ .get(0)
+ .pipeConnector
+ .contains(String.format("node-urls=%s",
receiverDataNode.getIpAndPortString())));
+
Assert.assertFalse(showPipeResult.get(0).pipeExtractor.contains("source.path=root.test2.**"));
+ Assert.assertEquals(creationTime, showPipeResult.get(0).creationTime);
+ }
+ }
+
+ @Test
+ public void testBasicDropPipeIfExists() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ // Create pipe
+ final String sql =
+ String.format(
+ "create pipe If Not Exists a2b with source
('source'='iotdb-source', 'source.path'='root.test1.**') with processor
('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
+ receiverDataNode.getIpAndPortString());
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // Drop pipe If Exists
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute("Drop pipe If Exists a2b");
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // show pipe
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ Assert.assertEquals(0, showPipeResult.size());
+ }
+
+ // Drop pipe If Exists
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute("Drop pipe If Exists a2b");
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ public void testBasicAlterPipeIfExists() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ // Create pipe If Not Exists
Review Comment:
...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]