This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 2bfc35a43e8 Pipe: add information about sender's IP and port in the
pipe receiver logs (#14343) (#14351)
2bfc35a43e8 is described below
commit 2bfc35a43e8d6983cc37c00393aed18668a880f0
Author: nanxiang xia <[email protected]>
AuthorDate: Mon Dec 9 10:40:15 2024 +0800
Pipe: add information about sender's IP and port in the pipe receiver logs
(#14343) (#14351)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../pipe/receiver/protocol/IoTDBConfigNodeReceiver.java | 16 ++++++++++++++++
.../receiver/protocol/thrift/IoTDBDataNodeReceiver.java | 12 ++++++++++++
.../apache/iotdb/db/protocol/session/IClientSession.java | 2 +-
.../iotdb/db/protocol/session/RestClientSession.java | 2 +-
.../iotdb/commons/pipe/receiver/IoTDBFileReceiver.java | 8 +++++++-
5 files changed, 37 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index 8466f5fb16a..fe8b383b56f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -68,6 +68,8 @@ import
org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
import org.apache.iotdb.confignode.service.ConfigNode;
import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -91,6 +93,8 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBConfigNodeReceiver.class);
+ private static final SessionManager SESSION_MANAGER =
SessionManager.getInstance();
+
private static final AtomicInteger QUERY_ID_GENERATOR = new AtomicInteger(0);
private static final PipeConfigPhysicalPlanTSStatusVisitor STATUS_VISITOR =
@@ -329,6 +333,18 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
return
ConfigNodeDescriptor.getInstance().getConf().getPipeReceiverFileDir();
}
+ @Override
+ protected String getSenderHost() {
+ final IClientSession session = SESSION_MANAGER.getCurrSession();
+ return session != null ? session.getClientAddress() : "unknown";
+ }
+
+ @Override
+ protected String getSenderPort() {
+ final IClientSession session = SESSION_MANAGER.getCurrSession();
+ return session != null ? String.valueOf(session.getClientPort()) :
"unknown";
+ }
+
@Override
protected TSStatus loadFileV1(
final PipeTransferFileSealReqV1 req, final String fileAbsolutePath) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 5a510da0475..b6fea173df2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -410,6 +410,18 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
return Objects.isNull(folderManager) ? null :
folderManager.getNextFolder();
}
+ @Override
+ protected String getSenderHost() {
+ final IClientSession session = SESSION_MANAGER.getCurrSession();
+ return session != null ? session.getClientAddress() : "unknown";
+ }
+
+ @Override
+ protected String getSenderPort() {
+ final IClientSession session = SESSION_MANAGER.getCurrSession();
+ return session != null ? String.valueOf(session.getClientPort()) :
"unknown";
+ }
+
@Override
protected TSStatus loadFileV1(final PipeTransferFileSealReqV1 req, final
String fileAbsolutePath)
throws IOException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java
index c79d1573a0a..c8e0b96a7c6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java
@@ -45,7 +45,7 @@ public abstract class IClientSession {
public abstract String getClientAddress();
- abstract int getClientPort();
+ public abstract int getClientPort();
abstract TSConnectionType getConnectionType();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/RestClientSession.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/RestClientSession.java
index 30ca7509d60..fa830ace3fb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/RestClientSession.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/RestClientSession.java
@@ -38,7 +38,7 @@ public class RestClientSession extends IClientSession {
}
@Override
- int getClientPort() {
+ public int getClientPort() {
return 0;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 14bd34ebea8..be0f8f0fdd3 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -174,14 +174,20 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
receiverFileDirWithIdSuffix.set(newReceiverDir);
LOGGER.info(
- "Receiver id = {}: Handshake successfully, receiver file dir = {}.",
+ "Receiver id = {}: Handshake successfully! Sender's host = {}, port =
{}. Receiver's file dir = {}.",
receiverId.get(),
+ getSenderHost(),
+ getSenderPort(),
newReceiverDir.getPath());
return new TPipeTransferResp(RpcUtils.SUCCESS_STATUS);
}
protected abstract String getReceiverFileBaseDir() throws Exception;
+ protected abstract String getSenderHost();
+
+ protected abstract String getSenderPort();
+
protected TPipeTransferResp handleTransferHandshakeV2(final
PipeTransferHandshakeV2Req req)
throws IOException {
// Reject to handshake if the receiver can not take clusterId from config
node.