This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 754a1502bac [To dev/1.3] Subscription: support encrypted password auth
in consumer builder (#17552)
754a1502bac is described below
commit 754a1502bacd2178088c9fca2fc191215cf953ba
Author: VGalaxies <[email protected]>
AuthorDate: Wed Apr 29 10:49:40 2026 +0800
[To dev/1.3] Subscription: support encrypted password auth in consumer
builder (#17552)
---
.../IoTDBEncryptedPasswordPullConsumerIT.java | 168 +++++++++++++++++++++
.../rpc/subscription/config/ConsumerConfig.java | 12 ++
.../rpc/subscription/config/ConsumerConstant.java | 1 +
.../consumer/SubscriptionConsumer.java | 22 +++
.../consumer/SubscriptionProvider.java | 12 ++
.../consumer/SubscriptionPullConsumer.java | 6 +
.../consumer/SubscriptionPushConsumer.java | 6 +
.../confignode/manager/PermissionManager.java | 9 ++
.../iotdb/confignode/persistence/AuthorInfo.java | 52 +++++++
.../impl/pipe/task/CreatePipeProcedureV2.java | 67 ++++++++
.../subscription/CreateSubscriptionProcedure.java | 9 +-
.../impl/pipe/task/CreatePipeProcedureV2Test.java | 61 ++++++++
.../pipe/config/constant/PipeSourceConstant.java | 7 +
.../meta/consumer/ConsumerGroupMeta.java | 4 +
.../subscription/meta/consumer/ConsumerMeta.java | 20 +++
.../commons/subscription/meta/topic/TopicMeta.java | 11 ++
.../commons/subscription/topic/TopicDeSerTest.java | 11 ++
17 files changed, 477 insertions(+), 1 deletion(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBEncryptedPasswordPullConsumerIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBEncryptedPasswordPullConsumerIT.java
new file mode 100644
index 00000000000..9987ebe9732
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBEncryptedPasswordPullConsumerIT.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.it.triple.regression.param;
+
+import org.apache.iotdb.commons.utils.AuthUtils;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import
org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionRegressionMisc;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
+import
org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
+
+import org.apache.thrift.TException;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2SubscriptionRegressionMisc.class})
+public class IoTDBEncryptedPasswordPullConsumerIT extends
AbstractSubscriptionRegressionIT {
+
+ private static final String DATABASE =
"root.TestEncryptedPasswordPullConsumer";
+ private static final String DEVICE = DATABASE + ".d_0";
+ private static final String TOPIC_NAME =
"TestEncryptedPasswordPullConsumerTopic";
+ private static final String USERNAME = "encrypted_user";
+ private static final String PASSWORD = "EncryptedUser@123";
+ private static final String ENCRYPTED_PASSWORD =
AuthUtils.encryptPassword(PASSWORD);
+ private static final String WRONG_ENCRYPTED_PASSWORD =
+ AuthUtils.encryptPassword("WrongEncryptedUser@123");
+
+ private static final List<MeasurementSchema> SCHEMA_LIST = new ArrayList<>();
+
+ static {
+ SCHEMA_LIST.add(new MeasurementSchema("s_0", TSDataType.INT64));
+ SCHEMA_LIST.add(new MeasurementSchema("s_1", TSDataType.DOUBLE));
+ }
+
+ private SubscriptionPullConsumer consumer;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ createDB(DATABASE);
+ createTopic_s(TOPIC_NAME, "root.**", null, null, false);
+ session_src.createTimeseries(
+ DEVICE + ".s_0", TSDataType.INT64, TSEncoding.GORILLA,
CompressionType.LZ4);
+ session_src.createTimeseries(
+ DEVICE + ".s_1", TSDataType.DOUBLE, TSEncoding.TS_2DIFF,
CompressionType.LZ4);
+ session_dest.createTimeseries(
+ DEVICE + ".s_0", TSDataType.INT64, TSEncoding.GORILLA,
CompressionType.LZ4);
+ session_dest.createTimeseries(
+ DEVICE + ".s_1", TSDataType.DOUBLE, TSEncoding.TS_2DIFF,
CompressionType.LZ4);
+ session_src.executeNonQueryStatement("create user " + USERNAME + " '" +
PASSWORD + "'");
+ session_src.executeNonQueryStatement("grant read,write on root.** to user
" + USERNAME);
+ Assert.assertTrue(subs.getTopic(TOPIC_NAME).isPresent());
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ try {
+ if (consumer != null) {
+ consumer.close();
+ }
+ } catch (final Exception ignored) {
+ }
+ try {
+ subs.dropTopic(TOPIC_NAME);
+ } catch (final Exception ignored) {
+ }
+ try {
+ session_src.executeNonQueryStatement("drop user " + USERNAME);
+ } catch (final Exception ignored) {
+ }
+ dropDB(DATABASE);
+ super.tearDown();
+ }
+
+ @Test
+ public void testSubscribeWithEncryptedPassword()
+ throws TException,
+ IoTDBConnectionException,
+ IOException,
+ StatementExecutionException,
+ InterruptedException {
+ consumer = createConsumer("encrypted-password-group", ENCRYPTED_PASSWORD);
+
+ consumer.open();
+ consumer.subscribe(TOPIC_NAME);
+ Assert.assertEquals(1, subs.getSubscriptions().size());
+
+ insertData(1706659200000L);
+ consume_data(consumer, session_dest);
+ check_count(
+ 4,
+ "select count(s_0) from " + DEVICE + " where time >= 1706659200000",
+ "encrypted password consumption");
+ }
+
+ @Test
+ public void testSubscribeFailsWithWrongEncryptedPassword()
+ throws IoTDBConnectionException, StatementExecutionException {
+ consumer = createConsumer("wrong-encrypted-password-group",
WRONG_ENCRYPTED_PASSWORD);
+
+ try {
+ consumer.open();
+ consumer.subscribe(TOPIC_NAME);
+ Assert.fail("subscribe should fail when encrypted password mismatches");
+ } catch (final Exception ignored) {
+ Assert.assertTrue(subs.getSubscriptions().isEmpty());
+ }
+ }
+
+ private SubscriptionPullConsumer createConsumer(
+ final String consumerGroupId, final String encryptedPassword) {
+ return new SubscriptionPullConsumer.Builder()
+ .host(SRC_HOST)
+ .port(SRC_PORT)
+ .username(USERNAME)
+ .encryptedPassword(encryptedPassword)
+ .consumerId("consumer_" + consumerGroupId)
+ .consumerGroupId(consumerGroupId)
+ .buildPullConsumer();
+ }
+
+ private void insertData(long timestamp)
+ throws IoTDBConnectionException, StatementExecutionException {
+ final Tablet tablet = new Tablet(DEVICE, SCHEMA_LIST, 10);
+ for (int row = 0; row < 5; row++) {
+ final int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, timestamp);
+ tablet.addValue("s_0", rowIndex, row * 20L + row);
+ tablet.addValue("s_1", rowIndex, row + 2.45);
+ timestamp += row * 2000;
+ }
+ session_src.insertTablet(tablet);
+ }
+}
diff --git
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
index 5094ae7eea4..97c35baf8f4 100644
---
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
+++
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
@@ -68,6 +68,18 @@ public class ConsumerConfig extends PipeParameters {
return getString(ConsumerConstant.CONSUMER_GROUP_ID_KEY);
}
+ public String getUsername() {
+ return getString(ConsumerConstant.USERNAME_KEY);
+ }
+
+ public String getPassword() {
+ return getString(ConsumerConstant.PASSWORD_KEY);
+ }
+
+ public String getEncryptedPassword() {
+ return getString(ConsumerConstant.ENCRYPTED_PASSWORD_KEY);
+ }
+
public long getHeartbeatIntervalMs() {
return getLongOrDefault(
ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY,
diff --git
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
index 504893b80ed..21b13e5d3f1 100644
---
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
+++
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
@@ -31,6 +31,7 @@ public class ConsumerConstant {
public static final String USERNAME_KEY = "username";
public static final String PASSWORD_KEY = "password";
+ public static final String ENCRYPTED_PASSWORD_KEY = "encrypted-password";
public static final String CONSUMER_ID_KEY = "consumer-id";
public static final String CONSUMER_GROUP_ID_KEY = "group-id";
diff --git
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index 92c6b2c9a3e..56f1c2e9b9b 100644
---
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -100,6 +100,7 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
private final String username;
private final String password;
+ private final String encryptedPassword;
protected String consumerId;
protected String consumerGroupId;
@@ -177,6 +178,7 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
this.username = builder.username;
this.password = builder.password;
+ this.encryptedPassword = builder.encryptedPassword;
this.consumerId = builder.consumerId;
this.consumerGroupId = builder.consumerGroupId;
@@ -206,6 +208,7 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
(String)
properties.getOrDefault(
ConsumerConstant.PASSWORD_KEY,
SessionConfig.DEFAULT_PASSWORD))
+ .encryptedPassword((String)
properties.get(ConsumerConstant.ENCRYPTED_PASSWORD_KEY))
.consumerId((String)
properties.get(ConsumerConstant.CONSUMER_ID_KEY))
.consumerGroupId((String)
properties.get(ConsumerConstant.CONSUMER_GROUP_ID_KEY))
.heartbeatIntervalMs(
@@ -386,6 +389,7 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
endPoint,
this.username,
this.password,
+ this.encryptedPassword,
this.consumerId,
this.consumerGroupId,
this.thriftMaxFrameSize,
@@ -1401,6 +1405,7 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
protected String username = SessionConfig.DEFAULT_USER;
protected String password = SessionConfig.DEFAULT_PASSWORD;
+ protected String encryptedPassword;
protected String consumerId;
protected String consumerGroupId;
@@ -1437,10 +1442,27 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
}
public Builder password(final String password) {
+ if (!Objects.equals(password, SessionConfig.DEFAULT_PASSWORD)
+ && Objects.nonNull(this.encryptedPassword)) {
+ throw new IllegalStateException(
+ "password and encryptedPassword are mutually exclusive;
encryptedPassword is already set");
+ }
this.password = password;
return this;
}
+ public Builder encryptedPassword(final String encryptedPassword) {
+ if (Objects.isNull(encryptedPassword)) {
+ return this;
+ }
+ if (!Objects.equals(this.password, SessionConfig.DEFAULT_PASSWORD)) {
+ throw new IllegalStateException(
+ "password and encryptedPassword are mutually exclusive; password
is already set");
+ }
+ this.encryptedPassword = encryptedPassword;
+ return this;
+ }
+
public Builder consumerId(@Nullable final String consumerId) {
if (Objects.isNull(consumerId)) {
return this;
diff --git
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
index 9f153efeb10..25d9c5da09e 100644
---
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
+++
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
@@ -86,11 +86,15 @@ final class SubscriptionProvider extends
SubscriptionSession {
private final long heartbeatIntervalMs;
private final int connectionTimeoutInMs;
private int dataNodeId;
+ private final String username;
+ private final String password;
+ private final String encryptedPassword;
SubscriptionProvider(
final TEndPoint endPoint,
final String username,
final String password,
+ final String encryptedPassword,
final String consumerId,
final String consumerGroupId,
final int thriftMaxFrameSize,
@@ -101,6 +105,9 @@ final class SubscriptionProvider extends
SubscriptionSession {
this.endPoint = endPoint;
this.consumerId = consumerId;
this.consumerGroupId = consumerGroupId;
+ this.username = username;
+ this.password = password;
+ this.encryptedPassword = encryptedPassword;
this.heartbeatIntervalMs = heartbeatIntervalMs;
this.connectionTimeoutInMs = connectionTimeoutInMs;
}
@@ -149,6 +156,11 @@ final class SubscriptionProvider extends
SubscriptionSession {
final Map<String, String> consumerAttributes = new HashMap<>();
consumerAttributes.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY,
consumerGroupId);
consumerAttributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
+ consumerAttributes.put(ConsumerConstant.USERNAME_KEY, username);
+ consumerAttributes.put(ConsumerConstant.PASSWORD_KEY, password);
+ if (encryptedPassword != null) {
+ consumerAttributes.put(ConsumerConstant.ENCRYPTED_PASSWORD_KEY,
encryptedPassword);
+ }
consumerAttributes.put(
ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY,
String.valueOf(heartbeatIntervalMs));
consumerAttributes.put(
diff --git
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
index a77716fe02c..227843811b1 100644
---
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
+++
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
@@ -329,6 +329,12 @@ public class SubscriptionPullConsumer extends
SubscriptionConsumer {
return this;
}
+ @Override
+ public Builder encryptedPassword(final String encryptedPassword) {
+ super.encryptedPassword(encryptedPassword);
+ return this;
+ }
+
@Override
public Builder consumerId(final String consumerId) {
super.consumerId(consumerId);
diff --git
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
index 2a413278090..b70633bef35 100644
---
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
+++
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
@@ -257,6 +257,12 @@ public class SubscriptionPushConsumer extends
SubscriptionConsumer {
return this;
}
+ @Override
+ public Builder encryptedPassword(final String encryptedPassword) {
+ super.encryptedPassword(encryptedPassword);
+ return this;
+ }
+
@Override
public Builder consumerId(final String consumerId) {
super.consumerId(consumerId);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
index 9592692d1bc..b4f31d84023 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
@@ -110,6 +110,15 @@ public class PermissionManager {
return authorInfo.login(username, password);
}
+ public TPermissionInfoResp login(
+ final String username, final String password, final boolean
useEncryptedPassword) {
+ return authorInfo.login(username, password, useEncryptedPassword);
+ }
+
+ public String login4Pipe(final String userName, final String password) {
+ return authorInfo.login4Pipe(userName, password);
+ }
+
public TPermissionInfoResp checkUserPrivileges(
String username, List<PartialPath> paths, int permission) {
return authorInfo.checkUserPrivileges(username, paths, permission);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java
index e9e1058d79b..3bd17b5b5b7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java
@@ -113,6 +113,58 @@ public class AuthorInfo implements SnapshotProcessor {
return result;
}
+ public TPermissionInfoResp login(
+ final String username, final String password, final boolean
useEncryptedPassword) {
+ if (!useEncryptedPassword) {
+ return login(username, password);
+ }
+
+ boolean status = false;
+ String loginMessage = null;
+ TSStatus tsStatus = new TSStatus();
+ TPermissionInfoResp result = new TPermissionInfoResp();
+ try {
+ final User user = authorizer.getUser(username);
+ status = user != null && password != null &&
password.equals(user.getPassword());
+ if (status) {
+ result = getUserPermissionInfo(username);
+ result.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS,
"Login successfully"));
+ } else {
+ result = AuthUtils.generateEmptyPermissionInfoResp();
+ }
+ } catch (AuthException e) {
+ LOGGER.error("meet error while logging in.", e);
+ loginMessage = e.getMessage();
+ }
+ if (!status) {
+ tsStatus.setMessage(loginMessage != null ? loginMessage :
"Authentication failed.");
+ tsStatus.setCode(TSStatusCode.WRONG_LOGIN_PASSWORD.getStatusCode());
+ result.setStatus(tsStatus);
+ }
+ return result;
+ }
+
+ public String login4Pipe(final String username, final String password) {
+ try {
+ final User user = authorizer.getUser(username);
+ if (user == null) {
+ return null;
+ }
+ if (password == null) {
+ return user.getPassword();
+ }
+ final TPermissionInfoResp loginResp = login(username, password);
+ if (loginResp.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && loginResp.isSetUserInfo()) {
+ return loginResp.getUserInfo().getPassword();
+ }
+ return null;
+ } catch (final AuthException e) {
+ LOGGER.error("meet error while logging in for pipe.", e);
+ return null;
+ }
+ }
+
// if All paths fail, return No permission
// if some paths fail, return SUCCESS and failed index list
// if all path success, return success and empty index list
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 26e9eb9bba1..ad18cc95bb0 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -25,11 +25,13 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
+import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType;
+import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
@@ -41,8 +43,10 @@ import
org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProced
import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.pipe.api.PipePlugin;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -53,6 +57,8 @@ import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@@ -136,9 +142,70 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
createPipeRequest.getProcessorAttributes(),
createPipeRequest.getConnectorAttributes());
+ checkAndEnrichSourceAuthentication(env,
createPipeRequest.getExtractorAttributes());
+
return pipeTaskInfo.get().checkBeforeCreatePipe(createPipeRequest);
}
+ public static void checkAndEnrichSourceAuthentication(
+ final ConfigNodeProcedureEnv env, final Map<String, String>
sourceAttributes) {
+ if (Objects.isNull(sourceAttributes)) {
+ return;
+ }
+ final PipeParameters sourceParameters = new
PipeParameters(sourceAttributes);
+
+ final String pluginName =
+ sourceParameters
+ .getStringOrDefault(
+ Arrays.asList(PipeSourceConstant.EXTRACTOR_KEY,
PipeSourceConstant.SOURCE_KEY),
+ BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+ .toLowerCase();
+
+ if
(!pluginName.equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+ &&
!pluginName.equals(BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName())) {
+ return;
+ }
+
+ if
(sourceParameters.hasAttribute(PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY)
+ ||
sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_USER_KEY)
+ ||
sourceParameters.hasAttribute(PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY)
+ ||
sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY)
+ ||
sourceParameters.hasAttribute(PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY)
+ ||
sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)) {
+ final String username =
+ sourceParameters.getStringByKeys(
+ PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY,
+ PipeSourceConstant.SOURCE_IOTDB_USER_KEY,
+ PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY,
+ PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY);
+ final String password =
+ sourceParameters.getStringByKeys(
+ PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY,
+ PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY);
+ String hashedPassword = null;
+ if (Objects.nonNull(password)) {
+ final TPermissionInfoResp loginResp =
+ env.getConfigManager().getPermissionManager().login(username,
password, true);
+ if (Objects.nonNull(loginResp)
+ && Objects.nonNull(loginResp.getStatus())
+ && loginResp.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ hashedPassword = password;
+ }
+ }
+ if (Objects.isNull(hashedPassword)) {
+ hashedPassword =
+ env.getConfigManager().getPermissionManager().login4Pipe(username,
password);
+ }
+ if (Objects.isNull(hashedPassword)) {
+ throw new PipeException("Authentication failed.");
+ }
+ sourceParameters.addOrReplaceEquivalentAttributes(
+ new PipeParameters(
+ Collections.singletonMap(
+ PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY,
hashedPassword)));
+ }
+ }
+
@Override
public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
LOGGER.info(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
index 4a48ebdd35d..80e4e511554 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.confignode.procedure.impl.subscription.subscription;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
+import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerMeta;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
@@ -89,7 +90,11 @@ public class CreateSubscriptionProcedure extends
AbstractOperateSubscriptionAndP
subscriptionInfo.get().validateBeforeSubscribe(subscribeReq);
// Construct AlterConsumerGroupProcedure
+ final String consumerId = subscribeReq.getConsumerId();
final String consumerGroupId = subscribeReq.getConsumerGroupId();
+ final ConsumerGroupMeta consumerGroupMeta =
+ subscriptionInfo.get().getConsumerGroupMeta(consumerGroupId);
+ final ConsumerMeta consumerMeta =
consumerGroupMeta.getConsumerMeta(consumerId);
final ConsumerGroupMeta updatedConsumerGroupMeta =
subscriptionInfo.get().deepCopyConsumerGroupMeta(consumerGroupId);
updatedConsumerGroupMeta.addSubscription(
@@ -110,7 +115,9 @@ public class CreateSubscriptionProcedure extends
AbstractOperateSubscriptionAndP
new CreatePipeProcedureV2(
new TCreatePipeReq()
.setPipeName(pipeName)
-
.setExtractorAttributes(topicMeta.generateExtractorAttributes())
+ .setExtractorAttributes(
+ topicMeta.generateExtractorAttributes(
+ consumerMeta.getUsername(),
consumerMeta.getSubscriptionAuthPassword()))
.setProcessorAttributes(topicMeta.generateProcessorAttributes())
.setConnectorAttributes(topicMeta.generateConnectorAttributes(consumerGroupId)),
pipeTaskInfo));
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java
index 6c095de9852..ac6ca8c740c 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java
@@ -19,11 +19,19 @@
package org.apache.iotdb.confignode.procedure.impl.pipe.task;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.PermissionManager;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.utils.PublicBAOS;
import org.junit.Test;
+import org.mockito.Mockito;
import java.io.DataOutputStream;
import java.nio.ByteBuffer;
@@ -68,4 +76,57 @@ public class CreatePipeProcedureV2Test {
fail();
}
}
+
+ @Test
+ public void testCheckAndEnrichSourceAuthenticationWithEncryptedPassword() {
+ final ConfigNodeProcedureEnv env =
Mockito.mock(ConfigNodeProcedureEnv.class);
+ final ConfigManager configManager = Mockito.mock(ConfigManager.class);
+ final PermissionManager permissionManager =
Mockito.mock(PermissionManager.class);
+ Mockito.when(env.getConfigManager()).thenReturn(configManager);
+
Mockito.when(configManager.getPermissionManager()).thenReturn(permissionManager);
+
+ final TPermissionInfoResp loginResp = new TPermissionInfoResp();
+ loginResp.setStatus(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ Mockito.when(permissionManager.login("user", "encrypted-password",
true)).thenReturn(loginResp);
+
+ final Map<String, String> sourceAttributes = new HashMap<>();
+ sourceAttributes.put("extractor", "iotdb-source");
+ sourceAttributes.put("username", "user");
+ sourceAttributes.put("password", "encrypted-password");
+
+ CreatePipeProcedureV2.checkAndEnrichSourceAuthentication(env,
sourceAttributes);
+
+ assertEquals(
+ "encrypted-password",
sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY));
+ Mockito.verify(permissionManager).login("user", "encrypted-password",
true);
+ Mockito.verify(permissionManager, Mockito.never())
+ .login4Pipe(Mockito.anyString(), Mockito.any());
+ }
+
+ @Test
+ public void testCheckAndEnrichSourceAuthenticationFallsBackToRawPassword() {
+ final ConfigNodeProcedureEnv env =
Mockito.mock(ConfigNodeProcedureEnv.class);
+ final ConfigManager configManager = Mockito.mock(ConfigManager.class);
+ final PermissionManager permissionManager =
Mockito.mock(PermissionManager.class);
+ Mockito.when(env.getConfigManager()).thenReturn(configManager);
+
Mockito.when(configManager.getPermissionManager()).thenReturn(permissionManager);
+
+ final TPermissionInfoResp loginResp = new TPermissionInfoResp();
+ loginResp.setStatus(new
TSStatus(TSStatusCode.WRONG_LOGIN_PASSWORD.getStatusCode()));
+ Mockito.when(permissionManager.login("user", "raw-password",
true)).thenReturn(loginResp);
+ Mockito.when(permissionManager.login4Pipe("user", "raw-password"))
+ .thenReturn("hashed-password");
+
+ final Map<String, String> sourceAttributes = new HashMap<>();
+ sourceAttributes.put(PipeSourceConstant.SOURCE_KEY, "iotdb-source");
+ sourceAttributes.put(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY, "user");
+ sourceAttributes.put(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY,
"raw-password");
+
+ CreatePipeProcedureV2.checkAndEnrichSourceAuthentication(env,
sourceAttributes);
+
+ assertEquals(
+ "hashed-password",
sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY));
+ Mockito.verify(permissionManager).login("user", "raw-password", true);
+ Mockito.verify(permissionManager).login4Pipe("user", "raw-password");
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
index e9adf64bc03..aba780fb0d1 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
@@ -24,6 +24,13 @@ public class PipeSourceConstant {
public static final String EXTRACTOR_KEY = "extractor";
public static final String SOURCE_KEY = "source";
+ public static final String EXTRACTOR_IOTDB_USER_KEY = "extractor.user";
+ public static final String SOURCE_IOTDB_USER_KEY = "source.user";
+ public static final String EXTRACTOR_IOTDB_USERNAME_KEY =
"extractor.username";
+ public static final String SOURCE_IOTDB_USERNAME_KEY = "source.username";
+ public static final String EXTRACTOR_IOTDB_PASSWORD_KEY =
"extractor.password";
+ public static final String SOURCE_IOTDB_PASSWORD_KEY = "source.password";
+
public static final String EXTRACTOR_INCLUSION_KEY = "extractor.inclusion";
public static final String SOURCE_INCLUSION_KEY = "source.inclusion";
public static final String EXTRACTOR_INCLUSION_DEFAULT_VALUE = "data.insert";
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
index b316c5f155d..498f3427690 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
@@ -136,6 +136,10 @@ public class ConsumerGroupMeta {
return consumerIdToConsumerMeta.containsKey(consumerId);
}
+ public ConsumerMeta getConsumerMeta(final String consumerId) {
+ return consumerIdToConsumerMeta.get(consumerId);
+ }
+
public boolean isEmpty() {
// When there are no consumers in a consumer group, it means that the
ConsumerGroupMeta is
// empty, and at this time, the topicNameToSubscribedConsumerIdSet is also
empty.
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
index f1bb9b46085..75cca9ccbcd 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
@@ -54,6 +54,26 @@ public class ConsumerMeta {
return consumerId;
}
+ public String getConsumerGroupId() {
+ return config.getConsumerGroupId();
+ }
+
+ public String getUsername() {
+ return config.getUsername();
+ }
+
+ public String getPassword() {
+ return config.getPassword();
+ }
+
+ public String getEncryptedPassword() {
+ return config.getEncryptedPassword();
+ }
+
+ public String getSubscriptionAuthPassword() {
+ return Objects.nonNull(getEncryptedPassword()) ? getEncryptedPassword() :
getPassword();
+ }
+
public ByteBuffer serialize() throws IOException {
PublicBAOS byteArrayOutputStream = new PublicBAOS();
DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
index 93cd87e4ae3..ee9177112e7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
@@ -181,11 +181,22 @@ public class TopicMeta {
/////////////////////////////// utilities ///////////////////////////////
public Map<String, String> generateExtractorAttributes() {
+ return generateExtractorAttributes(null, null);
+ }
+
+ public Map<String, String> generateExtractorAttributes(
+ final String username, final String password) {
final Map<String, String> extractorAttributes = new HashMap<>();
// disable meta sync
extractorAttributes.put("source", "iotdb-source");
extractorAttributes.put("inclusion", "data.insert");
extractorAttributes.put("inclusion.exclusion", "data.delete");
+ if (Objects.nonNull(username)) {
+ extractorAttributes.put("username", username);
+ }
+ if (Objects.nonNull(password)) {
+ extractorAttributes.put("password", password);
+ }
// path
extractorAttributes.putAll(config.getAttributesWithPathOrPattern());
// time
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java
index 4973f06d27c..d9c280e1493 100644
---
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java
@@ -54,4 +54,15 @@ public class TopicDeSerTest {
Assert.assertEquals(
topicMeta.getSubscribedConsumerGroupIds(),
topicMeta2.getSubscribedConsumerGroupIds());
}
+
+ @Test
+ public void testGenerateExtractorAttributesWithEncryptedPassword() {
+ final TopicMeta topicMeta = new TopicMeta("test_topic", 1, new
HashMap<>());
+
+ final Map<String, String> extractorAttributes =
+ topicMeta.generateExtractorAttributes("test_user",
"encrypted-password");
+
+ Assert.assertEquals("test_user", extractorAttributes.get("username"));
+ Assert.assertEquals("encrypted-password",
extractorAttributes.get("password"));
+ }
}