This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5f28e331d29 Pipe: Optimized the error log when source/sink's username
is without password in alter (#15241)
5f28e331d29 is described below
commit 5f28e331d29ff2869c57385c308c0609aff8dcff
Author: Caideyipi <[email protected]>
AuthorDate: Tue Apr 1 18:56:48 2025 +0800
Pipe: Optimized the error log when source/sink's username is without
password in alter (#15241)
---
.../execution/config/TableConfigTaskVisitor.java | 32 ++++++-----
.../execution/config/TreeConfigTaskVisitor.java | 13 +++--
.../config/executor/ClusterConfigTaskExecutor.java | 67 ++++++++++++++++++++++
.../execution/config/sys/pipe/AlterPipeTask.java | 3 +-
.../metadata/pipe/AlterPipeStatement.java | 9 +++
5 files changed, 106 insertions(+), 18 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
index e6f2546f389..5291fbd4b67 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
@@ -906,15 +906,18 @@ public class TableConfigTaskVisitor extends
AstVisitor<IConfigTask, MPPQueryCont
// Inject table model into the extractor attributes
extractorAttributes.put(SystemConstant.SQL_DIALECT_KEY,
SystemConstant.SQL_DIALECT_TABLE_VALUE);
- checkAndEnrichSourceUserName(pipeName, extractorAttributes, userName);
- checkAndEnrichSinkUserName(pipeName, node.getConnectorAttributes(),
userName);
+ checkAndEnrichSourceUserName(pipeName, extractorAttributes, userName,
false);
+ checkAndEnrichSinkUserName(pipeName, node.getConnectorAttributes(),
userName, false);
return new CreatePipeTask(node);
}
public static void checkAndEnrichSourceUserName(
- final String pipeName, final Map<String, String> extractorAttributes,
final String userName) {
- final PipeParameters extractorParameters = new
PipeParameters(extractorAttributes);
+ final String pipeName,
+ final Map<String, String> replacedExtractorAttributes,
+ final String userName,
+ final boolean isAlter) {
+ final PipeParameters extractorParameters = new
PipeParameters(replacedExtractorAttributes);
final String pluginName =
extractorParameters
.getStringOrDefault(
@@ -933,19 +936,22 @@ public class TableConfigTaskVisitor extends
AstVisitor<IConfigTask, MPPQueryCont
PipeExtractorConstant.SOURCE_IOTDB_USER_KEY,
PipeExtractorConstant.EXTRACTOR_IOTDB_USERNAME_KEY,
PipeExtractorConstant.SOURCE_IOTDB_USERNAME_KEY)) {
- extractorAttributes.put(PipeExtractorConstant.SOURCE_IOTDB_USERNAME_KEY,
userName);
+
replacedExtractorAttributes.put(PipeExtractorConstant.SOURCE_IOTDB_USERNAME_KEY,
userName);
} else if (!extractorParameters.hasAnyAttributes(
PipeExtractorConstant.EXTRACTOR_IOTDB_PASSWORD_KEY,
PipeExtractorConstant.SOURCE_IOTDB_PASSWORD_KEY)) {
throw new SemanticException(
String.format(
- "Failed to create pipe %s, in iotdb-source, password must be set
when the username is specified.",
- pipeName));
+ "Failed to %s pipe %s, in iotdb-source, password must be set
when the username is specified.",
+ isAlter ? "alter" : "create", pipeName));
}
}
public static void checkAndEnrichSinkUserName(
- final String pipeName, final Map<String, String> connectorAttributes,
final String userName) {
+ final String pipeName,
+ final Map<String, String> connectorAttributes,
+ final String userName,
+ final boolean isAlter) {
final PipeParameters connectorParameters = new
PipeParameters(connectorAttributes);
final String pluginName =
connectorParameters
@@ -970,8 +976,8 @@ public class TableConfigTaskVisitor extends
AstVisitor<IConfigTask, MPPQueryCont
PipeConnectorConstant.SINK_IOTDB_PASSWORD_KEY)) {
throw new SemanticException(
String.format(
- "Failed to create pipe %s, in write-back-sink, password must be
set when the username is specified.",
- pipeName));
+ "Failed to %s pipe %s, in write-back-sink, password must be set
when the username is specified.",
+ isAlter ? "alter" : "create", pipeName));
}
}
@@ -997,14 +1003,14 @@ public class TableConfigTaskVisitor extends
AstVisitor<IConfigTask, MPPQueryCont
if (node.isReplaceAllExtractorAttributes()) {
extractorAttributes.put(
SystemConstant.SQL_DIALECT_KEY,
SystemConstant.SQL_DIALECT_TABLE_VALUE);
- checkAndEnrichSourceUserName(pipeName, extractorAttributes, userName);
+ checkAndEnrichSourceUserName(pipeName, extractorAttributes, userName,
true);
}
if (node.isReplaceAllConnectorAttributes()) {
- checkAndEnrichSinkUserName(pipeName, node.getConnectorAttributes(),
userName);
+ checkAndEnrichSinkUserName(pipeName, node.getConnectorAttributes(),
userName, true);
}
- return new AlterPipeTask(node);
+ return new AlterPipeTask(node, userName);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
index 62aac3aa56c..564776c7534 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
@@ -535,11 +535,13 @@ public class TreeConfigTaskVisitor extends
StatementVisitor<IConfigTask, MPPQuer
checkAndEnrichSourceUserName(
createPipeStatement.getPipeName(),
createPipeStatement.getExtractorAttributes(),
- context.getSession().getUserName());
+ context.getSession().getUserName(),
+ false);
checkAndEnrichSinkUserName(
createPipeStatement.getPipeName(),
createPipeStatement.getConnectorAttributes(),
- context.getSession().getUserName());
+ context.getSession().getUserName(),
+ false);
return new CreatePipeTask(createPipeStatement);
}
@@ -559,6 +561,8 @@ public class TreeConfigTaskVisitor extends
StatementVisitor<IConfigTask, MPPQuer
}
final String userName = context.getSession().getUserName();
+ alterPipeStatement.setUserName(userName);
+
final String pipeName = alterPipeStatement.getPipeName();
final Map<String, String> extractorAttributes =
alterPipeStatement.getExtractorAttributes();
@@ -567,11 +571,12 @@ public class TreeConfigTaskVisitor extends
StatementVisitor<IConfigTask, MPPQuer
if (alterPipeStatement.isReplaceAllExtractorAttributes()) {
extractorAttributes.put(
SystemConstant.SQL_DIALECT_KEY,
SystemConstant.SQL_DIALECT_TREE_VALUE);
- checkAndEnrichSourceUserName(pipeName, extractorAttributes, userName);
+ checkAndEnrichSourceUserName(pipeName, extractorAttributes, userName,
true);
}
if (alterPipeStatement.isReplaceAllConnectorAttributes()) {
- checkAndEnrichSinkUserName(pipeName,
alterPipeStatement.getConnectorAttributes(), userName);
+ checkAndEnrichSinkUserName(
+ pipeName, alterPipeStatement.getConnectorAttributes(), userName,
true);
}
return new AlterPipeTask(alterPipeStatement);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 1895db6bd70..59313cc4018 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -48,10 +48,13 @@ import
org.apache.iotdb.commons.executable.ExecutableResource;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import
org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginClassLoader;
import
org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginExecutableManager;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
import
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility;
import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils;
@@ -156,6 +159,7 @@ import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
+import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
@@ -2073,6 +2077,8 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
if (alterPipeStatement.isReplaceAllExtractorAttributes()) {
extractorAttributes = alterPipeStatement.getExtractorAttributes();
} else {
+ final boolean onlyContainsUser =
+ onlyContainsUser(alterPipeStatement.getExtractorAttributes());
pipeMetaFromCoordinator
.getStaticMeta()
.getExtractorParameters()
@@ -2080,6 +2086,9 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
new
PipeParameters(alterPipeStatement.getExtractorAttributes()));
extractorAttributes =
pipeMetaFromCoordinator.getStaticMeta().getExtractorParameters().getAttribute();
+ if (onlyContainsUser) {
+ checkSourceType(alterPipeStatement.getPipeName(),
extractorAttributes);
+ }
}
} else {
extractorAttributes =
@@ -2107,6 +2116,8 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
if (alterPipeStatement.isReplaceAllConnectorAttributes()) {
connectorAttributes = alterPipeStatement.getConnectorAttributes();
} else {
+ final boolean onlyContainsUser =
+ onlyContainsUser(alterPipeStatement.getConnectorAttributes());
pipeMetaFromCoordinator
.getStaticMeta()
.getConnectorParameters()
@@ -2114,6 +2125,9 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
new
PipeParameters(alterPipeStatement.getConnectorAttributes()));
connectorAttributes =
pipeMetaFromCoordinator.getStaticMeta().getConnectorParameters().getAttribute();
+ if (onlyContainsUser) {
+ checkSinkType(alterPipeStatement.getPipeName(),
connectorAttributes);
+ }
}
} else {
connectorAttributes =
@@ -2153,6 +2167,59 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
return future;
}
+ private static void checkSourceType(
+ final String pipeName, final Map<String, String>
replacedExtractorAttributes) {
+ final PipeParameters extractorParameters = new
PipeParameters(replacedExtractorAttributes);
+ final String pluginName =
+ extractorParameters
+ .getStringOrDefault(
+ Arrays.asList(
+ PipeExtractorConstant.EXTRACTOR_KEY,
PipeExtractorConstant.SOURCE_KEY),
+ BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+ .toLowerCase();
+
+ if
(pluginName.equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+ ||
pluginName.equals(BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName())) {
+ throw new SemanticException(
+ String.format(
+ "Failed to alter pipe %s, in iotdb-source, password must be set
when the username is specified.",
+ pipeName));
+ }
+ }
+
+ private static boolean onlyContainsUser(
+ final Map<String, String> extractorOrConnectorAttributes) {
+ final PipeParameters extractorOrConnectorParameters =
+ new PipeParameters(extractorOrConnectorAttributes);
+ return extractorOrConnectorParameters.hasAnyAttributes(
+ PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY,
+ PipeConnectorConstant.SINK_IOTDB_USER_KEY,
+ PipeConnectorConstant.CONNECTOR_IOTDB_USERNAME_KEY,
+ PipeConnectorConstant.SINK_IOTDB_USERNAME_KEY)
+ && !extractorOrConnectorParameters.hasAnyAttributes(
+ PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY,
+ PipeConnectorConstant.SINK_IOTDB_PASSWORD_KEY);
+ }
+
+ private static void checkSinkType(
+ final String pipeName, final Map<String, String> connectorAttributes) {
+ final PipeParameters connectorParameters = new
PipeParameters(connectorAttributes);
+ final String pluginName =
+ connectorParameters
+ .getStringOrDefault(
+ Arrays.asList(PipeConnectorConstant.CONNECTOR_KEY,
PipeConnectorConstant.SINK_KEY),
+ BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName())
+ .toLowerCase();
+
+ if
(pluginName.equals(BuiltinPipePlugin.WRITE_BACK_CONNECTOR.getPipePluginName())
+ ||
pluginName.equals(BuiltinPipePlugin.WRITE_BACK_SINK.getPipePluginName())) {
+ throw new SemanticException(
+ String.format(
+ "Failed to alter pipe %s, in write-back-sink, password must be
set when the username is specified.",
+ pipeName));
+ }
+ }
+
@Override
public SettableFuture<ConfigTaskResult> startPipe(final StartPipeStatement
startPipeStatement) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
index 159c6941628..40e303a23ae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
@@ -43,7 +43,7 @@ public class AlterPipeTask implements IConfigTask {
this.alterPipeStatement = alterPipeStatement;
}
- public AlterPipeTask(final AlterPipe node) {
+ public AlterPipeTask(final AlterPipe node, final String userName) {
alterPipeStatement = new AlterPipeStatement(StatementType.ALTER_PIPE);
alterPipeStatement.setPipeName(node.getPipeName());
alterPipeStatement.setIfExists(node.hasIfExistsCondition());
@@ -57,6 +57,7 @@ public class AlterPipeTask implements IConfigTask {
alterPipeStatement.setReplaceAllExtractorAttributes(node.isReplaceAllExtractorAttributes());
alterPipeStatement.setReplaceAllProcessorAttributes(node.isReplaceAllProcessorAttributes());
alterPipeStatement.setReplaceAllConnectorAttributes(node.isReplaceAllConnectorAttributes());
+ alterPipeStatement.setUserName(userName);
alterPipeStatement.setTableModel(true);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
index 5736d0c7570..603303b49ae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
@@ -37,6 +37,7 @@ import java.util.Map;
public class AlterPipeStatement extends Statement implements IConfigStatement {
private String pipeName;
+ private String userName;
private boolean ifExistsCondition;
private Map<String, String> extractorAttributes;
private Map<String, String> processorAttributes;
@@ -86,6 +87,10 @@ public class AlterPipeStatement extends Statement implements
IConfigStatement {
return isTableModel;
}
+ public String getUserName() {
+ return userName;
+ }
+
public void setPipeName(final String pipeName) {
this.pipeName = pipeName;
}
@@ -122,6 +127,10 @@ public class AlterPipeStatement extends Statement
implements IConfigStatement {
this.isTableModel = tableModel;
}
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
@Override
public QueryType getQueryType() {
return QueryType.WRITE;