This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f28e331d29 Pipe: Optimized the error log when source/sink's username 
is without password in alter (#15241)
5f28e331d29 is described below

commit 5f28e331d29ff2869c57385c308c0609aff8dcff
Author: Caideyipi <[email protected]>
AuthorDate: Tue Apr 1 18:56:48 2025 +0800

    Pipe: Optimized the error log when source/sink's username is without 
password in alter (#15241)
---
 .../execution/config/TableConfigTaskVisitor.java   | 32 ++++++-----
 .../execution/config/TreeConfigTaskVisitor.java    | 13 +++--
 .../config/executor/ClusterConfigTaskExecutor.java | 67 ++++++++++++++++++++++
 .../execution/config/sys/pipe/AlterPipeTask.java   |  3 +-
 .../metadata/pipe/AlterPipeStatement.java          |  9 +++
 5 files changed, 106 insertions(+), 18 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
index e6f2546f389..5291fbd4b67 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
@@ -906,15 +906,18 @@ public class TableConfigTaskVisitor extends 
AstVisitor<IConfigTask, MPPQueryCont
 
     // Inject table model into the extractor attributes
     extractorAttributes.put(SystemConstant.SQL_DIALECT_KEY, 
SystemConstant.SQL_DIALECT_TABLE_VALUE);
-    checkAndEnrichSourceUserName(pipeName, extractorAttributes, userName);
-    checkAndEnrichSinkUserName(pipeName, node.getConnectorAttributes(), 
userName);
+    checkAndEnrichSourceUserName(pipeName, extractorAttributes, userName, 
false);
+    checkAndEnrichSinkUserName(pipeName, node.getConnectorAttributes(), 
userName, false);
 
     return new CreatePipeTask(node);
   }
 
   public static void checkAndEnrichSourceUserName(
-      final String pipeName, final Map<String, String> extractorAttributes, 
final String userName) {
-    final PipeParameters extractorParameters = new 
PipeParameters(extractorAttributes);
+      final String pipeName,
+      final Map<String, String> replacedExtractorAttributes,
+      final String userName,
+      final boolean isAlter) {
+    final PipeParameters extractorParameters = new 
PipeParameters(replacedExtractorAttributes);
     final String pluginName =
         extractorParameters
             .getStringOrDefault(
@@ -933,19 +936,22 @@ public class TableConfigTaskVisitor extends 
AstVisitor<IConfigTask, MPPQueryCont
         PipeExtractorConstant.SOURCE_IOTDB_USER_KEY,
         PipeExtractorConstant.EXTRACTOR_IOTDB_USERNAME_KEY,
         PipeExtractorConstant.SOURCE_IOTDB_USERNAME_KEY)) {
-      extractorAttributes.put(PipeExtractorConstant.SOURCE_IOTDB_USERNAME_KEY, 
userName);
+      
replacedExtractorAttributes.put(PipeExtractorConstant.SOURCE_IOTDB_USERNAME_KEY,
 userName);
     } else if (!extractorParameters.hasAnyAttributes(
         PipeExtractorConstant.EXTRACTOR_IOTDB_PASSWORD_KEY,
         PipeExtractorConstant.SOURCE_IOTDB_PASSWORD_KEY)) {
       throw new SemanticException(
           String.format(
-              "Failed to create pipe %s, in iotdb-source, password must be set 
when the username is specified.",
-              pipeName));
+              "Failed to %s pipe %s, in iotdb-source, password must be set 
when the username is specified.",
+              isAlter ? "alter" : "create", pipeName));
     }
   }
 
   public static void checkAndEnrichSinkUserName(
-      final String pipeName, final Map<String, String> connectorAttributes, 
final String userName) {
+      final String pipeName,
+      final Map<String, String> connectorAttributes,
+      final String userName,
+      final boolean isAlter) {
     final PipeParameters connectorParameters = new 
PipeParameters(connectorAttributes);
     final String pluginName =
         connectorParameters
@@ -970,8 +976,8 @@ public class TableConfigTaskVisitor extends 
AstVisitor<IConfigTask, MPPQueryCont
         PipeConnectorConstant.SINK_IOTDB_PASSWORD_KEY)) {
       throw new SemanticException(
           String.format(
-              "Failed to create pipe %s, in write-back-sink, password must be 
set when the username is specified.",
-              pipeName));
+              "Failed to %s pipe %s, in write-back-sink, password must be set 
when the username is specified.",
+              isAlter ? "alter" : "create", pipeName));
     }
   }
 
@@ -997,14 +1003,14 @@ public class TableConfigTaskVisitor extends 
AstVisitor<IConfigTask, MPPQueryCont
     if (node.isReplaceAllExtractorAttributes()) {
       extractorAttributes.put(
           SystemConstant.SQL_DIALECT_KEY, 
SystemConstant.SQL_DIALECT_TABLE_VALUE);
-      checkAndEnrichSourceUserName(pipeName, extractorAttributes, userName);
+      checkAndEnrichSourceUserName(pipeName, extractorAttributes, userName, 
true);
     }
 
     if (node.isReplaceAllConnectorAttributes()) {
-      checkAndEnrichSinkUserName(pipeName, node.getConnectorAttributes(), 
userName);
+      checkAndEnrichSinkUserName(pipeName, node.getConnectorAttributes(), 
userName, true);
     }
 
-    return new AlterPipeTask(node);
+    return new AlterPipeTask(node, userName);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
index 62aac3aa56c..564776c7534 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
@@ -535,11 +535,13 @@ public class TreeConfigTaskVisitor extends 
StatementVisitor<IConfigTask, MPPQuer
     checkAndEnrichSourceUserName(
         createPipeStatement.getPipeName(),
         createPipeStatement.getExtractorAttributes(),
-        context.getSession().getUserName());
+        context.getSession().getUserName(),
+        false);
     checkAndEnrichSinkUserName(
         createPipeStatement.getPipeName(),
         createPipeStatement.getConnectorAttributes(),
-        context.getSession().getUserName());
+        context.getSession().getUserName(),
+        false);
 
     return new CreatePipeTask(createPipeStatement);
   }
@@ -559,6 +561,8 @@ public class TreeConfigTaskVisitor extends 
StatementVisitor<IConfigTask, MPPQuer
     }
 
     final String userName = context.getSession().getUserName();
+    alterPipeStatement.setUserName(userName);
+
     final String pipeName = alterPipeStatement.getPipeName();
     final Map<String, String> extractorAttributes = 
alterPipeStatement.getExtractorAttributes();
 
@@ -567,11 +571,12 @@ public class TreeConfigTaskVisitor extends 
StatementVisitor<IConfigTask, MPPQuer
     if (alterPipeStatement.isReplaceAllExtractorAttributes()) {
       extractorAttributes.put(
           SystemConstant.SQL_DIALECT_KEY, 
SystemConstant.SQL_DIALECT_TREE_VALUE);
-      checkAndEnrichSourceUserName(pipeName, extractorAttributes, userName);
+      checkAndEnrichSourceUserName(pipeName, extractorAttributes, userName, 
true);
     }
 
     if (alterPipeStatement.isReplaceAllConnectorAttributes()) {
-      checkAndEnrichSinkUserName(pipeName, 
alterPipeStatement.getConnectorAttributes(), userName);
+      checkAndEnrichSinkUserName(
+          pipeName, alterPipeStatement.getConnectorAttributes(), userName, 
true);
     }
 
     return new AlterPipeTask(alterPipeStatement);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 1895db6bd70..59313cc4018 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -48,10 +48,13 @@ import 
org.apache.iotdb.commons.executable.ExecutableResource;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginClassLoader;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginExecutableManager;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
 import 
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
 import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility;
 import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils;
@@ -156,6 +159,7 @@ import org.apache.iotdb.db.exception.BatchProcessException;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
+import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
@@ -2073,6 +2077,8 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
         if (alterPipeStatement.isReplaceAllExtractorAttributes()) {
           extractorAttributes = alterPipeStatement.getExtractorAttributes();
         } else {
+          final boolean onlyContainsUser =
+              onlyContainsUser(alterPipeStatement.getExtractorAttributes());
           pipeMetaFromCoordinator
               .getStaticMeta()
               .getExtractorParameters()
@@ -2080,6 +2086,9 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
                   new 
PipeParameters(alterPipeStatement.getExtractorAttributes()));
           extractorAttributes =
               
pipeMetaFromCoordinator.getStaticMeta().getExtractorParameters().getAttribute();
+          if (onlyContainsUser) {
+            checkSourceType(alterPipeStatement.getPipeName(), 
extractorAttributes);
+          }
         }
       } else {
         extractorAttributes =
@@ -2107,6 +2116,8 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
         if (alterPipeStatement.isReplaceAllConnectorAttributes()) {
           connectorAttributes = alterPipeStatement.getConnectorAttributes();
         } else {
+          final boolean onlyContainsUser =
+              onlyContainsUser(alterPipeStatement.getConnectorAttributes());
           pipeMetaFromCoordinator
               .getStaticMeta()
               .getConnectorParameters()
@@ -2114,6 +2125,9 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
                   new 
PipeParameters(alterPipeStatement.getConnectorAttributes()));
           connectorAttributes =
               
pipeMetaFromCoordinator.getStaticMeta().getConnectorParameters().getAttribute();
+          if (onlyContainsUser) {
+            checkSinkType(alterPipeStatement.getPipeName(), 
connectorAttributes);
+          }
         }
       } else {
         connectorAttributes =
@@ -2153,6 +2167,59 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
     return future;
   }
 
+  private static void checkSourceType(
+      final String pipeName, final Map<String, String> 
replacedExtractorAttributes) {
+    final PipeParameters extractorParameters = new 
PipeParameters(replacedExtractorAttributes);
+    final String pluginName =
+        extractorParameters
+            .getStringOrDefault(
+                Arrays.asList(
+                    PipeExtractorConstant.EXTRACTOR_KEY, 
PipeExtractorConstant.SOURCE_KEY),
+                BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+            .toLowerCase();
+
+    if 
(pluginName.equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+        || 
pluginName.equals(BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName())) {
+      throw new SemanticException(
+          String.format(
+              "Failed to alter pipe %s, in iotdb-source, password must be set 
when the username is specified.",
+              pipeName));
+    }
+  }
+
+  private static boolean onlyContainsUser(
+      final Map<String, String> extractorOrConnectorAttributes) {
+    final PipeParameters extractorOrConnectorParameters =
+        new PipeParameters(extractorOrConnectorAttributes);
+    return extractorOrConnectorParameters.hasAnyAttributes(
+            PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY,
+            PipeConnectorConstant.SINK_IOTDB_USER_KEY,
+            PipeConnectorConstant.CONNECTOR_IOTDB_USERNAME_KEY,
+            PipeConnectorConstant.SINK_IOTDB_USERNAME_KEY)
+        && !extractorOrConnectorParameters.hasAnyAttributes(
+            PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY,
+            PipeConnectorConstant.SINK_IOTDB_PASSWORD_KEY);
+  }
+
+  private static void checkSinkType(
+      final String pipeName, final Map<String, String> connectorAttributes) {
+    final PipeParameters connectorParameters = new 
PipeParameters(connectorAttributes);
+    final String pluginName =
+        connectorParameters
+            .getStringOrDefault(
+                Arrays.asList(PipeConnectorConstant.CONNECTOR_KEY, 
PipeConnectorConstant.SINK_KEY),
+                BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName())
+            .toLowerCase();
+
+    if 
(pluginName.equals(BuiltinPipePlugin.WRITE_BACK_CONNECTOR.getPipePluginName())
+        || 
pluginName.equals(BuiltinPipePlugin.WRITE_BACK_SINK.getPipePluginName())) {
+      throw new SemanticException(
+          String.format(
+              "Failed to alter pipe %s, in write-back-sink, password must be 
set when the username is specified.",
+              pipeName));
+    }
+  }
+
   @Override
   public SettableFuture<ConfigTaskResult> startPipe(final StartPipeStatement 
startPipeStatement) {
     final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
index 159c6941628..40e303a23ae 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
@@ -43,7 +43,7 @@ public class AlterPipeTask implements IConfigTask {
     this.alterPipeStatement = alterPipeStatement;
   }
 
-  public AlterPipeTask(final AlterPipe node) {
+  public AlterPipeTask(final AlterPipe node, final String userName) {
     alterPipeStatement = new AlterPipeStatement(StatementType.ALTER_PIPE);
     alterPipeStatement.setPipeName(node.getPipeName());
     alterPipeStatement.setIfExists(node.hasIfExistsCondition());
@@ -57,6 +57,7 @@ public class AlterPipeTask implements IConfigTask {
     
alterPipeStatement.setReplaceAllExtractorAttributes(node.isReplaceAllExtractorAttributes());
     
alterPipeStatement.setReplaceAllProcessorAttributes(node.isReplaceAllProcessorAttributes());
     
alterPipeStatement.setReplaceAllConnectorAttributes(node.isReplaceAllConnectorAttributes());
+    alterPipeStatement.setUserName(userName);
 
     alterPipeStatement.setTableModel(true);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
index 5736d0c7570..603303b49ae 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
@@ -37,6 +37,7 @@ import java.util.Map;
 public class AlterPipeStatement extends Statement implements IConfigStatement {
 
   private String pipeName;
+  private String userName;
   private boolean ifExistsCondition;
   private Map<String, String> extractorAttributes;
   private Map<String, String> processorAttributes;
@@ -86,6 +87,10 @@ public class AlterPipeStatement extends Statement implements 
IConfigStatement {
     return isTableModel;
   }
 
+  public String getUserName() {
+    return userName;
+  }
+
   public void setPipeName(final String pipeName) {
     this.pipeName = pipeName;
   }
@@ -122,6 +127,10 @@ public class AlterPipeStatement extends Statement 
implements IConfigStatement {
     this.isTableModel = tableModel;
   }
 
+  public void setUserName(String userName) {
+    this.userName = userName;
+  }
+
   @Override
   public QueryType getQueryType() {
     return QueryType.WRITE;

Reply via email to