This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 754a1502bac [To dev/1.3] Subscription: support encrypted password auth 
in consumer builder (#17552)
754a1502bac is described below

commit 754a1502bacd2178088c9fca2fc191215cf953ba
Author: VGalaxies <[email protected]>
AuthorDate: Wed Apr 29 10:49:40 2026 +0800

    [To dev/1.3] Subscription: support encrypted password auth in consumer 
builder (#17552)
---
 .../IoTDBEncryptedPasswordPullConsumerIT.java      | 168 +++++++++++++++++++++
 .../rpc/subscription/config/ConsumerConfig.java    |  12 ++
 .../rpc/subscription/config/ConsumerConstant.java  |   1 +
 .../consumer/SubscriptionConsumer.java             |  22 +++
 .../consumer/SubscriptionProvider.java             |  12 ++
 .../consumer/SubscriptionPullConsumer.java         |   6 +
 .../consumer/SubscriptionPushConsumer.java         |   6 +
 .../confignode/manager/PermissionManager.java      |   9 ++
 .../iotdb/confignode/persistence/AuthorInfo.java   |  52 +++++++
 .../impl/pipe/task/CreatePipeProcedureV2.java      |  67 ++++++++
 .../subscription/CreateSubscriptionProcedure.java  |   9 +-
 .../impl/pipe/task/CreatePipeProcedureV2Test.java  |  61 ++++++++
 .../pipe/config/constant/PipeSourceConstant.java   |   7 +
 .../meta/consumer/ConsumerGroupMeta.java           |   4 +
 .../subscription/meta/consumer/ConsumerMeta.java   |  20 +++
 .../commons/subscription/meta/topic/TopicMeta.java |  11 ++
 .../commons/subscription/topic/TopicDeSerTest.java |  11 ++
 17 files changed, 477 insertions(+), 1 deletion(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBEncryptedPasswordPullConsumerIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBEncryptedPasswordPullConsumerIT.java
new file mode 100644
index 00000000000..9987ebe9732
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBEncryptedPasswordPullConsumerIT.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.it.triple.regression.param;
+
+import org.apache.iotdb.commons.utils.AuthUtils;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import 
org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionRegressionMisc;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
+import 
org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
+
+import org.apache.thrift.TException;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2SubscriptionRegressionMisc.class})
+public class IoTDBEncryptedPasswordPullConsumerIT extends 
AbstractSubscriptionRegressionIT {
+
+  private static final String DATABASE = 
"root.TestEncryptedPasswordPullConsumer";
+  private static final String DEVICE = DATABASE + ".d_0";
+  private static final String TOPIC_NAME = 
"TestEncryptedPasswordPullConsumerTopic";
+  private static final String USERNAME = "encrypted_user";
+  private static final String PASSWORD = "EncryptedUser@123";
+  private static final String ENCRYPTED_PASSWORD = 
AuthUtils.encryptPassword(PASSWORD);
+  private static final String WRONG_ENCRYPTED_PASSWORD =
+      AuthUtils.encryptPassword("WrongEncryptedUser@123");
+
+  private static final List<MeasurementSchema> SCHEMA_LIST = new ArrayList<>();
+
+  static {
+    SCHEMA_LIST.add(new MeasurementSchema("s_0", TSDataType.INT64));
+    SCHEMA_LIST.add(new MeasurementSchema("s_1", TSDataType.DOUBLE));
+  }
+
+  private SubscriptionPullConsumer consumer;
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    createDB(DATABASE);
+    createTopic_s(TOPIC_NAME, "root.**", null, null, false);
+    session_src.createTimeseries(
+        DEVICE + ".s_0", TSDataType.INT64, TSEncoding.GORILLA, 
CompressionType.LZ4);
+    session_src.createTimeseries(
+        DEVICE + ".s_1", TSDataType.DOUBLE, TSEncoding.TS_2DIFF, 
CompressionType.LZ4);
+    session_dest.createTimeseries(
+        DEVICE + ".s_0", TSDataType.INT64, TSEncoding.GORILLA, 
CompressionType.LZ4);
+    session_dest.createTimeseries(
+        DEVICE + ".s_1", TSDataType.DOUBLE, TSEncoding.TS_2DIFF, 
CompressionType.LZ4);
+    session_src.executeNonQueryStatement("create user " + USERNAME + " '" + 
PASSWORD + "'");
+    session_src.executeNonQueryStatement("grant read,write on root.** to user 
" + USERNAME);
+    Assert.assertTrue(subs.getTopic(TOPIC_NAME).isPresent());
+  }
+
+  @Override
+  @After
+  public void tearDown() throws Exception {
+    try {
+      if (consumer != null) {
+        consumer.close();
+      }
+    } catch (final Exception ignored) {
+    }
+    try {
+      subs.dropTopic(TOPIC_NAME);
+    } catch (final Exception ignored) {
+    }
+    try {
+      session_src.executeNonQueryStatement("drop user " + USERNAME);
+    } catch (final Exception ignored) {
+    }
+    dropDB(DATABASE);
+    super.tearDown();
+  }
+
+  @Test
+  public void testSubscribeWithEncryptedPassword()
+      throws TException,
+          IoTDBConnectionException,
+          IOException,
+          StatementExecutionException,
+          InterruptedException {
+    consumer = createConsumer("encrypted-password-group", ENCRYPTED_PASSWORD);
+
+    consumer.open();
+    consumer.subscribe(TOPIC_NAME);
+    Assert.assertEquals(1, subs.getSubscriptions().size());
+
+    insertData(1706659200000L);
+    consume_data(consumer, session_dest);
+    check_count(
+        4,
+        "select count(s_0) from " + DEVICE + " where time >= 1706659200000",
+        "encrypted password consumption");
+  }
+
+  @Test
+  public void testSubscribeFailsWithWrongEncryptedPassword()
+      throws IoTDBConnectionException, StatementExecutionException {
+    consumer = createConsumer("wrong-encrypted-password-group", 
WRONG_ENCRYPTED_PASSWORD);
+
+    try {
+      consumer.open();
+      consumer.subscribe(TOPIC_NAME);
+      Assert.fail("subscribe should fail when encrypted password mismatches");
+    } catch (final Exception ignored) {
+      Assert.assertTrue(subs.getSubscriptions().isEmpty());
+    }
+  }
+
+  private SubscriptionPullConsumer createConsumer(
+      final String consumerGroupId, final String encryptedPassword) {
+    return new SubscriptionPullConsumer.Builder()
+        .host(SRC_HOST)
+        .port(SRC_PORT)
+        .username(USERNAME)
+        .encryptedPassword(encryptedPassword)
+        .consumerId("consumer_" + consumerGroupId)
+        .consumerGroupId(consumerGroupId)
+        .buildPullConsumer();
+  }
+
+  private void insertData(long timestamp)
+      throws IoTDBConnectionException, StatementExecutionException {
+    final Tablet tablet = new Tablet(DEVICE, SCHEMA_LIST, 10);
+    for (int row = 0; row < 5; row++) {
+      final int rowIndex = tablet.rowSize++;
+      tablet.addTimestamp(rowIndex, timestamp);
+      tablet.addValue("s_0", rowIndex, row * 20L + row);
+      tablet.addValue("s_1", rowIndex, row + 2.45);
+      timestamp += row * 2000;
+    }
+    session_src.insertTablet(tablet);
+  }
+}
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
index 5094ae7eea4..97c35baf8f4 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
@@ -68,6 +68,18 @@ public class ConsumerConfig extends PipeParameters {
     return getString(ConsumerConstant.CONSUMER_GROUP_ID_KEY);
   }
 
+  public String getUsername() {
+    return getString(ConsumerConstant.USERNAME_KEY);
+  }
+
+  public String getPassword() {
+    return getString(ConsumerConstant.PASSWORD_KEY);
+  }
+
+  public String getEncryptedPassword() {
+    return getString(ConsumerConstant.ENCRYPTED_PASSWORD_KEY);
+  }
+
   public long getHeartbeatIntervalMs() {
     return getLongOrDefault(
         ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY,
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
index 504893b80ed..21b13e5d3f1 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
@@ -31,6 +31,7 @@ public class ConsumerConstant {
 
   public static final String USERNAME_KEY = "username";
   public static final String PASSWORD_KEY = "password";
+  public static final String ENCRYPTED_PASSWORD_KEY = "encrypted-password";
 
   public static final String CONSUMER_ID_KEY = "consumer-id";
   public static final String CONSUMER_GROUP_ID_KEY = "group-id";
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index 92c6b2c9a3e..56f1c2e9b9b 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -100,6 +100,7 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
 
   private final String username;
   private final String password;
+  private final String encryptedPassword;
 
   protected String consumerId;
   protected String consumerGroupId;
@@ -177,6 +178,7 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
 
     this.username = builder.username;
     this.password = builder.password;
+    this.encryptedPassword = builder.encryptedPassword;
 
     this.consumerId = builder.consumerId;
     this.consumerGroupId = builder.consumerGroupId;
@@ -206,6 +208,7 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
                 (String)
                     properties.getOrDefault(
                         ConsumerConstant.PASSWORD_KEY, 
SessionConfig.DEFAULT_PASSWORD))
+            .encryptedPassword((String) 
properties.get(ConsumerConstant.ENCRYPTED_PASSWORD_KEY))
             .consumerId((String) 
properties.get(ConsumerConstant.CONSUMER_ID_KEY))
             .consumerGroupId((String) 
properties.get(ConsumerConstant.CONSUMER_GROUP_ID_KEY))
             .heartbeatIntervalMs(
@@ -386,6 +389,7 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
             endPoint,
             this.username,
             this.password,
+            this.encryptedPassword,
             this.consumerId,
             this.consumerGroupId,
             this.thriftMaxFrameSize,
@@ -1401,6 +1405,7 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
 
     protected String username = SessionConfig.DEFAULT_USER;
     protected String password = SessionConfig.DEFAULT_PASSWORD;
+    protected String encryptedPassword;
 
     protected String consumerId;
     protected String consumerGroupId;
@@ -1437,10 +1442,27 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
     }
 
     public Builder password(final String password) {
+      if (!Objects.equals(password, SessionConfig.DEFAULT_PASSWORD)
+          && Objects.nonNull(this.encryptedPassword)) {
+        throw new IllegalStateException(
+            "password and encryptedPassword are mutually exclusive; 
encryptedPassword is already set");
+      }
       this.password = password;
       return this;
     }
 
+    public Builder encryptedPassword(final String encryptedPassword) {
+      if (Objects.isNull(encryptedPassword)) {
+        return this;
+      }
+      if (!Objects.equals(this.password, SessionConfig.DEFAULT_PASSWORD)) {
+        throw new IllegalStateException(
+            "password and encryptedPassword are mutually exclusive; password 
is already set");
+      }
+      this.encryptedPassword = encryptedPassword;
+      return this;
+    }
+
     public Builder consumerId(@Nullable final String consumerId) {
       if (Objects.isNull(consumerId)) {
         return this;
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
index 9f153efeb10..25d9c5da09e 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
@@ -86,11 +86,15 @@ final class SubscriptionProvider extends 
SubscriptionSession {
   private final long heartbeatIntervalMs;
   private final int connectionTimeoutInMs;
   private int dataNodeId;
+  private final String username;
+  private final String password;
+  private final String encryptedPassword;
 
   SubscriptionProvider(
       final TEndPoint endPoint,
       final String username,
       final String password,
+      final String encryptedPassword,
       final String consumerId,
       final String consumerGroupId,
       final int thriftMaxFrameSize,
@@ -101,6 +105,9 @@ final class SubscriptionProvider extends 
SubscriptionSession {
     this.endPoint = endPoint;
     this.consumerId = consumerId;
     this.consumerGroupId = consumerGroupId;
+    this.username = username;
+    this.password = password;
+    this.encryptedPassword = encryptedPassword;
     this.heartbeatIntervalMs = heartbeatIntervalMs;
     this.connectionTimeoutInMs = connectionTimeoutInMs;
   }
@@ -149,6 +156,11 @@ final class SubscriptionProvider extends 
SubscriptionSession {
     final Map<String, String> consumerAttributes = new HashMap<>();
     consumerAttributes.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, 
consumerGroupId);
     consumerAttributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
+    consumerAttributes.put(ConsumerConstant.USERNAME_KEY, username);
+    consumerAttributes.put(ConsumerConstant.PASSWORD_KEY, password);
+    if (encryptedPassword != null) {
+      consumerAttributes.put(ConsumerConstant.ENCRYPTED_PASSWORD_KEY, 
encryptedPassword);
+    }
     consumerAttributes.put(
         ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY, 
String.valueOf(heartbeatIntervalMs));
     consumerAttributes.put(
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
index a77716fe02c..227843811b1 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
@@ -329,6 +329,12 @@ public class SubscriptionPullConsumer extends 
SubscriptionConsumer {
       return this;
     }
 
+    @Override
+    public Builder encryptedPassword(final String encryptedPassword) {
+      super.encryptedPassword(encryptedPassword);
+      return this;
+    }
+
     @Override
     public Builder consumerId(final String consumerId) {
       super.consumerId(consumerId);
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
index 2a413278090..b70633bef35 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
@@ -257,6 +257,12 @@ public class SubscriptionPushConsumer extends 
SubscriptionConsumer {
       return this;
     }
 
+    @Override
+    public Builder encryptedPassword(final String encryptedPassword) {
+      super.encryptedPassword(encryptedPassword);
+      return this;
+    }
+
     @Override
     public Builder consumerId(final String consumerId) {
       super.consumerId(consumerId);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
index 9592692d1bc..b4f31d84023 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
@@ -110,6 +110,15 @@ public class PermissionManager {
     return authorInfo.login(username, password);
   }
 
+  public TPermissionInfoResp login(
+      final String username, final String password, final boolean 
useEncryptedPassword) {
+    return authorInfo.login(username, password, useEncryptedPassword);
+  }
+
+  public String login4Pipe(final String userName, final String password) {
+    return authorInfo.login4Pipe(userName, password);
+  }
+
   public TPermissionInfoResp checkUserPrivileges(
       String username, List<PartialPath> paths, int permission) {
     return authorInfo.checkUserPrivileges(username, paths, permission);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java
index e9e1058d79b..3bd17b5b5b7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java
@@ -113,6 +113,58 @@ public class AuthorInfo implements SnapshotProcessor {
     return result;
   }
 
+  public TPermissionInfoResp login(
+      final String username, final String password, final boolean 
useEncryptedPassword) {
+    if (!useEncryptedPassword) {
+      return login(username, password);
+    }
+
+    boolean status = false;
+    String loginMessage = null;
+    TSStatus tsStatus = new TSStatus();
+    TPermissionInfoResp result = new TPermissionInfoResp();
+    try {
+      final User user = authorizer.getUser(username);
+      status = user != null && password != null && 
password.equals(user.getPassword());
+      if (status) {
+        result = getUserPermissionInfo(username);
+        result.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, 
"Login successfully"));
+      } else {
+        result = AuthUtils.generateEmptyPermissionInfoResp();
+      }
+    } catch (AuthException e) {
+      LOGGER.error("meet error while logging in.", e);
+      loginMessage = e.getMessage();
+    }
+    if (!status) {
+      tsStatus.setMessage(loginMessage != null ? loginMessage : 
"Authentication failed.");
+      tsStatus.setCode(TSStatusCode.WRONG_LOGIN_PASSWORD.getStatusCode());
+      result.setStatus(tsStatus);
+    }
+    return result;
+  }
+
+  public String login4Pipe(final String username, final String password) {
+    try {
+      final User user = authorizer.getUser(username);
+      if (user == null) {
+        return null;
+      }
+      if (password == null) {
+        return user.getPassword();
+      }
+      final TPermissionInfoResp loginResp = login(username, password);
+      if (loginResp.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && loginResp.isSetUserInfo()) {
+        return loginResp.getUserInfo().getPassword();
+      }
+      return null;
+    } catch (final AuthException e) {
+      LOGGER.error("meet error while logging in for pipe.", e);
+      return null;
+    }
+  }
+
   // if All paths fail, return No permission
   // if some paths fail, return SUCCESS and failed index list
   // if all path success, return success and empty index list
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 26e9eb9bba1..ad18cc95bb0 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -25,11 +25,13 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
+import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType;
+import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
 import org.apache.iotdb.commons.schema.SchemaConstant;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
@@ -41,8 +43,10 @@ import 
org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProced
 import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.pipe.api.PipePlugin;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -53,6 +57,8 @@ import org.slf4j.LoggerFactory;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -136,9 +142,70 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
             createPipeRequest.getProcessorAttributes(),
             createPipeRequest.getConnectorAttributes());
 
+    checkAndEnrichSourceAuthentication(env, 
createPipeRequest.getExtractorAttributes());
+
     return pipeTaskInfo.get().checkBeforeCreatePipe(createPipeRequest);
   }
 
+  public static void checkAndEnrichSourceAuthentication(
+      final ConfigNodeProcedureEnv env, final Map<String, String> 
sourceAttributes) {
+    if (Objects.isNull(sourceAttributes)) {
+      return;
+    }
+    final PipeParameters sourceParameters = new 
PipeParameters(sourceAttributes);
+
+    final String pluginName =
+        sourceParameters
+            .getStringOrDefault(
+                Arrays.asList(PipeSourceConstant.EXTRACTOR_KEY, 
PipeSourceConstant.SOURCE_KEY),
+                BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+            .toLowerCase();
+
+    if 
(!pluginName.equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+        && 
!pluginName.equals(BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName())) {
+      return;
+    }
+
+    if 
(sourceParameters.hasAttribute(PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY)
+        || 
sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_USER_KEY)
+        || 
sourceParameters.hasAttribute(PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY)
+        || 
sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY)
+        || 
sourceParameters.hasAttribute(PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY)
+        || 
sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)) {
+      final String username =
+          sourceParameters.getStringByKeys(
+              PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY,
+              PipeSourceConstant.SOURCE_IOTDB_USER_KEY,
+              PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY,
+              PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY);
+      final String password =
+          sourceParameters.getStringByKeys(
+              PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY,
+              PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY);
+      String hashedPassword = null;
+      if (Objects.nonNull(password)) {
+        final TPermissionInfoResp loginResp =
+            env.getConfigManager().getPermissionManager().login(username, 
password, true);
+        if (Objects.nonNull(loginResp)
+            && Objects.nonNull(loginResp.getStatus())
+            && loginResp.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          hashedPassword = password;
+        }
+      }
+      if (Objects.isNull(hashedPassword)) {
+        hashedPassword =
+            env.getConfigManager().getPermissionManager().login4Pipe(username, 
password);
+      }
+      if (Objects.isNull(hashedPassword)) {
+        throw new PipeException("Authentication failed.");
+      }
+      sourceParameters.addOrReplaceEquivalentAttributes(
+          new PipeParameters(
+              Collections.singletonMap(
+                  PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY, 
hashedPassword)));
+    }
+  }
+
   @Override
   public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
     LOGGER.info(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
index 4a48ebdd35d..80e4e511554 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.confignode.procedure.impl.subscription.subscription;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
+import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerMeta;
 import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
@@ -89,7 +90,11 @@ public class CreateSubscriptionProcedure extends 
AbstractOperateSubscriptionAndP
     subscriptionInfo.get().validateBeforeSubscribe(subscribeReq);
 
     // Construct AlterConsumerGroupProcedure
+    final String consumerId = subscribeReq.getConsumerId();
     final String consumerGroupId = subscribeReq.getConsumerGroupId();
+    final ConsumerGroupMeta consumerGroupMeta =
+        subscriptionInfo.get().getConsumerGroupMeta(consumerGroupId);
+    final ConsumerMeta consumerMeta = 
consumerGroupMeta.getConsumerMeta(consumerId);
     final ConsumerGroupMeta updatedConsumerGroupMeta =
         subscriptionInfo.get().deepCopyConsumerGroupMeta(consumerGroupId);
     updatedConsumerGroupMeta.addSubscription(
@@ -110,7 +115,9 @@ public class CreateSubscriptionProcedure extends 
AbstractOperateSubscriptionAndP
             new CreatePipeProcedureV2(
                 new TCreatePipeReq()
                     .setPipeName(pipeName)
-                    
.setExtractorAttributes(topicMeta.generateExtractorAttributes())
+                    .setExtractorAttributes(
+                        topicMeta.generateExtractorAttributes(
+                            consumerMeta.getUsername(), 
consumerMeta.getSubscriptionAuthPassword()))
                     
.setProcessorAttributes(topicMeta.generateProcessorAttributes())
                     
.setConnectorAttributes(topicMeta.generateConnectorAttributes(consumerGroupId)),
                 pipeTaskInfo));
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java
index 6c095de9852..ac6ca8c740c 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java
@@ -19,11 +19,19 @@
 
 package org.apache.iotdb.confignode.procedure.impl.pipe.task;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.PermissionManager;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.tsfile.utils.PublicBAOS;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.DataOutputStream;
 import java.nio.ByteBuffer;
@@ -68,4 +76,57 @@ public class CreatePipeProcedureV2Test {
       fail();
     }
   }
+
+  @Test
+  public void testCheckAndEnrichSourceAuthenticationWithEncryptedPassword() {
+    final ConfigNodeProcedureEnv env = 
Mockito.mock(ConfigNodeProcedureEnv.class);
+    final ConfigManager configManager = Mockito.mock(ConfigManager.class);
+    final PermissionManager permissionManager = 
Mockito.mock(PermissionManager.class);
+    Mockito.when(env.getConfigManager()).thenReturn(configManager);
+    
Mockito.when(configManager.getPermissionManager()).thenReturn(permissionManager);
+
+    final TPermissionInfoResp loginResp = new TPermissionInfoResp();
+    loginResp.setStatus(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+    Mockito.when(permissionManager.login("user", "encrypted-password", 
true)).thenReturn(loginResp);
+
+    final Map<String, String> sourceAttributes = new HashMap<>();
+    sourceAttributes.put("extractor", "iotdb-source");
+    sourceAttributes.put("username", "user");
+    sourceAttributes.put("password", "encrypted-password");
+
+    CreatePipeProcedureV2.checkAndEnrichSourceAuthentication(env, 
sourceAttributes);
+
+    assertEquals(
+        "encrypted-password", 
sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY));
+    Mockito.verify(permissionManager).login("user", "encrypted-password", 
true);
+    Mockito.verify(permissionManager, Mockito.never())
+        .login4Pipe(Mockito.anyString(), Mockito.any());
+  }
+
+  @Test
+  public void testCheckAndEnrichSourceAuthenticationFallsBackToRawPassword() {
+    final ConfigNodeProcedureEnv env = 
Mockito.mock(ConfigNodeProcedureEnv.class);
+    final ConfigManager configManager = Mockito.mock(ConfigManager.class);
+    final PermissionManager permissionManager = 
Mockito.mock(PermissionManager.class);
+    Mockito.when(env.getConfigManager()).thenReturn(configManager);
+    
Mockito.when(configManager.getPermissionManager()).thenReturn(permissionManager);
+
+    final TPermissionInfoResp loginResp = new TPermissionInfoResp();
+    loginResp.setStatus(new 
TSStatus(TSStatusCode.WRONG_LOGIN_PASSWORD.getStatusCode()));
+    Mockito.when(permissionManager.login("user", "raw-password", 
true)).thenReturn(loginResp);
+    Mockito.when(permissionManager.login4Pipe("user", "raw-password"))
+        .thenReturn("hashed-password");
+
+    final Map<String, String> sourceAttributes = new HashMap<>();
+    sourceAttributes.put(PipeSourceConstant.SOURCE_KEY, "iotdb-source");
+    sourceAttributes.put(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY, "user");
+    sourceAttributes.put(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY, 
"raw-password");
+
+    CreatePipeProcedureV2.checkAndEnrichSourceAuthentication(env, 
sourceAttributes);
+
+    assertEquals(
+        "hashed-password", 
sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY));
+    Mockito.verify(permissionManager).login("user", "raw-password", true);
+    Mockito.verify(permissionManager).login4Pipe("user", "raw-password");
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
index e9adf64bc03..aba780fb0d1 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
@@ -24,6 +24,13 @@ public class PipeSourceConstant {
   public static final String EXTRACTOR_KEY = "extractor";
   public static final String SOURCE_KEY = "source";
 
+  public static final String EXTRACTOR_IOTDB_USER_KEY = "extractor.user";
+  public static final String SOURCE_IOTDB_USER_KEY = "source.user";
+  public static final String EXTRACTOR_IOTDB_USERNAME_KEY = 
"extractor.username";
+  public static final String SOURCE_IOTDB_USERNAME_KEY = "source.username";
+  public static final String EXTRACTOR_IOTDB_PASSWORD_KEY = 
"extractor.password";
+  public static final String SOURCE_IOTDB_PASSWORD_KEY = "source.password";
+
   public static final String EXTRACTOR_INCLUSION_KEY = "extractor.inclusion";
   public static final String SOURCE_INCLUSION_KEY = "source.inclusion";
   public static final String EXTRACTOR_INCLUSION_DEFAULT_VALUE = "data.insert";
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
index b316c5f155d..498f3427690 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
@@ -136,6 +136,10 @@ public class ConsumerGroupMeta {
     return consumerIdToConsumerMeta.containsKey(consumerId);
   }
 
+  public ConsumerMeta getConsumerMeta(final String consumerId) {
+    return consumerIdToConsumerMeta.get(consumerId);
+  }
+
   public boolean isEmpty() {
     // When there are no consumers in a consumer group, it means that the 
ConsumerGroupMeta is
     // empty, and at this time, the topicNameToSubscribedConsumerIdSet is also 
empty.
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
index f1bb9b46085..75cca9ccbcd 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
@@ -54,6 +54,26 @@ public class ConsumerMeta {
     return consumerId;
   }
 
+  public String getConsumerGroupId() {
+    return config.getConsumerGroupId();
+  }
+
+  public String getUsername() {
+    return config.getUsername();
+  }
+
+  public String getPassword() {
+    return config.getPassword();
+  }
+
+  public String getEncryptedPassword() {
+    return config.getEncryptedPassword();
+  }
+
+  public String getSubscriptionAuthPassword() {
+    return Objects.nonNull(getEncryptedPassword()) ? getEncryptedPassword() : 
getPassword();
+  }
+
   public ByteBuffer serialize() throws IOException {
     PublicBAOS byteArrayOutputStream = new PublicBAOS();
     DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
index 93cd87e4ae3..ee9177112e7 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
@@ -181,11 +181,22 @@ public class TopicMeta {
   /////////////////////////////// utilities ///////////////////////////////
 
   public Map<String, String> generateExtractorAttributes() {
+    return generateExtractorAttributes(null, null);
+  }
+
+  public Map<String, String> generateExtractorAttributes(
+      final String username, final String password) {
     final Map<String, String> extractorAttributes = new HashMap<>();
     // disable meta sync
     extractorAttributes.put("source", "iotdb-source");
     extractorAttributes.put("inclusion", "data.insert");
     extractorAttributes.put("inclusion.exclusion", "data.delete");
+    if (Objects.nonNull(username)) {
+      extractorAttributes.put("username", username);
+    }
+    if (Objects.nonNull(password)) {
+      extractorAttributes.put("password", password);
+    }
     // path
     extractorAttributes.putAll(config.getAttributesWithPathOrPattern());
     // time
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java
index 4973f06d27c..d9c280e1493 100644
--- 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java
@@ -54,4 +54,15 @@ public class TopicDeSerTest {
     Assert.assertEquals(
         topicMeta.getSubscribedConsumerGroupIds(), 
topicMeta2.getSubscribedConsumerGroupIds());
   }
+
+  @Test
+  public void testGenerateExtractorAttributesWithEncryptedPassword() {
+    final TopicMeta topicMeta = new TopicMeta("test_topic", 1, new 
HashMap<>());
+
+    final Map<String, String> extractorAttributes =
+        topicMeta.generateExtractorAttributes("test_user", 
"encrypted-password");
+
+    Assert.assertEquals("test_user", extractorAttributes.get("username"));
+    Assert.assertEquals("encrypted-password", 
extractorAttributes.get("password"));
+  }
 }


Reply via email to