Caideyipi commented on code in PR #12969:
URL: https://github.com/apache/iotdb/pull/12969#discussion_r1687340566
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java:
##########
@@ -160,18 +161,32 @@ public TSStatus stopPipe(String pipeName) {
}
/** Caller should ensure that the method is called in the lock {@link
#lock()}. */
- public TSStatus dropPipe(String pipeName) {
- final boolean isPipeExistedBeforeDrop =
pipeTaskInfo.isPipeExisted(pipeName);
- final TSStatus status =
configManager.getProcedureManager().dropPipe(pipeName);
+ public TSStatus dropPipe(TDropPipeReq req) {
+ final boolean isPipeExistedBeforeDrop =
pipeTaskInfo.isPipeExisted(req.getPipeName());
+ final TSStatus status =
configManager.getProcedureManager().dropPipe(req.getPipeName());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.warn("Failed to drop pipe {}. Result status: {}.", pipeName,
status);
+ LOGGER.warn("Failed to drop pipe {}. Result status: {}.",
req.getPipeName(), status);
}
- return isPipeExistedBeforeDrop
- ? status
- : RpcUtils.getStatus(
- TSStatusCode.PIPE_NOT_EXIST_ERROR,
- String.format(
- "Failed to drop pipe %s. Failures: %s does not exist.",
pipeName, pipeName));
+
+ // After the deletion operation is completed, handle the situation where
the pipe does not
+ // exist
+ if (!isPipeExistedBeforeDrop) {
+ // If the IF EXISTS condition is set in the request, return the current
status.
+ if (req.isIfExistsCondition()) {
+ return status;
Review Comment:
The conditions can somehow be compacted, since there are 2 "return status"es.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java:
##########
@@ -149,11 +151,31 @@ public TSStatus createTopic(TCreateTopicReq req) {
return status;
}
- public TSStatus dropTopic(String topicName) {
- final TSStatus status =
configManager.getProcedureManager().dropTopic(topicName);
+ public TSStatus dropTopic(TDropTopicReq req) {
+ final boolean isTopicExistedBeforeDrop =
subscriptionInfo.isTopicExisted(req.getTopicName());
+ final TSStatus status =
configManager.getProcedureManager().dropTopic(req.getTopicName());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.warn("Failed to drop topic {}. Result status: {}.", topicName,
status);
+ LOGGER.warn("Failed to drop topic {}. Result status: {}.",
req.getTopicName(), status);
}
+
+ // After the deletion operation is completed, handle the situation where
the topic does not
+ // exist
+ if (!isTopicExistedBeforeDrop) {
+ // If the IF EXISTS condition is set in the request, return the current
status.
+ if (req.isIfExistsCondition()) {
+ return status;
Review Comment:
Same as above.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java:
##########
@@ -179,7 +182,10 @@ protected Flow executeFromState(ConfigNodeProcedureEnv
env, OperateSubscriptionS
try {
switch (state) {
case VALIDATE:
- executeFromValidate(env);
+ if (!executeFromValidate(env)) {
Review Comment:
If the procedure shall be skipped, AbstractOperatePipeProcedureV2 returns
true, however AbstractSubscriptionProcedure returns false here. Better change
one of them.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java:
##########
@@ -110,6 +111,17 @@ protected PipeTaskOperation getOperation() {
return PipeTaskOperation.CREATE_PIPE;
}
+ /**
+ * Check the {@link PipePlugin} configuration in the Pipe, if there is an
error then throw a
+ * {@link PipeException}, if there is the same Pipe name and there is no
IfNotExists condition in
+ * the {@link #createPipeRequest} then throw a {@link PipeException}, if
there is an IfNotExists
+ * condition then end normally. {@link CreatePipeProcedureV2} process, if
there is no Pipe with
Review Comment:
What does "{@link CreatePipeProcedureV2} process," mean?
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java:
##########
@@ -91,19 +91,28 @@ public void releasePipePluginInfoLock() {
/////////////////////////////// Validator ///////////////////////////////
- public void validateBeforeCreatingPipePlugin(
- final String pluginName, final String jarName, final String jarMD5) {
+ public boolean validateBeforeCreatingPipePlugin(
+ final String pluginName,
+ final String jarName,
+ final String jarMD5,
+ final boolean ifNotExistsCondition) {
// both build-in and user defined pipe plugin should be unique
if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
+ if (ifNotExistsCondition) {
+ return false;
+ }
throw new PipeException(
String.format(
"Failed to create PipePlugin [%s], the same name PipePlugin has
been created",
pluginName));
}
+ return true;
}
- public void validateBeforeDroppingPipePlugin(final String pluginName) {
+ public boolean validateBeforeDroppingPipePlugin(
+ final String pluginName, boolean ifExistsCondition) {
if (!pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
+ if (ifExistsCondition) return false;
Review Comment:
Use {} like other codes....
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java:
##########
@@ -110,6 +111,17 @@ protected PipeTaskOperation getOperation() {
return PipeTaskOperation.CREATE_PIPE;
}
+ /**
+ * Check the {@link PipePlugin} configuration in the Pipe, if there is an
error then throw a
+ * {@link PipeException}, if there is the same Pipe name and there is no
IfNotExists condition in
+ * the {@link #createPipeRequest} then throw a {@link PipeException}, if
there is an IfNotExists
+ * condition then end normally. {@link CreatePipeProcedureV2} process, if
there is no Pipe with
Review Comment:
What does "{@link CreatePipeProcedureV2} process," mean?
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java:
##########
@@ -110,6 +111,17 @@ protected PipeTaskOperation getOperation() {
return PipeTaskOperation.CREATE_PIPE;
}
+ /**
+ * Check the {@link PipePlugin} configuration in the Pipe, if there is an
error then throw a
+ * {@link PipeException}, if there is the same Pipe name and there is no
IfNotExists condition in
+ * the {@link #createPipeRequest} then throw a {@link PipeException}, if
there is an IfNotExists
+ * condition then end normally. {@link CreatePipeProcedureV2} process, if
there is no Pipe with
Review Comment:
What does "{@link CreatePipeProcedureV2} process," mean?
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java:
##########
@@ -128,6 +134,18 @@ private Flow executeFromLock(ConfigNodeProcedureEnv env) {
return Flow.NO_MORE_STATE;
}
+ // If there is a Plugin with the same name and the IFExists condition
exists, the process ends
+ // normally.
+ if (!notExists) {
+ LOGGER.info(
+ "Pipe plugin {} is already exist, end the
DropPipePluginProcedure({})",
Review Comment:
....
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java:
##########
@@ -53,6 +53,9 @@ public abstract class AbstractOperateSubscriptionProcedure
private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractOperateSubscriptionProcedure.class);
+ private static final String SKIP_PROCEDURE_MESSAGE =
+ "Skip the following Procedure execution steps.";
Review Comment:
Better add some possible reasons here.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java:
##########
@@ -110,6 +111,17 @@ protected PipeTaskOperation getOperation() {
return PipeTaskOperation.CREATE_PIPE;
}
+ /**
+ * Check the {@link PipePlugin} configuration in the Pipe, if there is an
error then throw a
+ * {@link PipeException}, if there is the same Pipe name and there is no
IfNotExists condition in
+ * the {@link #createPipeRequest} then throw a {@link PipeException}, if
there is an IfNotExists
+ * condition then end normally. {@link CreatePipeProcedureV2} process, if
there is no Pipe with
Review Comment:
What does "{@link CreatePipeProcedureV2} process," mean?
##########
integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeCreateAndDropIT.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.autocreate;
+
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2AutoCreateSchema.class})
+public class IoTDBPipeCreateAndDropIT extends AbstractPipeDualAutoIT {
+
+ @Test
+ public void testBasicCreatePipe() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ // Create pipe
+ String sql =
+ String.format(
+ "create pipe a2b with source ('source'='iotdb-source',
'source.path'='root.test1.**') with processor
('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
+ receiverDataNode.getIpAndPortString());
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // show pipe
+ long creationTime;
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ Assert.assertEquals(1, showPipeResult.size());
+ // Check status
+ Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
+ // Check configurations
+
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source.path=root.test1.**"));
+ Assert.assertTrue(
+
showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor"));
+ Assert.assertTrue(
+ showPipeResult
+ .get(0)
+ .pipeConnector
+ .contains(String.format("node-urls=%s",
receiverDataNode.getIpAndPortString())));
+ // Record last creation time
+ creationTime = showPipeResult.get(0).creationTime;
+ }
+
+ // Create pipe If Not Exists
+ sql =
Review Comment:
Better test the "if not exists" / "if exists" only, because the normal
processes are already tested... We shall save time for github IT...
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java:
##########
@@ -116,8 +118,12 @@ private Flow executeFromLock(ConfigNodeProcedureEnv env) {
final AtomicReference<PipeTaskInfo> pipeTaskInfo =
pipeTaskCoordinator.lock();
pipePluginCoordinator.lock();
+ boolean notExists = true;
try {
-
pipePluginCoordinator.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName);
+ notExists =
Review Comment:
This return values means "exists", not "not exists"....
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java:
##########
@@ -155,19 +155,24 @@ public boolean canSkipNextSync() {
/////////////////////////////// Validator ///////////////////////////////
- public void checkBeforeCreatePipe(final TCreatePipeReq createPipeRequest)
throws PipeException {
+ public boolean checkBeforeCreatePipe(final TCreatePipeReq createPipeRequest)
+ throws PipeException {
acquireReadLock();
try {
- checkBeforeCreatePipeInternal(createPipeRequest);
+ return checkBeforeCreatePipeInternal(createPipeRequest);
} finally {
releaseReadLock();
}
}
- private void checkBeforeCreatePipeInternal(final TCreatePipeReq
createPipeRequest)
+ private boolean checkBeforeCreatePipeInternal(final TCreatePipeReq
createPipeRequest)
throws PipeException {
if (!isPipeExisted(createPipeRequest.getPipeName())) {
- return;
+ return true;
+ } else {
+ if (createPipeRequest.ifNotExistsCondition) {
Review Comment:
Can use “else if” directly.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java:
##########
@@ -110,6 +111,17 @@ protected PipeTaskOperation getOperation() {
return PipeTaskOperation.CREATE_PIPE;
}
+ /**
+ * Check the {@link PipePlugin} configuration in the Pipe, if there is an
error then throw a
+ * {@link PipeException}, if there is the same Pipe name and there is no
IfNotExists condition in
+ * the {@link #createPipeRequest} then throw a {@link PipeException}, if
there is an IfNotExists
+ * condition then end normally. {@link CreatePipeProcedureV2} process, if
there is no Pipe with
Review Comment:
What does "{@link CreatePipeProcedureV2} process," mean?
##########
iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java:
##########
@@ -250,6 +250,7 @@ public enum TSStatusCode {
ALTER_TOPIC_ERROR(2002),
SHOW_TOPIC_ERROR(2003),
TOPIC_PUSH_META_ERROR(2004),
+ TOPIC_NOT_EXIST_ERROR(2005),
Review Comment:
Is it used?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]