This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4b05c257615 Subscription: support encrypted password auth in consumer
builder (#17480)
4b05c257615 is described below
commit 4b05c257615acdbb77cf559aabb128135f0e2240
Author: VGalaxies <[email protected]>
AuthorDate: Mon Apr 27 10:32:23 2026 +0800
Subscription: support encrypted password auth in consumer builder (#17480)
* feat(subscription): support encrypted password auth
* test(confignode): assert canonical source password key
* test(subscription): use strong passwords in encrypted pull IT
* feat(subscription): reject setting both password and encryptedPassword
Make `password` and `encryptedPassword` mutually exclusive on the
subscription consumer builder and enable the encrypted-password IT.
---
.../IoTDBEncryptedPasswordPullConsumerIT.java | 171 +++++++++++++++++++++
.../rpc/subscription/config/ConsumerConfig.java | 4 +
.../rpc/subscription/config/ConsumerConstant.java | 1 +
.../base/AbstractSubscriptionConsumer.java | 5 +
.../base/AbstractSubscriptionConsumerBuilder.java | 18 +++
.../base/AbstractSubscriptionProvider.java | 6 +
.../AbstractSubscriptionPullConsumerBuilder.java | 6 +
.../AbstractSubscriptionPushConsumerBuilder.java | 6 +
.../consumer/table/SubscriptionTableProvider.java | 2 +
.../table/SubscriptionTablePullConsumer.java | 2 +
.../SubscriptionTablePullConsumerBuilder.java | 6 +
.../table/SubscriptionTablePushConsumer.java | 2 +
.../SubscriptionTablePushConsumerBuilder.java | 6 +
.../consumer/tree/SubscriptionTreeProvider.java | 2 +
.../tree/SubscriptionTreePullConsumer.java | 9 ++
.../tree/SubscriptionTreePullConsumerBuilder.java | 6 +
.../tree/SubscriptionTreePushConsumer.java | 9 ++
.../tree/SubscriptionTreePushConsumerBuilder.java | 6 +
.../impl/pipe/task/CreatePipeProcedureV2.java | 37 +++--
.../subscription/CreateSubscriptionProcedure.java | 3 +-
.../impl/pipe/task/CreatePipeProcedureV2Test.java | 61 ++++++++
.../subscription/meta/consumer/ConsumerMeta.java | 8 +
.../commons/subscription/meta/topic/TopicMeta.java | 8 +
.../commons/subscription/topic/TopicDeSerTest.java | 11 ++
24 files changed, 382 insertions(+), 13 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/param/IoTDBEncryptedPasswordPullConsumerIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/param/IoTDBEncryptedPasswordPullConsumerIT.java
new file mode 100644
index 00000000000..8f0eceeafab
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/param/IoTDBEncryptedPasswordPullConsumerIT.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.it.triple.treemodel.regression.param;
+
+import org.apache.iotdb.commons.utils.AuthUtils;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import
org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionTreeRegressionMisc;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import
org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer;
+import
org.apache.iotdb.subscription.it.triple.treemodel.regression.AbstractSubscriptionTreeRegressionIT;
+
+import org.apache.thrift.TException;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2SubscriptionTreeRegressionMisc.class})
+public class IoTDBEncryptedPasswordPullConsumerIT extends
AbstractSubscriptionTreeRegressionIT {
+
+ private static final String DATABASE =
"root.TestEncryptedPasswordPullConsumer";
+ private static final String DEVICE = DATABASE + ".d_0";
+ private static final String TOPIC_NAME =
"TestEncryptedPasswordPullConsumerTopic";
+ private static final String USERNAME = "encrypted_user";
+ private static final String PASSWORD = "EncryptedUser@123";
+ private static final String ENCRYPTED_PASSWORD =
AuthUtils.encryptPassword(PASSWORD);
+ private static final String WRONG_ENCRYPTED_PASSWORD =
+ AuthUtils.encryptPassword("WrongEncryptedUser@123");
+
+ private static final List<IMeasurementSchema> SCHEMA_LIST = new
ArrayList<>();
+
+ static {
+ SCHEMA_LIST.add(new MeasurementSchema("s_0", TSDataType.INT64));
+ SCHEMA_LIST.add(new MeasurementSchema("s_1", TSDataType.DOUBLE));
+ }
+
+ private SubscriptionTreePullConsumer consumer;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ createDB(DATABASE);
+ createTopic_s(TOPIC_NAME, "root.**", null, null, false);
+ session_src.createTimeseries(
+ DEVICE + ".s_0", TSDataType.INT64, TSEncoding.GORILLA,
CompressionType.LZ4);
+ session_src.createTimeseries(
+ DEVICE + ".s_1", TSDataType.DOUBLE, TSEncoding.TS_2DIFF,
CompressionType.LZ4);
+ session_dest.createTimeseries(
+ DEVICE + ".s_0", TSDataType.INT64, TSEncoding.GORILLA,
CompressionType.LZ4);
+ session_dest.createTimeseries(
+ DEVICE + ".s_1", TSDataType.DOUBLE, TSEncoding.TS_2DIFF,
CompressionType.LZ4);
+ session_src.executeNonQueryStatement("create user " + USERNAME + " '" +
PASSWORD + "'");
+ session_src.executeNonQueryStatement("grant read,write on root.** to user
" + USERNAME);
+ assertTrue(subs.getTopic(TOPIC_NAME).isPresent());
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ try {
+ if (consumer != null) {
+ consumer.close();
+ }
+ } catch (final Exception ignored) {
+ }
+ try {
+ subs.dropTopic(TOPIC_NAME);
+ } catch (final Exception ignored) {
+ }
+ try {
+ session_src.executeNonQueryStatement("drop user " + USERNAME);
+ } catch (final Exception ignored) {
+ }
+ dropDB(DATABASE);
+ super.tearDown();
+ }
+
+ @Test
+ public void testSubscribeWithEncryptedPassword()
+ throws TException,
+ IoTDBConnectionException,
+ IOException,
+ StatementExecutionException,
+ InterruptedException {
+ consumer = createConsumer("encrypted-password-group", ENCRYPTED_PASSWORD);
+
+ consumer.open();
+ consumer.subscribe(TOPIC_NAME);
+ assertEquals(1, subs.getSubscriptions().size(), "subscribe with encrypted
password");
+
+ insertData(1706659200000L);
+ consume_data(consumer, session_dest);
+ check_count(
+ 4,
+ "select count(s_0) from " + DEVICE + " where time >= 1706659200000",
+ "encrypted password consumption");
+ }
+
+ @Test
+ public void testSubscribeFailsWithWrongEncryptedPassword()
+ throws IoTDBConnectionException, StatementExecutionException {
+ consumer = createConsumer("wrong-encrypted-password-group",
WRONG_ENCRYPTED_PASSWORD);
+
+ try {
+ consumer.open();
+ consumer.subscribe(TOPIC_NAME);
+ fail("subscribe should fail when encrypted password mismatches");
+ } catch (final Exception ignored) {
+ assertTrue(subs.getSubscriptions().isEmpty());
+ }
+ }
+
+ private SubscriptionTreePullConsumer createConsumer(
+ final String consumerGroupId, final String encryptedPassword) {
+ return new SubscriptionTreePullConsumer.Builder()
+ .host(SRC_HOST)
+ .port(SRC_PORT)
+ .username(USERNAME)
+ .encryptedPassword(encryptedPassword)
+ .consumerId("consumer_" + consumerGroupId)
+ .consumerGroupId(consumerGroupId)
+ .buildPullConsumer();
+ }
+
+ private void insertData(long timestamp)
+ throws IoTDBConnectionException, StatementExecutionException {
+ final Tablet tablet = new Tablet(DEVICE, SCHEMA_LIST, 10);
+ for (int row = 0; row < 5; row++) {
+ final int rowIndex = tablet.getRowSize();
+ tablet.addTimestamp(rowIndex, timestamp);
+ tablet.addValue("s_0", rowIndex, row * 20L + row);
+ tablet.addValue("s_1", rowIndex, row + 2.45);
+ timestamp += row * 2000;
+ }
+ session_src.insertTablet(tablet);
+ }
+}
diff --git
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
index 0b5c5a55477..3cb0087d682 100644
---
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
+++
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
@@ -76,6 +76,10 @@ public class ConsumerConfig extends PipeParameters {
return getString(ConsumerConstant.PASSWORD_KEY);
}
+ public String getEncryptedPassword() {
+ return getString(ConsumerConstant.ENCRYPTED_PASSWORD_KEY);
+ }
+
public String getSqlDialect() {
return getString(ConsumerConstant.SQL_DIALECT_KEY);
}
diff --git
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
index a0e6f9ed228..9c52c8dd7da 100644
---
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
+++
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
@@ -34,6 +34,7 @@ public class ConsumerConstant {
public static final String USERNAME_KEY = "username";
public static final String PASSWORD_KEY = "password";
+ public static final String ENCRYPTED_PASSWORD_KEY = "encrypted-password";
public static final String CONSUMER_ID_KEY = "consumer-id";
public static final String CONSUMER_GROUP_ID_KEY = "group-id";
diff --git
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
index e9fbb1672e5..62c8d20e389 100644
---
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
+++
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
@@ -100,6 +100,7 @@ abstract class AbstractSubscriptionConsumer implements
AutoCloseable {
private final String username;
private final String password;
+ private final String encryptedPassword;
protected String consumerId;
protected String consumerGroupId;
@@ -177,6 +178,7 @@ abstract class AbstractSubscriptionConsumer implements
AutoCloseable {
this.username = builder.username;
this.password = builder.password;
+ this.encryptedPassword = builder.encryptedPassword;
this.consumerId = builder.consumerId;
this.consumerGroupId = builder.consumerGroupId;
@@ -207,6 +209,7 @@ abstract class AbstractSubscriptionConsumer implements
AutoCloseable {
(String)
properties.getOrDefault(
ConsumerConstant.PASSWORD_KEY,
SessionConfig.DEFAULT_PASSWORD))
+ .encryptedPassword((String)
properties.get(ConsumerConstant.ENCRYPTED_PASSWORD_KEY))
.consumerId((String)
properties.get(ConsumerConstant.CONSUMER_ID_KEY))
.consumerGroupId((String)
properties.get(ConsumerConstant.CONSUMER_GROUP_ID_KEY))
.heartbeatIntervalMs(
@@ -387,6 +390,7 @@ abstract class AbstractSubscriptionConsumer implements
AutoCloseable {
final TEndPoint endPoint,
final String username,
final String password,
+ final String encryptedPassword,
final String consumerId,
final String consumerGroupId,
final int thriftMaxFrameSize,
@@ -400,6 +404,7 @@ abstract class AbstractSubscriptionConsumer implements
AutoCloseable {
endPoint,
this.username,
this.password,
+ this.encryptedPassword,
this.consumerId,
this.consumerGroupId,
this.thriftMaxFrameSize,
diff --git
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java
index 81bfb6241c9..991d237ed2b 100644
---
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java
+++
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java
@@ -36,6 +36,7 @@ public class AbstractSubscriptionConsumerBuilder {
protected String username = SessionConfig.DEFAULT_USER;
protected String password = SessionConfig.DEFAULT_PASSWORD;
+ protected String encryptedPassword;
protected String consumerId;
protected String consumerGroupId;
@@ -72,10 +73,27 @@ public class AbstractSubscriptionConsumerBuilder {
}
public AbstractSubscriptionConsumerBuilder password(final String password) {
+ if (!Objects.equals(password, SessionConfig.DEFAULT_PASSWORD)
+ && Objects.nonNull(this.encryptedPassword)) {
+ throw new IllegalStateException(
+ "password and encryptedPassword are mutually exclusive;
encryptedPassword is already set");
+ }
this.password = password;
return this;
}
+ public AbstractSubscriptionConsumerBuilder encryptedPassword(final String
encryptedPassword) {
+ if (Objects.isNull(encryptedPassword)) {
+ return this;
+ }
+ if (!Objects.equals(this.password, SessionConfig.DEFAULT_PASSWORD)) {
+ throw new IllegalStateException(
+ "password and encryptedPassword are mutually exclusive; password is
already set");
+ }
+ this.encryptedPassword = encryptedPassword;
+ return this;
+ }
+
public AbstractSubscriptionConsumerBuilder consumerId(@Nullable final String
consumerId) {
if (Objects.isNull(consumerId)) {
return this;
diff --git
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
index 7f3582d195d..413c609abbf 100644
---
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
+++
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
@@ -90,6 +90,7 @@ public abstract class AbstractSubscriptionProvider {
private final String username;
private final String password;
+ private final String encryptedPassword;
private final long heartbeatIntervalMs;
private final int connectionTimeoutInMs;
@@ -105,6 +106,7 @@ public abstract class AbstractSubscriptionProvider {
final TEndPoint endPoint,
final String username,
final String password,
+ final String encryptedPassword,
final String consumerId,
final String consumerGroupId,
final int thriftMaxFrameSize,
@@ -124,6 +126,7 @@ public abstract class AbstractSubscriptionProvider {
this.consumerGroupId = consumerGroupId;
this.username = username;
this.password = password;
+ this.encryptedPassword = encryptedPassword;
this.heartbeatIntervalMs = heartbeatIntervalMs;
this.connectionTimeoutInMs = connectionTimeoutInMs;
}
@@ -175,6 +178,9 @@ public abstract class AbstractSubscriptionProvider {
consumerAttributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
consumerAttributes.put(ConsumerConstant.USERNAME_KEY, username);
consumerAttributes.put(ConsumerConstant.PASSWORD_KEY, password);
+ if (encryptedPassword != null) {
+ consumerAttributes.put(ConsumerConstant.ENCRYPTED_PASSWORD_KEY,
encryptedPassword);
+ }
consumerAttributes.put(ConsumerConstant.SQL_DIALECT_KEY,
session.getSqlDialect());
consumerAttributes.put(
ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY,
String.valueOf(heartbeatIntervalMs));
diff --git
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumerBuilder.java
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumerBuilder.java
index 7083a7dc4af..2fac3d500eb 100644
---
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumerBuilder.java
+++
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumerBuilder.java
@@ -58,6 +58,12 @@ public class AbstractSubscriptionPullConsumerBuilder extends
AbstractSubscriptio
return this;
}
+ @Override
+ public AbstractSubscriptionPullConsumerBuilder encryptedPassword(final
String encryptedPassword) {
+ super.encryptedPassword(encryptedPassword);
+ return this;
+ }
+
@Override
public AbstractSubscriptionPullConsumerBuilder consumerId(final String
consumerId) {
super.consumerId(consumerId);
diff --git
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumerBuilder.java
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumerBuilder.java
index f013b98dd19..bcd33812c8e 100644
---
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumerBuilder.java
+++
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumerBuilder.java
@@ -64,6 +64,12 @@ public class AbstractSubscriptionPushConsumerBuilder extends
AbstractSubscriptio
return this;
}
+ @Override
+ public AbstractSubscriptionPushConsumerBuilder encryptedPassword(final
String encryptedPassword) {
+ super.encryptedPassword(encryptedPassword);
+ return this;
+ }
+
@Override
public AbstractSubscriptionPushConsumerBuilder consumerId(final String
consumerId) {
super.consumerId(consumerId);
diff --git
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java
index 1b90866db9e..84470d283c2 100644
---
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java
+++
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java
@@ -30,6 +30,7 @@ final class SubscriptionTableProvider extends
AbstractSubscriptionProvider {
final TEndPoint endPoint,
final String username,
final String password,
+ final String encryptedPassword,
final String consumerId,
final String consumerGroupId,
final int thriftMaxFrameSize,
@@ -39,6 +40,7 @@ final class SubscriptionTableProvider extends
AbstractSubscriptionProvider {
endPoint,
username,
password,
+ encryptedPassword,
consumerId,
consumerGroupId,
thriftMaxFrameSize,
diff --git
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java
index 83dd39aebbf..8f712782fb5 100644
---
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java
+++
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java
@@ -42,6 +42,7 @@ public class SubscriptionTablePullConsumer extends
AbstractSubscriptionPullConsu
final TEndPoint endPoint,
final String username,
final String password,
+ final String encryptedPassword,
final String consumerId,
final String consumerGroupId,
final int thriftMaxFrameSize,
@@ -51,6 +52,7 @@ public class SubscriptionTablePullConsumer extends
AbstractSubscriptionPullConsu
endPoint,
username,
password,
+ encryptedPassword,
consumerId,
consumerGroupId,
thriftMaxFrameSize,
diff --git
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java
index 6d8437ac95f..939228a7f49 100644
---
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java
+++
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java
@@ -56,6 +56,12 @@ public class SubscriptionTablePullConsumerBuilder extends
AbstractSubscriptionPu
return this;
}
+ @Override
+ public SubscriptionTablePullConsumerBuilder encryptedPassword(final String
encryptedPassword) {
+ super.encryptedPassword(encryptedPassword);
+ return this;
+ }
+
@Override
public SubscriptionTablePullConsumerBuilder consumerId(final String
consumerId) {
super.consumerId(consumerId);
diff --git
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java
index ac44e421dac..e90afc1d8d1 100644
---
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java
+++
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java
@@ -38,6 +38,7 @@ public class SubscriptionTablePushConsumer extends
AbstractSubscriptionPushConsu
final TEndPoint endPoint,
final String username,
final String password,
+ final String encryptedPassword,
final String consumerId,
final String consumerGroupId,
final int thriftMaxFrameSize,
@@ -47,6 +48,7 @@ public class SubscriptionTablePushConsumer extends
AbstractSubscriptionPushConsu
endPoint,
username,
password,
+ encryptedPassword,
consumerId,
consumerGroupId,
thriftMaxFrameSize,
diff --git
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java
index c372c586db3..27bf328fea9 100644
---
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java
+++
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java
@@ -58,6 +58,12 @@ public class SubscriptionTablePushConsumerBuilder extends
AbstractSubscriptionPu
return this;
}
+ @Override
+ public SubscriptionTablePushConsumerBuilder encryptedPassword(final String
encryptedPassword) {
+ super.encryptedPassword(encryptedPassword);
+ return this;
+ }
+
@Override
public SubscriptionTablePushConsumerBuilder consumerId(final String
consumerId) {
super.consumerId(consumerId);
diff --git
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java
index c79b64e8c84..3589fbbcf74 100644
---
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java
+++
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java
@@ -30,6 +30,7 @@ final class SubscriptionTreeProvider extends
AbstractSubscriptionProvider {
final TEndPoint endPoint,
final String username,
final String password,
+ final String encryptedPassword,
final String consumerId,
final String consumerGroupId,
final int thriftMaxFrameSize,
@@ -39,6 +40,7 @@ final class SubscriptionTreeProvider extends
AbstractSubscriptionProvider {
endPoint,
username,
password,
+ encryptedPassword,
consumerId,
consumerGroupId,
thriftMaxFrameSize,
diff --git
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java
index 23050893f66..7225036aaa4 100644
---
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java
+++
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java
@@ -49,6 +49,7 @@ public class SubscriptionTreePullConsumer extends
AbstractSubscriptionPullConsum
final TEndPoint endPoint,
final String username,
final String password,
+ final String encryptedPassword,
final String consumerId,
final String consumerGroupId,
final int thriftMaxFrameSize,
@@ -58,6 +59,7 @@ public class SubscriptionTreePullConsumer extends
AbstractSubscriptionPullConsum
endPoint,
username,
password,
+ encryptedPassword,
consumerId,
consumerGroupId,
thriftMaxFrameSize,
@@ -80,6 +82,7 @@ public class SubscriptionTreePullConsumer extends
AbstractSubscriptionPullConsum
.nodeUrls(builder.nodeUrls)
.username(builder.username)
.password(builder.password)
+ .encryptedPassword(builder.encryptedPassword)
.consumerId(builder.consumerId)
.consumerGroupId(builder.consumerGroupId)
.heartbeatIntervalMs(builder.heartbeatIntervalMs)
@@ -231,6 +234,7 @@ public class SubscriptionTreePullConsumer extends
AbstractSubscriptionPullConsum
private String username = SessionConfig.DEFAULT_USER;
private String password = SessionConfig.DEFAULT_PASSWORD;
+ private String encryptedPassword;
private String consumerId;
private String consumerGroupId;
@@ -274,6 +278,11 @@ public class SubscriptionTreePullConsumer extends
AbstractSubscriptionPullConsum
return this;
}
+ public Builder encryptedPassword(final String encryptedPassword) {
+ this.encryptedPassword = encryptedPassword;
+ return this;
+ }
+
public Builder consumerId(@Nullable final String consumerId) {
if (Objects.isNull(consumerId)) {
return this;
diff --git
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java
index 8623a492087..cbceb95d77f 100644
---
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java
+++
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java
@@ -56,6 +56,12 @@ public class SubscriptionTreePullConsumerBuilder extends
AbstractSubscriptionPul
return this;
}
+ @Override
+ public SubscriptionTreePullConsumerBuilder encryptedPassword(final String
encryptedPassword) {
+ super.encryptedPassword(encryptedPassword);
+ return this;
+ }
+
@Override
public SubscriptionTreePullConsumerBuilder consumerId(final String
consumerId) {
super.consumerId(consumerId);
diff --git
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java
index d56e89d47c8..4d8a5ef3e16 100644
---
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java
+++
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java
@@ -48,6 +48,7 @@ public class SubscriptionTreePushConsumer extends
AbstractSubscriptionPushConsum
final TEndPoint endPoint,
final String username,
final String password,
+ final String encryptedPassword,
final String consumerId,
final String consumerGroupId,
final int thriftMaxFrameSize,
@@ -57,6 +58,7 @@ public class SubscriptionTreePushConsumer extends
AbstractSubscriptionPushConsum
endPoint,
username,
password,
+ encryptedPassword,
consumerId,
consumerGroupId,
thriftMaxFrameSize,
@@ -79,6 +81,7 @@ public class SubscriptionTreePushConsumer extends
AbstractSubscriptionPushConsum
.nodeUrls(builder.nodeUrls)
.username(builder.username)
.password(builder.password)
+ .encryptedPassword(builder.encryptedPassword)
.consumerId(builder.consumerId)
.consumerGroupId(builder.consumerGroupId)
.heartbeatIntervalMs(builder.heartbeatIntervalMs)
@@ -185,6 +188,7 @@ public class SubscriptionTreePushConsumer extends
AbstractSubscriptionPushConsum
private String username = SessionConfig.DEFAULT_USER;
private String password = SessionConfig.DEFAULT_PASSWORD;
+ private String encryptedPassword;
private String consumerId;
private String consumerGroupId;
@@ -231,6 +235,11 @@ public class SubscriptionTreePushConsumer extends
AbstractSubscriptionPushConsum
return this;
}
+ public Builder encryptedPassword(final String encryptedPassword) {
+ this.encryptedPassword = encryptedPassword;
+ return this;
+ }
+
public Builder consumerId(@Nullable final String consumerId) {
if (Objects.isNull(consumerId)) {
return this;
diff --git
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java
index dd0cb017637..86594433e77 100644
---
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java
+++
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java
@@ -58,6 +58,12 @@ public class SubscriptionTreePushConsumerBuilder extends
AbstractSubscriptionPus
return this;
}
+ @Override
+ public SubscriptionTreePushConsumerBuilder encryptedPassword(final String
encryptedPassword) {
+ super.encryptedPassword(encryptedPassword);
+ return this;
+ }
+
@Override
public SubscriptionTreePushConsumerBuilder consumerId(final String
consumerId) {
super.consumerId(consumerId);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 867d20e078a..98f6756db2d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -46,6 +46,7 @@ import
org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
import
org.apache.iotdb.confignode.procedure.impl.pipe.util.PipeExternalSourceLoadBalancer;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.pipe.api.PipePlugin;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
@@ -178,18 +179,30 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
||
sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY)
||
sourceParameters.hasAttribute(PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY)
||
sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)) {
- final String hashedPassword =
- env.getConfigManager()
- .getPermissionManager()
- .login4Pipe(
- sourceParameters.getStringByKeys(
- PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY,
- PipeSourceConstant.SOURCE_IOTDB_USER_KEY,
- PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY,
- PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY),
- sourceParameters.getStringByKeys(
- PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY,
- PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY));
+ final String username =
+ sourceParameters.getStringByKeys(
+ PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY,
+ PipeSourceConstant.SOURCE_IOTDB_USER_KEY,
+ PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY,
+ PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY);
+ final String password =
+ sourceParameters.getStringByKeys(
+ PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY,
+ PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY);
+ String hashedPassword = null;
+ if (Objects.nonNull(password)) {
+ final TPermissionInfoResp loginResp =
+ env.getConfigManager().getPermissionManager().login(username,
password, true);
+ if (Objects.nonNull(loginResp)
+ && Objects.nonNull(loginResp.getStatus())
+ && loginResp.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ hashedPassword = password;
+ }
+ }
+ if (Objects.isNull(hashedPassword)) {
+ hashedPassword =
+ env.getConfigManager().getPermissionManager().login4Pipe(username,
password);
+ }
if (Objects.isNull(hashedPassword)) {
throw new PipeException("Authentication failed.");
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
index cb5edd8cd91..867f04d02ea 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
@@ -117,7 +117,8 @@ public class CreateSubscriptionProcedure extends
AbstractOperateSubscriptionAndP
new TCreatePipeReq()
.setPipeName(pipeName)
.setExtractorAttributes(
-
topicMeta.generateExtractorAttributes(consumerMeta.getUsername()))
+ topicMeta.generateExtractorAttributes(
+ consumerMeta.getUsername(),
consumerMeta.getSubscriptionAuthPassword()))
.setProcessorAttributes(topicMeta.generateProcessorAttributes())
.setConnectorAttributes(topicMeta.generateConnectorAttributes(consumerGroupId)),
pipeTaskInfo));
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java
index 6c095de9852..ac6ca8c740c 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java
@@ -19,11 +19,19 @@
package org.apache.iotdb.confignode.procedure.impl.pipe.task;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.PermissionManager;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.utils.PublicBAOS;
import org.junit.Test;
+import org.mockito.Mockito;
import java.io.DataOutputStream;
import java.nio.ByteBuffer;
@@ -68,4 +76,57 @@ public class CreatePipeProcedureV2Test {
fail();
}
}
+
+ @Test
+ public void testCheckAndEnrichSourceAuthenticationWithEncryptedPassword() {
+ final ConfigNodeProcedureEnv env =
Mockito.mock(ConfigNodeProcedureEnv.class);
+ final ConfigManager configManager = Mockito.mock(ConfigManager.class);
+ final PermissionManager permissionManager =
Mockito.mock(PermissionManager.class);
+ Mockito.when(env.getConfigManager()).thenReturn(configManager);
+
Mockito.when(configManager.getPermissionManager()).thenReturn(permissionManager);
+
+ final TPermissionInfoResp loginResp = new TPermissionInfoResp();
+ loginResp.setStatus(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ Mockito.when(permissionManager.login("user", "encrypted-password",
true)).thenReturn(loginResp);
+
+ final Map<String, String> sourceAttributes = new HashMap<>();
+ sourceAttributes.put("extractor", "iotdb-source");
+ sourceAttributes.put("username", "user");
+ sourceAttributes.put("password", "encrypted-password");
+
+ CreatePipeProcedureV2.checkAndEnrichSourceAuthentication(env,
sourceAttributes);
+
+ assertEquals(
+ "encrypted-password",
sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY));
+ Mockito.verify(permissionManager).login("user", "encrypted-password",
true);
+ Mockito.verify(permissionManager, Mockito.never())
+ .login4Pipe(Mockito.anyString(), Mockito.any());
+ }
+
+ @Test
+ public void testCheckAndEnrichSourceAuthenticationFallsBackToRawPassword() {
+ final ConfigNodeProcedureEnv env =
Mockito.mock(ConfigNodeProcedureEnv.class);
+ final ConfigManager configManager = Mockito.mock(ConfigManager.class);
+ final PermissionManager permissionManager =
Mockito.mock(PermissionManager.class);
+ Mockito.when(env.getConfigManager()).thenReturn(configManager);
+
Mockito.when(configManager.getPermissionManager()).thenReturn(permissionManager);
+
+ final TPermissionInfoResp loginResp = new TPermissionInfoResp();
+ loginResp.setStatus(new
TSStatus(TSStatusCode.WRONG_LOGIN_PASSWORD.getStatusCode()));
+ Mockito.when(permissionManager.login("user", "raw-password",
true)).thenReturn(loginResp);
+ Mockito.when(permissionManager.login4Pipe("user", "raw-password"))
+ .thenReturn("hashed-password");
+
+ final Map<String, String> sourceAttributes = new HashMap<>();
+ sourceAttributes.put(PipeSourceConstant.SOURCE_KEY, "iotdb-source");
+ sourceAttributes.put(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY, "user");
+ sourceAttributes.put(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY,
"raw-password");
+
+ CreatePipeProcedureV2.checkAndEnrichSourceAuthentication(env,
sourceAttributes);
+
+ assertEquals(
+ "hashed-password",
sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY));
+ Mockito.verify(permissionManager).login("user", "raw-password", true);
+ Mockito.verify(permissionManager).login4Pipe("user", "raw-password");
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
index 152f0b111df..75cca9ccbcd 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
@@ -66,6 +66,14 @@ public class ConsumerMeta {
return config.getPassword();
}
+ public String getEncryptedPassword() {
+ return config.getEncryptedPassword();
+ }
+
+ public String getSubscriptionAuthPassword() {
+ return Objects.nonNull(getEncryptedPassword()) ? getEncryptedPassword() :
getPassword();
+ }
+
public ByteBuffer serialize() throws IOException {
PublicBAOS byteArrayOutputStream = new PublicBAOS();
DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
index d3836743578..badb77d6f48 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
@@ -183,6 +183,11 @@ public class TopicMeta {
/////////////////////////////// utilities ///////////////////////////////
public Map<String, String> generateExtractorAttributes(final String
username) {
+ return generateExtractorAttributes(username, null);
+ }
+
+ public Map<String, String> generateExtractorAttributes(
+ final String username, final String password) {
final Map<String, String> extractorAttributes = new HashMap<>();
// disable meta sync
extractorAttributes.put("source", "iotdb-source");
@@ -190,6 +195,9 @@ public class TopicMeta {
extractorAttributes.put("inclusion.exclusion", "data.delete");
// user
extractorAttributes.put("username", username);
+ if (Objects.nonNull(password)) {
+ extractorAttributes.put("password", password);
+ }
// TODO: currently set skipif to no-privileges
extractorAttributes.put("skipif", "no-privileges");
// sql dialect
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java
index 4973f06d27c..d9c280e1493 100644
---
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java
@@ -54,4 +54,15 @@ public class TopicDeSerTest {
Assert.assertEquals(
topicMeta.getSubscribedConsumerGroupIds(),
topicMeta2.getSubscribedConsumerGroupIds());
}
+
+ @Test
+ public void testGenerateExtractorAttributesWithEncryptedPassword() {
+ final TopicMeta topicMeta = new TopicMeta("test_topic", 1, new
HashMap<>());
+
+ final Map<String, String> extractorAttributes =
+ topicMeta.generateExtractorAttributes("test_user",
"encrypted-password");
+
+ Assert.assertEquals("test_user", extractorAttributes.get("username"));
+ Assert.assertEquals("encrypted-password",
extractorAttributes.get("password"));
+ }
}