This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 971605ddbcd [To dev/1.3] Pipe: Applied the login function to config
receiver (#15258) & Pipe: Fixed the bug that config receiver cannot detect
runtime password change (#15621) (#15647)
971605ddbcd is described below
commit 971605ddbcda74127f2460cfde0a4ad220c29f64
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 5 17:21:08 2025 +0800
[To dev/1.3] Pipe: Applied the login function to config receiver (#15258) &
Pipe: Fixed the bug that config receiver cannot detect runtime password change
(#15621) (#15647)
* Pipe: Applied the login function to config receiver (#15258)
* Adjustment
---
.../receiver/protocol/IoTDBConfigNodeReceiver.java | 12 ++--
.../protocol/thrift/IoTDBDataNodeReceiver.java | 78 ++++++++--------------
.../commons/pipe/receiver/IoTDBFileReceiver.java | 31 ++++++++-
3 files changed, 63 insertions(+), 58 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index 0986e50b31a..0fd091b146e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -461,13 +461,17 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
@Override
protected String getClusterId() {
- return
ConfigNode.getInstance().getConfigManager().getClusterManager().getClusterId();
+ return configManager.getClusterManager().getClusterId();
}
@Override
- protected TSStatus tryLogin() {
- // Do nothing. Login check will be done in the data node receiver.
- return StatusUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ protected boolean shouldLogin() {
+ return lastSuccessfulLoginTime == Long.MIN_VALUE || super.shouldLogin();
+ }
+
+ @Override
+ protected TSStatus login() {
+ return configManager.login(username, password).getStatus();
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 975e8beb77d..7719a338fb6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -37,7 +37,6 @@ import
org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.commons.utils.RetryUtils;
-import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -147,8 +146,6 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
private static final SessionManager SESSION_MANAGER =
SessionManager.getInstance();
- private long lastSuccessfulLoginTime = Long.MIN_VALUE;
-
private PipeMemoryBlock allocatedMemoryBlock;
static {
@@ -415,29 +412,10 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
}
@Override
- protected TSStatus tryLogin() {
- final IClientSession clientSession = SESSION_MANAGER.getCurrSession();
- final long loginPeriodicVerificationIntervalMs =
-
PipeConfig.getInstance().getPipeReceiverLoginPeriodicVerificationIntervalMs();
- if (clientSession == null
- || !clientSession.isLogin()
- || (loginPeriodicVerificationIntervalMs >= 0
- && lastSuccessfulLoginTime
- < System.currentTimeMillis() -
loginPeriodicVerificationIntervalMs)) {
- final TSStatus status =
- SESSION_MANAGER.login(
- SESSION_MANAGER.getCurrSession(),
- username,
- password,
- ZoneId.systemDefault().toString(),
- SessionManager.CURRENT_RPC_VERSION,
- IoTDBConstant.ClientVersion.V_1_0);
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- lastSuccessfulLoginTime = System.currentTimeMillis();
- }
- return status;
- }
- return StatusUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ protected boolean shouldLogin() {
+ // The idle time is updated per request
+ final IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+ return clientSession == null || !clientSession.isLogin() ||
super.shouldLogin();
}
@Override
@@ -715,6 +693,9 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
}
final TSStatus status = executeStatement(statement);
+
+ // Try to convert data type if the statement is a tree model statement
+ // and the status code is not success
return shouldConvertDataTypeOnTypeMismatch
&& ((statement instanceof InsertBaseStatement
&& ((InsertBaseStatement)
statement).hasFailedMeasurements())
@@ -725,34 +706,14 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
}
private TSStatus executeStatement(final Statement statement) {
- IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
- final long loginPeriodicVerificationIntervalMs =
-
PipeConfig.getInstance().getPipeReceiverLoginPeriodicVerificationIntervalMs();
- if (clientSession == null
- || !clientSession.isLogin()
- || (loginPeriodicVerificationIntervalMs >= 0
- && lastSuccessfulLoginTime
- < System.currentTimeMillis() -
loginPeriodicVerificationIntervalMs)) {
- final BasicOpenSessionResp openSessionResp =
- SESSION_MANAGER.login(
- SESSION_MANAGER.getCurrSession(),
- username,
- password,
- ZoneId.systemDefault().toString(),
- SessionManager.CURRENT_RPC_VERSION,
- IoTDBConstant.ClientVersion.V_1_0);
- if (openSessionResp.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.warn(
- "Receiver id = {}: Failed to open session, username = {}, response
= {}.",
- receiverId.get(),
- username,
- openSessionResp);
- return RpcUtils.getStatus(openSessionResp.getCode(),
openSessionResp.getMessage());
- }
- lastSuccessfulLoginTime = System.currentTimeMillis();
- clientSession = SESSION_MANAGER.getCurrSession();
+ // Permission check
+ final TSStatus loginStatus = loginIfNecessary();
+ if (loginStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return loginStatus;
}
+ final IClientSession clientSession = SESSION_MANAGER.getCurrSession();
+
final TSStatus status = AuthorityChecker.checkAuthority(statement,
clientSession);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn(
@@ -777,6 +738,19 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
.status;
}
+ @Override
+ protected TSStatus login() {
+ final BasicOpenSessionResp openSessionResp =
+ SESSION_MANAGER.login(
+ SESSION_MANAGER.getCurrSession(),
+ username,
+ password,
+ ZoneId.systemDefault().toString(),
+ SessionManager.CURRENT_RPC_VERSION,
+ IoTDBConstant.ClientVersion.V_1_0);
+ return RpcUtils.getStatus(openSessionResp.getCode(),
openSessionResp.getMessage());
+ }
+
@Override
public synchronized void handleExit() {
if (Objects.nonNull(configReceiverId.get())) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index c1f728ccfa6..1014b52d169 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -74,6 +74,10 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
protected String username = CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
protected String password = CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
+ private static final long LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS =
+
PipeConfig.getInstance().getPipeReceiverLoginPeriodicVerificationIntervalMs();
+ protected long lastSuccessfulLoginTime = Long.MIN_VALUE;
+
private File writingFile;
private RandomAccessFile writingFileWriter;
@@ -260,7 +264,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
if (passwordString != null) {
password = passwordString;
}
- final TSStatus status = tryLogin();
+ final TSStatus status = loginIfNecessary();
if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn(
"Receiver id = {}: Handshake failed because login failed, response
status = {}.",
@@ -313,7 +317,30 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
protected abstract String getClusterId();
- protected abstract TSStatus tryLogin();
+ protected boolean shouldLogin() {
+ return LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS >= 0
+ && lastSuccessfulLoginTime
+ < System.currentTimeMillis() -
LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS;
+ }
+
+ protected TSStatus loginIfNecessary() {
+ if (shouldLogin()) {
+ final TSStatus permissionCheckStatus = login();
+ if (permissionCheckStatus.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.warn(
+ "Receiver id = {}: Failed to login, username = {}, response = {}.",
+ receiverId.get(),
+ username,
+ permissionCheckStatus);
+ return permissionCheckStatus;
+ } else {
+ lastSuccessfulLoginTime = System.currentTimeMillis();
+ }
+ }
+ return StatusUtils.OK;
+ }
+
+ protected abstract TSStatus login();
protected final TPipeTransferResp handleTransferFilePiece(
final PipeTransferFilePieceReq req,