This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch feat/show-receivers
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit dd112cf35bd759f301d801fa6c09a993cddb654b
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jun 8 17:53:51 2026 +0800

    Add show receivers support
---
 .../org/apache/iotdb/db/qp/sql/IdentifierParser.g4 |   3 +-
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |   6 +-
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |   6 +-
 .../receiver/protocol/IoTDBConfigNodeReceiver.java |  41 ++-
 .../sink/protocol/IoTDBConfigRegionAirGapSink.java |   1 +
 .../protocol/thrift/IoTDBDataNodeReceiver.java     | 222 ++++++++++++---
 .../client/IoTDBDataNodeAsyncClientManager.java    |   1 +
 .../protocol/airgap/IoTDBDataNodeAirGapSink.java   |   1 +
 .../thrift/async/IoTDBDataRegionAsyncSink.java     |   2 +
 .../common/header/DatasetHeaderFactory.java        |   4 +
 .../operator/source/ShowReceiversOperator.java     | 161 +++++++++++
 .../InformationSchemaContentSupplierFactory.java   |  49 ++++
 .../db/queryengine/plan/analyze/Analysis.java      |   2 +
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |  29 ++
 .../db/queryengine/plan/parser/ASTVisitor.java     |   6 +
 .../plan/planner/LogicalPlanBuilder.java           |  39 +++
 .../plan/planner/LogicalPlanVisitor.java           |   7 +
 .../plan/planner/OperatorTreeGenerator.java        |  15 +
 .../SimpleFragmentParallelPlanner.java             |   2 +
 .../plan/node/DataNodePlanNodeDeserializer.java    |   3 +
 .../plan/planner/plan/node/PlanVisitor.java        |   5 +
 .../plan/node/source/ShowReceiversNode.java        | 115 ++++++++
 .../DataNodeLocationSupplierFactory.java           |   1 +
 .../security/TreeAccessCheckVisitor.java           |   7 +
 .../plan/relational/sql/parser/AstBuilder.java     |  11 +
 .../queryengine/plan/statement/StatementType.java  |   1 +
 .../plan/statement/StatementVisitor.java           |   5 +
 .../plan/statement/sys/ShowReceiversStatement.java |  60 ++++
 .../plan/parser/StatementGeneratorTest.java        |  11 +
 .../planner/node/source/SourceNodeSerdeTest.java   |  11 +
 .../informationschema/ShowReceiversTest.java       |  53 ++++
 .../commons/pipe/receiver/IoTDBFileReceiver.java   | 100 ++++++-
 .../runtime/PipeReceiverRuntimeRegistry.java       | 315 +++++++++++++++++++++
 .../runtime/PipeReceiverRuntimeSnapshot.java       | 118 ++++++++
 .../pipe/sink/client/IoTDBClientManager.java       |  20 ++
 .../pipe/sink/client/IoTDBSyncClientManager.java   |   1 +
 .../common/PipeTransferHandshakeConstant.java      |   2 +
 .../commons/pipe/sink/protocol/IoTDBSink.java      |  15 +
 .../pipe/sink/protocol/IoTDBSslSyncSink.java       |   1 +
 .../plan/planner/plan/node/PlanNodeType.java       |   1 +
 .../schema/column/ColumnHeaderConstant.java        |  44 +++
 .../commons/schema/table/InformationSchema.java    |  36 +++
 .../runtime/PipeReceiverRuntimeRegistryTest.java   | 127 +++++++++
 .../db/relational/grammar/sql/RelationalSql.g4     |   8 +-
 44 files changed, 1612 insertions(+), 56 deletions(-)

diff --git 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
index 54d53bab674..f16bf85bc76 100644
--- 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
+++ 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
@@ -196,6 +196,7 @@ keyWords
     | QUERY
     | QUERYID
     | QUOTA
+    | RECEIVERS
     | RANGE
     | READONLY
     | READ
@@ -298,4 +299,4 @@ keyWords
     | OPTION
     | INF
     | CURRENT_TIMESTAMP
-    ;
\ No newline at end of file
+    ;
diff --git 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 3e94667a2fa..dcad209354c 100644
--- 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -55,7 +55,7 @@ ddlStatement
     // ExternalService
     | createService | startService | stopService | dropService | showService
     // Pipe Task
-    | createPipe | alterPipe | dropPipe | startPipe | stopPipe | showPipes
+    | createPipe | alterPipe | dropPipe | startPipe | stopPipe | showPipes | 
showReceivers
     // Pipe Plugin
     | createPipePlugin | dropPipePlugin | showPipePlugins
     // Subscription
@@ -701,6 +701,10 @@ showPipes
     : SHOW ((PIPE pipeName=identifier) | PIPES (WHERE (CONNECTOR | SINK) USED 
BY pipeName=identifier)?)
     ;
 
+showReceivers
+    : SHOW RECEIVERS
+    ;
+
 // Pipe Plugin 
=========================================================================================
 createPipePlugin
     : CREATE PIPEPLUGIN (IF NOT EXISTS)? pluginName=identifier AS 
className=STRING_LITERAL uriClause
diff --git 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index 59bac72fc78..4f96e1f69a3 100644
--- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -723,6 +723,10 @@ QUOTA
     : Q U O T A
     ;
 
+RECEIVERS
+    : R E C E I V E R S
+    ;
+
 RANGE
     : R A N G E
     ;
@@ -1411,4 +1415,4 @@ fragment V: [vV];
 fragment W: [wW];
 fragment X: [xX];
 fragment Y: [yY];
-fragment Z: [zZ];
\ No newline at end of file
+fragment Z: [zZ];
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index 7de8e8bf78d..9d84e448c9f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -35,6 +35,7 @@ import 
org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
 import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
+import 
org.apache.iotdb.commons.pipe.receiver.runtime.PipeReceiverRuntimeRegistry;
 import 
org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferCompressedReq;
@@ -194,20 +195,22 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
                     
PipeTransferConfigNodeHandshakeV1Req.fromTPipeTransferReq(req));
             PipeConfigNodeReceiverMetrics.getInstance()
                 .recordHandshakeConfigNodeV1Timer(System.nanoTime() - 
startTime);
-            return resp;
+            return recordConfigNodeHandshakeIfSuccess(resp, req);
           case HANDSHAKE_CONFIGNODE_V2:
             resp =
                 handleTransferHandshakeV2(
                     
PipeTransferConfigNodeHandshakeV2Req.fromTPipeTransferReq(req));
-            userEntity.setAuditLogOperation(AuditLogOperation.DDL);
+            if (Objects.nonNull(userEntity)) {
+              userEntity.setAuditLogOperation(AuditLogOperation.DDL);
+            }
             PipeConfigNodeReceiverMetrics.getInstance()
                 .recordHandshakeConfigNodeV2Timer(System.nanoTime() - 
startTime);
-            return resp;
+            return recordConfigNodeHandshakeIfSuccess(resp, req);
           case TRANSFER_CONFIG_PLAN:
             resp = 
handleTransferConfigPlan(PipeTransferConfigPlanReq.fromTPipeTransferReq(req));
             PipeConfigNodeReceiverMetrics.getInstance()
                 .recordTransferConfigPlanTimer(System.nanoTime() - startTime);
-            return resp;
+            return recordConfigNodeTransferIfSuccess(resp);
           case TRANSFER_CONFIG_SNAPSHOT_PIECE:
             resp =
                 handleTransferFilePiece(
@@ -216,14 +219,14 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
                     false);
             PipeConfigNodeReceiverMetrics.getInstance()
                 .recordTransferConfigSnapshotPieceTimer(System.nanoTime() - 
startTime);
-            return resp;
+            return recordConfigNodeTransferIfSuccess(resp);
           case TRANSFER_CONFIG_SNAPSHOT_SEAL:
             resp =
                 handleTransferFileSealV2(
                     
PipeTransferConfigSnapshotSealReq.fromTPipeTransferReq(req));
             PipeConfigNodeReceiverMetrics.getInstance()
                 .recordTransferConfigSnapshotSealTimer(System.nanoTime() - 
startTime);
-            return resp;
+            return recordConfigNodeTransferIfSuccess(resp);
           case TRANSFER_COMPRESSED:
             return 
receive(PipeTransferCompressedReq.fromTPipeTransferReq(req));
           default:
@@ -262,6 +265,32 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
         && type != PipeRequestType.HANDSHAKE_CONFIGNODE_V2;
   }
 
+  private TPipeTransferResp recordConfigNodeHandshakeIfSuccess(
+      final TPipeTransferResp resp, final TPipeTransferReq req) {
+    if (isSuccess(resp)) {
+      recordPipeReceiverHandshake(
+          PipeReceiverRuntimeRegistry.NODE_TYPE_CONFIG_NODE,
+          ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
+          getProtocol(req));
+    }
+    return resp;
+  }
+
+  private TPipeTransferResp recordConfigNodeTransferIfSuccess(final 
TPipeTransferResp resp) {
+    if (isSuccess(resp)) {
+      recordPipeReceiverTransfer();
+    } else {
+      recordPipeReceiverRequest();
+    }
+    return resp;
+  }
+
+  private static String getProtocol(final TPipeTransferReq req) {
+    return req instanceof AirGapPseudoTPipeTransferRequest
+        ? PipeReceiverRuntimeRegistry.PROTOCOL_AIR_GAP
+        : PipeReceiverRuntimeRegistry.PROTOCOL_THRIFT;
+  }
+
   private TPipeTransferResp handleTransferConfigPlan(final 
PipeTransferConfigPlanReq req)
       throws IOException {
     return new TPipeTransferResp(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java
index c8c2890f5be..f8e91993eeb 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java
@@ -89,6 +89,7 @@ public class IoTDBConfigRegionAirGapSink extends 
IoTDBAirGapSink {
         Boolean.toString(shouldMarkAsPipeRequest));
     params.put(
         PipeTransferHandshakeConstant.HANDSHAKE_KEY_SKIP_IF, 
Boolean.toString(skipIfNoPrivileges));
+    appendPipeInfoToHandshakeParams(params);
 
     return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 731c2eb50f0..e237e7129bf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -35,8 +35,10 @@ import 
org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
 import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
+import 
org.apache.iotdb.commons.pipe.receiver.runtime.PipeReceiverRuntimeRegistry;
 import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import 
org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferHandshakeConstant;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqHandler;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferCompressedReq;
@@ -115,16 +117,19 @@ import 
org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.file.Paths;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -174,6 +179,8 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   // datanode (cluster B).
   private static final AtomicLong CONFIG_RECEIVER_ID_GENERATOR = new 
AtomicLong(0);
   protected final AtomicReference<String> configReceiverId = new 
AtomicReference<>();
+  private final AtomicReference<String> configPipeReceiverRuntimeSessionKey =
+      new AtomicReference<>();
 
   private final PipeTransferSliceReqHandler sliceReqHandler = new 
PipeTransferSliceReqHandler();
 
@@ -217,8 +224,10 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
                           TSStatusCode.PIPE_HANDSHAKE_ERROR.getStatusCode(),
                           "The receiver memory is not enough to handle the 
handshake request from datanode."));
                 }
-                return handleTransferHandshakeV1(
-                    
PipeTransferDataNodeHandshakeV1Req.fromTPipeTransferReq(req));
+                return recordDataNodeHandshakeIfSuccess(
+                    handleTransferHandshakeV1(
+                        
PipeTransferDataNodeHandshakeV1Req.fromTPipeTransferReq(req)),
+                    req);
               } finally {
                 PipeDataNodeReceiverMetrics.getInstance()
                     .recordHandshakeDatanodeV1Timer(System.nanoTime() - 
startTime);
@@ -235,8 +244,10 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
                           TSStatusCode.PIPE_HANDSHAKE_ERROR.getStatusCode(),
                           "The receiver memory is not enough to handle the 
handshake request from datanode."));
                 }
-                return handleTransferHandshakeV2(
-                    
PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req));
+                return recordDataNodeHandshakeIfSuccess(
+                    handleTransferHandshakeV2(
+                        
PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req)),
+                    req);
               } finally {
                 PipeDataNodeReceiverMetrics.getInstance()
                     .recordHandshakeDatanodeV2Timer(System.nanoTime() - 
startTime);
@@ -245,8 +256,9 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
           case TRANSFER_TABLET_INSERT_NODE:
             {
               try {
-                return handleTransferTabletInsertNode(
-                    PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req));
+                return recordDataNodeTransferIfSuccess(
+                    handleTransferTabletInsertNode(
+                        
PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req)));
 
               } finally {
                 PipeDataNodeReceiverMetrics.getInstance()
@@ -256,8 +268,9 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
           case TRANSFER_TABLET_INSERT_NODE_V2:
             {
               try {
-                return handleTransferTabletInsertNode(
-                    
PipeTransferTabletInsertNodeReqV2.fromTPipeTransferReq(req));
+                return recordDataNodeTransferIfSuccess(
+                    handleTransferTabletInsertNode(
+                        
PipeTransferTabletInsertNodeReqV2.fromTPipeTransferReq(req)));
               } finally {
                 PipeDataNodeReceiverMetrics.getInstance()
                     .recordTransferTabletInsertNodeV2Timer(System.nanoTime() - 
startTime);
@@ -266,7 +279,8 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
           case TRANSFER_TABLET_RAW:
             {
               try {
-                return 
handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req));
+                return recordDataNodeTransferIfSuccess(
+                    
handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req)));
               } finally {
                 PipeDataNodeReceiverMetrics.getInstance()
                     .recordTransferTabletRawTimer(System.nanoTime() - 
startTime);
@@ -275,8 +289,8 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
           case TRANSFER_TABLET_RAW_V2:
             {
               try {
-                return handleTransferTabletRaw(
-                    PipeTransferTabletRawReqV2.fromTPipeTransferReq(req));
+                return recordDataNodeTransferIfSuccess(
+                    
handleTransferTabletRaw(PipeTransferTabletRawReqV2.fromTPipeTransferReq(req)));
               } finally {
                 PipeDataNodeReceiverMetrics.getInstance()
                     .recordTransferTabletRawV2Timer(System.nanoTime() - 
startTime);
@@ -285,8 +299,9 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
           case TRANSFER_TABLET_BINARY:
             {
               try {
-                return handleTransferTabletBinary(
-                    PipeTransferTabletBinaryReq.fromTPipeTransferReq(req));
+                return recordDataNodeTransferIfSuccess(
+                    handleTransferTabletBinary(
+                        
PipeTransferTabletBinaryReq.fromTPipeTransferReq(req)));
               } finally {
                 PipeDataNodeReceiverMetrics.getInstance()
                     .recordTransferTabletBinaryTimer(System.nanoTime() - 
startTime);
@@ -295,8 +310,9 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
           case TRANSFER_TABLET_BINARY_V2:
             {
               try {
-                return handleTransferTabletBinary(
-                    PipeTransferTabletBinaryReqV2.fromTPipeTransferReq(req));
+                return recordDataNodeTransferIfSuccess(
+                    handleTransferTabletBinary(
+                        
PipeTransferTabletBinaryReqV2.fromTPipeTransferReq(req)));
               } finally {
                 PipeDataNodeReceiverMetrics.getInstance()
                     .recordTransferTabletBinaryV2Timer(System.nanoTime() - 
startTime);
@@ -305,8 +321,9 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
           case TRANSFER_TABLET_BATCH:
             {
               try {
-                return handleTransferTabletBatch(
-                    PipeTransferTabletBatchReq.fromTPipeTransferReq(req));
+                return recordDataNodeTransferIfSuccess(
+                    handleTransferTabletBatch(
+                        PipeTransferTabletBatchReq.fromTPipeTransferReq(req)));
               } finally {
                 PipeDataNodeReceiverMetrics.getInstance()
                     .recordTransferTabletBatchTimer(System.nanoTime() - 
startTime);
@@ -315,8 +332,9 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
           case TRANSFER_TABLET_BATCH_V2:
             {
               try {
-                return handleTransferTabletBatchV2(
-                    PipeTransferTabletBatchReqV2.fromTPipeTransferReq(req));
+                return recordDataNodeTransferIfSuccess(
+                    handleTransferTabletBatchV2(
+                        
PipeTransferTabletBatchReqV2.fromTPipeTransferReq(req)));
               } finally {
                 PipeDataNodeReceiverMetrics.getInstance()
                     .recordTransferTabletBatchV2Timer(System.nanoTime() - 
startTime);
@@ -325,10 +343,11 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
           case TRANSFER_TS_FILE_PIECE:
             {
               try {
-                return handleTransferFilePiece(
-                    PipeTransferTsFilePieceReq.fromTPipeTransferReq(req),
-                    req instanceof AirGapPseudoTPipeTransferRequest,
-                    true);
+                return recordDataNodeTransferIfSuccess(
+                    handleTransferFilePiece(
+                        PipeTransferTsFilePieceReq.fromTPipeTransferReq(req),
+                        req instanceof AirGapPseudoTPipeTransferRequest,
+                        true));
               } finally {
                 PipeDataNodeReceiverMetrics.getInstance()
                     .recordTransferTsFilePieceTimer(System.nanoTime() - 
startTime);
@@ -337,8 +356,8 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
           case TRANSFER_TS_FILE_SEAL:
             {
               try {
-                return handleTransferFileSealV1(
-                    PipeTransferTsFileSealReq.fromTPipeTransferReq(req));
+                return recordDataNodeTransferIfSuccess(
+                    
handleTransferFileSealV1(PipeTransferTsFileSealReq.fromTPipeTransferReq(req)));
               } finally {
                 PipeDataNodeReceiverMetrics.getInstance()
                     .recordTransferTsFileSealTimer(System.nanoTime() - 
startTime);
@@ -347,10 +366,11 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
           case TRANSFER_TS_FILE_PIECE_WITH_MOD:
             {
               try {
-                return handleTransferFilePiece(
-                    
PipeTransferTsFilePieceWithModReq.fromTPipeTransferReq(req),
-                    req instanceof AirGapPseudoTPipeTransferRequest,
-                    false);
+                return recordDataNodeTransferIfSuccess(
+                    handleTransferFilePiece(
+                        
PipeTransferTsFilePieceWithModReq.fromTPipeTransferReq(req),
+                        req instanceof AirGapPseudoTPipeTransferRequest,
+                        false));
 
               } finally {
                 PipeDataNodeReceiverMetrics.getInstance()
@@ -360,8 +380,9 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
           case TRANSFER_TS_FILE_SEAL_WITH_MOD:
             {
               try {
-                return handleTransferFileSealV2(
-                    
PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req));
+                return recordDataNodeTransferIfSuccess(
+                    handleTransferFileSealV2(
+                        
PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req)));
               } finally {
                 PipeDataNodeReceiverMetrics.getInstance()
                     .recordTransferTsFileSealWithModTimer(System.nanoTime() - 
startTime);
@@ -370,7 +391,8 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
           case TRANSFER_PLAN_NODE:
             {
               try {
-                return 
handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req));
+                return recordDataNodeTransferIfSuccess(
+                    
handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req)));
               } finally {
                 PipeDataNodeReceiverMetrics.getInstance()
                     .recordTransferSchemaPlanTimer(System.nanoTime() - 
startTime);
@@ -379,10 +401,11 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
           case TRANSFER_SCHEMA_SNAPSHOT_PIECE:
             {
               try {
-                return handleTransferFilePiece(
-                    
PipeTransferSchemaSnapshotPieceReq.fromTPipeTransferReq(req),
-                    req instanceof AirGapPseudoTPipeTransferRequest,
-                    false);
+                return recordDataNodeTransferIfSuccess(
+                    handleTransferFilePiece(
+                        
PipeTransferSchemaSnapshotPieceReq.fromTPipeTransferReq(req),
+                        req instanceof AirGapPseudoTPipeTransferRequest,
+                        false));
 
               } finally {
                 PipeDataNodeReceiverMetrics.getInstance()
@@ -392,8 +415,9 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
           case TRANSFER_SCHEMA_SNAPSHOT_SEAL:
             {
               try {
-                return handleTransferFileSealV2(
-                    
PipeTransferSchemaSnapshotSealReq.fromTPipeTransferReq(req));
+                return recordDataNodeTransferIfSuccess(
+                    handleTransferFileSealV2(
+                        
PipeTransferSchemaSnapshotSealReq.fromTPipeTransferReq(req)));
 
               } finally {
                 PipeDataNodeReceiverMetrics.getInstance()
@@ -409,7 +433,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
               try {
                 // Config requests will first be received by the DataNode 
receiver,
                 // then transferred to ConfigNode receiver to execute.
-                return handleTransferConfigPlan(req);
+                return 
recordConfigNodeReceiverRuntimeIfSuccess(handleTransferConfigPlan(req), req);
               } finally {
                 PipeDataNodeReceiverMetrics.getInstance()
                     .recordTransferConfigPlanTimer(System.nanoTime() - 
startTime);
@@ -760,6 +784,123 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
     return receive(req.get());
   }
 
+  private TPipeTransferResp recordDataNodeHandshakeIfSuccess(
+      final TPipeTransferResp resp, final TPipeTransferReq req) {
+    if (isSuccess(resp)) {
+      recordPipeReceiverHandshake(
+          PipeReceiverRuntimeRegistry.NODE_TYPE_DATA_NODE,
+          IOTDB_CONFIG.getDataNodeId(),
+          getProtocol(req));
+    }
+    return resp;
+  }
+
+  private TPipeTransferResp recordDataNodeTransferIfSuccess(final 
TPipeTransferResp resp) {
+    if (isSuccess(resp)) {
+      recordPipeReceiverTransfer();
+    } else {
+      recordPipeReceiverRequest();
+    }
+    return resp;
+  }
+
+  private TPipeTransferResp recordConfigNodeReceiverRuntimeIfSuccess(
+      final TPipeTransferResp resp, final TPipeTransferReq req) {
+    if (!PipeRequestType.isValidatedRequestType(req.getType())) {
+      return resp;
+    }
+
+    final PipeRequestType requestType = PipeRequestType.valueOf(req.getType());
+    if (requestType == PipeRequestType.HANDSHAKE_CONFIGNODE_V1
+        || requestType == PipeRequestType.HANDSHAKE_CONFIGNODE_V2) {
+      if (isSuccess(resp)) {
+        recordConfigNodeHandshake(req, requestType);
+      }
+    } else {
+      if (isSuccess(resp)) {
+        PipeReceiverRuntimeRegistry.getInstance()
+            .markTransfer(configPipeReceiverRuntimeSessionKey.get(), 
System.currentTimeMillis());
+      } else {
+        PipeReceiverRuntimeRegistry.getInstance()
+            .markRequest(configPipeReceiverRuntimeSessionKey.get());
+      }
+    }
+    return resp;
+  }
+
+  private void recordConfigNodeHandshake(
+      final TPipeTransferReq req, final PipeRequestType requestType) {
+    final String protocol = getProtocol(req);
+    final String sessionKey =
+        String.format(
+            "%s-%s-%s-%s",
+            PipeReceiverRuntimeRegistry.NODE_TYPE_CONFIG_NODE, -1, protocol, 
getConfigReceiverId());
+    final String oldSessionKey = 
configPipeReceiverRuntimeSessionKey.getAndSet(sessionKey);
+    if (!Objects.equals(oldSessionKey, sessionKey)) {
+      PipeReceiverRuntimeRegistry.getInstance().deregister(oldSessionKey);
+    }
+
+    final Map<String, String> params =
+        requestType == PipeRequestType.HANDSHAKE_CONFIGNODE_V2
+            ? parseHandshakeV2Params(req)
+            : Collections.emptyMap();
+    PipeReceiverRuntimeRegistry.getInstance()
+        .registerOrUpdateSession(
+            sessionKey,
+            PipeReceiverRuntimeRegistry.NODE_TYPE_CONFIG_NODE,
+            -1,
+            protocol,
+            getSenderHost(),
+            parseSenderPort(getSenderPort()),
+            
params.getOrDefault(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, 
username),
+            params.getOrDefault(
+                PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID,
+                PipeReceiverRuntimeRegistry.UNKNOWN),
+            params.get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PIPE_NAME),
+            parsePipeCreationTime(
+                
params.get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PIPE_CREATION_TIME)),
+            System.currentTimeMillis());
+  }
+
+  private static Map<String, String> parseHandshakeV2Params(final 
TPipeTransferReq req) {
+    final Map<String, String> params = new HashMap<>();
+    if (req.getBody() == null) {
+      return params;
+    }
+    final ByteBuffer body = req.getBody().duplicate();
+    body.rewind();
+    final int size = ReadWriteIOUtils.readInt(body);
+    for (int i = 0; i < size; ++i) {
+      params.put(ReadWriteIOUtils.readString(body), 
ReadWriteIOUtils.readString(body));
+    }
+    return params;
+  }
+
+  private static String getProtocol(final TPipeTransferReq req) {
+    return req instanceof AirGapPseudoTPipeTransferRequest
+        ? PipeReceiverRuntimeRegistry.PROTOCOL_AIR_GAP
+        : PipeReceiverRuntimeRegistry.PROTOCOL_THRIFT;
+  }
+
+  private static int parseSenderPort(final String senderPort) {
+    try {
+      return Integer.parseInt(senderPort);
+    } catch (final Exception e) {
+      return -1;
+    }
+  }
+
+  private static long parsePipeCreationTime(final String pipeCreationTime) {
+    if (pipeCreationTime == null) {
+      return Long.MIN_VALUE;
+    }
+    try {
+      return Long.parseLong(pipeCreationTime);
+    } catch (final NumberFormatException e) {
+      return Long.MIN_VALUE;
+    }
+  }
+
   /**
    * For {@link InsertRowsStatement} and {@link InsertMultiTabletsStatement}, 
the returned {@link
    * TSStatus} will use sub-status to record the endpoint for redirection. 
Each sub-status records
@@ -1157,6 +1298,9 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
 
   @Override
   public synchronized void handleExit() {
+    PipeReceiverRuntimeRegistry.getInstance()
+        .deregister(configPipeReceiverRuntimeSessionKey.getAndSet(null));
+
     if (Objects.nonNull(configReceiverId.get())) {
       try {
         
ClusterConfigTaskExecutor.getInstance().handlePipeConfigClientExit(configReceiverId.get());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index 5c38c3a8540..69efcf39fb2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -319,6 +319,7 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
       params.put(
           PipeTransferHandshakeConstant.HANDSHAKE_KEY_SKIP_IF,
           Boolean.toString(skipIfNoPrivileges));
+      appendPipeInfoToHandshakeParams(params);
 
       
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs());
       
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params),
 callback);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataNodeAirGapSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataNodeAirGapSink.java
index b593df56612..9e05c240a15 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataNodeAirGapSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataNodeAirGapSink.java
@@ -72,6 +72,7 @@ public abstract class IoTDBDataNodeAirGapSink extends 
IoTDBAirGapSink {
         Boolean.toString(shouldMarkAsPipeRequest));
     params.put(
         PipeTransferHandshakeConstant.HANDSHAKE_KEY_SKIP_IF, 
Boolean.toString(skipIfNoPrivileges));
+    appendPipeInfoToHandshakeParams(params);
 
     return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index b8b169b1f6a..a96b3d58c36 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -172,6 +172,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
             shouldMarkAsPipeRequest,
             false,
             skipIfNoPrivileges);
+    clientManager.setPipeInfo(pipeName, creationTime);
 
     transferTsFileClientManager =
         new IoTDBDataNodeAsyncClientManager(
@@ -188,6 +189,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
             shouldMarkAsPipeRequest,
             isSplitTSFileBatchModeEnabled,
             skipIfNoPrivileges);
+    transferTsFileClientManager.setPipeInfo(pipeName, creationTime);
 
     if (isTabletBatchModeEnabled) {
       tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java
index 18f15eea8f3..45c3670cd16 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java
@@ -207,6 +207,10 @@ public class DatasetHeaderFactory {
     return new DatasetHeader(ColumnHeaderConstant.showQueriesColumnHeaders, 
false);
   }
 
+  public static DatasetHeader getShowReceiversHeader() {
+    return new DatasetHeader(ColumnHeaderConstant.showReceiversColumnHeaders, 
false);
+  }
+
   public static DatasetHeader getShowDiskUsageHeader() {
     return new DatasetHeader(ColumnHeaderConstant.showDiskUsageColumnHeaders, 
true);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowReceiversOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowReceiversOperator.java
new file mode 100644
index 00000000000..c9cf62317f5
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowReceiversOperator.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.source;
+
+import 
org.apache.iotdb.commons.pipe.receiver.runtime.PipeReceiverRuntimeRegistry;
+import 
org.apache.iotdb.commons.pipe.receiver.runtime.PipeReceiverRuntimeSnapshot;
+import org.apache.iotdb.commons.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.commons.queryengine.utils.DateTimeUtils;
+import org.apache.iotdb.commons.queryengine.utils.TimestampPrecisionUtils;
+import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class ShowReceiversOperator implements SourceOperator {
+
+  private final OperatorContext operatorContext;
+  private final PlanNodeId sourceId;
+
+  private TsBlock tsBlock;
+  private boolean hasConsumed;
+
+  private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES =
+      TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(ShowReceiversOperator.class);
+
+  public ShowReceiversOperator(OperatorContext operatorContext, PlanNodeId 
sourceId) {
+    this.operatorContext = operatorContext;
+    this.sourceId = sourceId;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public TsBlock next() {
+    TsBlock res = tsBlock;
+    hasConsumed = true;
+    tsBlock = null;
+    return res;
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (hasConsumed) {
+      return false;
+    }
+    if (tsBlock == null) {
+      tsBlock = buildTsBlock();
+    }
+    return true;
+  }
+
+  @Override
+  public boolean isFinished() {
+    return hasConsumed;
+  }
+
+  @Override
+  public void close() {
+    // do nothing
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0;
+  }
+
+  @Override
+  public PlanNodeId getSourceId() {
+    return sourceId;
+  }
+
+  private TsBlock buildTsBlock() {
+    final List<TSDataType> outputDataTypes =
+        DatasetHeaderFactory.getShowReceiversHeader().getRespDataTypes();
+    final TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
+    final TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+    final ColumnBuilder[] columnBuilders = builder.getValueColumnBuilders();
+    final long currentTime =
+        TimestampPrecisionUtils.convertToCurrPrecision(
+            System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+
+    for (PipeReceiverRuntimeSnapshot snapshot :
+        PipeReceiverRuntimeRegistry.getInstance().snapshot()) {
+      timeColumnBuilder.writeLong(currentTime);
+      
columnBuilders[0].writeBinary(BytesUtils.valueOf(snapshot.getReceiverNodeType()));
+      columnBuilders[1].writeInt(snapshot.getReceiverNodeId());
+      
columnBuilders[2].writeBinary(BytesUtils.valueOf(snapshot.getProtocol()));
+      
columnBuilders[3].writeBinary(BytesUtils.valueOf(snapshot.getSenderAddress()));
+      
columnBuilders[4].writeBinary(BytesUtils.valueOf(snapshot.getSenderPorts()));
+      columnBuilders[5].writeInt(snapshot.getConnectionCount());
+      columnBuilders[6].writeInt(snapshot.getPipeCount());
+      columnBuilders[7].writeBinary(BytesUtils.valueOf(snapshot.getPipeIds()));
+      
columnBuilders[8].writeBinary(BytesUtils.valueOf(snapshot.getUserName()));
+      
columnBuilders[9].writeBinary(BytesUtils.valueOf(snapshot.getSenderClusterId()));
+      columnBuilders[10].writeBinary(
+          BytesUtils.valueOf(formatTime(snapshot.getLastHandshakeTime())));
+      columnBuilders[11].writeBinary(
+          BytesUtils.valueOf(formatTime(snapshot.getLastTransferTime())));
+      columnBuilders[12].writeLong(snapshot.getRequestNum());
+      builder.declarePosition();
+    }
+    return builder.build();
+  }
+
+  private static String formatTime(long timestampInMillis) {
+    return timestampInMillis <= 0
+        ? PipeReceiverRuntimeRegistry.UNKNOWN
+        : DateTimeUtils.convertLongToDate(timestampInMillis, "ms");
+  }
+
+  @Override
+  public long ramBytesUsed() {
+    return INSTANCE_SIZE
+        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
+        + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
index ca9c09ecb95..e84348851ce 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
@@ -28,12 +28,15 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TExternalServiceEntry;
 import org.apache.iotdb.common.rpc.thrift.TExternalServiceListResp;
 import org.apache.iotdb.commons.audit.UserEntity;
+import org.apache.iotdb.commons.auth.entity.PrivilegeType;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
 import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMeta;
+import 
org.apache.iotdb.commons.pipe.receiver.runtime.PipeReceiverRuntimeRegistry;
+import 
org.apache.iotdb.commons.pipe.receiver.runtime.PipeReceiverRuntimeSnapshot;
 import org.apache.iotdb.commons.queryengine.common.ConnectionInfo;
 import org.apache.iotdb.commons.queryengine.common.SqlDialect;
 import 
org.apache.iotdb.commons.queryengine.plan.relational.function.TableBuiltinTableFunction;
@@ -184,6 +187,8 @@ public class InformationSchemaContentSupplierFactory {
           return new RegionSupplier(dataTypes, userEntity);
         case InformationSchema.PIPES:
           return new PipeSupplier(dataTypes, userEntity.getUsername());
+        case InformationSchema.RECEIVERS:
+          return new ReceiversSupplier(dataTypes, userEntity);
         case InformationSchema.PIPE_PLUGINS:
           return new PipePluginSupplier(dataTypes, userEntity);
         case InformationSchema.TOPICS:
@@ -710,6 +715,50 @@ public class InformationSchemaContentSupplierFactory {
     }
   }
 
+  private static class ReceiversSupplier extends TsBlockSupplier {
+    private final Iterator<PipeReceiverRuntimeSnapshot> iterator;
+
+    private ReceiversSupplier(final List<TSDataType> dataTypes, final 
UserEntity userEntity) {
+      super(dataTypes);
+      accessControl.checkMissingPrivileges(
+          userEntity.getUsername(), 
Collections.singletonList(PrivilegeType.USE_PIPE), userEntity);
+      iterator = 
PipeReceiverRuntimeRegistry.getInstance().snapshot().iterator();
+    }
+
+    @Override
+    protected void constructLine() {
+      final PipeReceiverRuntimeSnapshot snapshot = iterator.next();
+      
columnBuilders[0].writeBinary(BytesUtils.valueOf(snapshot.getReceiverNodeType()));
+      columnBuilders[1].writeInt(snapshot.getReceiverNodeId());
+      
columnBuilders[2].writeBinary(BytesUtils.valueOf(snapshot.getProtocol()));
+      
columnBuilders[3].writeBinary(BytesUtils.valueOf(snapshot.getSenderAddress()));
+      
columnBuilders[4].writeBinary(BytesUtils.valueOf(snapshot.getSenderPorts()));
+      columnBuilders[5].writeInt(snapshot.getConnectionCount());
+      columnBuilders[6].writeInt(snapshot.getPipeCount());
+      columnBuilders[7].writeBinary(BytesUtils.valueOf(snapshot.getPipeIds()));
+      
columnBuilders[8].writeBinary(BytesUtils.valueOf(snapshot.getUserName()));
+      
columnBuilders[9].writeBinary(BytesUtils.valueOf(snapshot.getSenderClusterId()));
+      writeTimestamp(columnBuilders[10], snapshot.getLastHandshakeTime());
+      writeTimestamp(columnBuilders[11], snapshot.getLastTransferTime());
+      columnBuilders[12].writeLong(snapshot.getRequestNum());
+      resultBuilder.declarePosition();
+    }
+
+    private static void writeTimestamp(ColumnBuilder columnBuilder, long 
timestampInMillis) {
+      if (timestampInMillis <= 0) {
+        columnBuilder.appendNull();
+        return;
+      }
+      columnBuilder.writeLong(
+          TimestampPrecisionUtils.convertToCurrPrecision(timestampInMillis, 
TimeUnit.MILLISECONDS));
+    }
+
+    @Override
+    public boolean hasNext() {
+      return iterator.hasNext();
+    }
+  }
+
   private static class PipePluginSupplier extends TsBlockSupplier {
     private final Iterator<PipePluginMeta> iterator;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
index be20d218e6a..d5539bfe93d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
@@ -60,6 +60,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainAnalyzeStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ShowReceiversStatement;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.IDeviceID;
@@ -488,6 +489,7 @@ public class Analysis implements IAnalysis {
     return (dataPartition != null && !dataPartition.isEmpty())
         || (schemaPartition != null && !schemaPartition.isEmpty())
         || statement instanceof ShowQueriesStatement
+        || statement instanceof ShowReceiversStatement
         || statement instanceof ShowDiskUsageStatement
         || (statement instanceof QueryStatement
             && ((QueryStatement) statement).isAggregationQuery());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 22d5b0518ac..27a9778802a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -145,6 +145,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainAnalyzeStatemen
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ShowReceiversStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.view.visitor.TransformToExpressionVisitor;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -3822,6 +3823,34 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     return analysis;
   }
 
+  @Override
+  public Analysis visitShowReceivers(
+      ShowReceiversStatement showReceiversStatement, MPPQueryContext context) {
+    Analysis analysis = new Analysis();
+    analysis.setRealStatement(showReceiversStatement);
+    
analysis.setRespDatasetHeader(DatasetHeaderFactory.getShowReceiversHeader());
+    analysis.setVirtualSource(true);
+
+    List<TDataNodeLocation> allReadableDataNodeLocations = 
getReadableDataNodeLocations();
+    if (allReadableDataNodeLocations.isEmpty()) {
+      throw new 
StatementAnalyzeException(DataNodeQueryMessages.NO_RUNNING_DATANODES);
+    }
+    analysis.setReadableDataNodeLocations(allReadableDataNodeLocations);
+
+    Set<Expression> sourceExpressions = new HashSet<>();
+    for (ColumnHeader columnHeader : 
analysis.getRespDatasetHeader().getColumnHeaders()) {
+      sourceExpressions.add(
+          TimeSeriesOperand.constructColumnHeaderExpression(
+              columnHeader.getColumnName(), columnHeader.getColumnType()));
+    }
+    analysis.setSourceExpressions(sourceExpressions);
+    sourceExpressions.forEach(expression -> analyzeExpressionType(analysis, 
expression));
+
+    analysis.setMergeOrderParameter(new 
OrderByParameter(showReceiversStatement.getSortItemList()));
+
+    return analysis;
+  }
+
   @Override
   public Analysis visitShowDiskUsage(
       ShowDiskUsageStatement showDiskUsageStatement, MPPQueryContext context) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index 49f79a6a923..feca397e749 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -250,6 +250,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentSqlDialectS
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentUserStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ShowReceiversStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.StartRepairDataStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.StopRepairDataStatement;
@@ -4412,6 +4413,11 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
     return showPipesStatement;
   }
 
+  @Override
+  public Statement visitShowReceivers(IoTDBSqlParser.ShowReceiversContext ctx) 
{
+    return new ShowReceiversStatement();
+  }
+
   @Override
   public Statement visitCreateTopic(IoTDBSqlParser.CreateTopicContext ctx) {
     final CreateTopicStatement createTopicStatement = new 
CreateTopicStatement();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index 26d361c1f21..36e522e5d13 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@ -90,6 +90,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanN
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesSourceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowDiskUsageNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowReceiversNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.TimeseriesRegionScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
@@ -1328,6 +1329,44 @@ public class LogicalPlanBuilder {
     return this;
   }
 
+  public LogicalPlanBuilder planShowReceivers(Analysis analysis) {
+    List<TDataNodeLocation> dataNodeLocations = 
analysis.getReadableDataNodeLocations();
+    if (dataNodeLocations.size() == 1) {
+      this.root =
+          planSingleShowReceivers(dataNodeLocations.get(0))
+              .planSort(analysis.getMergeOrderParameter())
+              .getRoot();
+    } else {
+      List<String> outputColumns = new ArrayList<>();
+      MergeSortNode mergeSortNode =
+          new MergeSortNode(
+              context.getQueryId().genPlanNodeId(),
+              analysis.getMergeOrderParameter(),
+              outputColumns);
+
+      dataNodeLocations.forEach(
+          dataNodeLocation ->
+              mergeSortNode.addChild(
+                  this.planSingleShowReceivers(dataNodeLocation)
+                      .planSort(analysis.getMergeOrderParameter())
+                      .getRoot()));
+      
outputColumns.addAll(mergeSortNode.getChildren().get(0).getOutputColumnNames());
+      this.root = mergeSortNode;
+    }
+
+    ColumnHeaderConstant.showReceiversColumnHeaders.forEach(
+        columnHeader ->
+            context
+                .getTypeProvider()
+                .setTreeModelType(columnHeader.getColumnName(), 
columnHeader.getColumnType()));
+    return this;
+  }
+
+  private LogicalPlanBuilder planSingleShowReceivers(TDataNodeLocation 
dataNodeLocation) {
+    this.root = new ShowReceiversNode(context.getQueryId().genPlanNodeId(), 
dataNodeLocation);
+    return this;
+  }
+
   public LogicalPlanBuilder planShowDiskUsage(Analysis analysis, PartialPath 
pathPattern) {
     List<TDataNodeLocation> dataNodeLocations = 
analysis.getReadableDataNodeLocations();
     if (dataNodeLocations.size() == 1) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index 2c9304ce0b9..f04dc1c9263 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -92,6 +92,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainAnalyzeStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ShowReceiversStatement;
 import org.apache.iotdb.db.schemaengine.SchemaEngineMode;
 
 import org.apache.tsfile.enums.TSDataType;
@@ -1017,6 +1018,12 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
     return planBuilder.getRoot();
   }
 
+  @Override
+  public PlanNode visitShowReceivers(
+      ShowReceiversStatement showReceiversStatement, MPPQueryContext context) {
+    return new LogicalPlanBuilder(analysis, 
context).planShowReceivers(analysis).getRoot();
+  }
+
   @Override
   public PlanNode visitShowDiskUsage(
       ShowDiskUsageStatement showDiskUsageStatement, MPPQueryContext context) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 327dbecdda2..6e32f55b732 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -140,6 +140,7 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesAggregati
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.ShowDiskUsageOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.ShowQueriesOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.ShowReceiversOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.window.ConditionWindowParameter;
 import 
org.apache.iotdb.db.queryengine.execution.operator.window.CountWindowParameter;
 import 
org.apache.iotdb.db.queryengine.execution.operator.window.SessionWindowParameter;
@@ -224,6 +225,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggre
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowDiskUsageNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowReceiversNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.TimeseriesRegionScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
@@ -2478,6 +2480,19 @@ public class OperatorTreeGenerator implements 
PlanVisitor<Operator, LocalExecuti
         node.getAllowedUsername());
   }
 
+  @Override
+  public Operator visitShowReceivers(ShowReceiversNode node, 
LocalExecutionPlanContext context) {
+    OperatorContext operatorContext =
+        context
+            .getDriverContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                ShowReceiversOperator.class.getSimpleName());
+
+    return new ShowReceiversOperator(operatorContext, node.getPlanNodeId());
+  }
+
   @Override
   public Operator visitShowDiskUsage(ShowDiskUsageNode node, 
LocalExecutionPlanContext context) {
     OperatorContext operatorContext =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 4e34d361471..cd9a9fe44c8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -39,6 +39,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTimeSeriesSta
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainAnalyzeStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ShowReceiversStatement;
 
 import org.apache.tsfile.read.common.Path;
 import org.apache.tsfile.utils.Pair;
@@ -159,6 +160,7 @@ public class SimpleFragmentParallelPlanner extends 
AbstractFragmentParallelPlann
     if (analysis.getTreeStatement() instanceof QueryStatement
         || analysis.getTreeStatement() instanceof ExplainAnalyzeStatement
         || analysis.getTreeStatement() instanceof ShowQueriesStatement
+        || analysis.getTreeStatement() instanceof ShowReceiversStatement
         || analysis.getTreeStatement() instanceof ShowDiskUsageStatement
         || (analysis.getTreeStatement() instanceof ShowTimeSeriesStatement
             && (((ShowTimeSeriesStatement) 
analysis.getTreeStatement()).isOrderByHeat()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/DataNodePlanNodeDeserializer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/DataNodePlanNodeDeserializer.java
index 2a9b9b6905c..59de11d98d5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/DataNodePlanNodeDeserializer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/DataNodePlanNodeDeserializer.java
@@ -109,6 +109,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggre
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowDiskUsageNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowReceiversNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.TimeseriesRegionScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ContinuousSameSearchIndexSeparatorNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
@@ -415,6 +416,8 @@ public class DataNodePlanNodeDeserializer extends 
CommonPlanNodeDeserializer {
         return ShowDiskUsageNode.deserialize(buffer);
       case 108:
         return CollectNode.deserialize(buffer);
+      case 110:
+        return ShowReceiversNode.deserialize(buffer);
       case 902:
         return CreateOrUpdateTableDeviceNode.deserialize(buffer);
       case 903:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index 54ee9d17a94..418f46ec865 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -108,6 +108,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanN
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanSourceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowDiskUsageNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowReceiversNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.TimeseriesRegionScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
@@ -348,6 +349,10 @@ public interface PlanVisitor<R, C> extends 
ICoreQueryPlanVisitor<R, C> {
     return visitPlan(node, context);
   }
 
+  default R visitShowReceivers(ShowReceiversNode node, C context) {
+    return visitPlan(node, context);
+  }
+
   default R visitShowDiskUsage(ShowDiskUsageNode node, C context) {
     return visitPlan(node, context);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/ShowReceiversNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/ShowReceiversNode.java
new file mode 100644
index 00000000000..1cf76707c56
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/ShowReceiversNode.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.source;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import 
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.IPlanVisitor;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.commons.schema.column.ColumnHeader;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+
+import com.google.common.collect.ImmutableList;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+import static 
org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.showReceiversColumnHeaders;
+
+public class ShowReceiversNode extends VirtualSourceNode {
+
+  public static final List<String> SHOW_RECEIVERS_HEADER_COLUMNS =
+      showReceiversColumnHeaders.stream()
+          .map(ColumnHeader::getColumnName)
+          .collect(ImmutableList.toImmutableList());
+
+  public ShowReceiversNode(PlanNodeId id, TDataNodeLocation dataNodeLocation) {
+    super(id, dataNodeLocation);
+  }
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return ImmutableList.of();
+  }
+
+  @Override
+  public void addChild(PlanNode child) {
+    throw new UnsupportedOperationException("no child is allowed for 
ShowReceiversNode");
+  }
+
+  @Override
+  public PlanNodeType getType() {
+    return PlanNodeType.SHOW_RECEIVERS;
+  }
+
+  @Override
+  public PlanNode clone() {
+    return new ShowReceiversNode(getPlanNodeId(), getDataNodeLocation());
+  }
+
+  @Override
+  public int allowedChildCount() {
+    return NO_CHILD_ALLOWED;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return SHOW_RECEIVERS_HEADER_COLUMNS;
+  }
+
+  @Override
+  public <R, C> R accept(IPlanVisitor<R, C> visitor, C context) {
+    return ((PlanVisitor<R, C>) visitor).visitShowReceivers(this, context);
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.SHOW_RECEIVERS.serialize(byteBuffer);
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    PlanNodeType.SHOW_RECEIVERS.serialize(stream);
+  }
+
+  public static ShowReceiversNode deserialize(ByteBuffer byteBuffer) {
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new ShowReceiversNode(planNodeId, null);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(super.hashCode());
+  }
+
+  @Override
+  public String toString() {
+    return "ShowReceiversNode-" + this.getPlanNodeId();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java
index d0b3cb330e1..2d3f6cea910 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java
@@ -123,6 +123,7 @@ public class DataNodeLocationSupplierFactory {
         case InformationSchema.CONNECTIONS:
         case InformationSchema.CURRENT_QUERIES:
         case InformationSchema.QUERIES_COSTS_HISTOGRAM:
+        case InformationSchema.RECEIVERS:
           return getReadableDataNodeLocations();
         case InformationSchema.DATABASES:
         case InformationSchema.TABLES:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java
index fd9757c5f89..7f0a0e93dfa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java
@@ -159,6 +159,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentSqlDialectS
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentUserStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ShowReceiversStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.StartRepairDataStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.StopRepairDataStatement;
@@ -848,6 +849,12 @@ public class TreeAccessCheckVisitor extends 
StatementVisitor<TSStatus, TreeAcces
     return StatusUtils.OK;
   }
 
+  @Override
+  public TSStatus visitShowReceivers(
+      ShowReceiversStatement statement, TreeAccessCheckContext context) {
+    return 
checkPipeManagement(context.setAuditLogOperation(AuditLogOperation.QUERY), () 
-> "");
+  }
+
   @Override
   public TSStatus visitDropPipe(DropPipeStatement statement, 
TreeAccessCheckContext context) {
     return checkPipeManagement(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
index 25d68eba6be..22a9caf548e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
@@ -1722,6 +1722,17 @@ public class AstBuilder extends 
RelationalSqlBaseVisitor<Node> {
         limit);
   }
 
+  @Override
+  public Node 
visitShowReceiversStatement(RelationalSqlParser.ShowReceiversStatementContext 
ctx) {
+    return new ShowStatement(
+        getLocation(ctx),
+        InformationSchema.RECEIVERS,
+        Optional.empty(),
+        Optional.empty(),
+        Optional.empty(),
+        Optional.empty());
+  }
+
   @Override
   public Node 
visitKillQueryStatement(RelationalSqlParser.KillQueryStatementContext ctx) {
     if (ctx.queryId == null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
index b40c6444816..32f4c301601 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
@@ -201,4 +201,5 @@ public enum StatementType {
   SHOW_EXTERNAL_SERVICE,
 
   SHOW_DISK_USAGE,
+  SHOW_RECEIVERS,
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
index 847e850c521..e9a64c5981c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
@@ -147,6 +147,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentSqlDialectS
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentUserStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ShowReceiversStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.StartRepairDataStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.StopRepairDataStatement;
@@ -543,6 +544,10 @@ public abstract class StatementVisitor<R, C> {
     return visitStatement(showQueriesStatement, context);
   }
 
+  public R visitShowReceivers(ShowReceiversStatement showReceiversStatement, C 
context) {
+    return visitStatement(showReceiversStatement, context);
+  }
+
   public R visitShowDiskUsage(ShowDiskUsageStatement showDiskUsageStatement, C 
context) {
     return visitStatement(showDiskUsageStatement, context);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/ShowReceiversStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/ShowReceiversStatement.java
new file mode 100644
index 00000000000..1e8a87fd92a
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/ShowReceiversStatement.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.statement.sys;
+
+import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem;
+import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowStatement;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+public class ShowReceiversStatement extends ShowStatement {
+
+  private static final List<SortItem> DEFAULT_SORT_ITEMS =
+      ImmutableList.of(
+          new SortItem(ColumnHeaderConstant.RECEIVER_NODE_TYPE, Ordering.ASC),
+          new SortItem(ColumnHeaderConstant.RECEIVER_NODE_ID, Ordering.ASC),
+          new SortItem(ColumnHeaderConstant.PROTOCOL, Ordering.ASC),
+          new SortItem(ColumnHeaderConstant.SENDER_ADDRESS, Ordering.ASC),
+          new SortItem(ColumnHeaderConstant.RECEIVER_USER_NAME, Ordering.ASC));
+
+  public ShowReceiversStatement() {
+    this.statementType = StatementType.SHOW_RECEIVERS;
+  }
+
+  @Override
+  public boolean isQuery() {
+    return true;
+  }
+
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitShowReceivers(this, context);
+  }
+
+  public List<SortItem> getSortItemList() {
+    return DEFAULT_SORT_ITEMS;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java
index b98f34a2484..06851d8c996 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java
@@ -65,6 +65,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogica
 import org.apache.iotdb.db.queryengine.plan.statement.sys.AuthorStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ShowReceiversStatement;
 import org.apache.iotdb.isession.template.TemplateNode;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.service.rpc.thrift.TSAggregationQueryReq;
@@ -181,6 +182,16 @@ public class StatementGeneratorTest {
                 "show queries order by a", ZonedDateTime.now().getOffset()));
   }
 
+  @Test
+  public void testShowReceivers() {
+    final Statement showReceivers =
+        StatementGenerator.createStatement("show receivers", 
ZonedDateTime.now().getOffset());
+    Assert.assertTrue(showReceivers instanceof ShowReceiversStatement);
+    Assert.assertEquals(
+        new SortItem("ReceiverNodeType", Ordering.ASC),
+        ((ShowReceiversStatement) showReceivers).getSortItemList().get(0));
+  }
+
   @Test
   public void testRawDataQuery() throws IllegalPathException {
     TSRawDataQueryReq req =
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/SourceNodeSerdeTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/SourceNodeSerdeTest.java
index 5597a6ca8e6..97fbb5cc502 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/SourceNodeSerdeTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/SourceNodeSerdeTest.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.node.PlanNodeDeserializeHelp
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowDiskUsageNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowReceiversNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableDiskUsageInformationSchemaTableScanNode;
 
 import org.apache.tsfile.enums.TSDataType;
@@ -117,6 +118,16 @@ public class SourceNodeSerdeTest {
     assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), node);
   }
 
+  @Test
+  public void testShowReceiversNode() {
+    ShowReceiversNode node = new ShowReceiversNode(new PlanNodeId("test"), 
null);
+
+    ByteBuffer byteBuffer = ByteBuffer.allocate(2048);
+    node.serialize(byteBuffer);
+    byteBuffer.flip();
+    assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), node);
+  }
+
   @Test
   public void testTableDiskUsageInformationTableScanNode() throws 
IllegalPathException {
     List<Symbol> symbols = Arrays.asList(new Symbol("database"), new 
Symbol("size_in_bytes"));
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/informationschema/ShowReceiversTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/informationschema/ShowReceiversTest.java
new file mode 100644
index 00000000000..318a84bd483
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/informationschema/ShowReceiversTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.queryengine.plan.relational.planner.informationschema;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.PlanTester;
+
+import org.junit.Test;
+
+import java.util.Optional;
+
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanAssert.assertPlan;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.infoSchemaTableScan;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output;
+
+public class ShowReceiversTest {
+
+  private final PlanTester planTester = new PlanTester();
+
+  @Test
+  public void testShowReceiversRewrite() {
+    final LogicalQueryPlan logicalQueryPlan = planTester.createPlan("show 
receivers");
+    assertPlan(
+        logicalQueryPlan,
+        output(infoSchemaTableScan("information_schema.receivers", 
Optional.empty())));
+  }
+
+  @Test
+  public void testSelectReceivers() {
+    final LogicalQueryPlan logicalQueryPlan =
+        planTester.createPlan("select * from information_schema.receivers");
+    assertPlan(
+        logicalQueryPlan,
+        output(infoSchemaTableScan("information_schema.receivers", 
Optional.empty())));
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 8304161c9d0..fca1a857b0e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.i18n.PipeMessages;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
+import 
org.apache.iotdb.commons.pipe.receiver.runtime.PipeReceiverRuntimeRegistry;
 import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferHandshakeConstant;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
@@ -100,12 +101,21 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
   protected final AtomicBoolean shouldMarkAsPipeRequest = new 
AtomicBoolean(true);
   protected final AtomicBoolean skipIfNoPrivileges = new AtomicBoolean(false);
 
+  protected String senderClusterId = PipeReceiverRuntimeRegistry.UNKNOWN;
+  protected String receiverPipeName;
+  protected long receiverPipeCreationTime = Long.MIN_VALUE;
+  private final AtomicReference<String> pipeReceiverRuntimeSessionKey = new 
AtomicReference<>();
+
   @Override
   public IoTDBSinkRequestVersion getVersion() {
     return IoTDBSinkRequestVersion.VERSION_1;
   }
 
   protected TPipeTransferResp handleTransferHandshakeV1(final 
PipeTransferHandshakeV1Req req) {
+    senderClusterId = PipeReceiverRuntimeRegistry.UNKNOWN;
+    receiverPipeName = null;
+    receiverPipeCreationTime = Long.MIN_VALUE;
+
     if (!CommonDescriptor.getInstance()
         .getConfig()
         .getTimestampPrecision()
@@ -340,16 +350,28 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
             req.getParams()
                 
.getOrDefault(PipeTransferHandshakeConstant.HANDSHAKE_KEY_SKIP_IF, "false")));
 
+    final String pipeNameString =
+        
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PIPE_NAME);
+    final String pipeCreationTimeString =
+        
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PIPE_CREATION_TIME);
+
     // Handle the handshake request as a v1 request.
     // Here we construct a fake "dataNode" request to valid from v1 validation 
logic, though
     // it may not require the actual type of the v1 request.
-    return handleTransferHandshakeV1(
-        new PipeTransferHandshakeV1Req() {
-          @Override
-          protected PipeRequestType getPlanType() {
-            return PipeRequestType.HANDSHAKE_DATANODE_V1;
-          }
-        }.convertToTPipeTransferReq(timestampPrecision));
+    final TPipeTransferResp resp =
+        handleTransferHandshakeV1(
+            new PipeTransferHandshakeV1Req() {
+              @Override
+              protected PipeRequestType getPlanType() {
+                return PipeRequestType.HANDSHAKE_DATANODE_V1;
+              }
+            }.convertToTPipeTransferReq(timestampPrecision));
+    if (isSuccess(resp)) {
+      senderClusterId = clusterIdFromHandshakeRequest;
+      receiverPipeName = pipeNameString;
+      receiverPipeCreationTime = parsePipeCreationTime(pipeCreationTimeString);
+    }
+    return resp;
   }
 
   protected abstract String getClusterId();
@@ -873,8 +895,72 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       final PipeTransferFileSealReqV2 req, final List<String> 
fileAbsolutePaths)
       throws IOException, IllegalPathException;
 
+  protected void recordPipeReceiverHandshake(
+      final String receiverNodeType, final int receiverNodeId, final String 
protocol) {
+    final String sessionKey =
+        String.format("%s-%s-%s-%s", receiverNodeType, receiverNodeId, 
protocol, receiverId.get());
+    final String oldSessionKey = 
pipeReceiverRuntimeSessionKey.getAndSet(sessionKey);
+    if (!Objects.equals(oldSessionKey, sessionKey)) {
+      PipeReceiverRuntimeRegistry.getInstance().deregister(oldSessionKey);
+    }
+    PipeReceiverRuntimeRegistry.getInstance()
+        .registerOrUpdateSession(
+            sessionKey,
+            receiverNodeType,
+            receiverNodeId,
+            protocol,
+            getSenderHost(),
+            parseSenderPort(getSenderPort()),
+            username,
+            senderClusterId,
+            receiverPipeName,
+            receiverPipeCreationTime,
+            System.currentTimeMillis());
+  }
+
+  protected void recordPipeReceiverTransfer() {
+    PipeReceiverRuntimeRegistry.getInstance()
+        .markTransfer(pipeReceiverRuntimeSessionKey.get(), 
System.currentTimeMillis());
+  }
+
+  protected void recordPipeReceiverRequest() {
+    
PipeReceiverRuntimeRegistry.getInstance().markRequest(pipeReceiverRuntimeSessionKey.get());
+  }
+
+  protected void clearPipeReceiverRuntime() {
+    PipeReceiverRuntimeRegistry.getInstance()
+        .deregister(pipeReceiverRuntimeSessionKey.getAndSet(null));
+  }
+
+  protected static boolean isSuccess(final TPipeTransferResp resp) {
+    return resp != null
+        && resp.getStatus() != null
+        && resp.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode();
+  }
+
+  private static int parseSenderPort(final String senderPort) {
+    try {
+      return Integer.parseInt(senderPort);
+    } catch (final Exception e) {
+      return -1;
+    }
+  }
+
+  private static long parsePipeCreationTime(final String pipeCreationTime) {
+    if (pipeCreationTime == null) {
+      return Long.MIN_VALUE;
+    }
+    try {
+      return Long.parseLong(pipeCreationTime);
+    } catch (final NumberFormatException e) {
+      return Long.MIN_VALUE;
+    }
+  }
+
   @Override
   public synchronized void handleExit() {
+    clearPipeReceiverRuntime();
+
     if (writingFileWriter != null) {
       try {
         writingFileWriter.close();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/runtime/PipeReceiverRuntimeRegistry.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/runtime/PipeReceiverRuntimeRegistry.java
new file mode 100644
index 00000000000..4aaa24c8349
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/runtime/PipeReceiverRuntimeRegistry.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.receiver.runtime;
+
+import org.apache.iotdb.commons.queryengine.utils.DateTimeUtils;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.StringJoiner;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class PipeReceiverRuntimeRegistry {
+
+  public static final String UNKNOWN = "Unknown";
+  public static final String NODE_TYPE_DATA_NODE = "DataNode";
+  public static final String NODE_TYPE_CONFIG_NODE = "ConfigNode";
+  public static final String PROTOCOL_THRIFT = "thrift";
+  public static final String PROTOCOL_AIR_GAP = "air_gap";
+
+  private static final PipeReceiverRuntimeRegistry INSTANCE = new 
PipeReceiverRuntimeRegistry();
+
+  private final ConcurrentMap<String, SessionRuntimeInfo> sessionInfoMap =
+      new ConcurrentHashMap<>();
+
+  private PipeReceiverRuntimeRegistry() {}
+
+  public static PipeReceiverRuntimeRegistry getInstance() {
+    return INSTANCE;
+  }
+
+  public void registerOrUpdateSession(
+      String connectionKey,
+      String receiverNodeType,
+      int receiverNodeId,
+      String protocol,
+      String senderAddress,
+      int senderPort,
+      String userName,
+      String senderClusterId,
+      String pipeName,
+      long pipeCreationTime,
+      long handshakeTime) {
+    if (isBlank(connectionKey)) {
+      return;
+    }
+
+    sessionInfoMap.compute(
+        connectionKey,
+        (key, oldSession) -> {
+          final SessionRuntimeInfo session =
+              oldSession == null ? new SessionRuntimeInfo(connectionKey) : 
oldSession;
+          synchronized (session) {
+            session.receiverNodeType = normalize(receiverNodeType);
+            session.receiverNodeId = receiverNodeId;
+            session.protocol = normalize(protocol);
+            session.senderAddress = normalize(senderAddress);
+            session.senderPort = senderPort;
+            session.userName = normalize(userName);
+            session.senderClusterId = normalize(senderClusterId);
+            session.lastHandshakeTime = handshakeTime;
+            session.lastTransferTime = Math.max(session.lastTransferTime, 
handshakeTime);
+            session.requestNum.incrementAndGet();
+            if (!isBlank(pipeName)) {
+              session.pipeIds.add(formatPipeId(pipeName, pipeCreationTime));
+            }
+          }
+          return session;
+        });
+  }
+
+  public void markTransfer(String connectionKey, long transferTime) {
+    final SessionRuntimeInfo session = sessionInfoMap.get(connectionKey);
+    if (session == null) {
+      return;
+    }
+    synchronized (session) {
+      session.lastTransferTime = Math.max(session.lastTransferTime, 
transferTime);
+      session.requestNum.incrementAndGet();
+    }
+  }
+
+  public void markRequest(String connectionKey) {
+    final SessionRuntimeInfo session = sessionInfoMap.get(connectionKey);
+    if (session == null) {
+      return;
+    }
+    session.requestNum.incrementAndGet();
+  }
+
+  public void deregister(String connectionKey) {
+    if (!isBlank(connectionKey)) {
+      sessionInfoMap.remove(connectionKey);
+    }
+  }
+
+  public List<PipeReceiverRuntimeSnapshot> snapshot() {
+    final Map<GroupKey, AggregatedRuntimeInfo> aggregatedInfoMap = new 
HashMap<>();
+    for (SessionRuntimeInfo session : sessionInfoMap.values()) {
+      synchronized (session) {
+        final GroupKey groupKey =
+            new GroupKey(
+                session.receiverNodeType,
+                session.receiverNodeId,
+                session.protocol,
+                session.senderAddress,
+                session.userName);
+        AggregatedRuntimeInfo aggregatedInfo = aggregatedInfoMap.get(groupKey);
+        if (aggregatedInfo == null) {
+          aggregatedInfo = new AggregatedRuntimeInfo(groupKey);
+          aggregatedInfoMap.put(groupKey, aggregatedInfo);
+        }
+        aggregatedInfo.senderPorts.add(session.senderPort);
+        aggregatedInfo.connectionCount++;
+        aggregatedInfo.pipeIds.addAll(session.pipeIds);
+        aggregatedInfo.senderClusterIds.add(session.senderClusterId);
+        aggregatedInfo.lastHandshakeTime =
+            Math.max(aggregatedInfo.lastHandshakeTime, 
session.lastHandshakeTime);
+        aggregatedInfo.lastTransferTime =
+            Math.max(aggregatedInfo.lastTransferTime, 
session.lastTransferTime);
+        aggregatedInfo.requestNum += session.requestNum.get();
+      }
+    }
+
+    final List<AggregatedRuntimeInfo> aggregatedInfos = new 
ArrayList<>(aggregatedInfoMap.values());
+    
aggregatedInfos.sort(Comparator.comparing(AggregatedRuntimeInfo::getGroupKey));
+
+    final List<PipeReceiverRuntimeSnapshot> snapshots = new 
ArrayList<>(aggregatedInfos.size());
+    for (AggregatedRuntimeInfo aggregatedInfo : aggregatedInfos) {
+      snapshots.add(aggregatedInfo.toSnapshot());
+    }
+    return snapshots;
+  }
+
+  public void clear() {
+    sessionInfoMap.clear();
+  }
+
+  private static String normalize(String value) {
+    return isBlank(value) ? UNKNOWN : value;
+  }
+
+  private static boolean isBlank(String value) {
+    return value == null || value.trim().isEmpty();
+  }
+
+  private static String formatPipeId(String pipeName, long pipeCreationTime) {
+    if (pipeCreationTime < 0) {
+      return pipeName + "@" + UNKNOWN;
+    }
+    return pipeName + "@" + DateTimeUtils.convertLongToDate(pipeCreationTime, 
"ms");
+  }
+
+  private static class SessionRuntimeInfo {
+    private final String connectionKey;
+    private String receiverNodeType = UNKNOWN;
+    private int receiverNodeId = -1;
+    private String protocol = UNKNOWN;
+    private String senderAddress = UNKNOWN;
+    private int senderPort = -1;
+    private String userName = UNKNOWN;
+    private String senderClusterId = UNKNOWN;
+    private long lastHandshakeTime;
+    private long lastTransferTime;
+    private final AtomicLong requestNum = new AtomicLong();
+    private final TreeSet<String> pipeIds = new TreeSet<>();
+
+    private SessionRuntimeInfo(String connectionKey) {
+      this.connectionKey = connectionKey;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(connectionKey);
+    }
+  }
+
+  private static class GroupKey implements Comparable<GroupKey> {
+    private final String receiverNodeType;
+    private final int receiverNodeId;
+    private final String protocol;
+    private final String senderAddress;
+    private final String userName;
+
+    private GroupKey(
+        String receiverNodeType,
+        int receiverNodeId,
+        String protocol,
+        String senderAddress,
+        String userName) {
+      this.receiverNodeType = receiverNodeType;
+      this.receiverNodeId = receiverNodeId;
+      this.protocol = protocol;
+      this.senderAddress = senderAddress;
+      this.userName = userName;
+    }
+
+    @Override
+    public int compareTo(GroupKey other) {
+      int result = receiverNodeType.compareTo(other.receiverNodeType);
+      if (result != 0) {
+        return result;
+      }
+      result = Integer.compare(receiverNodeId, other.receiverNodeId);
+      if (result != 0) {
+        return result;
+      }
+      result = protocol.compareTo(other.protocol);
+      if (result != 0) {
+        return result;
+      }
+      result = senderAddress.compareTo(other.senderAddress);
+      if (result != 0) {
+        return result;
+      }
+      return userName.compareTo(other.userName);
+    }
+
+    @Override
+    public boolean equals(Object object) {
+      if (this == object) {
+        return true;
+      }
+      if (!(object instanceof GroupKey)) {
+        return false;
+      }
+      final GroupKey groupKey = (GroupKey) object;
+      return receiverNodeId == groupKey.receiverNodeId
+          && Objects.equals(receiverNodeType, groupKey.receiverNodeType)
+          && Objects.equals(protocol, groupKey.protocol)
+          && Objects.equals(senderAddress, groupKey.senderAddress)
+          && Objects.equals(userName, groupKey.userName);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(receiverNodeType, receiverNodeId, protocol, 
senderAddress, userName);
+    }
+  }
+
+  private static class AggregatedRuntimeInfo {
+    private final GroupKey groupKey;
+    private final TreeSet<Integer> senderPorts = new TreeSet<>();
+    private final TreeSet<String> pipeIds = new TreeSet<>();
+    private final TreeSet<String> senderClusterIds = new TreeSet<>();
+    private int connectionCount;
+    private long lastHandshakeTime;
+    private long lastTransferTime;
+    private long requestNum;
+
+    private AggregatedRuntimeInfo(GroupKey groupKey) {
+      this.groupKey = groupKey;
+    }
+
+    private GroupKey getGroupKey() {
+      return groupKey;
+    }
+
+    private PipeReceiverRuntimeSnapshot toSnapshot() {
+      return new PipeReceiverRuntimeSnapshot(
+          groupKey.receiverNodeType,
+          groupKey.receiverNodeId,
+          groupKey.protocol,
+          groupKey.senderAddress,
+          joinIntegerSet(senderPorts),
+          connectionCount,
+          pipeIds.size(),
+          pipeIds.isEmpty() ? UNKNOWN : joinStringSet(pipeIds),
+          groupKey.userName,
+          senderClusterIds.isEmpty() ? UNKNOWN : 
joinStringSet(senderClusterIds),
+          lastHandshakeTime,
+          lastTransferTime,
+          requestNum);
+    }
+  }
+
+  private static String joinIntegerSet(TreeSet<Integer> values) {
+    final StringJoiner joiner = new StringJoiner(",");
+    for (Integer value : values) {
+      joiner.add(String.valueOf(value));
+    }
+    return joiner.toString();
+  }
+
+  private static String joinStringSet(TreeSet<String> values) {
+    final StringJoiner joiner = new StringJoiner(";");
+    for (String value : values) {
+      joiner.add(value);
+    }
+    return joiner.toString();
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/runtime/PipeReceiverRuntimeSnapshot.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/runtime/PipeReceiverRuntimeSnapshot.java
new file mode 100644
index 00000000000..58a56333286
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/runtime/PipeReceiverRuntimeSnapshot.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.receiver.runtime;
+
+public class PipeReceiverRuntimeSnapshot {
+
+  private final String receiverNodeType;
+  private final int receiverNodeId;
+  private final String protocol;
+  private final String senderAddress;
+  private final String senderPorts;
+  private final int connectionCount;
+  private final int pipeCount;
+  private final String pipeIds;
+  private final String userName;
+  private final String senderClusterId;
+  private final long lastHandshakeTime;
+  private final long lastTransferTime;
+  private final long requestNum;
+
+  public PipeReceiverRuntimeSnapshot(
+      String receiverNodeType,
+      int receiverNodeId,
+      String protocol,
+      String senderAddress,
+      String senderPorts,
+      int connectionCount,
+      int pipeCount,
+      String pipeIds,
+      String userName,
+      String senderClusterId,
+      long lastHandshakeTime,
+      long lastTransferTime,
+      long requestNum) {
+    this.receiverNodeType = receiverNodeType;
+    this.receiverNodeId = receiverNodeId;
+    this.protocol = protocol;
+    this.senderAddress = senderAddress;
+    this.senderPorts = senderPorts;
+    this.connectionCount = connectionCount;
+    this.pipeCount = pipeCount;
+    this.pipeIds = pipeIds;
+    this.userName = userName;
+    this.senderClusterId = senderClusterId;
+    this.lastHandshakeTime = lastHandshakeTime;
+    this.lastTransferTime = lastTransferTime;
+    this.requestNum = requestNum;
+  }
+
+  public String getReceiverNodeType() {
+    return receiverNodeType;
+  }
+
+  public int getReceiverNodeId() {
+    return receiverNodeId;
+  }
+
+  public String getProtocol() {
+    return protocol;
+  }
+
+  public String getSenderAddress() {
+    return senderAddress;
+  }
+
+  public String getSenderPorts() {
+    return senderPorts;
+  }
+
+  public int getConnectionCount() {
+    return connectionCount;
+  }
+
+  public int getPipeCount() {
+    return pipeCount;
+  }
+
+  public String getPipeIds() {
+    return pipeIds;
+  }
+
+  public String getUserName() {
+    return userName;
+  }
+
+  public String getSenderClusterId() {
+    return senderClusterId;
+  }
+
+  public long getLastHandshakeTime() {
+    return lastHandshakeTime;
+  }
+
+  public long getLastTransferTime() {
+    return lastTransferTime;
+  }
+
+  public long getRequestNum() {
+    return requestNum;
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
index 1f76f5d2453..9d7c85d975a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
@@ -22,12 +22,14 @@ package org.apache.iotdb.commons.pipe.sink.client;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.audit.UserEntity;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferHandshakeConstant;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.SocketTimeoutException;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -51,6 +53,9 @@ public abstract class IoTDBClientManager {
   protected final boolean shouldMarkAsPipeRequest;
   protected final boolean skipIfNoPrivileges;
 
+  protected volatile String pipeName;
+  protected volatile long pipeCreationTime = Long.MIN_VALUE;
+
   // This flag indicates whether the receiver supports mods transferring if
   // it is a DataNode receiver. The flag is useless for configNode receiver.
   protected boolean supportModsIfIsDataNodeReceiver = true;
@@ -89,6 +94,21 @@ public abstract class IoTDBClientManager {
     return supportModsIfIsDataNodeReceiver;
   }
 
+  public void setPipeInfo(final String pipeName, final long pipeCreationTime) {
+    this.pipeName = pipeName;
+    this.pipeCreationTime = pipeCreationTime;
+  }
+
+  protected void appendPipeInfoToHandshakeParams(final Map<String, String> 
params) {
+    if (pipeName == null) {
+      return;
+    }
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PIPE_NAME, 
pipeName);
+    params.put(
+        PipeTransferHandshakeConstant.HANDSHAKE_KEY_PIPE_CREATION_TIME,
+        String.valueOf(pipeCreationTime));
+  }
+
   public void adjustTimeoutIfNecessary(Throwable e) {
     do {
       if (e instanceof SocketTimeoutException || e instanceof 
TimeoutException) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
index ff62dbf477b..e92d93ca64b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
@@ -249,6 +249,7 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
       params.put(
           PipeTransferHandshakeConstant.HANDSHAKE_KEY_SKIP_IF,
           Boolean.toString(skipIfNoPrivileges));
+      appendPipeInfoToHandshakeParams(params);
 
       // Try to handshake by PipeTransferHandshakeV2Req.
       TPipeTransferResp resp = 
client.pipeTransfer(buildHandshakeV2Req(params));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferHandshakeConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferHandshakeConstant.java
index 710fca828e9..3f084a73dc8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferHandshakeConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferHandshakeConstant.java
@@ -29,6 +29,8 @@ public class PipeTransferHandshakeConstant {
   public static final String HANDSHAKE_KEY_USERNAME = "username";
   public static final String HANDSHAKE_KEY_CLI_HOSTNAME = "cliHostname";
   public static final String HANDSHAKE_KEY_PASSWORD = "password";
+  public static final String HANDSHAKE_KEY_PIPE_NAME = "pipeName";
+  public static final String HANDSHAKE_KEY_PIPE_CREATION_TIME = 
"pipeCreationTime";
   public static final String HANDSHAKE_KEY_VALIDATE_TSFILE = "validateTsFile";
   public static final String HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST = 
"markAsPipeRequest";
   public static final String HANDSHAKE_KEY_SKIP_IF = "skipIf";
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
index b5662aeec2c..9a1fef61014 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.commons.pipe.sink.compressor.PipeCompressorConfig;
 import org.apache.iotdb.commons.pipe.sink.compressor.PipeCompressorFactory;
 import org.apache.iotdb.commons.pipe.sink.limiter.GlobalRPCRateLimiter;
 import org.apache.iotdb.commons.pipe.sink.limiter.PipeEndPointRateLimiter;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferHandshakeConstant;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferCompressedReq;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.metrics.type.Histogram;
@@ -190,6 +191,8 @@ public abstract class IoTDBSink implements PipeConnector, 
PipeConnectorWithEvent
   private final AtomicLong totalCompressedSize = new AtomicLong(0);
   protected String attributeSortedString;
   protected String sinkTaskId;
+  protected String pipeName;
+  protected long creationTime = Long.MIN_VALUE;
   protected Timer compressionTimer;
   protected boolean isRealtimeFirst;
 
@@ -396,6 +399,8 @@ public abstract class IoTDBSink implements PipeConnector, 
PipeConnectorWithEvent
           (PipeTaskSinkRuntimeEnvironment) environment;
       attributeSortedString = sinkEnvironment.getAttributeSortedString();
       sinkTaskId = sinkEnvironment.getSinkTaskId();
+      pipeName = sinkEnvironment.getPipeName();
+      creationTime = sinkEnvironment.getCreationTime();
     }
 
     nodeUrls.clear();
@@ -621,6 +626,16 @@ public abstract class IoTDBSink implements PipeConnector, 
PipeConnectorWithEvent
     return totalUncompressedSize.get();
   }
 
+  protected void appendPipeInfoToHandshakeParams(final Map<String, String> 
params) {
+    if (pipeName == null) {
+      return;
+    }
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PIPE_NAME, 
pipeName);
+    params.put(
+        PipeTransferHandshakeConstant.HANDSHAKE_KEY_PIPE_CREATION_TIME,
+        String.valueOf(creationTime));
+  }
+
   public void rateLimitIfNeeded(
       final String pipeName,
       final long creationTime,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
index 6d3c05d3b4f..064b5960245 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
@@ -143,6 +143,7 @@ public abstract class IoTDBSslSyncSink extends IoTDBSink {
             loadTsFileValidation,
             shouldMarkAsPipeRequest,
             skipIfNoPrivileges);
+    clientManager.setPipeInfo(pipeName, creationTime);
   }
 
   protected abstract IoTDBSyncClientManager constructClient(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/planner/plan/node/PlanNodeType.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/planner/plan/node/PlanNodeType.java
index 71eb2238f15..790b1f9c192 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -142,6 +142,7 @@ public enum PlanNodeType {
   SHOW_DISK_USAGE((short) 107),
   TREE_COLLECT((short) 108),
   LOAD_TSFILE_OBJECT_PIECE((short) 109),
+  SHOW_RECEIVERS((short) 110),
 
   CREATE_OR_UPDATE_TABLE_DEVICE((short) 902),
   TABLE_DEVICE_QUERY_SCAN((short) 903),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java
index 186f7daa684..9109e417163 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java
@@ -197,6 +197,21 @@ public class ColumnHeaderConstant {
   public static final String REMAINING_EVENT_COUNT = "RemainingEventCount";
   public static final String ESTIMATED_REMAINING_SECONDS = 
"EstimatedRemainingSeconds";
 
+  // column names for show receivers
+  public static final String RECEIVER_NODE_TYPE = "ReceiverNodeType";
+  public static final String RECEIVER_NODE_ID = "ReceiverNodeId";
+  public static final String PROTOCOL = "Protocol";
+  public static final String SENDER_ADDRESS = "SenderAddress";
+  public static final String SENDER_PORTS = "SenderPorts";
+  public static final String CONNECTION_COUNT = "ConnectionCount";
+  public static final String PIPE_COUNT = "PipeCount";
+  public static final String PIPE_IDS = "PipeIDs";
+  public static final String RECEIVER_USER_NAME = "UserName";
+  public static final String SENDER_CLUSTER_ID = "SenderClusterId";
+  public static final String LAST_HANDSHAKE_TIME = "LastHandshakeTime";
+  public static final String LAST_TRANSFER_TIME = "LastTransferTime";
+  public static final String REQUEST_NUM = "RequestNum";
+
   // column names for select into
   public static final String SOURCE_DEVICE = "SourceDevice";
   public static final String SOURCE_COLUMN = "SourceColumn";
@@ -270,6 +285,19 @@ public class ColumnHeaderConstant {
   public static final String ESTIMATED_REMAINING_SECONDS_TABLE_MODEL =
       "estimated_remaining_seconds";
 
+  public static final String RECEIVER_NODE_TYPE_TABLE_MODEL = 
"receiver_node_type";
+  public static final String RECEIVER_NODE_ID_TABLE_MODEL = "receiver_node_id";
+  public static final String PROTOCOL_TABLE_MODEL = "protocol";
+  public static final String SENDER_ADDRESS_TABLE_MODEL = "sender_address";
+  public static final String SENDER_PORTS_TABLE_MODEL = "sender_ports";
+  public static final String CONNECTION_COUNT_TABLE_MODEL = "connection_count";
+  public static final String PIPE_COUNT_TABLE_MODEL = "pipe_count";
+  public static final String PIPE_IDS_TABLE_MODEL = "pipe_ids";
+  public static final String SENDER_CLUSTER_ID_TABLE_MODEL = 
"sender_cluster_id";
+  public static final String LAST_HANDSHAKE_TIME_TABLE_MODEL = 
"last_handshake_time";
+  public static final String LAST_TRANSFER_TIME_TABLE_MODEL = 
"last_transfer_time";
+  public static final String REQUEST_NUM_TABLE_MODEL = "request_num";
+
   public static final String PLUGIN_NAME_TABLE_MODEL = "plugin_name";
   public static final String PLUGIN_TYPE_TABLE_MODEL = "plugin_type";
   public static final String CLASS_NAME_TABLE_MODEL = "class_name";
@@ -664,6 +692,22 @@ public class ColumnHeaderConstant {
           new ColumnHeader(CLIENT_IP_TREE_MODEL, TSDataType.STRING),
           new ColumnHeader(TIMEOUT, TSDataType.INT64));
 
+  public static final List<ColumnHeader> showReceiversColumnHeaders =
+      ImmutableList.of(
+          new ColumnHeader(RECEIVER_NODE_TYPE, TSDataType.TEXT),
+          new ColumnHeader(RECEIVER_NODE_ID, TSDataType.INT32),
+          new ColumnHeader(PROTOCOL, TSDataType.TEXT),
+          new ColumnHeader(SENDER_ADDRESS, TSDataType.TEXT),
+          new ColumnHeader(SENDER_PORTS, TSDataType.TEXT),
+          new ColumnHeader(CONNECTION_COUNT, TSDataType.INT32),
+          new ColumnHeader(PIPE_COUNT, TSDataType.INT32),
+          new ColumnHeader(PIPE_IDS, TSDataType.TEXT),
+          new ColumnHeader(RECEIVER_USER_NAME, TSDataType.TEXT),
+          new ColumnHeader(SENDER_CLUSTER_ID, TSDataType.TEXT),
+          new ColumnHeader(LAST_HANDSHAKE_TIME, TSDataType.TEXT),
+          new ColumnHeader(LAST_TRANSFER_TIME, TSDataType.TEXT),
+          new ColumnHeader(REQUEST_NUM, TSDataType.INT64));
+
   public static final List<ColumnHeader> showDiskUsageColumnHeaders =
       ImmutableList.of(
           new ColumnHeader(DATABASE, TSDataType.TEXT),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java
index c4001ecbb1e..fd91c0f8921 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java
@@ -47,6 +47,7 @@ public class InformationSchema {
   public static final String COLUMNS = "columns";
   public static final String REGIONS = "regions";
   public static final String PIPES = "pipes";
+  public static final String RECEIVERS = "receivers";
   public static final String PIPE_PLUGINS = "pipe_plugins";
   public static final String TOPICS = "topics";
   public static final String SUBSCRIPTIONS = "subscriptions";
@@ -221,6 +222,41 @@ public class InformationSchema {
             ColumnHeaderConstant.ESTIMATED_REMAINING_SECONDS_TABLE_MODEL, 
TSDataType.DOUBLE));
     schemaTables.put(PIPES, pipeTable);
 
+    final TsTable receiversTable = new TsTable(RECEIVERS);
+    receiversTable.addColumnSchema(
+        new TagColumnSchema(
+            ColumnHeaderConstant.RECEIVER_NODE_TYPE_TABLE_MODEL, 
TSDataType.STRING));
+    receiversTable.addColumnSchema(
+        new TagColumnSchema(ColumnHeaderConstant.RECEIVER_NODE_ID_TABLE_MODEL, 
TSDataType.INT32));
+    receiversTable.addColumnSchema(
+        new TagColumnSchema(ColumnHeaderConstant.PROTOCOL_TABLE_MODEL, 
TSDataType.STRING));
+    receiversTable.addColumnSchema(
+        new TagColumnSchema(ColumnHeaderConstant.SENDER_ADDRESS_TABLE_MODEL, 
TSDataType.STRING));
+    receiversTable.addColumnSchema(
+        new AttributeColumnSchema(
+            ColumnHeaderConstant.SENDER_PORTS_TABLE_MODEL, TSDataType.STRING));
+    receiversTable.addColumnSchema(
+        new AttributeColumnSchema(
+            ColumnHeaderConstant.CONNECTION_COUNT_TABLE_MODEL, 
TSDataType.INT32));
+    receiversTable.addColumnSchema(
+        new AttributeColumnSchema(ColumnHeaderConstant.PIPE_COUNT_TABLE_MODEL, 
TSDataType.INT32));
+    receiversTable.addColumnSchema(
+        new AttributeColumnSchema(ColumnHeaderConstant.PIPE_IDS_TABLE_MODEL, 
TSDataType.STRING));
+    receiversTable.addColumnSchema(
+        new TagColumnSchema(ColumnHeaderConstant.USER_NAME, 
TSDataType.STRING));
+    receiversTable.addColumnSchema(
+        new AttributeColumnSchema(
+            ColumnHeaderConstant.SENDER_CLUSTER_ID_TABLE_MODEL, 
TSDataType.STRING));
+    receiversTable.addColumnSchema(
+        new AttributeColumnSchema(
+            ColumnHeaderConstant.LAST_HANDSHAKE_TIME_TABLE_MODEL, 
TSDataType.TIMESTAMP));
+    receiversTable.addColumnSchema(
+        new AttributeColumnSchema(
+            ColumnHeaderConstant.LAST_TRANSFER_TIME_TABLE_MODEL, 
TSDataType.TIMESTAMP));
+    receiversTable.addColumnSchema(
+        new 
AttributeColumnSchema(ColumnHeaderConstant.REQUEST_NUM_TABLE_MODEL, 
TSDataType.INT64));
+    schemaTables.put(RECEIVERS, receiversTable);
+
     final TsTable pipePluginTable = new TsTable(PIPE_PLUGINS);
     pipePluginTable.addColumnSchema(
         new TagColumnSchema(ColumnHeaderConstant.PLUGIN_NAME_TABLE_MODEL, 
TSDataType.STRING));
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/runtime/PipeReceiverRuntimeRegistryTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/runtime/PipeReceiverRuntimeRegistryTest.java
new file mode 100644
index 00000000000..4445c8e9f58
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/runtime/PipeReceiverRuntimeRegistryTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.receiver.runtime;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class PipeReceiverRuntimeRegistryTest {
+
+  private final PipeReceiverRuntimeRegistry registry = 
PipeReceiverRuntimeRegistry.getInstance();
+
+  @Before
+  public void setUp() {
+    registry.clear();
+  }
+
+  @After
+  public void tearDown() {
+    registry.clear();
+  }
+
+  @Test
+  public void testAggregateAndSortSnapshots() {
+    registry.registerOrUpdateSession(
+        "data-1",
+        PipeReceiverRuntimeRegistry.NODE_TYPE_DATA_NODE,
+        2,
+        PipeReceiverRuntimeRegistry.PROTOCOL_THRIFT,
+        "127.0.0.1",
+        9001,
+        "root",
+        "cluster-a",
+        "pipe-a",
+        1,
+        100);
+    registry.markTransfer("data-1", 200);
+    registry.markRequest("data-1");
+    registry.registerOrUpdateSession(
+        "data-2",
+        PipeReceiverRuntimeRegistry.NODE_TYPE_DATA_NODE,
+        2,
+        PipeReceiverRuntimeRegistry.PROTOCOL_THRIFT,
+        "127.0.0.1",
+        9002,
+        "root",
+        "cluster-b",
+        "pipe-b",
+        2,
+        150);
+    registry.registerOrUpdateSession(
+        "config-1",
+        PipeReceiverRuntimeRegistry.NODE_TYPE_CONFIG_NODE,
+        1,
+        PipeReceiverRuntimeRegistry.PROTOCOL_AIR_GAP,
+        "127.0.0.2",
+        9003,
+        "root",
+        PipeReceiverRuntimeRegistry.UNKNOWN,
+        null,
+        Long.MIN_VALUE,
+        300);
+
+    final List<PipeReceiverRuntimeSnapshot> snapshots = registry.snapshot();
+
+    assertEquals(2, snapshots.size());
+    assertEquals(
+        PipeReceiverRuntimeRegistry.NODE_TYPE_CONFIG_NODE, 
snapshots.get(0).getReceiverNodeType());
+    assertEquals(
+        PipeReceiverRuntimeRegistry.NODE_TYPE_DATA_NODE, 
snapshots.get(1).getReceiverNodeType());
+
+    final PipeReceiverRuntimeSnapshot dataSnapshot = snapshots.get(1);
+    assertEquals(2, dataSnapshot.getConnectionCount());
+    assertEquals("9001,9002", dataSnapshot.getSenderPorts());
+    assertEquals(2, dataSnapshot.getPipeCount());
+    assertTrue(dataSnapshot.getPipeIds().contains("pipe-a@"));
+    assertTrue(dataSnapshot.getPipeIds().contains("pipe-b@"));
+    assertEquals("cluster-a;cluster-b", dataSnapshot.getSenderClusterId());
+    assertEquals(150, dataSnapshot.getLastHandshakeTime());
+    assertEquals(200, dataSnapshot.getLastTransferTime());
+    assertEquals(4, dataSnapshot.getRequestNum());
+  }
+
+  @Test
+  public void testDeregisterSession() {
+    registry.registerOrUpdateSession(
+        "data-1",
+        PipeReceiverRuntimeRegistry.NODE_TYPE_DATA_NODE,
+        1,
+        PipeReceiverRuntimeRegistry.PROTOCOL_THRIFT,
+        "127.0.0.1",
+        9001,
+        "root",
+        "cluster-a",
+        "pipe-a",
+        1,
+        100);
+
+    assertEquals(1, registry.snapshot().size());
+
+    registry.deregister("data-1");
+
+    assertTrue(registry.snapshot().isEmpty());
+  }
+}
diff --git 
a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
 
b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
index f984e825fc7..e7a02f269ab 100644
--- 
a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
+++ 
b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
@@ -102,6 +102,7 @@ statement
     | startPipeStatement
     | stopPipeStatement
     | showPipesStatement
+    | showReceiversStatement
     | createPipePluginStatement
     | dropPipePluginStatement
     | showPipePluginsStatement
@@ -519,6 +520,10 @@ showPipesStatement
     : SHOW ((PIPE pipeName=identifier) | PIPES (WHERE (CONNECTOR | SINK) USED 
BY pipeName=identifier)?)
     ;
 
+showReceiversStatement
+    : SHOW RECEIVERS
+    ;
+
 createPipePluginStatement
     : CREATE PIPEPLUGIN (IF NOT EXISTS)? pluginName=identifier AS 
className=string uriClause
     ;
@@ -1504,7 +1509,7 @@ nonReserved
     | OBJECT | OF | OFFSET | OMIT | ONE | ONLY | OPTION | ORDINALITY | OUTPUT 
| OVER | OVERFLOW
     | PARTITION | PARTITIONS | PASSING | PAST | PATH | PATTERN | PER | PERIOD 
| PERMUTE | PIPE | PIPEPLUGIN | PIPEPLUGINS | PIPES | PLAN | POSITION | 
PRECEDING | PRECISION | PRIVILEGES | PREVIOUS | PROCESSLIST | PROCESSOR | 
PROPERTIES | PRUNE
     | QUERIES | QUERY | QUOTES
-    | RANGE | READ | READONLY | RECONSTRUCT | REFRESH | REGION | REGIONID | 
REGIONS | REMOVE | RENAME | REPAIR | REPEAT | REPEATABLE | REPLACE | RESET | 
RESPECT | RESTRICT | RETURN | RETURNING | RETURNS | REVOKE | ROLE | ROLES | 
ROLLBACK | ROOT | ROW | ROWS | RPR_FIRST | RPR_LAST | RUNNING
+    | RANGE | READ | READONLY | RECEIVERS | RECONSTRUCT | REFRESH | REGION | 
REGIONID | REGIONS | REMOVE | RENAME | REPAIR | REPEAT | REPEATABLE | REPLACE | 
RESET | RESPECT | RESTRICT | RETURN | RETURNING | RETURNS | REVOKE | ROLE | 
ROLES | ROLLBACK | ROOT | ROW | ROWS | RPR_FIRST | RPR_LAST | RUNNING
     | SERIESSLOTID | SERVICE | SERVICES | SCALAR | SCHEMA | SCHEMAS | SECOND | 
SECURITY | SEEK | SERIALIZABLE | SESSION | SET | SETS
     | SECURITY | SHOW | SINK | SOME | SOURCE | START | STATS | STOP | 
SUBSCRIPTION | SUBSCRIPTIONS | SUBSET | SUBSTRING | SYSTEM
     | TABLES | TABLESAMPLE | TAG | TAGS | TEXT | TEXT_STRING | TIES | TIME | 
TIMEPARTITION | TIMER | TIMER_XL | TIMESERIES | TIMESLOTID | TIMESTAMP | TO | 
TOPIC | TOPICS | TRAILING | TRANSACTION | TRUNCATE | TRY_CAST | TYPE
@@ -1794,6 +1799,7 @@ QUOTES: 'QUOTES';
 RANGE: 'RANGE';
 READ: 'READ';
 READONLY: 'READONLY';
+RECEIVERS: 'RECEIVERS';
 RECONSTRUCT: 'RECONSTRUCT';
 RECURSIVE: 'RECURSIVE';
 REFRESH: 'REFRESH';

Reply via email to