This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch feat/show-receivers in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit dd112cf35bd759f301d801fa6c09a993cddb654b Author: Caideyipi <[email protected]> AuthorDate: Mon Jun 8 17:53:51 2026 +0800 Add show receivers support --- .../org/apache/iotdb/db/qp/sql/IdentifierParser.g4 | 3 +- .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 6 +- .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 6 +- .../receiver/protocol/IoTDBConfigNodeReceiver.java | 41 ++- .../sink/protocol/IoTDBConfigRegionAirGapSink.java | 1 + .../protocol/thrift/IoTDBDataNodeReceiver.java | 222 ++++++++++++--- .../client/IoTDBDataNodeAsyncClientManager.java | 1 + .../protocol/airgap/IoTDBDataNodeAirGapSink.java | 1 + .../thrift/async/IoTDBDataRegionAsyncSink.java | 2 + .../common/header/DatasetHeaderFactory.java | 4 + .../operator/source/ShowReceiversOperator.java | 161 +++++++++++ .../InformationSchemaContentSupplierFactory.java | 49 ++++ .../db/queryengine/plan/analyze/Analysis.java | 2 + .../queryengine/plan/analyze/AnalyzeVisitor.java | 29 ++ .../db/queryengine/plan/parser/ASTVisitor.java | 6 + .../plan/planner/LogicalPlanBuilder.java | 39 +++ .../plan/planner/LogicalPlanVisitor.java | 7 + .../plan/planner/OperatorTreeGenerator.java | 15 + .../SimpleFragmentParallelPlanner.java | 2 + .../plan/node/DataNodePlanNodeDeserializer.java | 3 + .../plan/planner/plan/node/PlanVisitor.java | 5 + .../plan/node/source/ShowReceiversNode.java | 115 ++++++++ .../DataNodeLocationSupplierFactory.java | 1 + .../security/TreeAccessCheckVisitor.java | 7 + .../plan/relational/sql/parser/AstBuilder.java | 11 + .../queryengine/plan/statement/StatementType.java | 1 + .../plan/statement/StatementVisitor.java | 5 + .../plan/statement/sys/ShowReceiversStatement.java | 60 ++++ .../plan/parser/StatementGeneratorTest.java | 11 + .../planner/node/source/SourceNodeSerdeTest.java | 11 + .../informationschema/ShowReceiversTest.java | 53 ++++ .../commons/pipe/receiver/IoTDBFileReceiver.java | 100 ++++++- .../runtime/PipeReceiverRuntimeRegistry.java | 315 +++++++++++++++++++++ .../runtime/PipeReceiverRuntimeSnapshot.java | 118 ++++++++ .../pipe/sink/client/IoTDBClientManager.java | 20 ++ .../pipe/sink/client/IoTDBSyncClientManager.java | 1 + .../common/PipeTransferHandshakeConstant.java | 2 + .../commons/pipe/sink/protocol/IoTDBSink.java | 15 + .../pipe/sink/protocol/IoTDBSslSyncSink.java | 1 + .../plan/planner/plan/node/PlanNodeType.java | 1 + .../schema/column/ColumnHeaderConstant.java | 44 +++ .../commons/schema/table/InformationSchema.java | 36 +++ .../runtime/PipeReceiverRuntimeRegistryTest.java | 127 +++++++++ .../db/relational/grammar/sql/RelationalSql.g4 | 8 +- 44 files changed, 1612 insertions(+), 56 deletions(-) diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 index 54d53bab674..f16bf85bc76 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 @@ -196,6 +196,7 @@ keyWords | QUERY | QUERYID | QUOTA + | RECEIVERS | RANGE | READONLY | READ @@ -298,4 +299,4 @@ keyWords | OPTION | INF | CURRENT_TIMESTAMP - ; \ No newline at end of file + ; diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 index 3e94667a2fa..dcad209354c 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 @@ -55,7 +55,7 @@ ddlStatement // ExternalService | createService | startService | stopService | dropService | showService // Pipe Task - | createPipe | alterPipe | dropPipe | startPipe | stopPipe | showPipes + | createPipe | alterPipe | dropPipe | startPipe | stopPipe | showPipes | showReceivers // Pipe Plugin | createPipePlugin | dropPipePlugin | showPipePlugins // Subscription @@ -701,6 +701,10 @@ showPipes : SHOW ((PIPE pipeName=identifier) | PIPES (WHERE (CONNECTOR | SINK) USED BY pipeName=identifier)?) ; +showReceivers + : SHOW RECEIVERS + ; + // Pipe Plugin ========================================================================================= createPipePlugin : CREATE PIPEPLUGIN (IF NOT EXISTS)? pluginName=identifier AS className=STRING_LITERAL uriClause diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 index 59bac72fc78..4f96e1f69a3 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 @@ -723,6 +723,10 @@ QUOTA : Q U O T A ; +RECEIVERS + : R E C E I V E R S + ; + RANGE : R A N G E ; @@ -1411,4 +1415,4 @@ fragment V: [vV]; fragment W: [wW]; fragment X: [xX]; fragment Y: [yY]; -fragment Z: [zZ]; \ No newline at end of file +fragment Z: [zZ]; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java index 7de8e8bf78d..9d84e448c9f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java @@ -35,6 +35,7 @@ import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver; import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler; +import org.apache.iotdb.commons.pipe.receiver.runtime.PipeReceiverRuntimeRegistry; import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferCompressedReq; @@ -194,20 +195,22 @@ public class IoTDBConfigNodeReceiver extends IoTDBFileReceiver { PipeTransferConfigNodeHandshakeV1Req.fromTPipeTransferReq(req)); PipeConfigNodeReceiverMetrics.getInstance() .recordHandshakeConfigNodeV1Timer(System.nanoTime() - startTime); - return resp; + return recordConfigNodeHandshakeIfSuccess(resp, req); case HANDSHAKE_CONFIGNODE_V2: resp = handleTransferHandshakeV2( PipeTransferConfigNodeHandshakeV2Req.fromTPipeTransferReq(req)); - userEntity.setAuditLogOperation(AuditLogOperation.DDL); + if (Objects.nonNull(userEntity)) { + userEntity.setAuditLogOperation(AuditLogOperation.DDL); + } PipeConfigNodeReceiverMetrics.getInstance() .recordHandshakeConfigNodeV2Timer(System.nanoTime() - startTime); - return resp; + return recordConfigNodeHandshakeIfSuccess(resp, req); case TRANSFER_CONFIG_PLAN: resp = handleTransferConfigPlan(PipeTransferConfigPlanReq.fromTPipeTransferReq(req)); PipeConfigNodeReceiverMetrics.getInstance() .recordTransferConfigPlanTimer(System.nanoTime() - startTime); - return resp; + return recordConfigNodeTransferIfSuccess(resp); case TRANSFER_CONFIG_SNAPSHOT_PIECE: resp = handleTransferFilePiece( @@ -216,14 +219,14 @@ public class IoTDBConfigNodeReceiver extends IoTDBFileReceiver { false); PipeConfigNodeReceiverMetrics.getInstance() .recordTransferConfigSnapshotPieceTimer(System.nanoTime() - startTime); - return resp; + return recordConfigNodeTransferIfSuccess(resp); case TRANSFER_CONFIG_SNAPSHOT_SEAL: resp = handleTransferFileSealV2( PipeTransferConfigSnapshotSealReq.fromTPipeTransferReq(req)); PipeConfigNodeReceiverMetrics.getInstance() .recordTransferConfigSnapshotSealTimer(System.nanoTime() - startTime); - return resp; + return recordConfigNodeTransferIfSuccess(resp); case TRANSFER_COMPRESSED: return receive(PipeTransferCompressedReq.fromTPipeTransferReq(req)); default: @@ -262,6 +265,32 @@ public class IoTDBConfigNodeReceiver extends IoTDBFileReceiver { && type != PipeRequestType.HANDSHAKE_CONFIGNODE_V2; } + private TPipeTransferResp recordConfigNodeHandshakeIfSuccess( + final TPipeTransferResp resp, final TPipeTransferReq req) { + if (isSuccess(resp)) { + recordPipeReceiverHandshake( + PipeReceiverRuntimeRegistry.NODE_TYPE_CONFIG_NODE, + ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), + getProtocol(req)); + } + return resp; + } + + private TPipeTransferResp recordConfigNodeTransferIfSuccess(final TPipeTransferResp resp) { + if (isSuccess(resp)) { + recordPipeReceiverTransfer(); + } else { + recordPipeReceiverRequest(); + } + return resp; + } + + private static String getProtocol(final TPipeTransferReq req) { + return req instanceof AirGapPseudoTPipeTransferRequest + ? PipeReceiverRuntimeRegistry.PROTOCOL_AIR_GAP + : PipeReceiverRuntimeRegistry.PROTOCOL_THRIFT; + } + private TPipeTransferResp handleTransferConfigPlan(final PipeTransferConfigPlanReq req) throws IOException { return new TPipeTransferResp( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java index c8c2890f5be..f8e91993eeb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java @@ -89,6 +89,7 @@ public class IoTDBConfigRegionAirGapSink extends IoTDBAirGapSink { Boolean.toString(shouldMarkAsPipeRequest)); params.put( PipeTransferHandshakeConstant.HANDSHAKE_KEY_SKIP_IF, Boolean.toString(skipIfNoPrivileges)); + appendPipeInfoToHandshakeParams(params); return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 731c2eb50f0..e237e7129bf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -35,8 +35,10 @@ import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver; import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler; +import org.apache.iotdb.commons.pipe.receiver.runtime.PipeReceiverRuntimeRegistry; import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferHandshakeConstant; import org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqHandler; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferCompressedReq; @@ -115,16 +117,19 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; import com.google.common.util.concurrent.ListenableFuture; import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.file.Paths; import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -174,6 +179,8 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { // datanode (cluster B). private static final AtomicLong CONFIG_RECEIVER_ID_GENERATOR = new AtomicLong(0); protected final AtomicReference<String> configReceiverId = new AtomicReference<>(); + private final AtomicReference<String> configPipeReceiverRuntimeSessionKey = + new AtomicReference<>(); private final PipeTransferSliceReqHandler sliceReqHandler = new PipeTransferSliceReqHandler(); @@ -217,8 +224,10 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { TSStatusCode.PIPE_HANDSHAKE_ERROR.getStatusCode(), "The receiver memory is not enough to handle the handshake request from datanode.")); } - return handleTransferHandshakeV1( - PipeTransferDataNodeHandshakeV1Req.fromTPipeTransferReq(req)); + return recordDataNodeHandshakeIfSuccess( + handleTransferHandshakeV1( + PipeTransferDataNodeHandshakeV1Req.fromTPipeTransferReq(req)), + req); } finally { PipeDataNodeReceiverMetrics.getInstance() .recordHandshakeDatanodeV1Timer(System.nanoTime() - startTime); @@ -235,8 +244,10 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { TSStatusCode.PIPE_HANDSHAKE_ERROR.getStatusCode(), "The receiver memory is not enough to handle the handshake request from datanode.")); } - return handleTransferHandshakeV2( - PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req)); + return recordDataNodeHandshakeIfSuccess( + handleTransferHandshakeV2( + PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req)), + req); } finally { PipeDataNodeReceiverMetrics.getInstance() .recordHandshakeDatanodeV2Timer(System.nanoTime() - startTime); @@ -245,8 +256,9 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { case TRANSFER_TABLET_INSERT_NODE: { try { - return handleTransferTabletInsertNode( - PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req)); + return recordDataNodeTransferIfSuccess( + handleTransferTabletInsertNode( + PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req))); } finally { PipeDataNodeReceiverMetrics.getInstance() @@ -256,8 +268,9 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { case TRANSFER_TABLET_INSERT_NODE_V2: { try { - return handleTransferTabletInsertNode( - PipeTransferTabletInsertNodeReqV2.fromTPipeTransferReq(req)); + return recordDataNodeTransferIfSuccess( + handleTransferTabletInsertNode( + PipeTransferTabletInsertNodeReqV2.fromTPipeTransferReq(req))); } finally { PipeDataNodeReceiverMetrics.getInstance() .recordTransferTabletInsertNodeV2Timer(System.nanoTime() - startTime); @@ -266,7 +279,8 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { case TRANSFER_TABLET_RAW: { try { - return handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req)); + return recordDataNodeTransferIfSuccess( + handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req))); } finally { PipeDataNodeReceiverMetrics.getInstance() .recordTransferTabletRawTimer(System.nanoTime() - startTime); @@ -275,8 +289,8 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { case TRANSFER_TABLET_RAW_V2: { try { - return handleTransferTabletRaw( - PipeTransferTabletRawReqV2.fromTPipeTransferReq(req)); + return recordDataNodeTransferIfSuccess( + handleTransferTabletRaw(PipeTransferTabletRawReqV2.fromTPipeTransferReq(req))); } finally { PipeDataNodeReceiverMetrics.getInstance() .recordTransferTabletRawV2Timer(System.nanoTime() - startTime); @@ -285,8 +299,9 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { case TRANSFER_TABLET_BINARY: { try { - return handleTransferTabletBinary( - PipeTransferTabletBinaryReq.fromTPipeTransferReq(req)); + return recordDataNodeTransferIfSuccess( + handleTransferTabletBinary( + PipeTransferTabletBinaryReq.fromTPipeTransferReq(req))); } finally { PipeDataNodeReceiverMetrics.getInstance() .recordTransferTabletBinaryTimer(System.nanoTime() - startTime); @@ -295,8 +310,9 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { case TRANSFER_TABLET_BINARY_V2: { try { - return handleTransferTabletBinary( - PipeTransferTabletBinaryReqV2.fromTPipeTransferReq(req)); + return recordDataNodeTransferIfSuccess( + handleTransferTabletBinary( + PipeTransferTabletBinaryReqV2.fromTPipeTransferReq(req))); } finally { PipeDataNodeReceiverMetrics.getInstance() .recordTransferTabletBinaryV2Timer(System.nanoTime() - startTime); @@ -305,8 +321,9 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { case TRANSFER_TABLET_BATCH: { try { - return handleTransferTabletBatch( - PipeTransferTabletBatchReq.fromTPipeTransferReq(req)); + return recordDataNodeTransferIfSuccess( + handleTransferTabletBatch( + PipeTransferTabletBatchReq.fromTPipeTransferReq(req))); } finally { PipeDataNodeReceiverMetrics.getInstance() .recordTransferTabletBatchTimer(System.nanoTime() - startTime); @@ -315,8 +332,9 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { case TRANSFER_TABLET_BATCH_V2: { try { - return handleTransferTabletBatchV2( - PipeTransferTabletBatchReqV2.fromTPipeTransferReq(req)); + return recordDataNodeTransferIfSuccess( + handleTransferTabletBatchV2( + PipeTransferTabletBatchReqV2.fromTPipeTransferReq(req))); } finally { PipeDataNodeReceiverMetrics.getInstance() .recordTransferTabletBatchV2Timer(System.nanoTime() - startTime); @@ -325,10 +343,11 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { case TRANSFER_TS_FILE_PIECE: { try { - return handleTransferFilePiece( - PipeTransferTsFilePieceReq.fromTPipeTransferReq(req), - req instanceof AirGapPseudoTPipeTransferRequest, - true); + return recordDataNodeTransferIfSuccess( + handleTransferFilePiece( + PipeTransferTsFilePieceReq.fromTPipeTransferReq(req), + req instanceof AirGapPseudoTPipeTransferRequest, + true)); } finally { PipeDataNodeReceiverMetrics.getInstance() .recordTransferTsFilePieceTimer(System.nanoTime() - startTime); @@ -337,8 +356,8 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { case TRANSFER_TS_FILE_SEAL: { try { - return handleTransferFileSealV1( - PipeTransferTsFileSealReq.fromTPipeTransferReq(req)); + return recordDataNodeTransferIfSuccess( + handleTransferFileSealV1(PipeTransferTsFileSealReq.fromTPipeTransferReq(req))); } finally { PipeDataNodeReceiverMetrics.getInstance() .recordTransferTsFileSealTimer(System.nanoTime() - startTime); @@ -347,10 +366,11 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { case TRANSFER_TS_FILE_PIECE_WITH_MOD: { try { - return handleTransferFilePiece( - PipeTransferTsFilePieceWithModReq.fromTPipeTransferReq(req), - req instanceof AirGapPseudoTPipeTransferRequest, - false); + return recordDataNodeTransferIfSuccess( + handleTransferFilePiece( + PipeTransferTsFilePieceWithModReq.fromTPipeTransferReq(req), + req instanceof AirGapPseudoTPipeTransferRequest, + false)); } finally { PipeDataNodeReceiverMetrics.getInstance() @@ -360,8 +380,9 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { case TRANSFER_TS_FILE_SEAL_WITH_MOD: { try { - return handleTransferFileSealV2( - PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req)); + return recordDataNodeTransferIfSuccess( + handleTransferFileSealV2( + PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req))); } finally { PipeDataNodeReceiverMetrics.getInstance() .recordTransferTsFileSealWithModTimer(System.nanoTime() - startTime); @@ -370,7 +391,8 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { case TRANSFER_PLAN_NODE: { try { - return handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req)); + return recordDataNodeTransferIfSuccess( + handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req))); } finally { PipeDataNodeReceiverMetrics.getInstance() .recordTransferSchemaPlanTimer(System.nanoTime() - startTime); @@ -379,10 +401,11 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { case TRANSFER_SCHEMA_SNAPSHOT_PIECE: { try { - return handleTransferFilePiece( - PipeTransferSchemaSnapshotPieceReq.fromTPipeTransferReq(req), - req instanceof AirGapPseudoTPipeTransferRequest, - false); + return recordDataNodeTransferIfSuccess( + handleTransferFilePiece( + PipeTransferSchemaSnapshotPieceReq.fromTPipeTransferReq(req), + req instanceof AirGapPseudoTPipeTransferRequest, + false)); } finally { PipeDataNodeReceiverMetrics.getInstance() @@ -392,8 +415,9 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { case TRANSFER_SCHEMA_SNAPSHOT_SEAL: { try { - return handleTransferFileSealV2( - PipeTransferSchemaSnapshotSealReq.fromTPipeTransferReq(req)); + return recordDataNodeTransferIfSuccess( + handleTransferFileSealV2( + PipeTransferSchemaSnapshotSealReq.fromTPipeTransferReq(req))); } finally { PipeDataNodeReceiverMetrics.getInstance() @@ -409,7 +433,7 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { try { // Config requests will first be received by the DataNode receiver, // then transferred to ConfigNode receiver to execute. - return handleTransferConfigPlan(req); + return recordConfigNodeReceiverRuntimeIfSuccess(handleTransferConfigPlan(req), req); } finally { PipeDataNodeReceiverMetrics.getInstance() .recordTransferConfigPlanTimer(System.nanoTime() - startTime); @@ -760,6 +784,123 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { return receive(req.get()); } + private TPipeTransferResp recordDataNodeHandshakeIfSuccess( + final TPipeTransferResp resp, final TPipeTransferReq req) { + if (isSuccess(resp)) { + recordPipeReceiverHandshake( + PipeReceiverRuntimeRegistry.NODE_TYPE_DATA_NODE, + IOTDB_CONFIG.getDataNodeId(), + getProtocol(req)); + } + return resp; + } + + private TPipeTransferResp recordDataNodeTransferIfSuccess(final TPipeTransferResp resp) { + if (isSuccess(resp)) { + recordPipeReceiverTransfer(); + } else { + recordPipeReceiverRequest(); + } + return resp; + } + + private TPipeTransferResp recordConfigNodeReceiverRuntimeIfSuccess( + final TPipeTransferResp resp, final TPipeTransferReq req) { + if (!PipeRequestType.isValidatedRequestType(req.getType())) { + return resp; + } + + final PipeRequestType requestType = PipeRequestType.valueOf(req.getType()); + if (requestType == PipeRequestType.HANDSHAKE_CONFIGNODE_V1 + || requestType == PipeRequestType.HANDSHAKE_CONFIGNODE_V2) { + if (isSuccess(resp)) { + recordConfigNodeHandshake(req, requestType); + } + } else { + if (isSuccess(resp)) { + PipeReceiverRuntimeRegistry.getInstance() + .markTransfer(configPipeReceiverRuntimeSessionKey.get(), System.currentTimeMillis()); + } else { + PipeReceiverRuntimeRegistry.getInstance() + .markRequest(configPipeReceiverRuntimeSessionKey.get()); + } + } + return resp; + } + + private void recordConfigNodeHandshake( + final TPipeTransferReq req, final PipeRequestType requestType) { + final String protocol = getProtocol(req); + final String sessionKey = + String.format( + "%s-%s-%s-%s", + PipeReceiverRuntimeRegistry.NODE_TYPE_CONFIG_NODE, -1, protocol, getConfigReceiverId()); + final String oldSessionKey = configPipeReceiverRuntimeSessionKey.getAndSet(sessionKey); + if (!Objects.equals(oldSessionKey, sessionKey)) { + PipeReceiverRuntimeRegistry.getInstance().deregister(oldSessionKey); + } + + final Map<String, String> params = + requestType == PipeRequestType.HANDSHAKE_CONFIGNODE_V2 + ? parseHandshakeV2Params(req) + : Collections.emptyMap(); + PipeReceiverRuntimeRegistry.getInstance() + .registerOrUpdateSession( + sessionKey, + PipeReceiverRuntimeRegistry.NODE_TYPE_CONFIG_NODE, + -1, + protocol, + getSenderHost(), + parseSenderPort(getSenderPort()), + params.getOrDefault(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username), + params.getOrDefault( + PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID, + PipeReceiverRuntimeRegistry.UNKNOWN), + params.get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PIPE_NAME), + parsePipeCreationTime( + params.get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PIPE_CREATION_TIME)), + System.currentTimeMillis()); + } + + private static Map<String, String> parseHandshakeV2Params(final TPipeTransferReq req) { + final Map<String, String> params = new HashMap<>(); + if (req.getBody() == null) { + return params; + } + final ByteBuffer body = req.getBody().duplicate(); + body.rewind(); + final int size = ReadWriteIOUtils.readInt(body); + for (int i = 0; i < size; ++i) { + params.put(ReadWriteIOUtils.readString(body), ReadWriteIOUtils.readString(body)); + } + return params; + } + + private static String getProtocol(final TPipeTransferReq req) { + return req instanceof AirGapPseudoTPipeTransferRequest + ? PipeReceiverRuntimeRegistry.PROTOCOL_AIR_GAP + : PipeReceiverRuntimeRegistry.PROTOCOL_THRIFT; + } + + private static int parseSenderPort(final String senderPort) { + try { + return Integer.parseInt(senderPort); + } catch (final Exception e) { + return -1; + } + } + + private static long parsePipeCreationTime(final String pipeCreationTime) { + if (pipeCreationTime == null) { + return Long.MIN_VALUE; + } + try { + return Long.parseLong(pipeCreationTime); + } catch (final NumberFormatException e) { + return Long.MIN_VALUE; + } + } + /** * For {@link InsertRowsStatement} and {@link InsertMultiTabletsStatement}, the returned {@link * TSStatus} will use sub-status to record the endpoint for redirection. Each sub-status records @@ -1157,6 +1298,9 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { @Override public synchronized void handleExit() { + PipeReceiverRuntimeRegistry.getInstance() + .deregister(configPipeReceiverRuntimeSessionKey.getAndSet(null)); + if (Objects.nonNull(configReceiverId.get())) { try { ClusterConfigTaskExecutor.getInstance().handlePipeConfigClientExit(configReceiverId.get()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java index 5c38c3a8540..69efcf39fb2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java @@ -319,6 +319,7 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager params.put( PipeTransferHandshakeConstant.HANDSHAKE_KEY_SKIP_IF, Boolean.toString(skipIfNoPrivileges)); + appendPipeInfoToHandshakeParams(params); client.setTimeoutDynamically(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs()); client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params), callback); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataNodeAirGapSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataNodeAirGapSink.java index b593df56612..9e05c240a15 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataNodeAirGapSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataNodeAirGapSink.java @@ -72,6 +72,7 @@ public abstract class IoTDBDataNodeAirGapSink extends IoTDBAirGapSink { Boolean.toString(shouldMarkAsPipeRequest)); params.put( PipeTransferHandshakeConstant.HANDSHAKE_KEY_SKIP_IF, Boolean.toString(skipIfNoPrivileges)); + appendPipeInfoToHandshakeParams(params); return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index b8b169b1f6a..a96b3d58c36 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -172,6 +172,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { shouldMarkAsPipeRequest, false, skipIfNoPrivileges); + clientManager.setPipeInfo(pipeName, creationTime); transferTsFileClientManager = new IoTDBDataNodeAsyncClientManager( @@ -188,6 +189,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { shouldMarkAsPipeRequest, isSplitTSFileBatchModeEnabled, skipIfNoPrivileges); + transferTsFileClientManager.setPipeInfo(pipeName, creationTime); if (isTabletBatchModeEnabled) { tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java index 18f15eea8f3..45c3670cd16 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java @@ -207,6 +207,10 @@ public class DatasetHeaderFactory { return new DatasetHeader(ColumnHeaderConstant.showQueriesColumnHeaders, false); } + public static DatasetHeader getShowReceiversHeader() { + return new DatasetHeader(ColumnHeaderConstant.showReceiversColumnHeaders, false); + } + public static DatasetHeader getShowDiskUsageHeader() { return new DatasetHeader(ColumnHeaderConstant.showDiskUsageColumnHeaders, true); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowReceiversOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowReceiversOperator.java new file mode 100644 index 00000000000..c9cf62317f5 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowReceiversOperator.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.source; + +import org.apache.iotdb.commons.pipe.receiver.runtime.PipeReceiverRuntimeRegistry; +import org.apache.iotdb.commons.pipe.receiver.runtime.PipeReceiverRuntimeSnapshot; +import org.apache.iotdb.commons.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.commons.queryengine.utils.DateTimeUtils; +import org.apache.iotdb.commons.queryengine.utils.TimestampPrecisionUtils; +import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; + +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.read.common.block.column.TimeColumnBuilder; +import org.apache.tsfile.utils.BytesUtils; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class ShowReceiversOperator implements SourceOperator { + + private final OperatorContext operatorContext; + private final PlanNodeId sourceId; + + private TsBlock tsBlock; + private boolean hasConsumed; + + private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES = + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); + + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(ShowReceiversOperator.class); + + public ShowReceiversOperator(OperatorContext operatorContext, PlanNodeId sourceId) { + this.operatorContext = operatorContext; + this.sourceId = sourceId; + } + + @Override + public OperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public TsBlock next() { + TsBlock res = tsBlock; + hasConsumed = true; + tsBlock = null; + return res; + } + + @Override + public boolean hasNext() { + if (hasConsumed) { + return false; + } + if (tsBlock == null) { + tsBlock = buildTsBlock(); + } + return true; + } + + @Override + public boolean isFinished() { + return hasConsumed; + } + + @Override + public void close() { + // do nothing + } + + @Override + public long calculateMaxPeekMemory() { + return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + } + + @Override + public long calculateMaxReturnSize() { + return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return 0; + } + + @Override + public PlanNodeId getSourceId() { + return sourceId; + } + + private TsBlock buildTsBlock() { + final List<TSDataType> outputDataTypes = + DatasetHeaderFactory.getShowReceiversHeader().getRespDataTypes(); + final TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes); + final TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder(); + final ColumnBuilder[] columnBuilders = builder.getValueColumnBuilders(); + final long currentTime = + TimestampPrecisionUtils.convertToCurrPrecision( + System.currentTimeMillis(), TimeUnit.MILLISECONDS); + + for (PipeReceiverRuntimeSnapshot snapshot : + PipeReceiverRuntimeRegistry.getInstance().snapshot()) { + timeColumnBuilder.writeLong(currentTime); + columnBuilders[0].writeBinary(BytesUtils.valueOf(snapshot.getReceiverNodeType())); + columnBuilders[1].writeInt(snapshot.getReceiverNodeId()); + columnBuilders[2].writeBinary(BytesUtils.valueOf(snapshot.getProtocol())); + columnBuilders[3].writeBinary(BytesUtils.valueOf(snapshot.getSenderAddress())); + columnBuilders[4].writeBinary(BytesUtils.valueOf(snapshot.getSenderPorts())); + columnBuilders[5].writeInt(snapshot.getConnectionCount()); + columnBuilders[6].writeInt(snapshot.getPipeCount()); + columnBuilders[7].writeBinary(BytesUtils.valueOf(snapshot.getPipeIds())); + columnBuilders[8].writeBinary(BytesUtils.valueOf(snapshot.getUserName())); + columnBuilders[9].writeBinary(BytesUtils.valueOf(snapshot.getSenderClusterId())); + columnBuilders[10].writeBinary( + BytesUtils.valueOf(formatTime(snapshot.getLastHandshakeTime()))); + columnBuilders[11].writeBinary( + BytesUtils.valueOf(formatTime(snapshot.getLastTransferTime()))); + columnBuilders[12].writeLong(snapshot.getRequestNum()); + builder.declarePosition(); + } + return builder.build(); + } + + private static String formatTime(long timestampInMillis) { + return timestampInMillis <= 0 + ? PipeReceiverRuntimeRegistry.UNKNOWN + : DateTimeUtils.convertLongToDate(timestampInMillis, "ms"); + } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java index ca9c09ecb95..e84348851ce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java @@ -28,12 +28,15 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TExternalServiceEntry; import org.apache.iotdb.common.rpc.thrift.TExternalServiceListResp; import org.apache.iotdb.commons.audit.UserEntity; +import org.apache.iotdb.commons.auth.entity.PrivilegeType; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.exception.auth.AccessDeniedException; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMeta; +import org.apache.iotdb.commons.pipe.receiver.runtime.PipeReceiverRuntimeRegistry; +import org.apache.iotdb.commons.pipe.receiver.runtime.PipeReceiverRuntimeSnapshot; import org.apache.iotdb.commons.queryengine.common.ConnectionInfo; import org.apache.iotdb.commons.queryengine.common.SqlDialect; import org.apache.iotdb.commons.queryengine.plan.relational.function.TableBuiltinTableFunction; @@ -184,6 +187,8 @@ public class InformationSchemaContentSupplierFactory { return new RegionSupplier(dataTypes, userEntity); case InformationSchema.PIPES: return new PipeSupplier(dataTypes, userEntity.getUsername()); + case InformationSchema.RECEIVERS: + return new ReceiversSupplier(dataTypes, userEntity); case InformationSchema.PIPE_PLUGINS: return new PipePluginSupplier(dataTypes, userEntity); case InformationSchema.TOPICS: @@ -710,6 +715,50 @@ public class InformationSchemaContentSupplierFactory { } } + private static class ReceiversSupplier extends TsBlockSupplier { + private final Iterator<PipeReceiverRuntimeSnapshot> iterator; + + private ReceiversSupplier(final List<TSDataType> dataTypes, final UserEntity userEntity) { + super(dataTypes); + accessControl.checkMissingPrivileges( + userEntity.getUsername(), Collections.singletonList(PrivilegeType.USE_PIPE), userEntity); + iterator = PipeReceiverRuntimeRegistry.getInstance().snapshot().iterator(); + } + + @Override + protected void constructLine() { + final PipeReceiverRuntimeSnapshot snapshot = iterator.next(); + columnBuilders[0].writeBinary(BytesUtils.valueOf(snapshot.getReceiverNodeType())); + columnBuilders[1].writeInt(snapshot.getReceiverNodeId()); + columnBuilders[2].writeBinary(BytesUtils.valueOf(snapshot.getProtocol())); + columnBuilders[3].writeBinary(BytesUtils.valueOf(snapshot.getSenderAddress())); + columnBuilders[4].writeBinary(BytesUtils.valueOf(snapshot.getSenderPorts())); + columnBuilders[5].writeInt(snapshot.getConnectionCount()); + columnBuilders[6].writeInt(snapshot.getPipeCount()); + columnBuilders[7].writeBinary(BytesUtils.valueOf(snapshot.getPipeIds())); + columnBuilders[8].writeBinary(BytesUtils.valueOf(snapshot.getUserName())); + columnBuilders[9].writeBinary(BytesUtils.valueOf(snapshot.getSenderClusterId())); + writeTimestamp(columnBuilders[10], snapshot.getLastHandshakeTime()); + writeTimestamp(columnBuilders[11], snapshot.getLastTransferTime()); + columnBuilders[12].writeLong(snapshot.getRequestNum()); + resultBuilder.declarePosition(); + } + + private static void writeTimestamp(ColumnBuilder columnBuilder, long timestampInMillis) { + if (timestampInMillis <= 0) { + columnBuilder.appendNull(); + return; + } + columnBuilder.writeLong( + TimestampPrecisionUtils.convertToCurrPrecision(timestampInMillis, TimeUnit.MILLISECONDS)); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + } + private static class PipePluginSupplier extends TsBlockSupplier { private final Iterator<PipePluginMeta> iterator; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java index be20d218e6a..d5539bfe93d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java @@ -60,6 +60,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainAnalyzeStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement; +import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowReceiversStatement; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.IDeviceID; @@ -488,6 +489,7 @@ public class Analysis implements IAnalysis { return (dataPartition != null && !dataPartition.isEmpty()) || (schemaPartition != null && !schemaPartition.isEmpty()) || statement instanceof ShowQueriesStatement + || statement instanceof ShowReceiversStatement || statement instanceof ShowDiskUsageStatement || (statement instanceof QueryStatement && ((QueryStatement) statement).isAggregationQuery()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index 22d5b0518ac..27a9778802a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -145,6 +145,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainAnalyzeStatemen import org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement; +import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowReceiversStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement; import org.apache.iotdb.db.schemaengine.schemaregion.view.visitor.TransformToExpressionVisitor; import org.apache.iotdb.rpc.RpcUtils; @@ -3822,6 +3823,34 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> return analysis; } + @Override + public Analysis visitShowReceivers( + ShowReceiversStatement showReceiversStatement, MPPQueryContext context) { + Analysis analysis = new Analysis(); + analysis.setRealStatement(showReceiversStatement); + analysis.setRespDatasetHeader(DatasetHeaderFactory.getShowReceiversHeader()); + analysis.setVirtualSource(true); + + List<TDataNodeLocation> allReadableDataNodeLocations = getReadableDataNodeLocations(); + if (allReadableDataNodeLocations.isEmpty()) { + throw new StatementAnalyzeException(DataNodeQueryMessages.NO_RUNNING_DATANODES); + } + analysis.setReadableDataNodeLocations(allReadableDataNodeLocations); + + Set<Expression> sourceExpressions = new HashSet<>(); + for (ColumnHeader columnHeader : analysis.getRespDatasetHeader().getColumnHeaders()) { + sourceExpressions.add( + TimeSeriesOperand.constructColumnHeaderExpression( + columnHeader.getColumnName(), columnHeader.getColumnType())); + } + analysis.setSourceExpressions(sourceExpressions); + sourceExpressions.forEach(expression -> analyzeExpressionType(analysis, expression)); + + analysis.setMergeOrderParameter(new OrderByParameter(showReceiversStatement.getSortItemList())); + + return analysis; + } + @Override public Analysis visitShowDiskUsage( ShowDiskUsageStatement showDiskUsageStatement, MPPQueryContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java index 49f79a6a923..feca397e749 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java @@ -250,6 +250,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentSqlDialectS import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentUserStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement; +import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowReceiversStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.StartRepairDataStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.StopRepairDataStatement; @@ -4412,6 +4413,11 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { return showPipesStatement; } + @Override + public Statement visitShowReceivers(IoTDBSqlParser.ShowReceiversContext ctx) { + return new ShowReceiversStatement(); + } + @Override public Statement visitCreateTopic(IoTDBSqlParser.CreateTopicContext ctx) { final CreateTopicStatement createTopicStatement = new CreateTopicStatement(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java index 26d361c1f21..36e522e5d13 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java @@ -90,6 +90,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanN import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesSourceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowDiskUsageNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowReceiversNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.TimeseriesRegionScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep; @@ -1328,6 +1329,44 @@ public class LogicalPlanBuilder { return this; } + public LogicalPlanBuilder planShowReceivers(Analysis analysis) { + List<TDataNodeLocation> dataNodeLocations = analysis.getReadableDataNodeLocations(); + if (dataNodeLocations.size() == 1) { + this.root = + planSingleShowReceivers(dataNodeLocations.get(0)) + .planSort(analysis.getMergeOrderParameter()) + .getRoot(); + } else { + List<String> outputColumns = new ArrayList<>(); + MergeSortNode mergeSortNode = + new MergeSortNode( + context.getQueryId().genPlanNodeId(), + analysis.getMergeOrderParameter(), + outputColumns); + + dataNodeLocations.forEach( + dataNodeLocation -> + mergeSortNode.addChild( + this.planSingleShowReceivers(dataNodeLocation) + .planSort(analysis.getMergeOrderParameter()) + .getRoot())); + outputColumns.addAll(mergeSortNode.getChildren().get(0).getOutputColumnNames()); + this.root = mergeSortNode; + } + + ColumnHeaderConstant.showReceiversColumnHeaders.forEach( + columnHeader -> + context + .getTypeProvider() + .setTreeModelType(columnHeader.getColumnName(), columnHeader.getColumnType())); + return this; + } + + private LogicalPlanBuilder planSingleShowReceivers(TDataNodeLocation dataNodeLocation) { + this.root = new ShowReceiversNode(context.getQueryId().genPlanNodeId(), dataNodeLocation); + return this; + } + public LogicalPlanBuilder planShowDiskUsage(Analysis analysis, PartialPath pathPattern) { List<TDataNodeLocation> dataNodeLocations = analysis.getReadableDataNodeLocations(); if (dataNodeLocations.size() == 1) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java index 2c9304ce0b9..f04dc1c9263 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java @@ -92,6 +92,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement import org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainAnalyzeStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement; +import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowReceiversStatement; import org.apache.iotdb.db.schemaengine.SchemaEngineMode; import org.apache.tsfile.enums.TSDataType; @@ -1017,6 +1018,12 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte return planBuilder.getRoot(); } + @Override + public PlanNode visitShowReceivers( + ShowReceiversStatement showReceiversStatement, MPPQueryContext context) { + return new LogicalPlanBuilder(analysis, context).planShowReceivers(analysis).getRoot(); + } + @Override public PlanNode visitShowDiskUsage( ShowDiskUsageStatement showDiskUsageStatement, MPPQueryContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index 327dbecdda2..6e32f55b732 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -140,6 +140,7 @@ import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesAggregati import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.ShowDiskUsageOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.ShowQueriesOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.ShowReceiversOperator; import org.apache.iotdb.db.queryengine.execution.operator.window.ConditionWindowParameter; import org.apache.iotdb.db.queryengine.execution.operator.window.CountWindowParameter; import org.apache.iotdb.db.queryengine.execution.operator.window.SessionWindowParameter; @@ -224,6 +225,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggre import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowDiskUsageNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowReceiversNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.TimeseriesRegionScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor; @@ -2478,6 +2480,19 @@ public class OperatorTreeGenerator implements PlanVisitor<Operator, LocalExecuti node.getAllowedUsername()); } + @Override + public Operator visitShowReceivers(ShowReceiversNode node, LocalExecutionPlanContext context) { + OperatorContext operatorContext = + context + .getDriverContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + ShowReceiversOperator.class.getSimpleName()); + + return new ShowReceiversOperator(operatorContext, node.getPlanNodeId()); + } + @Override public Operator visitShowDiskUsage(ShowDiskUsageNode node, LocalExecutionPlanContext context) { OperatorContext operatorContext = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java index 4e34d361471..cd9a9fe44c8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java @@ -39,6 +39,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTimeSeriesSta import org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainAnalyzeStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement; +import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowReceiversStatement; import org.apache.tsfile.read.common.Path; import org.apache.tsfile.utils.Pair; @@ -159,6 +160,7 @@ public class SimpleFragmentParallelPlanner extends AbstractFragmentParallelPlann if (analysis.getTreeStatement() instanceof QueryStatement || analysis.getTreeStatement() instanceof ExplainAnalyzeStatement || analysis.getTreeStatement() instanceof ShowQueriesStatement + || analysis.getTreeStatement() instanceof ShowReceiversStatement || analysis.getTreeStatement() instanceof ShowDiskUsageStatement || (analysis.getTreeStatement() instanceof ShowTimeSeriesStatement && (((ShowTimeSeriesStatement) analysis.getTreeStatement()).isOrderByHeat() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/DataNodePlanNodeDeserializer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/DataNodePlanNodeDeserializer.java index 2a9b9b6905c..59de11d98d5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/DataNodePlanNodeDeserializer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/DataNodePlanNodeDeserializer.java @@ -109,6 +109,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggre import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowDiskUsageNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowReceiversNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.TimeseriesRegionScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ContinuousSameSearchIndexSeparatorNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; @@ -415,6 +416,8 @@ public class DataNodePlanNodeDeserializer extends CommonPlanNodeDeserializer { return ShowDiskUsageNode.deserialize(buffer); case 108: return CollectNode.deserialize(buffer); + case 110: + return ShowReceiversNode.deserialize(buffer); case 902: return CreateOrUpdateTableDeviceNode.deserialize(buffer); case 903: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index 54ee9d17a94..418f46ec865 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -108,6 +108,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanN import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanSourceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowDiskUsageNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowReceiversNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.TimeseriesRegionScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode; @@ -348,6 +349,10 @@ public interface PlanVisitor<R, C> extends ICoreQueryPlanVisitor<R, C> { return visitPlan(node, context); } + default R visitShowReceivers(ShowReceiversNode node, C context) { + return visitPlan(node, context); + } + default R visitShowDiskUsage(ShowDiskUsageNode node, C context) { return visitPlan(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/ShowReceiversNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/ShowReceiversNode.java new file mode 100644 index 00000000000..1cf76707c56 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/ShowReceiversNode.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner.plan.node.source; + +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.IPlanVisitor; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.commons.schema.column.ColumnHeader; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; + +import com.google.common.collect.ImmutableList; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; + +import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.showReceiversColumnHeaders; + +public class ShowReceiversNode extends VirtualSourceNode { + + public static final List<String> SHOW_RECEIVERS_HEADER_COLUMNS = + showReceiversColumnHeaders.stream() + .map(ColumnHeader::getColumnName) + .collect(ImmutableList.toImmutableList()); + + public ShowReceiversNode(PlanNodeId id, TDataNodeLocation dataNodeLocation) { + super(id, dataNodeLocation); + } + + @Override + public List<PlanNode> getChildren() { + return ImmutableList.of(); + } + + @Override + public void addChild(PlanNode child) { + throw new UnsupportedOperationException("no child is allowed for ShowReceiversNode"); + } + + @Override + public PlanNodeType getType() { + return PlanNodeType.SHOW_RECEIVERS; + } + + @Override + public PlanNode clone() { + return new ShowReceiversNode(getPlanNodeId(), getDataNodeLocation()); + } + + @Override + public int allowedChildCount() { + return NO_CHILD_ALLOWED; + } + + @Override + public List<String> getOutputColumnNames() { + return SHOW_RECEIVERS_HEADER_COLUMNS; + } + + @Override + public <R, C> R accept(IPlanVisitor<R, C> visitor, C context) { + return ((PlanVisitor<R, C>) visitor).visitShowReceivers(this, context); + } + + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.SHOW_RECEIVERS.serialize(byteBuffer); + } + + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.SHOW_RECEIVERS.serialize(stream); + } + + public static ShowReceiversNode deserialize(ByteBuffer byteBuffer) { + PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); + return new ShowReceiversNode(planNodeId, null); + } + + @Override + public boolean equals(Object o) { + return super.equals(o); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode()); + } + + @Override + public String toString() { + return "ShowReceiversNode-" + this.getPlanNodeId(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java index d0b3cb330e1..2d3f6cea910 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java @@ -123,6 +123,7 @@ public class DataNodeLocationSupplierFactory { case InformationSchema.CONNECTIONS: case InformationSchema.CURRENT_QUERIES: case InformationSchema.QUERIES_COSTS_HISTOGRAM: + case InformationSchema.RECEIVERS: return getReadableDataNodeLocations(); case InformationSchema.DATABASES: case InformationSchema.TABLES: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java index fd9757c5f89..7f0a0e93dfa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java @@ -159,6 +159,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentSqlDialectS import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentUserStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement; +import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowReceiversStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.StartRepairDataStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.StopRepairDataStatement; @@ -848,6 +849,12 @@ public class TreeAccessCheckVisitor extends StatementVisitor<TSStatus, TreeAcces return StatusUtils.OK; } + @Override + public TSStatus visitShowReceivers( + ShowReceiversStatement statement, TreeAccessCheckContext context) { + return checkPipeManagement(context.setAuditLogOperation(AuditLogOperation.QUERY), () -> ""); + } + @Override public TSStatus visitDropPipe(DropPipeStatement statement, TreeAccessCheckContext context) { return checkPipeManagement( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java index 25d68eba6be..22a9caf548e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java @@ -1722,6 +1722,17 @@ public class AstBuilder extends RelationalSqlBaseVisitor<Node> { limit); } + @Override + public Node visitShowReceiversStatement(RelationalSqlParser.ShowReceiversStatementContext ctx) { + return new ShowStatement( + getLocation(ctx), + InformationSchema.RECEIVERS, + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty()); + } + @Override public Node visitKillQueryStatement(RelationalSqlParser.KillQueryStatementContext ctx) { if (ctx.queryId == null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java index b40c6444816..32f4c301601 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java @@ -201,4 +201,5 @@ public enum StatementType { SHOW_EXTERNAL_SERVICE, SHOW_DISK_USAGE, + SHOW_RECEIVERS, } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java index 847e850c521..e9a64c5981c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java @@ -147,6 +147,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentSqlDialectS import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentUserStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement; +import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowReceiversStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.StartRepairDataStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.StopRepairDataStatement; @@ -543,6 +544,10 @@ public abstract class StatementVisitor<R, C> { return visitStatement(showQueriesStatement, context); } + public R visitShowReceivers(ShowReceiversStatement showReceiversStatement, C context) { + return visitStatement(showReceiversStatement, context); + } + public R visitShowDiskUsage(ShowDiskUsageStatement showDiskUsageStatement, C context) { return visitStatement(showDiskUsageStatement, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/ShowReceiversStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/ShowReceiversStatement.java new file mode 100644 index 00000000000..1e8a87fd92a --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/ShowReceiversStatement.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.statement.sys; + +import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; +import org.apache.iotdb.db.queryengine.plan.statement.StatementType; +import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; +import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; +import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowStatement; + +import com.google.common.collect.ImmutableList; + +import java.util.List; + +public class ShowReceiversStatement extends ShowStatement { + + private static final List<SortItem> DEFAULT_SORT_ITEMS = + ImmutableList.of( + new SortItem(ColumnHeaderConstant.RECEIVER_NODE_TYPE, Ordering.ASC), + new SortItem(ColumnHeaderConstant.RECEIVER_NODE_ID, Ordering.ASC), + new SortItem(ColumnHeaderConstant.PROTOCOL, Ordering.ASC), + new SortItem(ColumnHeaderConstant.SENDER_ADDRESS, Ordering.ASC), + new SortItem(ColumnHeaderConstant.RECEIVER_USER_NAME, Ordering.ASC)); + + public ShowReceiversStatement() { + this.statementType = StatementType.SHOW_RECEIVERS; + } + + @Override + public boolean isQuery() { + return true; + } + + @Override + public <R, C> R accept(StatementVisitor<R, C> visitor, C context) { + return visitor.visitShowReceivers(this, context); + } + + public List<SortItem> getSortItemList() { + return DEFAULT_SORT_ITEMS; + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java index b98f34a2484..06851d8c996 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java @@ -65,6 +65,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogica import org.apache.iotdb.db.queryengine.plan.statement.sys.AuthorStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement; +import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowReceiversStatement; import org.apache.iotdb.isession.template.TemplateNode; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.service.rpc.thrift.TSAggregationQueryReq; @@ -181,6 +182,16 @@ public class StatementGeneratorTest { "show queries order by a", ZonedDateTime.now().getOffset())); } + @Test + public void testShowReceivers() { + final Statement showReceivers = + StatementGenerator.createStatement("show receivers", ZonedDateTime.now().getOffset()); + Assert.assertTrue(showReceivers instanceof ShowReceiversStatement); + Assert.assertEquals( + new SortItem("ReceiverNodeType", Ordering.ASC), + ((ShowReceiversStatement) showReceivers).getSortItemList().get(0)); + } + @Test public void testRawDataQuery() throws IllegalPathException { TSRawDataQueryReq req = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/SourceNodeSerdeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/SourceNodeSerdeTest.java index 5597a6ca8e6..97fbb5cc502 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/SourceNodeSerdeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/SourceNodeSerdeTest.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.node.PlanNodeDeserializeHelp import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowDiskUsageNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowReceiversNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableDiskUsageInformationSchemaTableScanNode; import org.apache.tsfile.enums.TSDataType; @@ -117,6 +118,16 @@ public class SourceNodeSerdeTest { assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), node); } + @Test + public void testShowReceiversNode() { + ShowReceiversNode node = new ShowReceiversNode(new PlanNodeId("test"), null); + + ByteBuffer byteBuffer = ByteBuffer.allocate(2048); + node.serialize(byteBuffer); + byteBuffer.flip(); + assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), node); + } + @Test public void testTableDiskUsageInformationTableScanNode() throws IllegalPathException { List<Symbol> symbols = Arrays.asList(new Symbol("database"), new Symbol("size_in_bytes")); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/informationschema/ShowReceiversTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/informationschema/ShowReceiversTest.java new file mode 100644 index 00000000000..318a84bd483 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/informationschema/ShowReceiversTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.informationschema; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; +import org.apache.iotdb.db.queryengine.plan.relational.planner.PlanTester; + +import org.junit.Test; + +import java.util.Optional; + +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanAssert.assertPlan; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.infoSchemaTableScan; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output; + +public class ShowReceiversTest { + + private final PlanTester planTester = new PlanTester(); + + @Test + public void testShowReceiversRewrite() { + final LogicalQueryPlan logicalQueryPlan = planTester.createPlan("show receivers"); + assertPlan( + logicalQueryPlan, + output(infoSchemaTableScan("information_schema.receivers", Optional.empty()))); + } + + @Test + public void testSelectReceivers() { + final LogicalQueryPlan logicalQueryPlan = + planTester.createPlan("select * from information_schema.receivers"); + assertPlan( + logicalQueryPlan, + output(infoSchemaTableScan("information_schema.receivers", Optional.empty()))); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java index 8304161c9d0..fca1a857b0e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.i18n.PipeMessages; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; +import org.apache.iotdb.commons.pipe.receiver.runtime.PipeReceiverRuntimeRegistry; import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferHandshakeConstant; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; @@ -100,12 +101,21 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { protected final AtomicBoolean shouldMarkAsPipeRequest = new AtomicBoolean(true); protected final AtomicBoolean skipIfNoPrivileges = new AtomicBoolean(false); + protected String senderClusterId = PipeReceiverRuntimeRegistry.UNKNOWN; + protected String receiverPipeName; + protected long receiverPipeCreationTime = Long.MIN_VALUE; + private final AtomicReference<String> pipeReceiverRuntimeSessionKey = new AtomicReference<>(); + @Override public IoTDBSinkRequestVersion getVersion() { return IoTDBSinkRequestVersion.VERSION_1; } protected TPipeTransferResp handleTransferHandshakeV1(final PipeTransferHandshakeV1Req req) { + senderClusterId = PipeReceiverRuntimeRegistry.UNKNOWN; + receiverPipeName = null; + receiverPipeCreationTime = Long.MIN_VALUE; + if (!CommonDescriptor.getInstance() .getConfig() .getTimestampPrecision() @@ -340,16 +350,28 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { req.getParams() .getOrDefault(PipeTransferHandshakeConstant.HANDSHAKE_KEY_SKIP_IF, "false"))); + final String pipeNameString = + req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PIPE_NAME); + final String pipeCreationTimeString = + req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PIPE_CREATION_TIME); + // Handle the handshake request as a v1 request. // Here we construct a fake "dataNode" request to valid from v1 validation logic, though // it may not require the actual type of the v1 request. - return handleTransferHandshakeV1( - new PipeTransferHandshakeV1Req() { - @Override - protected PipeRequestType getPlanType() { - return PipeRequestType.HANDSHAKE_DATANODE_V1; - } - }.convertToTPipeTransferReq(timestampPrecision)); + final TPipeTransferResp resp = + handleTransferHandshakeV1( + new PipeTransferHandshakeV1Req() { + @Override + protected PipeRequestType getPlanType() { + return PipeRequestType.HANDSHAKE_DATANODE_V1; + } + }.convertToTPipeTransferReq(timestampPrecision)); + if (isSuccess(resp)) { + senderClusterId = clusterIdFromHandshakeRequest; + receiverPipeName = pipeNameString; + receiverPipeCreationTime = parsePipeCreationTime(pipeCreationTimeString); + } + return resp; } protected abstract String getClusterId(); @@ -873,8 +895,72 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { final PipeTransferFileSealReqV2 req, final List<String> fileAbsolutePaths) throws IOException, IllegalPathException; + protected void recordPipeReceiverHandshake( + final String receiverNodeType, final int receiverNodeId, final String protocol) { + final String sessionKey = + String.format("%s-%s-%s-%s", receiverNodeType, receiverNodeId, protocol, receiverId.get()); + final String oldSessionKey = pipeReceiverRuntimeSessionKey.getAndSet(sessionKey); + if (!Objects.equals(oldSessionKey, sessionKey)) { + PipeReceiverRuntimeRegistry.getInstance().deregister(oldSessionKey); + } + PipeReceiverRuntimeRegistry.getInstance() + .registerOrUpdateSession( + sessionKey, + receiverNodeType, + receiverNodeId, + protocol, + getSenderHost(), + parseSenderPort(getSenderPort()), + username, + senderClusterId, + receiverPipeName, + receiverPipeCreationTime, + System.currentTimeMillis()); + } + + protected void recordPipeReceiverTransfer() { + PipeReceiverRuntimeRegistry.getInstance() + .markTransfer(pipeReceiverRuntimeSessionKey.get(), System.currentTimeMillis()); + } + + protected void recordPipeReceiverRequest() { + PipeReceiverRuntimeRegistry.getInstance().markRequest(pipeReceiverRuntimeSessionKey.get()); + } + + protected void clearPipeReceiverRuntime() { + PipeReceiverRuntimeRegistry.getInstance() + .deregister(pipeReceiverRuntimeSessionKey.getAndSet(null)); + } + + protected static boolean isSuccess(final TPipeTransferResp resp) { + return resp != null + && resp.getStatus() != null + && resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode(); + } + + private static int parseSenderPort(final String senderPort) { + try { + return Integer.parseInt(senderPort); + } catch (final Exception e) { + return -1; + } + } + + private static long parsePipeCreationTime(final String pipeCreationTime) { + if (pipeCreationTime == null) { + return Long.MIN_VALUE; + } + try { + return Long.parseLong(pipeCreationTime); + } catch (final NumberFormatException e) { + return Long.MIN_VALUE; + } + } + @Override public synchronized void handleExit() { + clearPipeReceiverRuntime(); + if (writingFileWriter != null) { try { writingFileWriter.close(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/runtime/PipeReceiverRuntimeRegistry.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/runtime/PipeReceiverRuntimeRegistry.java new file mode 100644 index 00000000000..4aaa24c8349 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/runtime/PipeReceiverRuntimeRegistry.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.receiver.runtime; + +import org.apache.iotdb.commons.queryengine.utils.DateTimeUtils; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.StringJoiner; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + +public class PipeReceiverRuntimeRegistry { + + public static final String UNKNOWN = "Unknown"; + public static final String NODE_TYPE_DATA_NODE = "DataNode"; + public static final String NODE_TYPE_CONFIG_NODE = "ConfigNode"; + public static final String PROTOCOL_THRIFT = "thrift"; + public static final String PROTOCOL_AIR_GAP = "air_gap"; + + private static final PipeReceiverRuntimeRegistry INSTANCE = new PipeReceiverRuntimeRegistry(); + + private final ConcurrentMap<String, SessionRuntimeInfo> sessionInfoMap = + new ConcurrentHashMap<>(); + + private PipeReceiverRuntimeRegistry() {} + + public static PipeReceiverRuntimeRegistry getInstance() { + return INSTANCE; + } + + public void registerOrUpdateSession( + String connectionKey, + String receiverNodeType, + int receiverNodeId, + String protocol, + String senderAddress, + int senderPort, + String userName, + String senderClusterId, + String pipeName, + long pipeCreationTime, + long handshakeTime) { + if (isBlank(connectionKey)) { + return; + } + + sessionInfoMap.compute( + connectionKey, + (key, oldSession) -> { + final SessionRuntimeInfo session = + oldSession == null ? new SessionRuntimeInfo(connectionKey) : oldSession; + synchronized (session) { + session.receiverNodeType = normalize(receiverNodeType); + session.receiverNodeId = receiverNodeId; + session.protocol = normalize(protocol); + session.senderAddress = normalize(senderAddress); + session.senderPort = senderPort; + session.userName = normalize(userName); + session.senderClusterId = normalize(senderClusterId); + session.lastHandshakeTime = handshakeTime; + session.lastTransferTime = Math.max(session.lastTransferTime, handshakeTime); + session.requestNum.incrementAndGet(); + if (!isBlank(pipeName)) { + session.pipeIds.add(formatPipeId(pipeName, pipeCreationTime)); + } + } + return session; + }); + } + + public void markTransfer(String connectionKey, long transferTime) { + final SessionRuntimeInfo session = sessionInfoMap.get(connectionKey); + if (session == null) { + return; + } + synchronized (session) { + session.lastTransferTime = Math.max(session.lastTransferTime, transferTime); + session.requestNum.incrementAndGet(); + } + } + + public void markRequest(String connectionKey) { + final SessionRuntimeInfo session = sessionInfoMap.get(connectionKey); + if (session == null) { + return; + } + session.requestNum.incrementAndGet(); + } + + public void deregister(String connectionKey) { + if (!isBlank(connectionKey)) { + sessionInfoMap.remove(connectionKey); + } + } + + public List<PipeReceiverRuntimeSnapshot> snapshot() { + final Map<GroupKey, AggregatedRuntimeInfo> aggregatedInfoMap = new HashMap<>(); + for (SessionRuntimeInfo session : sessionInfoMap.values()) { + synchronized (session) { + final GroupKey groupKey = + new GroupKey( + session.receiverNodeType, + session.receiverNodeId, + session.protocol, + session.senderAddress, + session.userName); + AggregatedRuntimeInfo aggregatedInfo = aggregatedInfoMap.get(groupKey); + if (aggregatedInfo == null) { + aggregatedInfo = new AggregatedRuntimeInfo(groupKey); + aggregatedInfoMap.put(groupKey, aggregatedInfo); + } + aggregatedInfo.senderPorts.add(session.senderPort); + aggregatedInfo.connectionCount++; + aggregatedInfo.pipeIds.addAll(session.pipeIds); + aggregatedInfo.senderClusterIds.add(session.senderClusterId); + aggregatedInfo.lastHandshakeTime = + Math.max(aggregatedInfo.lastHandshakeTime, session.lastHandshakeTime); + aggregatedInfo.lastTransferTime = + Math.max(aggregatedInfo.lastTransferTime, session.lastTransferTime); + aggregatedInfo.requestNum += session.requestNum.get(); + } + } + + final List<AggregatedRuntimeInfo> aggregatedInfos = new ArrayList<>(aggregatedInfoMap.values()); + aggregatedInfos.sort(Comparator.comparing(AggregatedRuntimeInfo::getGroupKey)); + + final List<PipeReceiverRuntimeSnapshot> snapshots = new ArrayList<>(aggregatedInfos.size()); + for (AggregatedRuntimeInfo aggregatedInfo : aggregatedInfos) { + snapshots.add(aggregatedInfo.toSnapshot()); + } + return snapshots; + } + + public void clear() { + sessionInfoMap.clear(); + } + + private static String normalize(String value) { + return isBlank(value) ? UNKNOWN : value; + } + + private static boolean isBlank(String value) { + return value == null || value.trim().isEmpty(); + } + + private static String formatPipeId(String pipeName, long pipeCreationTime) { + if (pipeCreationTime < 0) { + return pipeName + "@" + UNKNOWN; + } + return pipeName + "@" + DateTimeUtils.convertLongToDate(pipeCreationTime, "ms"); + } + + private static class SessionRuntimeInfo { + private final String connectionKey; + private String receiverNodeType = UNKNOWN; + private int receiverNodeId = -1; + private String protocol = UNKNOWN; + private String senderAddress = UNKNOWN; + private int senderPort = -1; + private String userName = UNKNOWN; + private String senderClusterId = UNKNOWN; + private long lastHandshakeTime; + private long lastTransferTime; + private final AtomicLong requestNum = new AtomicLong(); + private final TreeSet<String> pipeIds = new TreeSet<>(); + + private SessionRuntimeInfo(String connectionKey) { + this.connectionKey = connectionKey; + } + + @Override + public int hashCode() { + return Objects.hash(connectionKey); + } + } + + private static class GroupKey implements Comparable<GroupKey> { + private final String receiverNodeType; + private final int receiverNodeId; + private final String protocol; + private final String senderAddress; + private final String userName; + + private GroupKey( + String receiverNodeType, + int receiverNodeId, + String protocol, + String senderAddress, + String userName) { + this.receiverNodeType = receiverNodeType; + this.receiverNodeId = receiverNodeId; + this.protocol = protocol; + this.senderAddress = senderAddress; + this.userName = userName; + } + + @Override + public int compareTo(GroupKey other) { + int result = receiverNodeType.compareTo(other.receiverNodeType); + if (result != 0) { + return result; + } + result = Integer.compare(receiverNodeId, other.receiverNodeId); + if (result != 0) { + return result; + } + result = protocol.compareTo(other.protocol); + if (result != 0) { + return result; + } + result = senderAddress.compareTo(other.senderAddress); + if (result != 0) { + return result; + } + return userName.compareTo(other.userName); + } + + @Override + public boolean equals(Object object) { + if (this == object) { + return true; + } + if (!(object instanceof GroupKey)) { + return false; + } + final GroupKey groupKey = (GroupKey) object; + return receiverNodeId == groupKey.receiverNodeId + && Objects.equals(receiverNodeType, groupKey.receiverNodeType) + && Objects.equals(protocol, groupKey.protocol) + && Objects.equals(senderAddress, groupKey.senderAddress) + && Objects.equals(userName, groupKey.userName); + } + + @Override + public int hashCode() { + return Objects.hash(receiverNodeType, receiverNodeId, protocol, senderAddress, userName); + } + } + + private static class AggregatedRuntimeInfo { + private final GroupKey groupKey; + private final TreeSet<Integer> senderPorts = new TreeSet<>(); + private final TreeSet<String> pipeIds = new TreeSet<>(); + private final TreeSet<String> senderClusterIds = new TreeSet<>(); + private int connectionCount; + private long lastHandshakeTime; + private long lastTransferTime; + private long requestNum; + + private AggregatedRuntimeInfo(GroupKey groupKey) { + this.groupKey = groupKey; + } + + private GroupKey getGroupKey() { + return groupKey; + } + + private PipeReceiverRuntimeSnapshot toSnapshot() { + return new PipeReceiverRuntimeSnapshot( + groupKey.receiverNodeType, + groupKey.receiverNodeId, + groupKey.protocol, + groupKey.senderAddress, + joinIntegerSet(senderPorts), + connectionCount, + pipeIds.size(), + pipeIds.isEmpty() ? UNKNOWN : joinStringSet(pipeIds), + groupKey.userName, + senderClusterIds.isEmpty() ? UNKNOWN : joinStringSet(senderClusterIds), + lastHandshakeTime, + lastTransferTime, + requestNum); + } + } + + private static String joinIntegerSet(TreeSet<Integer> values) { + final StringJoiner joiner = new StringJoiner(","); + for (Integer value : values) { + joiner.add(String.valueOf(value)); + } + return joiner.toString(); + } + + private static String joinStringSet(TreeSet<String> values) { + final StringJoiner joiner = new StringJoiner(";"); + for (String value : values) { + joiner.add(value); + } + return joiner.toString(); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/runtime/PipeReceiverRuntimeSnapshot.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/runtime/PipeReceiverRuntimeSnapshot.java new file mode 100644 index 00000000000..58a56333286 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/runtime/PipeReceiverRuntimeSnapshot.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.receiver.runtime; + +public class PipeReceiverRuntimeSnapshot { + + private final String receiverNodeType; + private final int receiverNodeId; + private final String protocol; + private final String senderAddress; + private final String senderPorts; + private final int connectionCount; + private final int pipeCount; + private final String pipeIds; + private final String userName; + private final String senderClusterId; + private final long lastHandshakeTime; + private final long lastTransferTime; + private final long requestNum; + + public PipeReceiverRuntimeSnapshot( + String receiverNodeType, + int receiverNodeId, + String protocol, + String senderAddress, + String senderPorts, + int connectionCount, + int pipeCount, + String pipeIds, + String userName, + String senderClusterId, + long lastHandshakeTime, + long lastTransferTime, + long requestNum) { + this.receiverNodeType = receiverNodeType; + this.receiverNodeId = receiverNodeId; + this.protocol = protocol; + this.senderAddress = senderAddress; + this.senderPorts = senderPorts; + this.connectionCount = connectionCount; + this.pipeCount = pipeCount; + this.pipeIds = pipeIds; + this.userName = userName; + this.senderClusterId = senderClusterId; + this.lastHandshakeTime = lastHandshakeTime; + this.lastTransferTime = lastTransferTime; + this.requestNum = requestNum; + } + + public String getReceiverNodeType() { + return receiverNodeType; + } + + public int getReceiverNodeId() { + return receiverNodeId; + } + + public String getProtocol() { + return protocol; + } + + public String getSenderAddress() { + return senderAddress; + } + + public String getSenderPorts() { + return senderPorts; + } + + public int getConnectionCount() { + return connectionCount; + } + + public int getPipeCount() { + return pipeCount; + } + + public String getPipeIds() { + return pipeIds; + } + + public String getUserName() { + return userName; + } + + public String getSenderClusterId() { + return senderClusterId; + } + + public long getLastHandshakeTime() { + return lastHandshakeTime; + } + + public long getLastTransferTime() { + return lastTransferTime; + } + + public long getRequestNum() { + return requestNum; + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java index 1f76f5d2453..9d7c85d975a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java @@ -22,12 +22,14 @@ package org.apache.iotdb.commons.pipe.sink.client; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.audit.UserEntity; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferHandshakeConstant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.SocketTimeoutException; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -51,6 +53,9 @@ public abstract class IoTDBClientManager { protected final boolean shouldMarkAsPipeRequest; protected final boolean skipIfNoPrivileges; + protected volatile String pipeName; + protected volatile long pipeCreationTime = Long.MIN_VALUE; + // This flag indicates whether the receiver supports mods transferring if // it is a DataNode receiver. The flag is useless for configNode receiver. protected boolean supportModsIfIsDataNodeReceiver = true; @@ -89,6 +94,21 @@ public abstract class IoTDBClientManager { return supportModsIfIsDataNodeReceiver; } + public void setPipeInfo(final String pipeName, final long pipeCreationTime) { + this.pipeName = pipeName; + this.pipeCreationTime = pipeCreationTime; + } + + protected void appendPipeInfoToHandshakeParams(final Map<String, String> params) { + if (pipeName == null) { + return; + } + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PIPE_NAME, pipeName); + params.put( + PipeTransferHandshakeConstant.HANDSHAKE_KEY_PIPE_CREATION_TIME, + String.valueOf(pipeCreationTime)); + } + public void adjustTimeoutIfNecessary(Throwable e) { do { if (e instanceof SocketTimeoutException || e instanceof TimeoutException) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java index ff62dbf477b..e92d93ca64b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java @@ -249,6 +249,7 @@ public abstract class IoTDBSyncClientManager extends IoTDBClientManager implemen params.put( PipeTransferHandshakeConstant.HANDSHAKE_KEY_SKIP_IF, Boolean.toString(skipIfNoPrivileges)); + appendPipeInfoToHandshakeParams(params); // Try to handshake by PipeTransferHandshakeV2Req. TPipeTransferResp resp = client.pipeTransfer(buildHandshakeV2Req(params)); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferHandshakeConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferHandshakeConstant.java index 710fca828e9..3f084a73dc8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferHandshakeConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferHandshakeConstant.java @@ -29,6 +29,8 @@ public class PipeTransferHandshakeConstant { public static final String HANDSHAKE_KEY_USERNAME = "username"; public static final String HANDSHAKE_KEY_CLI_HOSTNAME = "cliHostname"; public static final String HANDSHAKE_KEY_PASSWORD = "password"; + public static final String HANDSHAKE_KEY_PIPE_NAME = "pipeName"; + public static final String HANDSHAKE_KEY_PIPE_CREATION_TIME = "pipeCreationTime"; public static final String HANDSHAKE_KEY_VALIDATE_TSFILE = "validateTsFile"; public static final String HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST = "markAsPipeRequest"; public static final String HANDSHAKE_KEY_SKIP_IF = "skipIf"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java index b5662aeec2c..9a1fef61014 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java @@ -30,6 +30,7 @@ import org.apache.iotdb.commons.pipe.sink.compressor.PipeCompressorConfig; import org.apache.iotdb.commons.pipe.sink.compressor.PipeCompressorFactory; import org.apache.iotdb.commons.pipe.sink.limiter.GlobalRPCRateLimiter; import org.apache.iotdb.commons.pipe.sink.limiter.PipeEndPointRateLimiter; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferHandshakeConstant; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferCompressedReq; import org.apache.iotdb.commons.utils.NodeUrlUtils; import org.apache.iotdb.metrics.type.Histogram; @@ -190,6 +191,8 @@ public abstract class IoTDBSink implements PipeConnector, PipeConnectorWithEvent private final AtomicLong totalCompressedSize = new AtomicLong(0); protected String attributeSortedString; protected String sinkTaskId; + protected String pipeName; + protected long creationTime = Long.MIN_VALUE; protected Timer compressionTimer; protected boolean isRealtimeFirst; @@ -396,6 +399,8 @@ public abstract class IoTDBSink implements PipeConnector, PipeConnectorWithEvent (PipeTaskSinkRuntimeEnvironment) environment; attributeSortedString = sinkEnvironment.getAttributeSortedString(); sinkTaskId = sinkEnvironment.getSinkTaskId(); + pipeName = sinkEnvironment.getPipeName(); + creationTime = sinkEnvironment.getCreationTime(); } nodeUrls.clear(); @@ -621,6 +626,16 @@ public abstract class IoTDBSink implements PipeConnector, PipeConnectorWithEvent return totalUncompressedSize.get(); } + protected void appendPipeInfoToHandshakeParams(final Map<String, String> params) { + if (pipeName == null) { + return; + } + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PIPE_NAME, pipeName); + params.put( + PipeTransferHandshakeConstant.HANDSHAKE_KEY_PIPE_CREATION_TIME, + String.valueOf(creationTime)); + } + public void rateLimitIfNeeded( final String pipeName, final long creationTime, diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java index 6d3c05d3b4f..064b5960245 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java @@ -143,6 +143,7 @@ public abstract class IoTDBSslSyncSink extends IoTDBSink { loadTsFileValidation, shouldMarkAsPipeRequest, skipIfNoPrivileges); + clientManager.setPipeInfo(pipeName, creationTime); } protected abstract IoTDBSyncClientManager constructClient( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/planner/plan/node/PlanNodeType.java index 71eb2238f15..790b1f9c192 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -142,6 +142,7 @@ public enum PlanNodeType { SHOW_DISK_USAGE((short) 107), TREE_COLLECT((short) 108), LOAD_TSFILE_OBJECT_PIECE((short) 109), + SHOW_RECEIVERS((short) 110), CREATE_OR_UPDATE_TABLE_DEVICE((short) 902), TABLE_DEVICE_QUERY_SCAN((short) 903), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java index 186f7daa684..9109e417163 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java @@ -197,6 +197,21 @@ public class ColumnHeaderConstant { public static final String REMAINING_EVENT_COUNT = "RemainingEventCount"; public static final String ESTIMATED_REMAINING_SECONDS = "EstimatedRemainingSeconds"; + // column names for show receivers + public static final String RECEIVER_NODE_TYPE = "ReceiverNodeType"; + public static final String RECEIVER_NODE_ID = "ReceiverNodeId"; + public static final String PROTOCOL = "Protocol"; + public static final String SENDER_ADDRESS = "SenderAddress"; + public static final String SENDER_PORTS = "SenderPorts"; + public static final String CONNECTION_COUNT = "ConnectionCount"; + public static final String PIPE_COUNT = "PipeCount"; + public static final String PIPE_IDS = "PipeIDs"; + public static final String RECEIVER_USER_NAME = "UserName"; + public static final String SENDER_CLUSTER_ID = "SenderClusterId"; + public static final String LAST_HANDSHAKE_TIME = "LastHandshakeTime"; + public static final String LAST_TRANSFER_TIME = "LastTransferTime"; + public static final String REQUEST_NUM = "RequestNum"; + // column names for select into public static final String SOURCE_DEVICE = "SourceDevice"; public static final String SOURCE_COLUMN = "SourceColumn"; @@ -270,6 +285,19 @@ public class ColumnHeaderConstant { public static final String ESTIMATED_REMAINING_SECONDS_TABLE_MODEL = "estimated_remaining_seconds"; + public static final String RECEIVER_NODE_TYPE_TABLE_MODEL = "receiver_node_type"; + public static final String RECEIVER_NODE_ID_TABLE_MODEL = "receiver_node_id"; + public static final String PROTOCOL_TABLE_MODEL = "protocol"; + public static final String SENDER_ADDRESS_TABLE_MODEL = "sender_address"; + public static final String SENDER_PORTS_TABLE_MODEL = "sender_ports"; + public static final String CONNECTION_COUNT_TABLE_MODEL = "connection_count"; + public static final String PIPE_COUNT_TABLE_MODEL = "pipe_count"; + public static final String PIPE_IDS_TABLE_MODEL = "pipe_ids"; + public static final String SENDER_CLUSTER_ID_TABLE_MODEL = "sender_cluster_id"; + public static final String LAST_HANDSHAKE_TIME_TABLE_MODEL = "last_handshake_time"; + public static final String LAST_TRANSFER_TIME_TABLE_MODEL = "last_transfer_time"; + public static final String REQUEST_NUM_TABLE_MODEL = "request_num"; + public static final String PLUGIN_NAME_TABLE_MODEL = "plugin_name"; public static final String PLUGIN_TYPE_TABLE_MODEL = "plugin_type"; public static final String CLASS_NAME_TABLE_MODEL = "class_name"; @@ -664,6 +692,22 @@ public class ColumnHeaderConstant { new ColumnHeader(CLIENT_IP_TREE_MODEL, TSDataType.STRING), new ColumnHeader(TIMEOUT, TSDataType.INT64)); + public static final List<ColumnHeader> showReceiversColumnHeaders = + ImmutableList.of( + new ColumnHeader(RECEIVER_NODE_TYPE, TSDataType.TEXT), + new ColumnHeader(RECEIVER_NODE_ID, TSDataType.INT32), + new ColumnHeader(PROTOCOL, TSDataType.TEXT), + new ColumnHeader(SENDER_ADDRESS, TSDataType.TEXT), + new ColumnHeader(SENDER_PORTS, TSDataType.TEXT), + new ColumnHeader(CONNECTION_COUNT, TSDataType.INT32), + new ColumnHeader(PIPE_COUNT, TSDataType.INT32), + new ColumnHeader(PIPE_IDS, TSDataType.TEXT), + new ColumnHeader(RECEIVER_USER_NAME, TSDataType.TEXT), + new ColumnHeader(SENDER_CLUSTER_ID, TSDataType.TEXT), + new ColumnHeader(LAST_HANDSHAKE_TIME, TSDataType.TEXT), + new ColumnHeader(LAST_TRANSFER_TIME, TSDataType.TEXT), + new ColumnHeader(REQUEST_NUM, TSDataType.INT64)); + public static final List<ColumnHeader> showDiskUsageColumnHeaders = ImmutableList.of( new ColumnHeader(DATABASE, TSDataType.TEXT), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java index c4001ecbb1e..fd91c0f8921 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java @@ -47,6 +47,7 @@ public class InformationSchema { public static final String COLUMNS = "columns"; public static final String REGIONS = "regions"; public static final String PIPES = "pipes"; + public static final String RECEIVERS = "receivers"; public static final String PIPE_PLUGINS = "pipe_plugins"; public static final String TOPICS = "topics"; public static final String SUBSCRIPTIONS = "subscriptions"; @@ -221,6 +222,41 @@ public class InformationSchema { ColumnHeaderConstant.ESTIMATED_REMAINING_SECONDS_TABLE_MODEL, TSDataType.DOUBLE)); schemaTables.put(PIPES, pipeTable); + final TsTable receiversTable = new TsTable(RECEIVERS); + receiversTable.addColumnSchema( + new TagColumnSchema( + ColumnHeaderConstant.RECEIVER_NODE_TYPE_TABLE_MODEL, TSDataType.STRING)); + receiversTable.addColumnSchema( + new TagColumnSchema(ColumnHeaderConstant.RECEIVER_NODE_ID_TABLE_MODEL, TSDataType.INT32)); + receiversTable.addColumnSchema( + new TagColumnSchema(ColumnHeaderConstant.PROTOCOL_TABLE_MODEL, TSDataType.STRING)); + receiversTable.addColumnSchema( + new TagColumnSchema(ColumnHeaderConstant.SENDER_ADDRESS_TABLE_MODEL, TSDataType.STRING)); + receiversTable.addColumnSchema( + new AttributeColumnSchema( + ColumnHeaderConstant.SENDER_PORTS_TABLE_MODEL, TSDataType.STRING)); + receiversTable.addColumnSchema( + new AttributeColumnSchema( + ColumnHeaderConstant.CONNECTION_COUNT_TABLE_MODEL, TSDataType.INT32)); + receiversTable.addColumnSchema( + new AttributeColumnSchema(ColumnHeaderConstant.PIPE_COUNT_TABLE_MODEL, TSDataType.INT32)); + receiversTable.addColumnSchema( + new AttributeColumnSchema(ColumnHeaderConstant.PIPE_IDS_TABLE_MODEL, TSDataType.STRING)); + receiversTable.addColumnSchema( + new TagColumnSchema(ColumnHeaderConstant.USER_NAME, TSDataType.STRING)); + receiversTable.addColumnSchema( + new AttributeColumnSchema( + ColumnHeaderConstant.SENDER_CLUSTER_ID_TABLE_MODEL, TSDataType.STRING)); + receiversTable.addColumnSchema( + new AttributeColumnSchema( + ColumnHeaderConstant.LAST_HANDSHAKE_TIME_TABLE_MODEL, TSDataType.TIMESTAMP)); + receiversTable.addColumnSchema( + new AttributeColumnSchema( + ColumnHeaderConstant.LAST_TRANSFER_TIME_TABLE_MODEL, TSDataType.TIMESTAMP)); + receiversTable.addColumnSchema( + new AttributeColumnSchema(ColumnHeaderConstant.REQUEST_NUM_TABLE_MODEL, TSDataType.INT64)); + schemaTables.put(RECEIVERS, receiversTable); + final TsTable pipePluginTable = new TsTable(PIPE_PLUGINS); pipePluginTable.addColumnSchema( new TagColumnSchema(ColumnHeaderConstant.PLUGIN_NAME_TABLE_MODEL, TSDataType.STRING)); diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/runtime/PipeReceiverRuntimeRegistryTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/runtime/PipeReceiverRuntimeRegistryTest.java new file mode 100644 index 00000000000..4445c8e9f58 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/runtime/PipeReceiverRuntimeRegistryTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.receiver.runtime; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class PipeReceiverRuntimeRegistryTest { + + private final PipeReceiverRuntimeRegistry registry = PipeReceiverRuntimeRegistry.getInstance(); + + @Before + public void setUp() { + registry.clear(); + } + + @After + public void tearDown() { + registry.clear(); + } + + @Test + public void testAggregateAndSortSnapshots() { + registry.registerOrUpdateSession( + "data-1", + PipeReceiverRuntimeRegistry.NODE_TYPE_DATA_NODE, + 2, + PipeReceiverRuntimeRegistry.PROTOCOL_THRIFT, + "127.0.0.1", + 9001, + "root", + "cluster-a", + "pipe-a", + 1, + 100); + registry.markTransfer("data-1", 200); + registry.markRequest("data-1"); + registry.registerOrUpdateSession( + "data-2", + PipeReceiverRuntimeRegistry.NODE_TYPE_DATA_NODE, + 2, + PipeReceiverRuntimeRegistry.PROTOCOL_THRIFT, + "127.0.0.1", + 9002, + "root", + "cluster-b", + "pipe-b", + 2, + 150); + registry.registerOrUpdateSession( + "config-1", + PipeReceiverRuntimeRegistry.NODE_TYPE_CONFIG_NODE, + 1, + PipeReceiverRuntimeRegistry.PROTOCOL_AIR_GAP, + "127.0.0.2", + 9003, + "root", + PipeReceiverRuntimeRegistry.UNKNOWN, + null, + Long.MIN_VALUE, + 300); + + final List<PipeReceiverRuntimeSnapshot> snapshots = registry.snapshot(); + + assertEquals(2, snapshots.size()); + assertEquals( + PipeReceiverRuntimeRegistry.NODE_TYPE_CONFIG_NODE, snapshots.get(0).getReceiverNodeType()); + assertEquals( + PipeReceiverRuntimeRegistry.NODE_TYPE_DATA_NODE, snapshots.get(1).getReceiverNodeType()); + + final PipeReceiverRuntimeSnapshot dataSnapshot = snapshots.get(1); + assertEquals(2, dataSnapshot.getConnectionCount()); + assertEquals("9001,9002", dataSnapshot.getSenderPorts()); + assertEquals(2, dataSnapshot.getPipeCount()); + assertTrue(dataSnapshot.getPipeIds().contains("pipe-a@")); + assertTrue(dataSnapshot.getPipeIds().contains("pipe-b@")); + assertEquals("cluster-a;cluster-b", dataSnapshot.getSenderClusterId()); + assertEquals(150, dataSnapshot.getLastHandshakeTime()); + assertEquals(200, dataSnapshot.getLastTransferTime()); + assertEquals(4, dataSnapshot.getRequestNum()); + } + + @Test + public void testDeregisterSession() { + registry.registerOrUpdateSession( + "data-1", + PipeReceiverRuntimeRegistry.NODE_TYPE_DATA_NODE, + 1, + PipeReceiverRuntimeRegistry.PROTOCOL_THRIFT, + "127.0.0.1", + 9001, + "root", + "cluster-a", + "pipe-a", + 1, + 100); + + assertEquals(1, registry.snapshot().size()); + + registry.deregister("data-1"); + + assertTrue(registry.snapshot().isEmpty()); + } +} diff --git a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 index f984e825fc7..e7a02f269ab 100644 --- a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 +++ b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 @@ -102,6 +102,7 @@ statement | startPipeStatement | stopPipeStatement | showPipesStatement + | showReceiversStatement | createPipePluginStatement | dropPipePluginStatement | showPipePluginsStatement @@ -519,6 +520,10 @@ showPipesStatement : SHOW ((PIPE pipeName=identifier) | PIPES (WHERE (CONNECTOR | SINK) USED BY pipeName=identifier)?) ; +showReceiversStatement + : SHOW RECEIVERS + ; + createPipePluginStatement : CREATE PIPEPLUGIN (IF NOT EXISTS)? pluginName=identifier AS className=string uriClause ; @@ -1504,7 +1509,7 @@ nonReserved | OBJECT | OF | OFFSET | OMIT | ONE | ONLY | OPTION | ORDINALITY | OUTPUT | OVER | OVERFLOW | PARTITION | PARTITIONS | PASSING | PAST | PATH | PATTERN | PER | PERIOD | PERMUTE | PIPE | PIPEPLUGIN | PIPEPLUGINS | PIPES | PLAN | POSITION | PRECEDING | PRECISION | PRIVILEGES | PREVIOUS | PROCESSLIST | PROCESSOR | PROPERTIES | PRUNE | QUERIES | QUERY | QUOTES - | RANGE | READ | READONLY | RECONSTRUCT | REFRESH | REGION | REGIONID | REGIONS | REMOVE | RENAME | REPAIR | REPEAT | REPEATABLE | REPLACE | RESET | RESPECT | RESTRICT | RETURN | RETURNING | RETURNS | REVOKE | ROLE | ROLES | ROLLBACK | ROOT | ROW | ROWS | RPR_FIRST | RPR_LAST | RUNNING + | RANGE | READ | READONLY | RECEIVERS | RECONSTRUCT | REFRESH | REGION | REGIONID | REGIONS | REMOVE | RENAME | REPAIR | REPEAT | REPEATABLE | REPLACE | RESET | RESPECT | RESTRICT | RETURN | RETURNING | RETURNS | REVOKE | ROLE | ROLES | ROLLBACK | ROOT | ROW | ROWS | RPR_FIRST | RPR_LAST | RUNNING | SERIESSLOTID | SERVICE | SERVICES | SCALAR | SCHEMA | SCHEMAS | SECOND | SECURITY | SEEK | SERIALIZABLE | SESSION | SET | SETS | SECURITY | SHOW | SINK | SOME | SOURCE | START | STATS | STOP | SUBSCRIPTION | SUBSCRIPTIONS | SUBSET | SUBSTRING | SYSTEM | TABLES | TABLESAMPLE | TAG | TAGS | TEXT | TEXT_STRING | TIES | TIME | TIMEPARTITION | TIMER | TIMER_XL | TIMESERIES | TIMESLOTID | TIMESTAMP | TO | TOPIC | TOPICS | TRAILING | TRANSACTION | TRUNCATE | TRY_CAST | TYPE @@ -1794,6 +1799,7 @@ QUOTES: 'QUOTES'; RANGE: 'RANGE'; READ: 'READ'; READONLY: 'READONLY'; +RECEIVERS: 'RECEIVERS'; RECONSTRUCT: 'RECONSTRUCT'; RECURSIVE: 'RECURSIVE'; REFRESH: 'REFRESH';
