This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b05c257615 Subscription: support encrypted password auth in consumer 
builder (#17480)
4b05c257615 is described below

commit 4b05c257615acdbb77cf559aabb128135f0e2240
Author: VGalaxies <[email protected]>
AuthorDate: Mon Apr 27 10:32:23 2026 +0800

    Subscription: support encrypted password auth in consumer builder (#17480)
    
    * feat(subscription): support encrypted password auth
    
    * test(confignode): assert canonical source password key
    
    * test(subscription): use strong passwords in encrypted pull IT
    
    * feat(subscription): reject setting both password and encryptedPassword
    
    Make `password` and `encryptedPassword` mutually exclusive on the
    subscription consumer builder and enable the encrypted-password IT.
---
 .../IoTDBEncryptedPasswordPullConsumerIT.java      | 171 +++++++++++++++++++++
 .../rpc/subscription/config/ConsumerConfig.java    |   4 +
 .../rpc/subscription/config/ConsumerConstant.java  |   1 +
 .../base/AbstractSubscriptionConsumer.java         |   5 +
 .../base/AbstractSubscriptionConsumerBuilder.java  |  18 +++
 .../base/AbstractSubscriptionProvider.java         |   6 +
 .../AbstractSubscriptionPullConsumerBuilder.java   |   6 +
 .../AbstractSubscriptionPushConsumerBuilder.java   |   6 +
 .../consumer/table/SubscriptionTableProvider.java  |   2 +
 .../table/SubscriptionTablePullConsumer.java       |   2 +
 .../SubscriptionTablePullConsumerBuilder.java      |   6 +
 .../table/SubscriptionTablePushConsumer.java       |   2 +
 .../SubscriptionTablePushConsumerBuilder.java      |   6 +
 .../consumer/tree/SubscriptionTreeProvider.java    |   2 +
 .../tree/SubscriptionTreePullConsumer.java         |   9 ++
 .../tree/SubscriptionTreePullConsumerBuilder.java  |   6 +
 .../tree/SubscriptionTreePushConsumer.java         |   9 ++
 .../tree/SubscriptionTreePushConsumerBuilder.java  |   6 +
 .../impl/pipe/task/CreatePipeProcedureV2.java      |  37 +++--
 .../subscription/CreateSubscriptionProcedure.java  |   3 +-
 .../impl/pipe/task/CreatePipeProcedureV2Test.java  |  61 ++++++++
 .../subscription/meta/consumer/ConsumerMeta.java   |   8 +
 .../commons/subscription/meta/topic/TopicMeta.java |   8 +
 .../commons/subscription/topic/TopicDeSerTest.java |  11 ++
 24 files changed, 382 insertions(+), 13 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/param/IoTDBEncryptedPasswordPullConsumerIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/param/IoTDBEncryptedPasswordPullConsumerIT.java
new file mode 100644
index 00000000000..8f0eceeafab
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/param/IoTDBEncryptedPasswordPullConsumerIT.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.it.triple.treemodel.regression.param;
+
+import org.apache.iotdb.commons.utils.AuthUtils;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import 
org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionTreeRegressionMisc;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import 
org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer;
+import 
org.apache.iotdb.subscription.it.triple.treemodel.regression.AbstractSubscriptionTreeRegressionIT;
+
+import org.apache.thrift.TException;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2SubscriptionTreeRegressionMisc.class})
+public class IoTDBEncryptedPasswordPullConsumerIT extends 
AbstractSubscriptionTreeRegressionIT {
+
+  private static final String DATABASE = 
"root.TestEncryptedPasswordPullConsumer";
+  private static final String DEVICE = DATABASE + ".d_0";
+  private static final String TOPIC_NAME = 
"TestEncryptedPasswordPullConsumerTopic";
+  private static final String USERNAME = "encrypted_user";
+  private static final String PASSWORD = "EncryptedUser@123";
+  private static final String ENCRYPTED_PASSWORD = 
AuthUtils.encryptPassword(PASSWORD);
+  private static final String WRONG_ENCRYPTED_PASSWORD =
+      AuthUtils.encryptPassword("WrongEncryptedUser@123");
+
+  private static final List<IMeasurementSchema> SCHEMA_LIST = new 
ArrayList<>();
+
+  static {
+    SCHEMA_LIST.add(new MeasurementSchema("s_0", TSDataType.INT64));
+    SCHEMA_LIST.add(new MeasurementSchema("s_1", TSDataType.DOUBLE));
+  }
+
+  private SubscriptionTreePullConsumer consumer;
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    createDB(DATABASE);
+    createTopic_s(TOPIC_NAME, "root.**", null, null, false);
+    session_src.createTimeseries(
+        DEVICE + ".s_0", TSDataType.INT64, TSEncoding.GORILLA, 
CompressionType.LZ4);
+    session_src.createTimeseries(
+        DEVICE + ".s_1", TSDataType.DOUBLE, TSEncoding.TS_2DIFF, 
CompressionType.LZ4);
+    session_dest.createTimeseries(
+        DEVICE + ".s_0", TSDataType.INT64, TSEncoding.GORILLA, 
CompressionType.LZ4);
+    session_dest.createTimeseries(
+        DEVICE + ".s_1", TSDataType.DOUBLE, TSEncoding.TS_2DIFF, 
CompressionType.LZ4);
+    session_src.executeNonQueryStatement("create user " + USERNAME + " '" + 
PASSWORD + "'");
+    session_src.executeNonQueryStatement("grant read,write on root.** to user 
" + USERNAME);
+    assertTrue(subs.getTopic(TOPIC_NAME).isPresent());
+  }
+
+  @Override
+  @After
+  public void tearDown() throws Exception {
+    try {
+      if (consumer != null) {
+        consumer.close();
+      }
+    } catch (final Exception ignored) {
+    }
+    try {
+      subs.dropTopic(TOPIC_NAME);
+    } catch (final Exception ignored) {
+    }
+    try {
+      session_src.executeNonQueryStatement("drop user " + USERNAME);
+    } catch (final Exception ignored) {
+    }
+    dropDB(DATABASE);
+    super.tearDown();
+  }
+
+  @Test
+  public void testSubscribeWithEncryptedPassword()
+      throws TException,
+          IoTDBConnectionException,
+          IOException,
+          StatementExecutionException,
+          InterruptedException {
+    consumer = createConsumer("encrypted-password-group", ENCRYPTED_PASSWORD);
+
+    consumer.open();
+    consumer.subscribe(TOPIC_NAME);
+    assertEquals(1, subs.getSubscriptions().size(), "subscribe with encrypted 
password");
+
+    insertData(1706659200000L);
+    consume_data(consumer, session_dest);
+    check_count(
+        4,
+        "select count(s_0) from " + DEVICE + " where time >= 1706659200000",
+        "encrypted password consumption");
+  }
+
+  @Test
+  public void testSubscribeFailsWithWrongEncryptedPassword()
+      throws IoTDBConnectionException, StatementExecutionException {
+    consumer = createConsumer("wrong-encrypted-password-group", 
WRONG_ENCRYPTED_PASSWORD);
+
+    try {
+      consumer.open();
+      consumer.subscribe(TOPIC_NAME);
+      fail("subscribe should fail when encrypted password mismatches");
+    } catch (final Exception ignored) {
+      assertTrue(subs.getSubscriptions().isEmpty());
+    }
+  }
+
+  private SubscriptionTreePullConsumer createConsumer(
+      final String consumerGroupId, final String encryptedPassword) {
+    return new SubscriptionTreePullConsumer.Builder()
+        .host(SRC_HOST)
+        .port(SRC_PORT)
+        .username(USERNAME)
+        .encryptedPassword(encryptedPassword)
+        .consumerId("consumer_" + consumerGroupId)
+        .consumerGroupId(consumerGroupId)
+        .buildPullConsumer();
+  }
+
+  private void insertData(long timestamp)
+      throws IoTDBConnectionException, StatementExecutionException {
+    final Tablet tablet = new Tablet(DEVICE, SCHEMA_LIST, 10);
+    for (int row = 0; row < 5; row++) {
+      final int rowIndex = tablet.getRowSize();
+      tablet.addTimestamp(rowIndex, timestamp);
+      tablet.addValue("s_0", rowIndex, row * 20L + row);
+      tablet.addValue("s_1", rowIndex, row + 2.45);
+      timestamp += row * 2000;
+    }
+    session_src.insertTablet(tablet);
+  }
+}
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
index 0b5c5a55477..3cb0087d682 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
@@ -76,6 +76,10 @@ public class ConsumerConfig extends PipeParameters {
     return getString(ConsumerConstant.PASSWORD_KEY);
   }
 
+  public String getEncryptedPassword() {
+    return getString(ConsumerConstant.ENCRYPTED_PASSWORD_KEY);
+  }
+
   public String getSqlDialect() {
     return getString(ConsumerConstant.SQL_DIALECT_KEY);
   }
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
index a0e6f9ed228..9c52c8dd7da 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
@@ -34,6 +34,7 @@ public class ConsumerConstant {
 
   public static final String USERNAME_KEY = "username";
   public static final String PASSWORD_KEY = "password";
+  public static final String ENCRYPTED_PASSWORD_KEY = "encrypted-password";
 
   public static final String CONSUMER_ID_KEY = "consumer-id";
   public static final String CONSUMER_GROUP_ID_KEY = "group-id";
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
index e9fbb1672e5..62c8d20e389 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
@@ -100,6 +100,7 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
 
   private final String username;
   private final String password;
+  private final String encryptedPassword;
 
   protected String consumerId;
   protected String consumerGroupId;
@@ -177,6 +178,7 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
 
     this.username = builder.username;
     this.password = builder.password;
+    this.encryptedPassword = builder.encryptedPassword;
 
     this.consumerId = builder.consumerId;
     this.consumerGroupId = builder.consumerGroupId;
@@ -207,6 +209,7 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
                 (String)
                     properties.getOrDefault(
                         ConsumerConstant.PASSWORD_KEY, 
SessionConfig.DEFAULT_PASSWORD))
+            .encryptedPassword((String) 
properties.get(ConsumerConstant.ENCRYPTED_PASSWORD_KEY))
             .consumerId((String) 
properties.get(ConsumerConstant.CONSUMER_ID_KEY))
             .consumerGroupId((String) 
properties.get(ConsumerConstant.CONSUMER_GROUP_ID_KEY))
             .heartbeatIntervalMs(
@@ -387,6 +390,7 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
       final TEndPoint endPoint,
       final String username,
       final String password,
+      final String encryptedPassword,
       final String consumerId,
       final String consumerGroupId,
       final int thriftMaxFrameSize,
@@ -400,6 +404,7 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
             endPoint,
             this.username,
             this.password,
+            this.encryptedPassword,
             this.consumerId,
             this.consumerGroupId,
             this.thriftMaxFrameSize,
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java
index 81bfb6241c9..991d237ed2b 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java
@@ -36,6 +36,7 @@ public class AbstractSubscriptionConsumerBuilder {
 
   protected String username = SessionConfig.DEFAULT_USER;
   protected String password = SessionConfig.DEFAULT_PASSWORD;
+  protected String encryptedPassword;
 
   protected String consumerId;
   protected String consumerGroupId;
@@ -72,10 +73,27 @@ public class AbstractSubscriptionConsumerBuilder {
   }
 
   public AbstractSubscriptionConsumerBuilder password(final String password) {
+    if (!Objects.equals(password, SessionConfig.DEFAULT_PASSWORD)
+        && Objects.nonNull(this.encryptedPassword)) {
+      throw new IllegalStateException(
+          "password and encryptedPassword are mutually exclusive; 
encryptedPassword is already set");
+    }
     this.password = password;
     return this;
   }
 
+  public AbstractSubscriptionConsumerBuilder encryptedPassword(final String 
encryptedPassword) {
+    if (Objects.isNull(encryptedPassword)) {
+      return this;
+    }
+    if (!Objects.equals(this.password, SessionConfig.DEFAULT_PASSWORD)) {
+      throw new IllegalStateException(
+          "password and encryptedPassword are mutually exclusive; password is 
already set");
+    }
+    this.encryptedPassword = encryptedPassword;
+    return this;
+  }
+
   public AbstractSubscriptionConsumerBuilder consumerId(@Nullable final String 
consumerId) {
     if (Objects.isNull(consumerId)) {
       return this;
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
index 7f3582d195d..413c609abbf 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
@@ -90,6 +90,7 @@ public abstract class AbstractSubscriptionProvider {
 
   private final String username;
   private final String password;
+  private final String encryptedPassword;
   private final long heartbeatIntervalMs;
   private final int connectionTimeoutInMs;
 
@@ -105,6 +106,7 @@ public abstract class AbstractSubscriptionProvider {
       final TEndPoint endPoint,
       final String username,
       final String password,
+      final String encryptedPassword,
       final String consumerId,
       final String consumerGroupId,
       final int thriftMaxFrameSize,
@@ -124,6 +126,7 @@ public abstract class AbstractSubscriptionProvider {
     this.consumerGroupId = consumerGroupId;
     this.username = username;
     this.password = password;
+    this.encryptedPassword = encryptedPassword;
     this.heartbeatIntervalMs = heartbeatIntervalMs;
     this.connectionTimeoutInMs = connectionTimeoutInMs;
   }
@@ -175,6 +178,9 @@ public abstract class AbstractSubscriptionProvider {
     consumerAttributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
     consumerAttributes.put(ConsumerConstant.USERNAME_KEY, username);
     consumerAttributes.put(ConsumerConstant.PASSWORD_KEY, password);
+    if (encryptedPassword != null) {
+      consumerAttributes.put(ConsumerConstant.ENCRYPTED_PASSWORD_KEY, 
encryptedPassword);
+    }
     consumerAttributes.put(ConsumerConstant.SQL_DIALECT_KEY, 
session.getSqlDialect());
     consumerAttributes.put(
         ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY, 
String.valueOf(heartbeatIntervalMs));
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumerBuilder.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumerBuilder.java
index 7083a7dc4af..2fac3d500eb 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumerBuilder.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumerBuilder.java
@@ -58,6 +58,12 @@ public class AbstractSubscriptionPullConsumerBuilder extends 
AbstractSubscriptio
     return this;
   }
 
+  @Override
+  public AbstractSubscriptionPullConsumerBuilder encryptedPassword(final 
String encryptedPassword) {
+    super.encryptedPassword(encryptedPassword);
+    return this;
+  }
+
   @Override
   public AbstractSubscriptionPullConsumerBuilder consumerId(final String 
consumerId) {
     super.consumerId(consumerId);
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumerBuilder.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumerBuilder.java
index f013b98dd19..bcd33812c8e 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumerBuilder.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumerBuilder.java
@@ -64,6 +64,12 @@ public class AbstractSubscriptionPushConsumerBuilder extends 
AbstractSubscriptio
     return this;
   }
 
+  @Override
+  public AbstractSubscriptionPushConsumerBuilder encryptedPassword(final 
String encryptedPassword) {
+    super.encryptedPassword(encryptedPassword);
+    return this;
+  }
+
   @Override
   public AbstractSubscriptionPushConsumerBuilder consumerId(final String 
consumerId) {
     super.consumerId(consumerId);
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java
index 1b90866db9e..84470d283c2 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java
@@ -30,6 +30,7 @@ final class SubscriptionTableProvider extends 
AbstractSubscriptionProvider {
       final TEndPoint endPoint,
       final String username,
       final String password,
+      final String encryptedPassword,
       final String consumerId,
       final String consumerGroupId,
       final int thriftMaxFrameSize,
@@ -39,6 +40,7 @@ final class SubscriptionTableProvider extends 
AbstractSubscriptionProvider {
         endPoint,
         username,
         password,
+        encryptedPassword,
         consumerId,
         consumerGroupId,
         thriftMaxFrameSize,
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java
index 83dd39aebbf..8f712782fb5 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java
@@ -42,6 +42,7 @@ public class SubscriptionTablePullConsumer extends 
AbstractSubscriptionPullConsu
       final TEndPoint endPoint,
       final String username,
       final String password,
+      final String encryptedPassword,
       final String consumerId,
       final String consumerGroupId,
       final int thriftMaxFrameSize,
@@ -51,6 +52,7 @@ public class SubscriptionTablePullConsumer extends 
AbstractSubscriptionPullConsu
         endPoint,
         username,
         password,
+        encryptedPassword,
         consumerId,
         consumerGroupId,
         thriftMaxFrameSize,
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java
index 6d8437ac95f..939228a7f49 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java
@@ -56,6 +56,12 @@ public class SubscriptionTablePullConsumerBuilder extends 
AbstractSubscriptionPu
     return this;
   }
 
+  @Override
+  public SubscriptionTablePullConsumerBuilder encryptedPassword(final String 
encryptedPassword) {
+    super.encryptedPassword(encryptedPassword);
+    return this;
+  }
+
   @Override
   public SubscriptionTablePullConsumerBuilder consumerId(final String 
consumerId) {
     super.consumerId(consumerId);
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java
index ac44e421dac..e90afc1d8d1 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java
@@ -38,6 +38,7 @@ public class SubscriptionTablePushConsumer extends 
AbstractSubscriptionPushConsu
       final TEndPoint endPoint,
       final String username,
       final String password,
+      final String encryptedPassword,
       final String consumerId,
       final String consumerGroupId,
       final int thriftMaxFrameSize,
@@ -47,6 +48,7 @@ public class SubscriptionTablePushConsumer extends 
AbstractSubscriptionPushConsu
         endPoint,
         username,
         password,
+        encryptedPassword,
         consumerId,
         consumerGroupId,
         thriftMaxFrameSize,
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java
index c372c586db3..27bf328fea9 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java
@@ -58,6 +58,12 @@ public class SubscriptionTablePushConsumerBuilder extends 
AbstractSubscriptionPu
     return this;
   }
 
+  @Override
+  public SubscriptionTablePushConsumerBuilder encryptedPassword(final String 
encryptedPassword) {
+    super.encryptedPassword(encryptedPassword);
+    return this;
+  }
+
   @Override
   public SubscriptionTablePushConsumerBuilder consumerId(final String 
consumerId) {
     super.consumerId(consumerId);
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java
index c79b64e8c84..3589fbbcf74 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java
@@ -30,6 +30,7 @@ final class SubscriptionTreeProvider extends 
AbstractSubscriptionProvider {
       final TEndPoint endPoint,
       final String username,
       final String password,
+      final String encryptedPassword,
       final String consumerId,
       final String consumerGroupId,
       final int thriftMaxFrameSize,
@@ -39,6 +40,7 @@ final class SubscriptionTreeProvider extends 
AbstractSubscriptionProvider {
         endPoint,
         username,
         password,
+        encryptedPassword,
         consumerId,
         consumerGroupId,
         thriftMaxFrameSize,
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java
index 23050893f66..7225036aaa4 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java
@@ -49,6 +49,7 @@ public class SubscriptionTreePullConsumer extends 
AbstractSubscriptionPullConsum
       final TEndPoint endPoint,
       final String username,
       final String password,
+      final String encryptedPassword,
       final String consumerId,
       final String consumerGroupId,
       final int thriftMaxFrameSize,
@@ -58,6 +59,7 @@ public class SubscriptionTreePullConsumer extends 
AbstractSubscriptionPullConsum
         endPoint,
         username,
         password,
+        encryptedPassword,
         consumerId,
         consumerGroupId,
         thriftMaxFrameSize,
@@ -80,6 +82,7 @@ public class SubscriptionTreePullConsumer extends 
AbstractSubscriptionPullConsum
             .nodeUrls(builder.nodeUrls)
             .username(builder.username)
             .password(builder.password)
+            .encryptedPassword(builder.encryptedPassword)
             .consumerId(builder.consumerId)
             .consumerGroupId(builder.consumerGroupId)
             .heartbeatIntervalMs(builder.heartbeatIntervalMs)
@@ -231,6 +234,7 @@ public class SubscriptionTreePullConsumer extends 
AbstractSubscriptionPullConsum
 
     private String username = SessionConfig.DEFAULT_USER;
     private String password = SessionConfig.DEFAULT_PASSWORD;
+    private String encryptedPassword;
 
     private String consumerId;
     private String consumerGroupId;
@@ -274,6 +278,11 @@ public class SubscriptionTreePullConsumer extends 
AbstractSubscriptionPullConsum
       return this;
     }
 
+    public Builder encryptedPassword(final String encryptedPassword) {
+      this.encryptedPassword = encryptedPassword;
+      return this;
+    }
+
     public Builder consumerId(@Nullable final String consumerId) {
       if (Objects.isNull(consumerId)) {
         return this;
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java
index 8623a492087..cbceb95d77f 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java
@@ -56,6 +56,12 @@ public class SubscriptionTreePullConsumerBuilder extends 
AbstractSubscriptionPul
     return this;
   }
 
+  @Override
+  public SubscriptionTreePullConsumerBuilder encryptedPassword(final String 
encryptedPassword) {
+    super.encryptedPassword(encryptedPassword);
+    return this;
+  }
+
   @Override
   public SubscriptionTreePullConsumerBuilder consumerId(final String 
consumerId) {
     super.consumerId(consumerId);
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java
index d56e89d47c8..4d8a5ef3e16 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java
@@ -48,6 +48,7 @@ public class SubscriptionTreePushConsumer extends 
AbstractSubscriptionPushConsum
       final TEndPoint endPoint,
       final String username,
       final String password,
+      final String encryptedPassword,
       final String consumerId,
       final String consumerGroupId,
       final int thriftMaxFrameSize,
@@ -57,6 +58,7 @@ public class SubscriptionTreePushConsumer extends 
AbstractSubscriptionPushConsum
         endPoint,
         username,
         password,
+        encryptedPassword,
         consumerId,
         consumerGroupId,
         thriftMaxFrameSize,
@@ -79,6 +81,7 @@ public class SubscriptionTreePushConsumer extends 
AbstractSubscriptionPushConsum
             .nodeUrls(builder.nodeUrls)
             .username(builder.username)
             .password(builder.password)
+            .encryptedPassword(builder.encryptedPassword)
             .consumerId(builder.consumerId)
             .consumerGroupId(builder.consumerGroupId)
             .heartbeatIntervalMs(builder.heartbeatIntervalMs)
@@ -185,6 +188,7 @@ public class SubscriptionTreePushConsumer extends 
AbstractSubscriptionPushConsum
 
     private String username = SessionConfig.DEFAULT_USER;
     private String password = SessionConfig.DEFAULT_PASSWORD;
+    private String encryptedPassword;
 
     private String consumerId;
     private String consumerGroupId;
@@ -231,6 +235,11 @@ public class SubscriptionTreePushConsumer extends 
AbstractSubscriptionPushConsum
       return this;
     }
 
+    public Builder encryptedPassword(final String encryptedPassword) {
+      this.encryptedPassword = encryptedPassword;
+      return this;
+    }
+
     public Builder consumerId(@Nullable final String consumerId) {
       if (Objects.isNull(consumerId)) {
         return this;
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java
index dd0cb017637..86594433e77 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java
@@ -58,6 +58,12 @@ public class SubscriptionTreePushConsumerBuilder extends 
AbstractSubscriptionPus
     return this;
   }
 
+  @Override
+  public SubscriptionTreePushConsumerBuilder encryptedPassword(final String 
encryptedPassword) {
+    super.encryptedPassword(encryptedPassword);
+    return this;
+  }
+
   @Override
   public SubscriptionTreePushConsumerBuilder consumerId(final String 
consumerId) {
     super.consumerId(consumerId);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 867d20e078a..98f6756db2d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -46,6 +46,7 @@ import 
org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
 import 
org.apache.iotdb.confignode.procedure.impl.pipe.util.PipeExternalSourceLoadBalancer;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.pipe.api.PipePlugin;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
@@ -178,18 +179,30 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
         || 
sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY)
         || 
sourceParameters.hasAttribute(PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY)
         || 
sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)) {
-      final String hashedPassword =
-          env.getConfigManager()
-              .getPermissionManager()
-              .login4Pipe(
-                  sourceParameters.getStringByKeys(
-                      PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY,
-                      PipeSourceConstant.SOURCE_IOTDB_USER_KEY,
-                      PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY,
-                      PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY),
-                  sourceParameters.getStringByKeys(
-                      PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY,
-                      PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY));
+      final String username =
+          sourceParameters.getStringByKeys(
+              PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY,
+              PipeSourceConstant.SOURCE_IOTDB_USER_KEY,
+              PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY,
+              PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY);
+      final String password =
+          sourceParameters.getStringByKeys(
+              PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY,
+              PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY);
+      String hashedPassword = null;
+      if (Objects.nonNull(password)) {
+        final TPermissionInfoResp loginResp =
+            env.getConfigManager().getPermissionManager().login(username, 
password, true);
+        if (Objects.nonNull(loginResp)
+            && Objects.nonNull(loginResp.getStatus())
+            && loginResp.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          hashedPassword = password;
+        }
+      }
+      if (Objects.isNull(hashedPassword)) {
+        hashedPassword =
+            env.getConfigManager().getPermissionManager().login4Pipe(username, 
password);
+      }
       if (Objects.isNull(hashedPassword)) {
         throw new PipeException("Authentication failed.");
       }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
index cb5edd8cd91..867f04d02ea 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
@@ -117,7 +117,8 @@ public class CreateSubscriptionProcedure extends 
AbstractOperateSubscriptionAndP
                 new TCreatePipeReq()
                     .setPipeName(pipeName)
                     .setExtractorAttributes(
-                        
topicMeta.generateExtractorAttributes(consumerMeta.getUsername()))
+                        topicMeta.generateExtractorAttributes(
+                            consumerMeta.getUsername(), 
consumerMeta.getSubscriptionAuthPassword()))
                     
.setProcessorAttributes(topicMeta.generateProcessorAttributes())
                     
.setConnectorAttributes(topicMeta.generateConnectorAttributes(consumerGroupId)),
                 pipeTaskInfo));
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java
index 6c095de9852..ac6ca8c740c 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java
@@ -19,11 +19,19 @@
 
 package org.apache.iotdb.confignode.procedure.impl.pipe.task;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.PermissionManager;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.tsfile.utils.PublicBAOS;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.DataOutputStream;
 import java.nio.ByteBuffer;
@@ -68,4 +76,57 @@ public class CreatePipeProcedureV2Test {
       fail();
     }
   }
+
+  @Test
+  public void testCheckAndEnrichSourceAuthenticationWithEncryptedPassword() {
+    final ConfigNodeProcedureEnv env = 
Mockito.mock(ConfigNodeProcedureEnv.class);
+    final ConfigManager configManager = Mockito.mock(ConfigManager.class);
+    final PermissionManager permissionManager = 
Mockito.mock(PermissionManager.class);
+    Mockito.when(env.getConfigManager()).thenReturn(configManager);
+    
Mockito.when(configManager.getPermissionManager()).thenReturn(permissionManager);
+
+    final TPermissionInfoResp loginResp = new TPermissionInfoResp();
+    loginResp.setStatus(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+    Mockito.when(permissionManager.login("user", "encrypted-password", 
true)).thenReturn(loginResp);
+
+    final Map<String, String> sourceAttributes = new HashMap<>();
+    sourceAttributes.put("extractor", "iotdb-source");
+    sourceAttributes.put("username", "user");
+    sourceAttributes.put("password", "encrypted-password");
+
+    CreatePipeProcedureV2.checkAndEnrichSourceAuthentication(env, 
sourceAttributes);
+
+    assertEquals(
+        "encrypted-password", 
sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY));
+    Mockito.verify(permissionManager).login("user", "encrypted-password", 
true);
+    Mockito.verify(permissionManager, Mockito.never())
+        .login4Pipe(Mockito.anyString(), Mockito.any());
+  }
+
+  @Test
+  public void testCheckAndEnrichSourceAuthenticationFallsBackToRawPassword() {
+    final ConfigNodeProcedureEnv env = 
Mockito.mock(ConfigNodeProcedureEnv.class);
+    final ConfigManager configManager = Mockito.mock(ConfigManager.class);
+    final PermissionManager permissionManager = 
Mockito.mock(PermissionManager.class);
+    Mockito.when(env.getConfigManager()).thenReturn(configManager);
+    
Mockito.when(configManager.getPermissionManager()).thenReturn(permissionManager);
+
+    final TPermissionInfoResp loginResp = new TPermissionInfoResp();
+    loginResp.setStatus(new 
TSStatus(TSStatusCode.WRONG_LOGIN_PASSWORD.getStatusCode()));
+    Mockito.when(permissionManager.login("user", "raw-password", 
true)).thenReturn(loginResp);
+    Mockito.when(permissionManager.login4Pipe("user", "raw-password"))
+        .thenReturn("hashed-password");
+
+    final Map<String, String> sourceAttributes = new HashMap<>();
+    sourceAttributes.put(PipeSourceConstant.SOURCE_KEY, "iotdb-source");
+    sourceAttributes.put(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY, "user");
+    sourceAttributes.put(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY, 
"raw-password");
+
+    CreatePipeProcedureV2.checkAndEnrichSourceAuthentication(env, 
sourceAttributes);
+
+    assertEquals(
+        "hashed-password", 
sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY));
+    Mockito.verify(permissionManager).login("user", "raw-password", true);
+    Mockito.verify(permissionManager).login4Pipe("user", "raw-password");
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
index 152f0b111df..75cca9ccbcd 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
@@ -66,6 +66,14 @@ public class ConsumerMeta {
     return config.getPassword();
   }
 
+  public String getEncryptedPassword() {
+    return config.getEncryptedPassword();
+  }
+
+  public String getSubscriptionAuthPassword() {
+    return Objects.nonNull(getEncryptedPassword()) ? getEncryptedPassword() : 
getPassword();
+  }
+
   public ByteBuffer serialize() throws IOException {
     PublicBAOS byteArrayOutputStream = new PublicBAOS();
     DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
index d3836743578..badb77d6f48 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
@@ -183,6 +183,11 @@ public class TopicMeta {
   /////////////////////////////// utilities ///////////////////////////////
 
   public Map<String, String> generateExtractorAttributes(final String 
username) {
+    return generateExtractorAttributes(username, null);
+  }
+
+  public Map<String, String> generateExtractorAttributes(
+      final String username, final String password) {
     final Map<String, String> extractorAttributes = new HashMap<>();
     // disable meta sync
     extractorAttributes.put("source", "iotdb-source");
@@ -190,6 +195,9 @@ public class TopicMeta {
     extractorAttributes.put("inclusion.exclusion", "data.delete");
     // user
     extractorAttributes.put("username", username);
+    if (Objects.nonNull(password)) {
+      extractorAttributes.put("password", password);
+    }
     // TODO: currently set skipif to no-privileges
     extractorAttributes.put("skipif", "no-privileges");
     // sql dialect
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java
index 4973f06d27c..d9c280e1493 100644
--- 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java
@@ -54,4 +54,15 @@ public class TopicDeSerTest {
     Assert.assertEquals(
         topicMeta.getSubscribedConsumerGroupIds(), 
topicMeta2.getSubscribedConsumerGroupIds());
   }
+
+  @Test
+  public void testGenerateExtractorAttributesWithEncryptedPassword() {
+    final TopicMeta topicMeta = new TopicMeta("test_topic", 1, new 
HashMap<>());
+
+    final Map<String, String> extractorAttributes =
+        topicMeta.generateExtractorAttributes("test_user", 
"encrypted-password");
+
+    Assert.assertEquals("test_user", extractorAttributes.get("username"));
+    Assert.assertEquals("encrypted-password", 
extractorAttributes.get("password"));
+  }
 }


Reply via email to