This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 0657aff5a93 [To dev/1.3] Pipe: Introduce periodic verification for
username and password on receiver side (#14764) (#14790)
0657aff5a93 is described below
commit 0657aff5a93a20828a242a6c3228ce1c62323ac6
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Feb 5 17:17:08 2025 +0800
[To dev/1.3] Pipe: Introduce periodic verification for username and
password on receiver side (#14764) (#14790)
---
.../protocol/thrift/IoTDBDataNodeReceiver.java | 36 ++++++++++++++++------
.../apache/iotdb/commons/conf/CommonConfig.java | 12 ++++++++
.../iotdb/commons/conf/CommonDescriptor.java | 6 ++++
.../iotdb/commons/pipe/config/PipeConfig.java | 10 ++++++
4 files changed, 55 insertions(+), 9 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 6b6ea69de34..2817e8644ae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferSliceReqHandler;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
@@ -142,6 +143,9 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
private final PipeTransferSliceReqHandler sliceReqHandler = new
PipeTransferSliceReqHandler();
private static final SessionManager SESSION_MANAGER =
SessionManager.getInstance();
+ private static final long LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS =
+
PipeConfig.getInstance().getPipeReceiverLoginPeriodicVerificationIntervalMs();
+ private long lastSuccessfulLoginTime = Long.MIN_VALUE;
static {
try {
@@ -409,14 +413,23 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
@Override
protected TSStatus tryLogin() {
final IClientSession clientSession = SESSION_MANAGER.getCurrSession();
- if (clientSession == null || !clientSession.isLogin()) {
- return SESSION_MANAGER.login(
- SESSION_MANAGER.getCurrSession(),
- username,
- password,
- ZoneId.systemDefault().toString(),
- SessionManager.CURRENT_RPC_VERSION,
- IoTDBConstant.ClientVersion.V_1_0);
+ if (clientSession == null
+ || !clientSession.isLogin()
+ || (LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS >= 0
+ && lastSuccessfulLoginTime
+ < System.currentTimeMillis() -
LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS)) {
+ final TSStatus status =
+ SESSION_MANAGER.login(
+ SESSION_MANAGER.getCurrSession(),
+ username,
+ password,
+ ZoneId.systemDefault().toString(),
+ SessionManager.CURRENT_RPC_VERSION,
+ IoTDBConstant.ClientVersion.V_1_0);
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ lastSuccessfulLoginTime = System.currentTimeMillis();
+ }
+ return status;
}
return StatusUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@@ -675,7 +688,11 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
private TSStatus executeStatement(final Statement statement) {
IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
- if (clientSession == null || !clientSession.isLogin()) {
+ if (clientSession == null
+ || !clientSession.isLogin()
+ || (LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS >= 0
+ && lastSuccessfulLoginTime
+ < System.currentTimeMillis() -
LOGIN_PERIODIC_VERIFICATION_INTERVAL_MS)) {
final BasicOpenSessionResp openSessionResp =
SESSION_MANAGER.login(
SESSION_MANAGER.getCurrSession(),
@@ -692,6 +709,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
openSessionResp);
return RpcUtils.getStatus(openSessionResp.getCode(),
openSessionResp.getMessage());
}
+ lastSuccessfulLoginTime = System.currentTimeMillis();
clientSession = SESSION_MANAGER.getCurrSession();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index bb0fb08c1b8..50db0de3021 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -251,6 +251,8 @@ public class CommonConfig {
private boolean pipeAirGapReceiverEnabled = false;
private int pipeAirGapReceiverPort = 9780;
+ private long pipeReceiverLoginPeriodicVerificationIntervalMs = 300000;
+
private int pipeMaxAllowedHistoricalTsFilePerDataRegion = 100;
private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 2;
private int pipeMaxAllowedPinnedMemTableCount = 50;
@@ -992,6 +994,16 @@ public class CommonConfig {
return pipeAirGapReceiverPort;
}
+ public void setPipeReceiverLoginPeriodicVerificationIntervalMs(
+ long pipeReceiverLoginPeriodicVerificationIntervalMs) {
+ this.pipeReceiverLoginPeriodicVerificationIntervalMs =
+ pipeReceiverLoginPeriodicVerificationIntervalMs;
+ }
+
+ public long getPipeReceiverLoginPeriodicVerificationIntervalMs() {
+ return pipeReceiverLoginPeriodicVerificationIntervalMs;
+ }
+
public int getPipeMaxAllowedHistoricalTsFilePerDataRegion() {
return pipeMaxAllowedHistoricalTsFilePerDataRegion;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index ee9516ee7c8..c40429567c2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -479,6 +479,12 @@ public class CommonDescriptor {
"pipe_air_gap_receiver_port",
Integer.toString(config.getPipeAirGapReceiverPort()))));
+ config.setPipeReceiverLoginPeriodicVerificationIntervalMs(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_receiver_login_periodic_verification_interval_ms",
+
Long.toString(config.getPipeReceiverLoginPeriodicVerificationIntervalMs()))));
+
config.setPipeMaxAllowedHistoricalTsFilePerDataRegion(
Integer.parseInt(
properties.getProperty(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 4032bcc0af2..e652bff98f9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -224,6 +224,12 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeAirGapReceiverPort();
}
+ /////////////////////////////// Receiver ///////////////////////////////
+
+ public long getPipeReceiverLoginPeriodicVerificationIntervalMs() {
+ return COMMON_CONFIG.getPipeReceiverLoginPeriodicVerificationIntervalMs();
+ }
+
/////////////////////////////// Hybrid Mode ///////////////////////////////
public int getPipeMaxAllowedHistoricalTsFilePerDataRegion() {
@@ -434,6 +440,10 @@ public class PipeConfig {
LOGGER.info("PipeAirGapReceiverEnabled: {}",
getPipeAirGapReceiverEnabled());
LOGGER.info("PipeAirGapReceiverPort: {}", getPipeAirGapReceiverPort());
+ LOGGER.info(
+ "PipeReceiverLoginPeriodicVerificationIntervalMs: {}",
+ getPipeReceiverLoginPeriodicVerificationIntervalMs());
+
LOGGER.info(
"PipeMaxAllowedHistoricalTsFilePerDataRegion: {}",
getPipeMaxAllowedHistoricalTsFilePerDataRegion());