This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f971de235ec Fix old pipe root user compatibility (#17985)
f971de235ec is described below

commit f971de235eca05453c8b6fec27ffe2d5a7f9ed22
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jun 22 09:37:45 2026 +0800

    Fix old pipe root user compatibility (#17985)
    
    * Fix old pipe root user compatibility
    
    * Fix ConfigurationFileUtilsTest import
    
    * Update AbstractEnv.java
    
    * Address pipe compatibility review comments
---
 .../iotdb/confignode/i18n/ConfigNodeMessages.java  |   3 +
 .../iotdb/confignode/i18n/ConfigNodeMessages.java  |   3 +
 .../iotdb/confignode/manager/ConfigManager.java    |   4 +-
 .../persistence/executor/ConfigPlanExecutor.java   |   1 +
 .../confignode/persistence/pipe/PipeInfo.java      |   8 +-
 .../confignode/persistence/pipe/PipeTaskInfo.java  |  55 ++++++-
 .../pipe/PipeTaskInfoAutoRestartTest.java          | 169 ++++++++++++++++++++-
 .../pipe/agent/task/meta/PipeStaticMeta.java       |  60 ++++++++
 8 files changed, 292 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
 
b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
index f662fa5871d..69a69c76db7 100644
--- 
a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
+++ 
b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
@@ -216,6 +216,9 @@ public final class ConfigNodeMessages {
       "Failed to drop trigger [%s], this trigger has not been created";
   public static final String FAILED_TO_DROP_UDF_THIS_UDF_HAS_NOT_BEEN_CREATED =
       "Failed to drop UDF [%s], this UDF has not been created";
+  public static final String
+      
FAILED_TO_ENRICH_PIPE_WITH_ROOT_USER_FOR_COMPATIBILITY_BECAUSE_ROOT_USER_DOES_NOT_EXIST
 =
+          "Failed to enrich pipe %s with root user for compatibility because 
root user %s does not exist.";
   public static final String 
FAILED_TO_FETCH_SCHEMAENGINE_BLACK_LIST_ON_DATANODE =
       "Failed to fetch schemaengine black list on DataNode {}, {}";
   public static final String FAILED_TO_GET_FIELD = "Failed to get field {}";
diff --git 
a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
 
b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
index 8bffa1b0831..6bf3da0e68b 100644
--- 
a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
+++ 
b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
@@ -212,6 +212,9 @@ public final class ConfigNodeMessages {
       "Failed to drop trigger [%s], this trigger has not been created";
   public static final String FAILED_TO_DROP_UDF_THIS_UDF_HAS_NOT_BEEN_CREATED =
       "Failed to drop UDF [%s], this UDF has not been created";
+  public static final String
+      
FAILED_TO_ENRICH_PIPE_WITH_ROOT_USER_FOR_COMPATIBILITY_BECAUSE_ROOT_USER_DOES_NOT_EXIST
 =
+          "Failed to enrich pipe %s with root user for compatibility because 
root user %s does not exist.";
   public static final String 
FAILED_TO_FETCH_SCHEMAENGINE_BLACK_LIST_ON_DATANODE =
       "Failed to fetch schemaengine black list on DataNode {}, {}";
   public static final String FAILED_TO_GET_FIELD = "Failed to get field {}";
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 92a22fe992f..da10bb22283 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -375,7 +375,8 @@ public class ConfigManager implements IManager {
     TriggerInfo triggerInfo = new TriggerInfo();
     CQInfo cqInfo = new CQInfo();
     ExternalServiceInfo externalServiceInfo = new ExternalServiceInfo();
-    PipeInfo pipeInfo = new PipeInfo();
+    this.permissionManager = createPermissionManager(authorInfo);
+    PipeInfo pipeInfo = new PipeInfo(userName -> 
this.permissionManager.login4Pipe(userName, null));
     QuotaInfo quotaInfo = new QuotaInfo();
     TTLInfo ttlInfo = new TTLInfo();
     SubscriptionInfo subscriptionInfo = new SubscriptionInfo();
@@ -409,7 +410,6 @@ public class ConfigManager implements IManager {
             new ClusterSchemaQuotaStatistics(
                 COMMON_CONF.getSeriesLimitThreshold(), 
COMMON_CONF.getDeviceLimitThreshold()));
     this.partitionManager = new PartitionManager(this, partitionInfo);
-    this.permissionManager = createPermissionManager(authorInfo);
     this.procedureManager = createProcedureManager(procedureInfo);
     this.externalServiceManager = new ExternalServiceManager(this);
     this.udfManager = new UDFManager(this, udfInfo);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index ed23fda8a6f..4a70ace8ca1 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -801,6 +801,7 @@ public class ConfigPlanExecutor {
               }
             });
     if (result.get()) {
+      pipeInfo.getPipeTaskInfo().enrichPipeMetasWithRootUserForCompatibility();
       LOGGER.info(
           
ConfigNodeMessages.CONFIGNODESNAPSHOT_LOAD_SNAPSHOT_SUCCESS_LATESTSNAPSHOTROOTDIR,
           latestSnapshotRootDir);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
index e353398cd9d..bef26386afd 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
@@ -49,6 +49,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.function.Function;
 
 public class PipeInfo implements SnapshotProcessor {
 
@@ -58,8 +59,13 @@ public class PipeInfo implements SnapshotProcessor {
   private final PipeTaskInfo pipeTaskInfo;
 
   public PipeInfo() throws IOException {
+    this(null);
+  }
+
+  public PipeInfo(final Function<String, String> 
pipeUserCurrentPasswordProvider)
+      throws IOException {
     pipePluginInfo = new PipePluginInfo();
-    pipeTaskInfo = new PipeTaskInfo();
+    pipeTaskInfo = new PipeTaskInfo(pipeUserCurrentPasswordProvider);
   }
 
   public PipePluginInfo getPipePluginInfo() {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 3c2e331b0a1..247f803152b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.persistence.pipe;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
@@ -56,7 +57,6 @@ import 
org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceM
 import 
org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleMetaChangeProcedure;
 import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
-import org.apache.iotdb.confignode.service.ConfigNode;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
@@ -76,9 +76,11 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
@@ -95,10 +97,17 @@ public class PipeTaskInfo implements SnapshotProcessor {
 
   // Pure in-memory object, not involved in snapshot serialization and 
deserialization.
   private final PipeTaskInfoVersion pipeTaskInfoVersion;
+  // Accepts a username and returns its current stored password for pipe 
authentication.
+  private final Function<String, String> pipeUserCurrentPasswordProvider;
 
   public PipeTaskInfo() {
+    this(null);
+  }
+
+  public PipeTaskInfo(final Function<String, String> 
pipeUserCurrentPasswordProvider) {
     this.pipeMetaKeeper = new PipeMetaKeeper();
     this.pipeTaskInfoVersion = new PipeTaskInfoVersion();
+    this.pipeUserCurrentPasswordProvider = pipeUserCurrentPasswordProvider;
   }
 
   /////////////////////////////// Lock ///////////////////////////////
@@ -445,6 +454,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
   public TSStatus createPipe(final CreatePipePlanV2 plan) {
     acquireWriteLock();
     try {
+      enrichPipeMetaWithRootUserForCompatibility(plan.getPipeStaticMeta());
       pipeMetaKeeper.addPipeMeta(new PipeMeta(plan.getPipeStaticMeta(), 
plan.getPipeRuntimeMeta()));
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } finally {
@@ -502,6 +512,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
   public TSStatus alterPipe(final AlterPipePlanV2 plan) {
     acquireWriteLock();
     try {
+      enrichPipeMetaWithRootUserForCompatibility(plan.getPipeStaticMeta());
       final PipeTemporaryMeta temporaryMeta =
           
pipeMetaKeeper.getPipeMeta(plan.getPipeStaticMeta().getPipeName()).getTemporaryMeta();
       pipeMetaKeeper.removePipeMeta(plan.getPipeStaticMeta().getPipeName());
@@ -719,6 +730,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
     plan.getPipeMetaList()
         .forEach(
             pipeMeta -> {
+              
enrichPipeMetaWithRootUserForCompatibility(pipeMeta.getStaticMeta());
               pipeMetaKeeper.addPipeMeta(pipeMeta);
               logger.ifPresent(l -> 
l.debug(ConfigNodeMessages.RECORDING_PIPE_META, pipeMeta));
             });
@@ -998,6 +1010,47 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
+  public void enrichPipeMetasWithRootUserForCompatibility() {
+    acquireWriteLock();
+    try {
+      pipeMetaKeeper
+          .getPipeMetaList()
+          .forEach(
+              pipeMeta -> 
enrichPipeMetaWithRootUserForCompatibility(pipeMeta.getStaticMeta()));
+    } finally {
+      releaseWriteLock();
+    }
+  }
+
+  private void enrichPipeMetaWithRootUserForCompatibility(final PipeStaticMeta 
pipeStaticMeta) {
+    if (pipeUserCurrentPasswordProvider == null) {
+      return;
+    }
+    final boolean shouldEnrichSource = 
pipeStaticMeta.mayNeedCompatibleRootUserForIoTDBSource();
+    final boolean shouldEnrichSink = 
pipeStaticMeta.mayNeedCompatibleRootUserForWriteBackSink();
+    if (!shouldEnrichSource && !shouldEnrichSink) {
+      return;
+    }
+
+    final String rootUserName = 
CommonDescriptor.getInstance().getConfig().getDefaultAdminName();
+    final String password = 
pipeUserCurrentPasswordProvider.apply(rootUserName);
+    if (Objects.isNull(password)) {
+      throw new PipeException(
+          String.format(
+              ConfigNodeMessages
+                  
.FAILED_TO_ENRICH_PIPE_WITH_ROOT_USER_FOR_COMPATIBILITY_BECAUSE_ROOT_USER_DOES_NOT_EXIST,
+              pipeStaticMeta.getPipeName(),
+              rootUserName));
+    }
+
+    if (shouldEnrichSource) {
+      pipeStaticMeta.enrichSourceWithRootUserForCompatibility(rootUserName, 
password);
+    }
+    if (shouldEnrichSink) {
+      
pipeStaticMeta.enrichWriteBackSinkWithRootUserForCompatibility(rootUserName, 
password);
+    }
+  }
+
   private void normalizeRecoveredConsensusPipeStatus() {
     final List<String> restartedConsensusPipes = new ArrayList<>();
 
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java
index 762c3bf045c..7b78f59253d 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java
@@ -20,11 +20,15 @@
 package org.apache.iotdb.confignode.persistence.pipe;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
+import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
@@ -88,6 +92,145 @@ public class PipeTaskInfoAutoRestartTest {
     Assert.assertEquals(PipeStatus.STOPPED, runtimeMeta.getStatus().get());
   }
 
+  @Test
+  public void testEnrichOldUserPipeWithRootUserForCompatibility() {
+    final String rootUserName = 
CommonDescriptor.getInstance().getConfig().getDefaultAdminName();
+    final String rootPassword = "root-current-password";
+    pipeTaskInfo = new PipeTaskInfo(username -> rootPassword);
+
+    createPipe("oldPipe", PipeStatus.STOPPED);
+
+    final Map<String, String> sourceAttributes =
+        pipeTaskInfo
+            .getPipeMetaByPipeName("oldPipe")
+            .getStaticMeta()
+            .getSourceParameters()
+            .getAttribute();
+    Assert.assertEquals(
+        String.valueOf(IoTDBConstant.SUPER_USER_ID),
+        sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_USER_ID));
+    Assert.assertEquals(
+        rootUserName, 
sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY));
+    Assert.assertEquals(
+        rootPassword, 
sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY));
+    Assert.assertFalse(
+        pipeTaskInfo
+            .getPipeMetaByPipeName("oldPipe")
+            .getStaticMeta()
+            .getSinkParameters()
+            .getAttribute()
+            .containsKey(PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY));
+  }
+
+  @Test
+  public void testDoNotOverwritePipeWithUserForCompatibility() {
+    pipeTaskInfo = new PipeTaskInfo(username -> "root-current-password");
+
+    createPipeWithSourceAttributes(
+        "newPipe",
+        new HashMap<String, String>() {
+          {
+            put("extractor", "iotdb-source");
+            put(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY, "user");
+            put(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY, "user-password");
+          }
+        });
+
+    final Map<String, String> sourceAttributes =
+        pipeTaskInfo
+            .getPipeMetaByPipeName("newPipe")
+            .getStaticMeta()
+            .getSourceParameters()
+            .getAttribute();
+    Assert.assertEquals("user", 
sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY));
+    Assert.assertEquals(
+        "user-password", 
sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY));
+  }
+
+  @Test
+  public void testDoNotEnrichSystemPipeForCompatibility() {
+    pipeTaskInfo = new PipeTaskInfo(username -> "root-current-password");
+
+    createPipeWithSourceAttributes(
+        PipeStaticMeta.generateSubscriptionPipeName("topic", "group"),
+        new HashMap<String, String>() {
+          {
+            put("extractor", "iotdb-source");
+          }
+        });
+
+    final Map<String, String> sourceAttributes =
+        pipeTaskInfo
+            
.getPipeMetaByPipeName(PipeStaticMeta.generateSubscriptionPipeName("topic", 
"group"))
+            .getStaticMeta()
+            .getSourceParameters()
+            .getAttribute();
+    
Assert.assertFalse(sourceAttributes.containsKey(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY));
+    
Assert.assertFalse(sourceAttributes.containsKey(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY));
+  }
+
+  @Test
+  public void testEnrichOldWriteBackSinkWithRootUserForCompatibility() {
+    final String rootUserName = 
CommonDescriptor.getInstance().getConfig().getDefaultAdminName();
+    final String rootPassword = "root-current-password";
+    pipeTaskInfo = new PipeTaskInfo(username -> rootPassword);
+
+    createPipeWithAttributes(
+        "oldWriteBackPipe",
+        new HashMap<String, String>() {
+          {
+            put("extractor", "iotdb-source");
+            put(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY, "source-user");
+            put(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY, 
"source-password");
+          }
+        },
+        new HashMap<String, String>() {
+          {
+            put("connector", "write-back-sink");
+          }
+        });
+
+    final Map<String, String> sinkAttributes =
+        pipeTaskInfo
+            .getPipeMetaByPipeName("oldWriteBackPipe")
+            .getStaticMeta()
+            .getSinkParameters()
+            .getAttribute();
+    Assert.assertEquals(
+        String.valueOf(IoTDBConstant.SUPER_USER_ID),
+        sinkAttributes.get(PipeSinkConstant.SINK_IOTDB_USER_ID));
+    Assert.assertEquals(rootUserName, 
sinkAttributes.get(PipeSinkConstant.SINK_IOTDB_USERNAME_KEY));
+    Assert.assertEquals(rootPassword, 
sinkAttributes.get(PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY));
+  }
+
+  @Test
+  public void testEnrichLoadedPipeMetasWithRootUserForCompatibility() {
+    final String rootPassword = "root-current-password";
+    pipeTaskInfo = new PipeTaskInfo(username -> rootPassword);
+
+    createPipeWithSourceAttributes(
+        "loadedPipe",
+        new HashMap<String, String>() {
+          {
+            put("extractor", "iotdb-source");
+          }
+        });
+    final Map<String, String> sourceAttributes =
+        pipeTaskInfo
+            .getPipeMetaByPipeName("loadedPipe")
+            .getStaticMeta()
+            .getSourceParameters()
+            .getAttribute();
+    sourceAttributes.remove(PipeSourceConstant.SOURCE_IOTDB_USER_ID);
+    sourceAttributes.remove(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY);
+    sourceAttributes.remove(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY);
+
+    pipeTaskInfo.enrichPipeMetasWithRootUserForCompatibility();
+
+    Assert.assertEquals(
+        rootPassword, 
sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY));
+  }
+
   private Map<Integer, TPushPipeMetaResp> createErrorRespMap(final String 
pipeName) {
     final TPushPipeMetaRespExceptionMessage exceptionMessage =
         new TPushPipeMetaRespExceptionMessage(
@@ -101,11 +244,27 @@ public class PipeTaskInfoAutoRestartTest {
 
   private void createPipe(final String pipeName, final PipeStatus 
initialStatus) {
     final Map<String, String> extractorAttributes = new HashMap<>();
-    final Map<String, String> processorAttributes = new HashMap<>();
-    final Map<String, String> connectorAttributes = new HashMap<>();
     extractorAttributes.put("extractor", "iotdb-source");
-    processorAttributes.put("processor", "do-nothing-processor");
+    createPipeWithSourceAttributes(pipeName, extractorAttributes);
+
+    if (PipeStatus.RUNNING.equals(initialStatus)) {
+      pipeTaskInfo.setPipeStatus(new SetPipeStatusPlanV2(pipeName, 
PipeStatus.RUNNING));
+    }
+  }
+
+  private void createPipeWithSourceAttributes(
+      final String pipeName, final Map<String, String> extractorAttributes) {
+    final Map<String, String> connectorAttributes = new HashMap<>();
     connectorAttributes.put("connector", "iotdb-thrift-sink");
+    createPipeWithAttributes(pipeName, extractorAttributes, 
connectorAttributes);
+  }
+
+  private void createPipeWithAttributes(
+      final String pipeName,
+      final Map<String, String> extractorAttributes,
+      final Map<String, String> connectorAttributes) {
+    final Map<String, String> processorAttributes = new HashMap<>();
+    processorAttributes.put("processor", "do-nothing-processor");
 
     final PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, DATA_NODE_ID);
     final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new 
ConcurrentHashMap<>();
@@ -120,9 +279,5 @@ public class PipeTaskInfoAutoRestartTest {
             connectorAttributes);
     final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks);
     pipeTaskInfo.createPipe(new CreatePipePlanV2(pipeStaticMeta, 
pipeRuntimeMeta));
-
-    if (PipeStatus.RUNNING.equals(initialStatus)) {
-      pipeTaskInfo.setPipeStatus(new SetPipeStatusPlanV2(pipeName, 
PipeStatus.RUNNING));
-    }
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java
index 4b57cd64e43..9855552113c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java
@@ -19,7 +19,9 @@
 
 package org.apache.iotdb.commons.pipe.agent.task.meta;
 
+import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
 import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility;
 import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils;
@@ -97,6 +99,64 @@ public class PipeStaticMeta {
             .toLowerCase());
   }
 
+  public boolean mayNeedCompatibleRootUserForIoTDBSource() {
+    final String pluginName =
+        sourceParameters
+            .getStringOrDefault(
+                Arrays.asList(PipeSourceConstant.EXTRACTOR_KEY, 
PipeSourceConstant.SOURCE_KEY),
+                BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+            .toLowerCase();
+
+    return PipeType.USER.equals(getPipeType())
+        && 
(pluginName.equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+            || 
pluginName.equals(BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName()))
+        && !sourceParameters.hasAnyAttributes(
+            PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY,
+            PipeSourceConstant.SOURCE_IOTDB_USER_KEY,
+            PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY,
+            PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY,
+            PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY,
+            PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY);
+  }
+
+  public boolean mayNeedCompatibleRootUserForWriteBackSink() {
+    final String pluginName =
+        sinkParameters
+            .getStringOrDefault(
+                Arrays.asList(PipeSinkConstant.CONNECTOR_KEY, 
PipeSinkConstant.SINK_KEY),
+                BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName())
+            .toLowerCase();
+
+    return PipeType.USER.equals(getPipeType())
+        && 
(pluginName.equals(BuiltinPipePlugin.WRITE_BACK_CONNECTOR.getPipePluginName())
+            || 
pluginName.equals(BuiltinPipePlugin.WRITE_BACK_SINK.getPipePluginName()))
+        && !sinkParameters.hasAnyAttributes(
+            PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY,
+            PipeSinkConstant.SINK_IOTDB_USER_KEY,
+            PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY,
+            PipeSinkConstant.SINK_IOTDB_USERNAME_KEY,
+            PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY,
+            PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY);
+  }
+
+  public void enrichSourceWithRootUserForCompatibility(
+      final String rootUserName, final String password) {
+    sourceParameters
+        .getAttribute()
+        .put(PipeSourceConstant.SOURCE_IOTDB_USER_ID, 
String.valueOf(IoTDBConstant.SUPER_USER_ID));
+    
sourceParameters.getAttribute().put(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY,
 rootUserName);
+    
sourceParameters.getAttribute().put(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY,
 password);
+  }
+
+  public void enrichWriteBackSinkWithRootUserForCompatibility(
+      final String rootUserName, final String password) {
+    sinkParameters
+        .getAttribute()
+        .put(PipeSinkConstant.SINK_IOTDB_USER_ID, 
String.valueOf(IoTDBConstant.SUPER_USER_ID));
+    
sinkParameters.getAttribute().put(PipeSinkConstant.SINK_IOTDB_USERNAME_KEY, 
rootUserName);
+    
sinkParameters.getAttribute().put(PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY, 
password);
+  }
+
   public ByteBuffer serialize() throws IOException {
     PublicBAOS byteArrayOutputStream = new PublicBAOS();
     DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);

Reply via email to