This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-table-model in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 133b28060a41300827f0cc098131adaa1a606aab Author: Steve Yurong Su <[email protected]> AuthorDate: Tue Sep 24 17:12:40 2024 +0800 pipe plugin sql --- .../plan/relational/sql/ast/CreatePipe.java | 11 ++-- .../ast/{CreatePipe.java => CreatePipePlugin.java} | 65 +++++++++------------- .../plan/relational/sql/ast/DropPipe.java | 2 +- .../sql/ast/{DropPipe.java => DropPipePlugin.java} | 20 +++---- .../ast/{StopPipe.java => ShowPipePlugins.java} | 26 ++------- .../plan/relational/sql/ast/StartPipe.java | 2 +- .../plan/relational/sql/ast/StopPipe.java | 2 +- .../plan/relational/sql/parser/AstBuilder.java | 31 ++++++++++- 8 files changed, 78 insertions(+), 81 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreatePipe.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreatePipe.java index 2e2917ed04b..e997c02a239 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreatePipe.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreatePipe.java @@ -43,11 +43,14 @@ public class CreatePipe extends Statement { final Map<String, String> processorAttributes, final Map<String, String> connectorAttributes) { super(null); - this.pipeName = requireNonNull(pipeName); + this.pipeName = requireNonNull(pipeName, "pipe name can not be null"); this.ifNotExistsCondition = ifNotExistsCondition; - this.extractorAttributes = requireNonNull(extractorAttributes); - this.processorAttributes = requireNonNull(processorAttributes); - this.connectorAttributes = requireNonNull(connectorAttributes); + this.extractorAttributes = + requireNonNull(extractorAttributes, "extractor/source attributes can not be null"); + this.processorAttributes = + requireNonNull(processorAttributes, "processor attributes can not be null"); + this.connectorAttributes = + requireNonNull(connectorAttributes, "connector attributes can not be null"); } public String getPipeName() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreatePipe.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreatePipePlugin.java similarity index 54% copy from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreatePipe.java copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreatePipePlugin.java index 2e2917ed04b..cc541ece980 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreatePipe.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreatePipePlugin.java @@ -22,52 +22,44 @@ package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; import com.google.common.collect.ImmutableList; import java.util.List; -import java.util.Map; import java.util.Objects; import static com.google.common.base.MoreObjects.toStringHelper; import static java.util.Objects.requireNonNull; -public class CreatePipe extends Statement { +public class CreatePipePlugin extends Statement { - private final String pipeName; + private final String pluginName; private final boolean ifNotExistsCondition; - private final Map<String, String> extractorAttributes; - private final Map<String, String> processorAttributes; - private final Map<String, String> connectorAttributes; + private final String className; + private final String uriString; - public CreatePipe( - final String pipeName, + public CreatePipePlugin( + final String pluginName, final boolean ifNotExistsCondition, - final Map<String, String> extractorAttributes, - final Map<String, String> processorAttributes, - final Map<String, String> connectorAttributes) { + final String className, + final String uriString) { super(null); - this.pipeName = requireNonNull(pipeName); + this.pluginName = requireNonNull(pluginName, "plugin name can not be null"); this.ifNotExistsCondition = ifNotExistsCondition; - this.extractorAttributes = requireNonNull(extractorAttributes); - this.processorAttributes = requireNonNull(processorAttributes); - this.connectorAttributes = requireNonNull(connectorAttributes); + this.className = requireNonNull(className, "class name can not be null"); + this.uriString = requireNonNull(uriString, "uri can not be null"); } - public String getPipeName() { - return pipeName; + public String getPluginName() { + return pluginName; } public boolean hasIfNotExistsCondition() { return ifNotExistsCondition; } - public Map<String, String> getExtractorAttributes() { - return extractorAttributes; + public String getClassName() { + return className; } - public Map<String, String> getProcessorAttributes() { - return processorAttributes; - } - - public Map<String, String> getConnectorAttributes() { - return connectorAttributes; + public String getUriString() { + return uriString; } @Override @@ -83,12 +75,7 @@ public class CreatePipe extends Statement { @Override public int hashCode() { - return Objects.hash( - pipeName, - ifNotExistsCondition, - extractorAttributes, - processorAttributes, - connectorAttributes); + return Objects.hash(pluginName, ifNotExistsCondition, className, uriString); } @Override @@ -99,22 +86,20 @@ public class CreatePipe extends Statement { if (obj == null || getClass() != obj.getClass()) { return false; } - CreatePipe other = (CreatePipe) obj; - return Objects.equals(pipeName, other.pipeName) + CreatePipePlugin other = (CreatePipePlugin) obj; + return Objects.equals(pluginName, other.pluginName) && Objects.equals(ifNotExistsCondition, other.ifNotExistsCondition) - && Objects.equals(extractorAttributes, other.extractorAttributes) - && Objects.equals(processorAttributes, other.processorAttributes) - && Objects.equals(connectorAttributes, other.connectorAttributes); + && Objects.equals(className, other.className) + && Objects.equals(uriString, other.uriString); } @Override public String toString() { return toStringHelper(this) - .add("pipeName", pipeName) + .add("pluginName", pluginName) .add("ifNotExistsCondition", ifNotExistsCondition) - .add("extractorAttributes", extractorAttributes) - .add("processorAttributes", processorAttributes) - .add("connectorAttributes", connectorAttributes) + .add("className", className) + .add("uriString", uriString) .toString(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipe.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipe.java index 08889b512ea..053431c1f19 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipe.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipe.java @@ -34,7 +34,7 @@ public class DropPipe extends Statement { public DropPipe(final String pipeName, final boolean ifExistsCondition) { super(null); - this.pipeName = requireNonNull(pipeName); + this.pipeName = requireNonNull(pipeName, "pipe name can not be null"); this.ifExistsCondition = ifExistsCondition; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipe.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipePlugin.java similarity index 79% copy from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipe.java copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipePlugin.java index 08889b512ea..81cb32d4244 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipe.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipePlugin.java @@ -27,19 +27,19 @@ import java.util.Objects; import static com.google.common.base.MoreObjects.toStringHelper; import static java.util.Objects.requireNonNull; -public class DropPipe extends Statement { +public class DropPipePlugin extends Statement { - private final String pipeName; + private final String pluginName; private final boolean ifExistsCondition; - public DropPipe(final String pipeName, final boolean ifExistsCondition) { + public DropPipePlugin(final String pluginName, final boolean ifExistsCondition) { super(null); - this.pipeName = requireNonNull(pipeName); + this.pluginName = requireNonNull(pluginName, "plugin name can not be null"); this.ifExistsCondition = ifExistsCondition; } - public String getPipeName() { - return pipeName; + public String getPluginName() { + return pluginName; } public boolean hasIfExistsCondition() { @@ -59,7 +59,7 @@ public class DropPipe extends Statement { @Override public int hashCode() { - return Objects.hash(pipeName, ifExistsCondition); + return Objects.hash(pluginName, ifExistsCondition); } @Override @@ -70,15 +70,15 @@ public class DropPipe extends Statement { if (obj == null || getClass() != obj.getClass()) { return false; } - DropPipe other = (DropPipe) obj; - return Objects.equals(pipeName, other.pipeName) + DropPipePlugin other = (DropPipePlugin) obj; + return Objects.equals(pluginName, other.pluginName) && Objects.equals(ifExistsCondition, other.ifExistsCondition); } @Override public String toString() { return toStringHelper(this) - .add("pipeName", pipeName) + .add("pluginName", pluginName) .add("ifExistsCondition", ifExistsCondition) .toString(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StopPipe.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowPipePlugins.java similarity index 69% copy from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StopPipe.java copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowPipePlugins.java index be6ac6d80f8..de290de3cdf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StopPipe.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowPipePlugins.java @@ -22,22 +22,13 @@ package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; import com.google.common.collect.ImmutableList; import java.util.List; -import java.util.Objects; import static com.google.common.base.MoreObjects.toStringHelper; -import static java.util.Objects.requireNonNull; -public class StopPipe extends Statement { +public class ShowPipePlugins extends Statement { - private final String pipeName; - - public StopPipe(final String pipeName) { + public ShowPipePlugins() { super(null); - this.pipeName = requireNonNull(pipeName); - } - - public String getPipeName() { - return pipeName; } @Override @@ -53,23 +44,16 @@ public class StopPipe extends Statement { @Override public int hashCode() { - return Objects.hash(pipeName); + return 0; } @Override public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null || getClass() != obj.getClass()) { - return false; - } - StopPipe other = (StopPipe) obj; - return Objects.equals(pipeName, other.pipeName); + return obj instanceof ShowPipePlugins; } @Override public String toString() { - return toStringHelper(this).add("pipeName", pipeName).toString(); + return toStringHelper(this).toString(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StartPipe.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StartPipe.java index 46a3395030f..5f6cbffee38 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StartPipe.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StartPipe.java @@ -33,7 +33,7 @@ public class StartPipe extends Statement { public StartPipe(final String pipeName) { super(null); - this.pipeName = requireNonNull(pipeName); + this.pipeName = requireNonNull(pipeName, "pipe name can not be null"); } public String getPipeName() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StopPipe.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StopPipe.java index be6ac6d80f8..1e1eb242d0a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StopPipe.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StopPipe.java @@ -33,7 +33,7 @@ public class StopPipe extends Statement { public StopPipe(final String pipeName) { super(null); - this.pipeName = requireNonNull(pipeName); + this.pipeName = requireNonNull(pipeName, "pipe name can not be null"); } public String getPipeName() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java index 0e306c0d684..52c3c770224 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java @@ -44,6 +44,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CountDevice; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateDB; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateIndex; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreatePipe; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreatePipePlugin; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTable; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CurrentDatabase; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CurrentTime; @@ -58,6 +59,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropColumn; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropDB; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropIndex; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipe; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipePlugin; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTable; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Except; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExistsPredicate; @@ -118,6 +120,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDB; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDataNodes; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDevice; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowIndex; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipePlugins; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipes; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowRegions; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTables; @@ -165,6 +168,8 @@ import org.apache.tsfile.utils.TimeDuration; import javax.annotation.Nullable; +import java.net.URI; +import java.net.URISyntaxException; import java.time.ZoneId; import java.util.ArrayDeque; import java.util.ArrayList; @@ -700,18 +705,38 @@ public class AstBuilder extends RelationalSqlBaseVisitor<Node> { @Override public Node visitCreatePipePluginStatement( RelationalSqlParser.CreatePipePluginStatementContext ctx) { - return super.visitCreatePipePluginStatement(ctx); + final String pluginName = ((Identifier) visit(ctx.identifier())).getValue(); + final boolean hasIfNotExistsCondition = + ctx.IF() != null && ctx.NOT() != null && ctx.EXISTS() != null; + final String className = ((StringLiteral) visit(ctx.className)).getValue(); + final String uriString = parseAndValidateURI(ctx.uriClause()); + return new CreatePipePlugin(pluginName, hasIfNotExistsCondition, className, uriString); + } + + private String parseAndValidateURI(RelationalSqlParser.UriClauseContext ctx) { + final String uriString = + ctx.uri.identifier() != null + ? ((Identifier) visit(ctx.uri.identifier())).getValue() + : ((StringLiteral) visit(ctx.uri.string())).getValue(); + try { + new URI(uriString); + } catch (URISyntaxException e) { + throw new SemanticException(String.format("Invalid URI: %s", uriString)); + } + return uriString; } @Override public Node visitDropPipePluginStatement(RelationalSqlParser.DropPipePluginStatementContext ctx) { - return super.visitDropPipePluginStatement(ctx); + final String pluginName = ((Identifier) visit(ctx.identifier())).getValue(); + final boolean hasIfExistsCondition = ctx.IF() != null && ctx.EXISTS() != null; + return new DropPipePlugin(pluginName, hasIfExistsCondition); } @Override public Node visitShowPipePluginsStatement( RelationalSqlParser.ShowPipePluginsStatementContext ctx) { - return super.visitShowPipePluginsStatement(ctx); + return new ShowPipePlugins(); } @Override
