This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch mergemaster0808 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2d5f2b344367006167b459c80e9dedb0f6f14d3f Author: Zhenyu Luo <[email protected]> AuthorDate: Thu Aug 1 14:33:44 2024 +0800 Pipe/Subscription: Add 'Create If Not Exists' and 'Drop If Exists' Support for Pipes, Plugins, and Topics (#12969) Co-authored-by: Steve Yurong Su <[email protected]> (cherry picked from commit 43e562a2ac1f032d3d3dd9de3d8acd95742e79b6) --- .../IoTDBPipeConditionalOperationsIT.java | 228 +++++++++++++++++++++ .../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 + .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 14 +- .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 9 + .../iotdb/confignode/manager/ConfigManager.java | 15 +- .../apache/iotdb/confignode/manager/IManager.java | 11 +- .../iotdb/confignode/manager/ProcedureManager.java | 13 +- .../coordinator/plugin/PipePluginCoordinator.java | 12 +- .../pipe/coordinator/task/PipeTaskCoordinator.java | 11 +- .../subscription/SubscriptionCoordinator.java | 18 +- .../persistence/pipe/PipePluginInfo.java | 26 ++- .../confignode/persistence/pipe/PipeTaskInfo.java | 26 ++- .../persistence/subscription/SubscriptionInfo.java | 12 +- .../impl/pipe/AbstractOperatePipeProcedureV2.java | 6 +- .../pipe/plugin/CreatePipePluginProcedure.java | 28 ++- .../impl/pipe/plugin/DropPipePluginProcedure.java | 22 +- .../runtime/PipeHandleLeaderChangeProcedure.java | 2 +- .../runtime/PipeHandleMetaChangeProcedure.java | 2 +- .../impl/pipe/runtime/PipeMetaSyncProcedure.java | 2 +- .../impl/pipe/task/AlterPipeProcedureV2.java | 6 +- .../impl/pipe/task/CreatePipeProcedureV2.java | 15 +- .../impl/pipe/task/DropPipeProcedureV2.java | 2 +- .../impl/pipe/task/StartPipeProcedureV2.java | 4 +- .../impl/pipe/task/StopPipeProcedureV2.java | 2 +- .../AbstractOperateSubscriptionProcedure.java | 15 +- .../consumer/AlterConsumerGroupProcedure.java | 3 +- .../runtime/ConsumerGroupMetaSyncProcedure.java | 3 +- .../subscription/CreateSubscriptionProcedure.java | 3 +- .../subscription/DropSubscriptionProcedure.java | 3 +- .../subscription/topic/AlterTopicProcedure.java | 4 +- .../subscription/topic/CreateTopicProcedure.java | 7 +- .../subscription/topic/DropTopicProcedure.java | 3 +- .../topic/runtime/TopicMetaSyncProcedure.java | 3 +- .../thrift/ConfigNodeRPCServiceProcessor.java | 20 +- .../iotdb/confignode/persistence/PipeInfoTest.java | 2 +- .../pipe/plugin/CreatePipePluginProcedureTest.java | 2 +- .../pipe/plugin/DropPipePluginProcedureTest.java | 2 +- .../iotdb/db/protocol/client/ConfigNodeClient.java | 14 ++ .../config/executor/ClusterConfigTaskExecutor.java | 37 +++- .../config/executor/IConfigTaskExecutor.java | 3 +- .../config/metadata/DropPipePluginTask.java | 6 +- .../db/queryengine/plan/parser/ASTVisitor.java | 20 +- .../metadata/pipe/AlterPipeStatement.java | 9 + .../metadata/pipe/CreatePipePluginStatement.java | 9 +- .../metadata/pipe/CreatePipeStatement.java | 9 + .../metadata/pipe/DropPipePluginStatement.java | 18 +- .../statement/metadata/pipe/DropPipeStatement.java | 9 + .../subscription/CreateTopicStatement.java | 10 +- .../metadata/subscription/DropTopicStatement.java | 9 + .../src/main/thrift/confignode.thrift | 21 ++ 50 files changed, 632 insertions(+), 99 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConditionalOperationsIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConditionalOperationsIT.java new file mode 100644 index 00000000000..e6dfd37a307 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConditionalOperationsIT.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it.autocreate; + +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo; +import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; + +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2AutoCreateSchema.class}) +public class IoTDBPipeConditionalOperationsIT extends AbstractPipeDualAutoIT { + + @Test + public void testBasicCreatePipeIfNotExists() throws Exception { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + // Create pipe + String sql = + String.format( + "create pipe If Not Exists a2b with source ('source'='iotdb-source', 'source.pattern'='root.test1', 'source.realtime.mode'='stream') with processor ('processor'='do-nothing-processor') with sink ('node-urls'='%s')", + receiverDataNode.getIpAndPortString()); + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute(sql); + } catch (SQLException e) { + fail(e.getMessage()); + } + + // show pipe + long creationTime; + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final List<TShowPipeInfo> showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertEquals(1, showPipeResult.size()); + // Check status + Assert.assertEquals("RUNNING", showPipeResult.get(0).state); + // Check configurations + Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source")); + Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source.pattern=root.test1")); + Assert.assertTrue( + showPipeResult.get(0).pipeExtractor.contains("source.realtime.mode=stream")); + Assert.assertTrue( + showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor")); + Assert.assertTrue( + showPipeResult + .get(0) + .pipeConnector + .contains(String.format("node-urls=%s", receiverDataNode.getIpAndPortString()))); + // Record last creation time + creationTime = showPipeResult.get(0).creationTime; + } + + // Create pipe If Not Exists + sql = + String.format( + "create pipe If Not Exists a2b with source ('source'='iotdb-source', 'source.path'='root.test2.**') with sink ('node-urls'='%s')", + receiverDataNode.getIpAndPortString()); + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute(sql); + } catch (SQLException e) { + fail(e.getMessage()); + } + + // show pipe + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final List<TShowPipeInfo> showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertEquals(1, showPipeResult.size()); + // Check status + Assert.assertEquals("RUNNING", showPipeResult.get(0).state); + // Check configurations + Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source")); + Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source.pattern=root.test1")); + Assert.assertTrue( + showPipeResult.get(0).pipeExtractor.contains("source.realtime.mode=stream")); + Assert.assertTrue( + showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor")); + Assert.assertTrue( + showPipeResult + .get(0) + .pipeConnector + .contains(String.format("node-urls=%s", receiverDataNode.getIpAndPortString()))); + Assert.assertFalse(showPipeResult.get(0).pipeExtractor.contains("source.path=root.test2.**")); + Assert.assertEquals(creationTime, showPipeResult.get(0).creationTime); + } + } + + @Test + public void testBasicDropPipeIfExists() throws Exception { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + // Create pipe + final String sql = + String.format( + "create pipe If Not Exists a2b with source ('source'='iotdb-source', 'source.path'='root.test1.**') with processor ('processor'='do-nothing-processor') with sink ('node-urls'='%s')", + receiverDataNode.getIpAndPortString()); + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute(sql); + } catch (SQLException e) { + fail(e.getMessage()); + } + + // Drop pipe If Exists + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute("Drop pipe If Exists a2b"); + } catch (SQLException e) { + fail(e.getMessage()); + } + + // show pipe + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final List<TShowPipeInfo> showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertEquals(0, showPipeResult.size()); + } + + // Drop pipe If Exists + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute("Drop pipe If Exists a2b"); + } catch (SQLException e) { + fail(e.getMessage()); + } + } + + public void testBasicAlterPipeIfExists() throws Exception { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + // Alter pipe If Exists + String sql = + String.format( + "Alter pipe If Exists a2b replace sink ('node-urls'='%s')", + receiverDataNode.getIpAndPortString()); + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute(sql); + } catch (SQLException e) { + fail(e.getMessage()); + } + + // show pipe + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final List<TShowPipeInfo> showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertEquals(0, showPipeResult.size()); + } + + // Create pipe + sql = + String.format( + "create pipe If Not Exists a2b with source ('source'='iotdb-source', 'source.pattern'='root.test1', 'source.realtime.mode'='stream') with processor ('processor'='do-nothing-processor') with sink ('node-urls'='%s')", + receiverDataNode.getIpAndPortString()); + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute(sql); + } catch (SQLException e) { + fail(e.getMessage()); + } + + // Alter pipe If Exists + sql = + String.format( + "Alter pipe If Exists a2b replace source () replace processor () replace sink ('node-urls'='%s')", + receiverDataNode.getIpAndPortString()); + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute(sql); + } catch (SQLException e) { + fail(e.getMessage()); + } + + // show pipe + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final List<TShowPipeInfo> showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList; + Assert.assertEquals(1, showPipeResult.size()); + // Check status + Assert.assertEquals("RUNNING", showPipeResult.get(0).state); + // Check configurations + Assert.assertFalse(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source")); + Assert.assertFalse(showPipeResult.get(0).pipeExtractor.contains("source.pattern=root.test1")); + Assert.assertFalse( + showPipeResult.get(0).pipeExtractor.contains("source.realtime.mode=stream")); + Assert.assertFalse( + showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor")); + Assert.assertTrue( + showPipeResult + .get(0) + .pipeConnector + .contains(String.format("node-urls=%s", receiverDataNode.getIpAndPortString()))); + } + } +} diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index abbe1425a22..0f7ab2f3f3d 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -263,6 +263,7 @@ public enum TSStatusCode { ALTER_TOPIC_ERROR(2002), SHOW_TOPIC_ERROR(2003), TOPIC_PUSH_META_ERROR(2004), + TOPIC_NOT_EXIST_ERROR(2005), // Consumer CREATE_CONSUMER_ERROR(2100), diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 index e66c875c9a7..8986fce18e1 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 @@ -535,7 +535,7 @@ verifyConnection // Pipe Task ========================================================================================= createPipe - : CREATE PIPE pipeName=identifier + : CREATE PIPE (IF NOT EXISTS)? pipeName=identifier extractorAttributesClause? processorAttributesClause? connectorAttributesClause @@ -575,7 +575,7 @@ connectorAttributeClause ; alterPipe - : ALTER PIPE pipeName=identifier + : ALTER PIPE (IF EXISTS)? pipeName=identifier alterExtractorAttributesClause? alterProcessorAttributesClause? alterConnectorAttributesClause? @@ -603,7 +603,7 @@ alterConnectorAttributesClause ; dropPipe - : DROP PIPE pipeName=identifier + : DROP PIPE (IF EXISTS)? pipeName=identifier ; startPipe @@ -620,11 +620,11 @@ showPipes // Pipe Plugin ========================================================================================= createPipePlugin - : CREATE PIPEPLUGIN pluginName=identifier AS className=STRING_LITERAL uriClause + : CREATE PIPEPLUGIN (IF NOT EXISTS)? pluginName=identifier AS className=STRING_LITERAL uriClause ; dropPipePlugin - : DROP PIPEPLUGIN pluginName=identifier + : DROP PIPEPLUGIN (IF EXISTS)? pluginName=identifier ; showPipePlugins @@ -633,7 +633,7 @@ showPipePlugins // Topic ========================================================================================= createTopic - : CREATE TOPIC topicName=identifier topicAttributesClause? + : CREATE TOPIC (IF NOT EXISTS)? topicName=identifier topicAttributesClause? ; topicAttributesClause @@ -645,7 +645,7 @@ topicAttributeClause ; dropTopic - : DROP TOPIC topicName=identifier + : DROP TOPIC (IF EXISTS)? topicName=identifier ; showTopics diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 index d1865a1e204..dc9c12c7c8c 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 @@ -286,6 +286,10 @@ EVERY : E V E R Y ; +EXISTS + : E X I S T S + ; + EXPLAIN : E X P L A I N ; @@ -942,10 +946,15 @@ ELSE : E L S E ; +IF + : I F + ; + INF : I N F ; + // Privileges Keywords PRIVILEGE_VALUE diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 4a7a6144b5d..68e605976b8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -150,6 +150,9 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteDatabasesReq; import org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq; import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq; import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp; @@ -1495,10 +1498,10 @@ public class ConfigManager implements IManager { } @Override - public TSStatus dropPipePlugin(String pipePluginName) { + public TSStatus dropPipePlugin(TDropPipePluginReq req) { TSStatus status = confirmLeader(); return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() - ? pipeManager.getPipePluginCoordinator().dropPipePlugin(pipePluginName) + ? pipeManager.getPipePluginCoordinator().dropPipePlugin(req) : status; } @@ -2080,10 +2083,10 @@ public class ConfigManager implements IManager { } @Override - public TSStatus dropPipe(String pipeName) { + public TSStatus dropPipe(TDropPipeReq req) { TSStatus status = confirmLeader(); return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() - ? pipeManager.getPipeTaskCoordinator().dropPipe(pipeName) + ? pipeManager.getPipeTaskCoordinator().dropPipe(req) : status; } @@ -2112,10 +2115,10 @@ public class ConfigManager implements IManager { } @Override - public TSStatus dropTopic(String topicName) { + public TSStatus dropTopic(TDropTopicReq req) { TSStatus status = confirmLeader(); return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() - ? subscriptionManager.getSubscriptionCoordinator().dropTopic(topicName) + ? subscriptionManager.getSubscriptionCoordinator().dropTopic(req) : status; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index 5e9dd2f57c2..5f3d66044b5 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java @@ -79,6 +79,9 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteDatabasesReq; import org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq; import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq; import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp; @@ -484,7 +487,7 @@ public interface IManager { TSStatus createPipePlugin(TCreatePipePluginReq req); /** Drop pipe plugin. */ - TSStatus dropPipePlugin(String pluginName); + TSStatus dropPipePlugin(TDropPipePluginReq req); /** Show pipe plugins. */ TGetPipePluginTableResp getPipePluginTable(); @@ -651,11 +654,11 @@ public interface IManager { /** * Drop Pipe. * - * @param pipeName name of Pipe + * @param req Info about Pipe * @return {@link TSStatusCode#SUCCESS_STATUS} if dropped the pipe successfully, {@link * TSStatusCode#PIPE_ERROR} if encountered failure. */ - TSStatus dropPipe(String pipeName); + TSStatus dropPipe(TDropPipeReq req); /** * Get Pipe by name. If pipeName is empty, get all Pipe. @@ -690,7 +693,7 @@ public interface IManager { TSStatus createTopic(TCreateTopicReq topic); /** Drop Topic. */ - TSStatus dropTopic(String topicName); + TSStatus dropTopic(TDropTopicReq req); /** Show Topic. */ TShowTopicResp showTopic(TShowTopicReq req); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 9f0f7765f87..71f363facc0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -112,6 +112,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; import org.apache.iotdb.confignode.rpc.thrift.TCreateTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; import org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq; import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq; @@ -856,9 +857,10 @@ public class ProcedureManager { return statusList.get(0); } - public TSStatus createPipePlugin(PipePluginMeta pipePluginMeta, byte[] jarFile) { + public TSStatus createPipePlugin( + PipePluginMeta pipePluginMeta, byte[] jarFile, boolean isSetIfNotExistsCondition) { final CreatePipePluginProcedure createPipePluginProcedure = - new CreatePipePluginProcedure(pipePluginMeta, jarFile); + new CreatePipePluginProcedure(pipePluginMeta, jarFile, isSetIfNotExistsCondition); try { if (jarFile != null && new UpdateProcedurePlan(createPipePluginProcedure).getSerializedSize() @@ -886,8 +888,11 @@ public class ProcedureManager { } } - public TSStatus dropPipePlugin(String pluginName) { - final long procedureId = executor.submitProcedure(new DropPipePluginProcedure(pluginName)); + public TSStatus dropPipePlugin(TDropPipePluginReq req) { + final long procedureId = + executor.submitProcedure( + new DropPipePluginProcedure( + req.getPluginName(), req.isSetIfExistsCondition() && req.isIfExistsCondition())); final List<TSStatus> statusList = new ArrayList<>(); final boolean isSucceed = waitingProcedureFinished(Collections.singletonList(procedureId), statusList); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/plugin/PipePluginCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/plugin/PipePluginCoordinator.java index 5c0ad6492e6..07259f842fb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/plugin/PipePluginCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/plugin/PipePluginCoordinator.java @@ -28,6 +28,7 @@ import org.apache.iotdb.confignode.consensus.response.pipe.plugin.PipePluginTabl import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.persistence.pipe.PipePluginInfo; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipePluginReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq; import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp; import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp; @@ -72,11 +73,16 @@ public class PipePluginCoordinator { final PipePluginMeta pipePluginMeta = new PipePluginMeta(pluginName, className, false, jarName, jarMD5); - return configManager.getProcedureManager().createPipePlugin(pipePluginMeta, req.getJarFile()); + return configManager + .getProcedureManager() + .createPipePlugin( + pipePluginMeta, + req.getJarFile(), + req.isSetIfNotExistsCondition() && req.isIfNotExistsCondition()); } - public TSStatus dropPipePlugin(String pluginName) { - return configManager.getProcedureManager().dropPipePlugin(pluginName); + public TSStatus dropPipePlugin(TDropPipePluginReq req) { + return configManager.getProcedureManager().dropPipePlugin(req); } public TGetPipePluginTableResp getPipePluginTable() { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java index b07203e43ed..100da334d35 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java @@ -26,6 +26,7 @@ import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo; import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp; @@ -160,13 +161,19 @@ public class PipeTaskCoordinator { } /** Caller should ensure that the method is called in the lock {@link #lock()}. */ - public TSStatus dropPipe(String pipeName) { + public TSStatus dropPipe(TDropPipeReq req) { + final String pipeName = req.getPipeName(); final boolean isPipeExistedBeforeDrop = pipeTaskInfo.isPipeExisted(pipeName); final TSStatus status = configManager.getProcedureManager().dropPipe(pipeName); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.warn("Failed to drop pipe {}. Result status: {}.", pipeName, status); } - return isPipeExistedBeforeDrop + + final boolean isSetIfExistsCondition = + req.isSetIfExistsCondition() && req.isIfExistsCondition(); + // If the `IF EXISTS` condition is not set and the pipe does not exist before the delete + // operation, return an error status indicating that the pipe does not exist. + return isPipeExistedBeforeDrop || isSetIfExistsCondition ? status : RpcUtils.getStatus( TSStatusCode.PIPE_NOT_EXIST_ERROR, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java index c48f3e0f078..20dfce44bf2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java @@ -30,6 +30,7 @@ import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo; import org.apache.iotdb.confignode.rpc.thrift.TCloseConsumerReq; import org.apache.iotdb.confignode.rpc.thrift.TCreateConsumerReq; import org.apache.iotdb.confignode.rpc.thrift.TCreateTopicReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TGetAllTopicInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq; @@ -38,6 +39,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TShowTopicResp; import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq; import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq; +import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; @@ -149,12 +151,24 @@ public class SubscriptionCoordinator { return status; } - public TSStatus dropTopic(String topicName) { + public TSStatus dropTopic(TDropTopicReq req) { + final String topicName = req.getTopicName(); + final boolean isTopicExistedBeforeDrop = subscriptionInfo.isTopicExisted(topicName); final TSStatus status = configManager.getProcedureManager().dropTopic(topicName); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.warn("Failed to drop topic {}. Result status: {}.", topicName, status); } - return status; + + // If the `IF EXISTS` condition is not set and the topic does not exist before the drop + // operation, return an error status indicating that the topic does not exist. + final boolean isIfExistedConditionSet = + req.isSetIfExistsCondition() && req.isIfExistsCondition(); + return isTopicExistedBeforeDrop || isIfExistedConditionSet + ? status + : RpcUtils.getStatus( + TSStatusCode.TOPIC_NOT_EXIST_ERROR, + String.format( + "Failed to drop topic %s. Failures: %s does not exist.", topicName, topicName)); } public TShowTopicResp showTopic(TShowTopicReq req) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java index 1e4a81cfe44..64d235e7f23 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java @@ -93,19 +93,38 @@ public class PipePluginInfo implements SnapshotProcessor { /////////////////////////////// Validator /////////////////////////////// - public void validateBeforeCreatingPipePlugin( - final String pluginName, final String jarName, final String jarMD5) { + /** + * @return true if the pipe plugin is already created and the isSetIfNotExistsCondition is true, + * false otherwise + * @throws PipeException if the pipe plugin is already created and the isSetIfNotExistsCondition + * is false + */ + public boolean validateBeforeCreatingPipePlugin( + final String pluginName, final boolean isSetIfNotExistsCondition) { // both build-in and user defined pipe plugin should be unique if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) { + if (isSetIfNotExistsCondition) { + return true; + } throw new PipeException( String.format( "Failed to create PipePlugin [%s], the same name PipePlugin has been created", pluginName)); } + return false; } - public void validateBeforeDroppingPipePlugin(final String pluginName) { + /** + * @return true if the pipe plugin is not created and the isSetIfExistsCondition is true, false + * otherwise + * @throws PipeException if the pipe plugin is not created and the isSetIfExistsCondition is false + */ + public boolean validateBeforeDroppingPipePlugin( + final String pluginName, final boolean isSetIfExistsCondition) { if (!pipePluginMetaKeeper.containsPipePlugin(pluginName)) { + if (isSetIfExistsCondition) { + return true; + } throw new PipeException( String.format( "Failed to drop PipePlugin [%s], this PipePlugin has not been created", pluginName)); @@ -116,6 +135,7 @@ public class PipePluginInfo implements SnapshotProcessor { "Failed to drop PipePlugin [%s], the PipePlugin is a built-in PipePlugin", pluginName)); } + return false; } public boolean isJarNeededToBeSavedWhenCreatingPipePlugin(final String jarName) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java index c5a264c047a..ddc6ae6ac60 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java @@ -155,19 +155,25 @@ public class PipeTaskInfo implements SnapshotProcessor { /////////////////////////////// Validator /////////////////////////////// - public void checkBeforeCreatePipe(final TCreatePipeReq createPipeRequest) throws PipeException { + public boolean checkBeforeCreatePipe(final TCreatePipeReq createPipeRequest) + throws PipeException { acquireReadLock(); try { - checkBeforeCreatePipeInternal(createPipeRequest); + return checkBeforeCreatePipeInternal(createPipeRequest); } finally { releaseReadLock(); } } - private void checkBeforeCreatePipeInternal(final TCreatePipeReq createPipeRequest) + private boolean checkBeforeCreatePipeInternal(final TCreatePipeReq createPipeRequest) throws PipeException { if (!isPipeExisted(createPipeRequest.getPipeName())) { - return; + return true; + } + + if (createPipeRequest.isSetIfNotExistsCondition() + && createPipeRequest.isIfNotExistsCondition()) { + return false; } final String exceptionMessage = @@ -178,19 +184,23 @@ public class PipeTaskInfo implements SnapshotProcessor { throw new PipeException(exceptionMessage); } - public void checkAndUpdateRequestBeforeAlterPipe(final TAlterPipeReq alterPipeRequest) + public boolean checkAndUpdateRequestBeforeAlterPipe(final TAlterPipeReq alterPipeRequest) throws PipeException { acquireReadLock(); try { - checkAndUpdateRequestBeforeAlterPipeInternal(alterPipeRequest); + return checkAndUpdateRequestBeforeAlterPipeInternal(alterPipeRequest); } finally { releaseReadLock(); } } - private void checkAndUpdateRequestBeforeAlterPipeInternal(final TAlterPipeReq alterPipeRequest) + private boolean checkAndUpdateRequestBeforeAlterPipeInternal(final TAlterPipeReq alterPipeRequest) throws PipeException { if (!isPipeExisted(alterPipeRequest.getPipeName())) { + if (alterPipeRequest.isSetIfExistsCondition() && alterPipeRequest.isIfExistsCondition()) { + return false; + } + final String exceptionMessage = String.format( "Failed to alter pipe %s, the pipe does not exist", alterPipeRequest.getPipeName()); @@ -254,6 +264,8 @@ public class PipeTaskInfo implements SnapshotProcessor { .getAttribute()); } } + + return true; } public void checkBeforeStartPipe(final String pipeName) throws PipeException { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java index b020ffc045d..6b64331422b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java @@ -142,20 +142,24 @@ public class SubscriptionInfo implements SnapshotProcessor { /////////////////////////////// Topic /////////////////////////////// - public void validateBeforeCreatingTopic(TCreateTopicReq createTopicReq) + public boolean validateBeforeCreatingTopic(TCreateTopicReq createTopicReq) throws SubscriptionException { acquireReadLock(); try { - checkBeforeCreateTopicInternal(createTopicReq); + return checkBeforeCreateTopicInternal(createTopicReq); } finally { releaseReadLock(); } } - private void checkBeforeCreateTopicInternal(TCreateTopicReq createTopicReq) + private boolean checkBeforeCreateTopicInternal(TCreateTopicReq createTopicReq) throws SubscriptionException { if (!isTopicExisted(createTopicReq.getTopicName())) { - return; + return true; + } + + if (createTopicReq.isSetIfNotExistsCondition() && createTopicReq.isIfNotExistsCondition()) { + return false; } final String exceptionMessage = diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java index 8decdd0b4da..325832a3869 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java @@ -185,7 +185,7 @@ public abstract class AbstractOperatePipeProcedureV2 /** * Execute at state {@link OperatePipeTaskState#VALIDATE_TASK}. * - * @return true if this procedure can skip subsequent stages (start RUNNING pipe or stop STOPPED + * @return false if this procedure can skip subsequent stages (start RUNNING pipe or stop STOPPED * pipe without runtime exception) * @throws PipeException if validation for pipe parameters failed */ @@ -224,8 +224,8 @@ public abstract class AbstractOperatePipeProcedureV2 try { switch (state) { case VALIDATE_TASK: - if (executeFromValidateTask(env)) { - LOGGER.warn("ProcedureId {}: {}", getProcId(), SKIP_PIPE_PROCEDURE_MESSAGE); + if (!executeFromValidateTask(env)) { + LOGGER.info("ProcedureId {}: {}", getProcId(), SKIP_PIPE_PROCEDURE_MESSAGE); // On client side, the message returned after the successful execution of the pipe // command corresponding to this procedure is "Msg: The statement is executed // successfully." diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java index 53fc302c663..fd485079890 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java @@ -66,14 +66,21 @@ public class CreatePipePluginProcedure extends AbstractNodeProcedure<CreatePipeP private PipePluginMeta pipePluginMeta; private byte[] jarFile; + // This field will not be serialized. It may cause some problems + // when the procedure fails on one node and recovers on another node. + // Though it is not a good practice, it is acceptable here. + private boolean isSetIfNotExistsCondition; + public CreatePipePluginProcedure() { super(); } - public CreatePipePluginProcedure(PipePluginMeta pipePluginMeta, byte[] jarFile) { + public CreatePipePluginProcedure( + PipePluginMeta pipePluginMeta, byte[] jarFile, boolean isSetIfNotExistsCondition) { super(); this.pipePluginMeta = pipePluginMeta; this.jarFile = jarFile; + this.isSetIfNotExistsCondition = isSetIfNotExistsCondition; } @Override @@ -125,20 +132,25 @@ public class CreatePipePluginProcedure extends AbstractNodeProcedure<CreatePipeP env.getConfigManager().getPipeManager().getPipePluginCoordinator(); pipePluginCoordinator.lock(); + final String pluginName = pipePluginMeta.getPluginName(); try { - pipePluginCoordinator + if (pipePluginCoordinator .getPipePluginInfo() - .validateBeforeCreatingPipePlugin( - pipePluginMeta.getPluginName(), - pipePluginMeta.getJarName(), - pipePluginMeta.getJarMD5()); + .validateBeforeCreatingPipePlugin(pluginName, isSetIfNotExistsCondition)) { + LOGGER.info( + "Pipe plugin {} is already created and isSetIfNotExistsCondition is true, end the CreatePipePluginProcedure({})", + pluginName, + pluginName); + pipePluginCoordinator.unlock(); + return Flow.NO_MORE_STATE; + } } catch (PipeException e) { // The pipe plugin has already created, we should end the procedure LOGGER.warn( "Pipe plugin {} is already created, end the CreatePipePluginProcedure({})", - pipePluginMeta.getPluginName(), - pipePluginMeta.getPluginName()); + pluginName, + pluginName); setFailure(new ProcedureException(e.getMessage())); pipePluginCoordinator.unlock(); return Flow.NO_MORE_STATE; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java index 2502a56ab47..dc9d4ce4f87 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java @@ -62,13 +62,20 @@ public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePlugi private String pluginName; + // If the plugin does not exist and the If Exists condition is met, the process ends. + // This field will not be serialized. It may cause some problems + // when the procedure fails on one node and recovers on another node. + // Though it is not a good practice, it is acceptable here. + private boolean isSetIfExistsCondition; + public DropPipePluginProcedure() { super(); } - public DropPipePluginProcedure(String pluginName) { + public DropPipePluginProcedure(String pluginName, boolean isSetIfExistsCondition) { super(); this.pluginName = pluginName; + this.isSetIfExistsCondition = isSetIfExistsCondition; } @Override @@ -117,7 +124,18 @@ public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePlugi pipePluginCoordinator.lock(); try { - pipePluginCoordinator.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName); + if (pipePluginCoordinator + .getPipePluginInfo() + .validateBeforeDroppingPipePlugin(pluginName, isSetIfExistsCondition)) { + LOGGER.info( + "Pipe plugin {} is not exist, end the DropPipePluginProcedure({})", + pluginName, + pluginName); + pipePluginCoordinator.unlock(); + pipeTaskCoordinator.unlock(); + return Flow.NO_MORE_STATE; + } + pipeTaskInfo.get().validatePipePluginUsageByPipe(pluginName); } catch (PipeException e) { // if the pipe plugin is a built-in plugin, we should not drop it diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java index 0803aa29ca8..f18737e8b7a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java @@ -71,7 +71,7 @@ public class PipeHandleLeaderChangeProcedure extends AbstractOperatePipeProcedur LOGGER.info("PipeHandleLeaderChangeProcedure: executeFromValidateTask"); // Nothing needs to be checked - return false; + return true; } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java index 0919894f0d5..e91a6ba974d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java @@ -81,7 +81,7 @@ public class PipeHandleMetaChangeProcedure extends AbstractOperatePipeProcedureV LOGGER.info("PipeHandleMetaChangeProcedure: executeFromValidateTask"); // Do nothing - return false; + return true; } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java index 8b1991aeb88..c4907d1f9e7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java @@ -94,7 +94,7 @@ public class PipeMetaSyncProcedure extends AbstractOperatePipeProcedureV2 { LOGGER.info("PipeMetaSyncProcedure: executeFromValidateTask"); LAST_EXECUTION_TIME.set(System.currentTimeMillis()); - return false; + return true; } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java index bfd35a3bc96..59a04f9a806 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java @@ -96,7 +96,9 @@ public class AlterPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { // We should execute checkBeforeAlterPipe before checking the pipe plugin. This method will // update the alterPipeRequest based on the alterPipeRequest and existing pipe metadata. - pipeTaskInfo.get().checkAndUpdateRequestBeforeAlterPipe(alterPipeRequest); + if (!pipeTaskInfo.get().checkAndUpdateRequestBeforeAlterPipe(alterPipeRequest)) { + return false; + } final PipeManager pipeManager = env.getConfigManager().getPipeManager(); pipeManager @@ -107,7 +109,7 @@ public class AlterPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { alterPipeRequest.getProcessorAttributes(), alterPipeRequest.getConnectorAttributes()); - return false; + return true; } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java index 68075b5911d..cd581d01332 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java @@ -42,6 +42,7 @@ import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation; import org.apache.iotdb.confignode.procedure.store.ProcedureType; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; import org.apache.iotdb.consensus.exception.ConsensusException; +import org.apache.iotdb.pipe.api.PipePlugin; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; @@ -110,6 +111,17 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 { return PipeTaskOperation.CREATE_PIPE; } + /** + * Check the {@link PipePlugin} configuration in Pipe. If there is an error, throw {@link + * PipeException}. If there is a Pipe with the same name and there is no IfNotExists condition in + * {@link #createPipeRequest}, throw {@link PipeException}. If there is an IfNotExists condition, + * return {@code false}. If there is no Pipe with the same name, return {@code true}. + * + * @param env the environment for the procedure + * @return {@code true} The pipeline does not exist {@code false} The pipeline already exists and + * satisfies the IfNotExists condition + * @throws PipeException + */ @Override public boolean executeFromValidateTask(ConfigNodeProcedureEnv env) throws PipeException { LOGGER.info( @@ -123,9 +135,8 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 { createPipeRequest.getExtractorAttributes(), createPipeRequest.getProcessorAttributes(), createPipeRequest.getConnectorAttributes()); - pipeTaskInfo.get().checkBeforeCreatePipe(createPipeRequest); - return false; + return pipeTaskInfo.get().checkBeforeCreatePipe(createPipeRequest); } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java index 60d851a66e3..0c7042caf3f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java @@ -78,7 +78,7 @@ public class DropPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { pipeTaskInfo.get().checkBeforeDropPipe(pipeName); - return false; + return true; } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java index 628935a2116..58254dc1ab0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java @@ -65,8 +65,8 @@ public class StartPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { pipeTaskInfo.get().checkBeforeStartPipe(pipeName); - return pipeTaskInfo.get().isPipeRunning(pipeName) - && !pipeTaskInfo.get().isStoppedByRuntimeException(pipeName); + return !pipeTaskInfo.get().isPipeRunning(pipeName) + || pipeTaskInfo.get().isStoppedByRuntimeException(pipeName); } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java index 81a70733363..a817e16c7ae 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java @@ -65,7 +65,7 @@ public class StopPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { pipeTaskInfo.get().checkBeforeStopPipe(pipeName); - return pipeTaskInfo.get().isPipeStoppedByUser(pipeName); + return !pipeTaskInfo.get().isPipeStoppedByUser(pipeName); } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java index 7b0ca8a3d25..4566738cd8d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java @@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -53,6 +54,9 @@ public abstract class AbstractOperateSubscriptionProcedure private static final Logger LOGGER = LoggerFactory.getLogger(AbstractOperateSubscriptionProcedure.class); + private static final String SKIP_SUBSCRIPTION_PROCEDURE_MESSAGE = + "Skip subscription-related operations and do nothing"; + private static final int RETRY_THRESHOLD = 1; protected AtomicReference<SubscriptionInfo> subscriptionInfo; @@ -156,7 +160,7 @@ public abstract class AbstractOperateSubscriptionProcedure protected abstract SubscriptionOperation getOperation(); - protected abstract void executeFromValidate(ConfigNodeProcedureEnv env) + protected abstract boolean executeFromValidate(ConfigNodeProcedureEnv env) throws SubscriptionException; protected abstract void executeFromOperateOnConfigNodes(ConfigNodeProcedureEnv env) @@ -179,7 +183,14 @@ public abstract class AbstractOperateSubscriptionProcedure try { switch (state) { case VALIDATE: - executeFromValidate(env); + if (!executeFromValidate(env)) { + LOGGER.info("ProcedureId {}: {}", getProcId(), SKIP_SUBSCRIPTION_PROCEDURE_MESSAGE); + // On client side, the message returned after the successful execution of the + // subscription command corresponding to this procedure is "Msg: The statement is + // executed successfully." + this.setResult(SKIP_SUBSCRIPTION_PROCEDURE_MESSAGE.getBytes(StandardCharsets.UTF_8)); + return Flow.NO_MORE_STATE; + } setNextState(OperateSubscriptionState.OPERATE_ON_CONFIG_NODES); break; case OPERATE_ON_CONFIG_NODES: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/AlterConsumerGroupProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/AlterConsumerGroupProcedure.java index 981e0513d04..6e6031b8102 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/AlterConsumerGroupProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/AlterConsumerGroupProcedure.java @@ -75,10 +75,11 @@ public class AlterConsumerGroupProcedure extends AbstractOperateSubscriptionProc } @Override - public void executeFromValidate(ConfigNodeProcedureEnv env) throws SubscriptionException { + public boolean executeFromValidate(ConfigNodeProcedureEnv env) throws SubscriptionException { LOGGER.info("AlterConsumerGroupProcedure: executeFromValidate, try to validate"); validateAndGetOldAndNewMeta(env); + return true; } protected void validateAndGetOldAndNewMeta(ConfigNodeProcedureEnv env) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java index 6e3a5387479..bb010eaca40 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java @@ -91,10 +91,11 @@ public class ConsumerGroupMetaSyncProcedure extends AbstractOperateSubscriptionP } @Override - public void executeFromValidate(ConfigNodeProcedureEnv env) { + public boolean executeFromValidate(ConfigNodeProcedureEnv env) { LOGGER.info("ConsumerGroupMetaSyncProcedure: executeFromValidate"); LAST_EXECUTION_TIME.set(System.currentTimeMillis()); + return true; } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java index 0894feb62ad..166f7b3da5e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java @@ -86,7 +86,7 @@ public class CreateSubscriptionProcedure extends AbstractOperateSubscriptionAndP } @Override - protected void executeFromValidate(ConfigNodeProcedureEnv env) throws SubscriptionException { + protected boolean executeFromValidate(ConfigNodeProcedureEnv env) throws SubscriptionException { LOGGER.info("CreateSubscriptionProcedure: executeFromValidate"); subscriptionInfo.get().validateBeforeSubscribe(subscribeReq); @@ -131,6 +131,7 @@ public class CreateSubscriptionProcedure extends AbstractOperateSubscriptionAndP createPipeProcedure.executeFromValidateTask(env); createPipeProcedure.executeFromCalculateInfoForTask(env); } + return true; } // TODO: check periodically if the subscription is still valid but no working pipe? diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java index 89da284ac0d..7d0c1e09cbd 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java @@ -87,7 +87,7 @@ public class DropSubscriptionProcedure extends AbstractOperateSubscriptionAndPip } @Override - protected void executeFromValidate(final ConfigNodeProcedureEnv env) + protected boolean executeFromValidate(final ConfigNodeProcedureEnv env) throws SubscriptionException { LOGGER.info("DropSubscriptionProcedure: executeFromValidate"); @@ -133,6 +133,7 @@ public class DropSubscriptionProcedure extends AbstractOperateSubscriptionAndPip // Validate AlterConsumerGroupProcedure alterConsumerGroupProcedure.executeFromValidate(env); + return true; } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/AlterTopicProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/AlterTopicProcedure.java index 93380ed10be..f8fbdab72e0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/AlterTopicProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/AlterTopicProcedure.java @@ -84,12 +84,14 @@ public class AlterTopicProcedure extends AbstractOperateSubscriptionProcedure { } @Override - public void executeFromValidate(ConfigNodeProcedureEnv env) throws SubscriptionException { + public boolean executeFromValidate(ConfigNodeProcedureEnv env) throws SubscriptionException { LOGGER.info("AlterTopicProcedure: executeFromValidate"); subscriptionInfo.get().validateBeforeAlteringTopic(updatedTopicMeta); existedTopicMeta = subscriptionInfo.get().getTopicMeta(updatedTopicMeta.getTopicName()); + + return true; } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java index ed3d59bd3d4..b712861cb54 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java @@ -66,11 +66,13 @@ public class CreateTopicProcedure extends AbstractOperateSubscriptionProcedure { } @Override - protected void executeFromValidate(ConfigNodeProcedureEnv env) throws SubscriptionException { + protected boolean executeFromValidate(ConfigNodeProcedureEnv env) throws SubscriptionException { LOGGER.info("CreateTopicProcedure: executeFromValidate"); // 1. check if the topic exists - subscriptionInfo.get().validateBeforeCreatingTopic(createTopicReq); + if (!subscriptionInfo.get().validateBeforeCreatingTopic(createTopicReq)) { + return false; + } // 2. create the topic meta topicMeta = @@ -78,6 +80,7 @@ public class CreateTopicProcedure extends AbstractOperateSubscriptionProcedure { createTopicReq.getTopicName(), System.currentTimeMillis(), createTopicReq.getTopicAttributes()); + return true; } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/DropTopicProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/DropTopicProcedure.java index 54c6e3ed469..f1cfbb59d10 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/DropTopicProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/DropTopicProcedure.java @@ -61,10 +61,11 @@ public class DropTopicProcedure extends AbstractOperateSubscriptionProcedure { } @Override - protected void executeFromValidate(ConfigNodeProcedureEnv env) throws SubscriptionException { + protected boolean executeFromValidate(ConfigNodeProcedureEnv env) throws SubscriptionException { LOGGER.info("DropTopicProcedure: executeFromValidate({})", topicName); subscriptionInfo.get().validateBeforeDroppingTopic(topicName); + return true; } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java index 3d49a766102..40920e43936 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java @@ -90,10 +90,11 @@ public class TopicMetaSyncProcedure extends AbstractOperateSubscriptionProcedure } @Override - public void executeFromValidate(ConfigNodeProcedureEnv env) { + public boolean executeFromValidate(ConfigNodeProcedureEnv env) { LOGGER.info("TopicMetaSyncProcedure: executeFromValidate"); LAST_EXECUTION_TIME.set(System.currentTimeMillis()); + return true; } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index fd7db9ce3ca..54c1a55e212 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -117,6 +117,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq; import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq; import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq; import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp; @@ -824,7 +826,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac @Override public TSStatus dropPipePlugin(TDropPipePluginReq req) { - return configManager.dropPipePlugin(req.getPluginName()); + return configManager.dropPipePlugin(req); } @Override @@ -1054,7 +1056,13 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac @Override public TSStatus dropPipe(String pipeName) { - return configManager.dropPipe(pipeName); + return configManager.dropPipe( + new TDropPipeReq().setPipeName(pipeName).setIfExistsCondition(false)); + } + + @Override + public TSStatus dropPipeExtended(TDropPipeReq req) { + return configManager.dropPipe(req); } @Override @@ -1084,7 +1092,13 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac @Override public TSStatus dropTopic(String topicName) { - return configManager.dropTopic(topicName); + return configManager.dropTopic( + new TDropTopicReq().setTopicName(topicName).setIfExistsCondition(false)); + } + + @Override + public TSStatus dropTopicExtended(TDropTopicReq req) throws TException { + return configManager.dropTopic(req); } @Override diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java index a697adcdec8..f3571bed57e 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java @@ -152,7 +152,7 @@ public class PipeInfoTest { pipeInfo.getPipePluginInfo().createPipePlugin(createPipePluginPlan); // Drop pipe plugin test plugin - pipeInfo.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName); + pipeInfo.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName, false); DropPipePluginPlan dropPipePluginPlan = new DropPipePluginPlan(pluginName); pipeInfo.getPipePluginInfo().dropPipePlugin(dropPipePluginPlan); } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java index 8405f2d4675..93b95a3f336 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java @@ -41,7 +41,7 @@ public class CreatePipePluginProcedureTest { PipePluginMeta pipePluginMeta = new PipePluginMeta("test", "test.class", false, "test.jar", "testMD5test"); CreatePipePluginProcedure proc = - new CreatePipePluginProcedure(pipePluginMeta, new byte[] {1, 2, 3}); + new CreatePipePluginProcedure(pipePluginMeta, new byte[] {1, 2, 3}, false); try { proc.serialize(outputStream); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedureTest.java index c29f5bd7fb5..232a5fbee59 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedureTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedureTest.java @@ -36,7 +36,7 @@ public class DropPipePluginProcedureTest { PublicBAOS byteArrayOutputStream = new PublicBAOS(); DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); - DropPipePluginProcedure proc = new DropPipePluginProcedure("test"); + DropPipePluginProcedure proc = new DropPipePluginProcedure("test", false); try { proc.serialize(outputStream); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java index 1fb3e907b05..04057ef3def 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java @@ -85,6 +85,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq; import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq; import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq; import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp; @@ -994,6 +996,12 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie () -> client.dropPipe(pipeName), status -> !updateConfigNodeLeader(status)); } + @Override + public TSStatus dropPipeExtended(TDropPipeReq req) throws TException { + return executeRemoteCallWithRetry( + () -> client.dropPipeExtended(req), status -> !updateConfigNodeLeader(status)); + } + @Override public TShowPipeResp showPipe(TShowPipeReq req) throws TException { return executeRemoteCallWithRetry( @@ -1018,6 +1026,12 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie () -> client.dropTopic(topicName), status -> !updateConfigNodeLeader(status)); } + @Override + public TSStatus dropTopicExtended(TDropTopicReq req) throws TException { + return executeRemoteCallWithRetry( + () -> client.dropTopicExtended(req), status -> !updateConfigNodeLeader(status)); + } + @Override public TShowTopicResp showTopic(TShowTopicReq req) throws TException { return executeRemoteCallWithRetry( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 0bba6836171..1bc8667cc94 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -81,6 +81,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq; import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq; import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq; import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq; import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp; @@ -201,6 +203,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTTLStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.AlterPipeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipePluginStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipeStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipePluginStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.ShowPipesStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StartPipeStatement; @@ -881,6 +884,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { client.createPipePlugin( new TCreatePipePluginReq() .setPluginName(pluginName) + .setIfNotExistsCondition(createPipePluginStatement.hasIfNotExistsCondition()) .setClassName(className) .setJarFile(jarFile) .setJarMD5(jarMd5) @@ -907,13 +911,21 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } @Override - public SettableFuture<ConfigTaskResult> dropPipePlugin(String pluginName) { + public SettableFuture<ConfigTaskResult> dropPipePlugin( + DropPipePluginStatement dropPipePluginStatement) { final SettableFuture<ConfigTaskResult> future = SettableFuture.create(); try (final ConfigNodeClient client = CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - final TSStatus executionStatus = client.dropPipePlugin(new TDropPipePluginReq(pluginName)); + final TSStatus executionStatus = + client.dropPipePlugin( + new TDropPipePluginReq() + .setPluginName(dropPipePluginStatement.getPluginName()) + .setIfExistsCondition(dropPipePluginStatement.hasIfExistsCondition())); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) { - LOGGER.warn("[{}] Failed to drop pipe plugin {}.", executionStatus, pluginName); + LOGGER.warn( + "[{}] Failed to drop pipe plugin {}.", + executionStatus, + dropPipePluginStatement.getPluginName()); future.setException(new IoTDBException(executionStatus.message, executionStatus.code)); } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); @@ -1777,6 +1789,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { TCreatePipeReq req = new TCreatePipeReq() .setPipeName(createPipeStatement.getPipeName()) + .setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition()) .setExtractorAttributes(createPipeStatement.getExtractorAttributes()) .setProcessorAttributes(createPipeStatement.getProcessorAttributes()) .setConnectorAttributes(createPipeStatement.getConnectorAttributes()); @@ -1846,6 +1859,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { alterPipeStatement.isReplaceAllConnectorAttributes()); req.setExtractorAttributes(alterPipeStatement.getExtractorAttributes()); req.setIsReplaceAllExtractorAttributes(alterPipeStatement.isReplaceAllExtractorAttributes()); + req.setIfExistsCondition(alterPipeStatement.hasIfExistsCondition()); final TSStatus tsStatus = configNodeClient.alterPipe(req); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { LOGGER.warn("Failed to alter pipe {} in config node, status is {}.", pipeName, tsStatus); @@ -1909,7 +1923,11 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { try (ConfigNodeClient configNodeClient = CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - final TSStatus tsStatus = configNodeClient.dropPipe(dropPipeStatement.getPipeName()); + final TSStatus tsStatus = + configNodeClient.dropPipeExtended( + new TDropPipeReq() + .setPipeName(dropPipeStatement.getPipeName()) + .setIfExistsCondition(dropPipeStatement.hasIfExistsCondition())); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { LOGGER.warn( "Failed to drop pipe {}, status is {}.", dropPipeStatement.getPipeName(), tsStatus); @@ -2057,7 +2075,10 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { try (final ConfigNodeClient configNodeClient = CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { final TCreateTopicReq req = - new TCreateTopicReq().setTopicName(topicName).setTopicAttributes(topicAttributes); + new TCreateTopicReq() + .setTopicName(topicName) + .setIfNotExistsCondition(createTopicStatement.hasIfNotExistsCondition()) + .setTopicAttributes(topicAttributes); final TSStatus tsStatus = configNodeClient.createTopic(req); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { LOGGER.warn("Failed to create topic {} in config node, status is {}.", topicName, tsStatus); @@ -2076,7 +2097,11 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { final SettableFuture<ConfigTaskResult> future = SettableFuture.create(); try (ConfigNodeClient configNodeClient = CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - final TSStatus tsStatus = configNodeClient.dropTopic(dropTopicStatement.getTopicName()); + final TSStatus tsStatus = + configNodeClient.dropTopicExtended( + new TDropTopicReq() + .setIfExistsCondition(dropTopicStatement.hasIfExistsCondition()) + .setTopicName(dropTopicStatement.getTopicName())); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { LOGGER.warn( "Failed to drop topic {}, status is {}.", dropTopicStatement.getTopicName(), tsStatus); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java index 954f74f3897..8a1e1f416c2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java @@ -60,6 +60,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTTLStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.AlterPipeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipePluginStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipeStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipePluginStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.ShowPipesStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StartPipeStatement; @@ -118,7 +119,7 @@ public interface IConfigTaskExecutor { SettableFuture<ConfigTaskResult> createPipePlugin(CreatePipePluginStatement createPipeStatement); - SettableFuture<ConfigTaskResult> dropPipePlugin(String pluginName); + SettableFuture<ConfigTaskResult> dropPipePlugin(DropPipePluginStatement dropPipePluginStatement); SettableFuture<ConfigTaskResult> showPipePlugins(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/DropPipePluginTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/DropPipePluginTask.java index 82f9a325706..eb57f958430 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/DropPipePluginTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/DropPipePluginTask.java @@ -28,15 +28,15 @@ import com.google.common.util.concurrent.ListenableFuture; public class DropPipePluginTask implements IConfigTask { - private final String pluginName; + private final DropPipePluginStatement dropPipePluginStatement; public DropPipePluginTask(DropPipePluginStatement dropPipePluginStatement) { - this.pluginName = dropPipePluginStatement.getPluginName(); + this.dropPipePluginStatement = dropPipePluginStatement; } @Override public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor) throws InterruptedException { - return configTaskExecutor.dropPipePlugin(pluginName); + return configTaskExecutor.dropPipePlugin(dropPipePluginStatement); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java index 60302379b0c..6115218d125 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java @@ -960,6 +960,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { public Statement visitCreatePipePlugin(IoTDBSqlParser.CreatePipePluginContext ctx) { return new CreatePipePluginStatement( parseIdentifier(ctx.pluginName.getText()), + ctx.IF() != null && ctx.NOT() != null && ctx.EXISTS() != null, parseStringLiteral(ctx.className.getText()), parseAndValidateURI(ctx.uriClause())); } @@ -967,7 +968,10 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { // Drop PipePlugin ===================================================================== @Override public Statement visitDropPipePlugin(IoTDBSqlParser.DropPipePluginContext ctx) { - return new DropPipePluginStatement(parseIdentifier(ctx.pluginName.getText())); + final DropPipePluginStatement dropPipePluginStatement = new DropPipePluginStatement(); + dropPipePluginStatement.setPluginName(parseIdentifier(ctx.pluginName.getText())); + dropPipePluginStatement.setIfExists(ctx.IF() != null && ctx.EXISTS() != null); + return dropPipePluginStatement; } // Show PipePlugins ===================================================================== @@ -3719,6 +3723,10 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { throw new SemanticException( "Not support for this sql in CREATE PIPE, please enter pipe name."); } + + createPipeStatement.setIfNotExists( + ctx.IF() != null && ctx.NOT() != null && ctx.EXISTS() != null); + if (ctx.extractorAttributesClause() != null) { createPipeStatement.setExtractorAttributes( parseExtractorAttributesClause( @@ -3749,6 +3757,8 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { "Not support for this sql in ALTER PIPE, please enter pipe name."); } + alterPipeStatement.setIfExists(ctx.IF() != null && ctx.EXISTS() != null); + if (ctx.alterExtractorAttributesClause() != null) { alterPipeStatement.setExtractorAttributes( parseExtractorAttributesClause( @@ -3781,6 +3791,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { alterPipeStatement.setConnectorAttributes(new HashMap<>()); alterPipeStatement.setReplaceAllConnectorAttributes(false); } + return alterPipeStatement; } @@ -3827,6 +3838,8 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { throw new SemanticException("Not support for this sql in DROP PIPE, please enter pipename."); } + dropPipeStatement.setIfExists(ctx.IF() != null && ctx.EXISTS() != null); + return dropPipeStatement; } @@ -3879,6 +3892,9 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { "Not support for this sql in CREATE TOPIC, please enter topicName."); } + createTopicStatement.setIfNotExists( + ctx.IF() != null && ctx.NOT() != null && ctx.EXISTS() != null); + if (ctx.topicAttributesClause() != null) { createTopicStatement.setTopicAttributes( parseTopicAttributesClause(ctx.topicAttributesClause().topicAttributeClause())); @@ -3911,6 +3927,8 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { "Not support for this sql in DROP TOPIC, please enter topicName."); } + dropTopicStatement.setIfExists(ctx.IF() != null && ctx.EXISTS() != null); + return dropTopicStatement; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java index 0c0bef6c274..de5dc3c1d59 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java @@ -37,6 +37,7 @@ import java.util.Map; public class AlterPipeStatement extends Statement implements IConfigStatement { private String pipeName; + private boolean ifExistsCondition; private Map<String, String> extractorAttributes; private Map<String, String> processorAttributes; private Map<String, String> connectorAttributes; @@ -52,6 +53,10 @@ public class AlterPipeStatement extends Statement implements IConfigStatement { return pipeName; } + public boolean hasIfExistsCondition() { + return ifExistsCondition; + } + public Map<String, String> getExtractorAttributes() { return extractorAttributes; } @@ -80,6 +85,10 @@ public class AlterPipeStatement extends Statement implements IConfigStatement { this.pipeName = pipeName; } + public void setIfExists(boolean ifExistsCondition) { + this.ifExistsCondition = ifExistsCondition; + } + public void setExtractorAttributes(Map<String, String> extractorAttributes) { this.extractorAttributes = extractorAttributes; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipePluginStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipePluginStatement.java index f954cc347dc..f0975b56012 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipePluginStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipePluginStatement.java @@ -36,13 +36,16 @@ import java.util.List; public class CreatePipePluginStatement extends Statement implements IConfigStatement { private final String pluginName; + private final boolean ifNotExistsCondition; private final String className; private final String uriString; - public CreatePipePluginStatement(String pluginName, String className, String uriString) { + public CreatePipePluginStatement( + String pluginName, boolean ifNotExistsCondition, String className, String uriString) { super(); statementType = StatementType.CREATE_PIPEPLUGIN; this.pluginName = pluginName; + this.ifNotExistsCondition = ifNotExistsCondition; this.className = className; this.uriString = uriString; } @@ -51,6 +54,10 @@ public class CreatePipePluginStatement extends Statement implements IConfigState return pluginName; } + public boolean hasIfNotExistsCondition() { + return ifNotExistsCondition; + } + public String getClassName() { return className; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java index 45a1a4664cb..a7b7471ffd0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java @@ -37,6 +37,7 @@ import java.util.Map; public class CreatePipeStatement extends Statement implements IConfigStatement { private String pipeName; + private boolean ifNotExistsCondition; private Map<String, String> extractorAttributes; private Map<String, String> processorAttributes; private Map<String, String> connectorAttributes; @@ -49,6 +50,10 @@ public class CreatePipeStatement extends Statement implements IConfigStatement { return pipeName; } + public boolean hasIfNotExistsCondition() { + return ifNotExistsCondition; + } + public Map<String, String> getExtractorAttributes() { return extractorAttributes; } @@ -65,6 +70,10 @@ public class CreatePipeStatement extends Statement implements IConfigStatement { this.pipeName = pipeName; } + public void setIfNotExists(boolean ifNotExistsCondition) { + this.ifNotExistsCondition = ifNotExistsCondition; + } + public void setExtractorAttributes(Map<String, String> extractorAttributes) { this.extractorAttributes = extractorAttributes; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipePluginStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipePluginStatement.java index 49807d21a24..c7f4ebfd5df 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipePluginStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipePluginStatement.java @@ -35,18 +35,30 @@ import java.util.List; public class DropPipePluginStatement extends Statement implements IConfigStatement { - private final String pluginName; + private String pluginName; + private boolean ifExistsCondition; - public DropPipePluginStatement(String pluginName) { + public DropPipePluginStatement() { super(); statementType = StatementType.DROP_PIPEPLUGIN; - this.pluginName = pluginName; } public String getPluginName() { return pluginName; } + public boolean hasIfExistsCondition() { + return ifExistsCondition; + } + + public void setPluginName(String pluginName) { + this.pluginName = pluginName; + } + + public void setIfExists(boolean ifExistsCondition) { + this.ifExistsCondition = ifExistsCondition; + } + @Override public QueryType getQueryType() { return QueryType.WRITE; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java index a7d54a3086f..a3403e00e68 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java @@ -36,11 +36,16 @@ import java.util.List; public class DropPipeStatement extends Statement implements IConfigStatement { private String pipeName; + private boolean ifExistsCondition; public DropPipeStatement(StatementType dropPipeStatement) { this.statementType = dropPipeStatement; } + public boolean hasIfExistsCondition() { + return ifExistsCondition; + } + public String getPipeName() { return pipeName; } @@ -49,6 +54,10 @@ public class DropPipeStatement extends Statement implements IConfigStatement { this.pipeName = pipeName; } + public void setIfExists(boolean ifExistsCondition) { + this.ifExistsCondition = ifExistsCondition; + } + @Override public QueryType getQueryType() { return QueryType.WRITE; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/CreateTopicStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/CreateTopicStatement.java index 6482975b038..a98a1d5d273 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/CreateTopicStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/CreateTopicStatement.java @@ -37,7 +37,7 @@ import java.util.Map; public class CreateTopicStatement extends Statement implements IConfigStatement { private String topicName; - + private boolean ifNotExistsCondition; private Map<String, String> topicAttributes; public CreateTopicStatement() { @@ -49,6 +49,10 @@ public class CreateTopicStatement extends Statement implements IConfigStatement return topicName; } + public boolean hasIfNotExistsCondition() { + return ifNotExistsCondition; + } + public Map<String, String> getTopicAttributes() { return topicAttributes; } @@ -57,6 +61,10 @@ public class CreateTopicStatement extends Statement implements IConfigStatement this.topicName = topicName; } + public void setIfNotExists(boolean ifNotExistsCondition) { + this.ifNotExistsCondition = ifNotExistsCondition; + } + public void setTopicAttributes(Map<String, String> topicAttributes) { this.topicAttributes = topicAttributes; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java index e366b1c5eb3..36525b1846e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java @@ -35,6 +35,7 @@ import java.util.List; public class DropTopicStatement extends Statement implements IConfigStatement { private String topicName; + private boolean ifExistsCondition; public DropTopicStatement() { super(); @@ -45,10 +46,18 @@ public class DropTopicStatement extends Statement implements IConfigStatement { return topicName; } + public boolean hasIfExistsCondition() { + return ifExistsCondition; + } + public void setTopicName(String topicName) { this.topicName = topicName; } + public void setIfExists(boolean ifExistsCondition) { + this.ifExistsCondition = ifExistsCondition; + } + @Override public QueryType getQueryType() { return QueryType.WRITE; diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 17618206b43..47e8cf553bf 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -674,10 +674,12 @@ struct TCreatePipePluginReq { 3: required string jarName 4: required binary jarFile 5: required string jarMD5 + 6: optional bool ifNotExistsCondition } struct TDropPipePluginReq { 1: required string pluginName + 2: optional bool ifExistsCondition } // Get PipePlugin table from config node @@ -710,6 +712,7 @@ struct TCreatePipeReq { 2: optional map<string, string> extractorAttributes 3: optional map<string, string> processorAttributes 4: required map<string, string> connectorAttributes + 5: optional bool ifNotExistsCondition } struct TAlterPipeReq { @@ -720,6 +723,12 @@ struct TAlterPipeReq { 5: required bool isReplaceAllConnectorAttributes 6: optional map<string, string> extractorAttributes 7: optional bool isReplaceAllExtractorAttributes + 8: optional bool ifExistsCondition +} + +struct TDropPipeReq { + 1: required string pipeName + 2: optional bool ifExistsCondition } // Deprecated, restored for compatibility @@ -774,6 +783,12 @@ struct TAlterLogicalViewReq { struct TCreateTopicReq { 1: required string topicName 2: optional map<string, string> topicAttributes + 3: optional bool ifNotExistsCondition +} + +struct TDropTopicReq { + 1: required string topicName + 2: optional bool ifExistsCondition } struct TShowTopicReq { @@ -1519,6 +1534,9 @@ service IConfigNodeRPCService { /** Drop Pipe */ common.TSStatus dropPipe(string pipeName) + /** Drop Pipe */ + common.TSStatus dropPipeExtended(TDropPipeReq req) + /** Show Pipe by name, if name is empty, show all Pipe */ TShowPipeResp showPipe(TShowPipeReq req) @@ -1540,6 +1558,9 @@ service IConfigNodeRPCService { /** Drop Topic */ common.TSStatus dropTopic(string topicName) + /** Drop Topic */ + common.TSStatus dropTopicExtended(TDropTopicReq req) + /** Show Topic by name, if name is empty, show all Topic */ TShowTopicResp showTopic(TShowTopicReq req)
