Oscarcheng0312 opened a new pull request, #7792:
URL: https://github.com/apache/incubator-seata/pull/7792
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- Please make sure you have read and understood the contributing
guidelines -->
- [ ] I have read the
[CONTRIBUTING.md](https://github.com/apache/incubator-seata/blob/2.x/CONTRIBUTING.md)
guidelines.
- [ ] I have registered the PR
[changes](https://github.com/apache/incubator-seata/tree/2.x/changes).
### Ⅰ. Describe what this PR did
# II. Client–Server Communication
During the heartbeat sending cycle, the client reports connection-pool
metrics (HikariCP, Druid) to the server at a fixed interval.
## 1. Client-side Heartbeat Reporting of Metrics
### 1) Modifications
`core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java`
- Added extension points `createMetricsMessage()` and
`shouldReportPoolInfo()`, used during the Writer Idle event to determine
whether connection-pool metrics should be sent.
- In the `IdleStateEvent.WRITER_IDLE_STATE_EVENT` branch, added reporting
logic: when `shouldReportPoolInfo()` is true, send the result of
`createMetricsMessage()`.
Example snippet:
```java
// On Writer Idle, send heartbeat and optionally send connection-pool metrics
AbstractNettyRemotingClient.this.sendAsyncRequest(ctx.channel(),
HeartbeatMessage.PING);
if (shouldReportPoolInfo()) {
AbstractNettyRemotingClient.this.sendAsyncRequest(ctx.channel(),
createMetricsMessage());
}
```
`core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java`
- Overrides `createMetricsMessage()` and `shouldReportPoolInfo()`,delegating
to the client-side processor.
- Adds `setEnableConnectionPoolMetrics(boolean)` and `setHttpPort(int)`,for
enabling metrics reporting and setting the client HTTP port.
- Registers the connection-pool metrics processor in `registerProcessor()` .
Example:
```java
@Override
public Object createMetricsMessage() {
return clientConnectionPoolMetricsProcessor.createMetricsMessage();
}
@Override
public boolean shouldReportPoolInfo() {
return clientConnectionPoolMetricsProcessor.shouldReportPoolInfo();
}
// Register processor
clientConnectionPoolMetricsProcessor = new
ClientConnectionPoolMetricsProcessor(applicationId);
super.registerProcessor(MessageType.TYPE_CONNECTION_POOL_METRICS,
clientConnectionPoolMetricsProcessor, null);
```
`core/src/main/java/org/apache/seata/core/protocol/MessageType.java`
- Added a new type code: `TYPE_CONNECTION_POOL_METRICS = 122`,used for
routing connection-pool metrics messages.
### 2) Additions
`core/src/main/java/org/apache/seata/core/rpc/processor/client/ClientConnectionPoolMetricsProcessor.java`
- Client-side metrics processor: constructs
`ConnectionPoolMetricsMessage`((includes HikariCP and Druid metrics), and
controls report throttling (default 10 seconds).
- Responsible for generating `clientUrl`(`ip:port`), allowing the server to
identify the client address (for update operations).
Example:
```java
public Object createMetricsMessage() {
ConnectionPoolMetricsMessage msg = new ConnectionPoolMetricsMessage();
msg.setApplicationId(applicationId);
msg.setClientUrl(getClientHttpUrl());
msg.setDruidMetrics(DataSourceConnectionPoolCollector.collectAllDruidPoolMetrics());
msg.setHikariMetrics(DataSourceConnectionPoolCollector.collectAllHikariPoolMetrics());
msg.setSequenceNumber(sequence.incrementAndGet());
lastReportTime = System.currentTimeMillis();
msg.setTimestamp(lastReportTime);
return msg;
}
```
`core/src/main/java/org/apache/seata/core/protocol/ConnectionPoolMetricsMessage.java`
- Metrics message body: carries application ID, client URL, HikariCP/Druid
metrics list, sequence number, timestamp.
Uses type code `TYPE_CONNECTION_POOL_METRICS`。
## 2. Server-side Handling of Metrics
### 1)Modifications
`core/src/main/java/org/apache/seata/core/rpc/netty/NettyRemotingServer.java`
- Registers the server-side metrics processor `registerProcessor()` in
`ServerConnectionPoolMetricsProcessor`,enabling reception and processing of
client reports.
### 2)Additions
`core/src/main/java/org/apache/seata/core/rpc/processor/server/ServerConnectionPoolMetricsProcessor.java`
- Server-side processor: parses messages and dispatches them to
`ConnectionPoolInfoCache`,where metrics are aggregated by `poolName` across
multiple clients.
`core/src/main/java/org/apache/seata/core/rpc/processor/server/ConnectionPoolInfoCache.java`
- Server-side cache: stores HikariCP/Druid metrics, pool type, and client
URL grouped by pool name.
- Provides `refresh(PoolConfigUpdateRequest)` to update the configuration
snapshot in cache.
Auxiliary types:
- `core/src/main/java/org/apache/seata/core/protocol/PoolType.java`
- Enumeration of connection-pool types.
## 3.Serialization/Deserialization: Protobuf
### 1)Modifications
`serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/manager/ProtobufConvertManager.java`
**Purpose**: Protobuf Conversion Manager — responsible for registering and
managing converters for all message types.
- Maintains three core mapping tables.
- Provides registration and lookup for type converters.
- Supports bi-directional conversion (Java Model ↔ Protobuf Message).
```java
protobufConvertManager.convertorMap.put(
ConnectionPoolMetricsMessage.class.getName(), new
ConnectionPoolMetricsMessageConvertor());
protobufConvertManager.protoClazzMap.put(
ConnectionPoolMetricsMessageProto.getDescriptor().getFullName(),
ConnectionPoolMetricsMessageProto.class);
protobufConvertManager.reverseConvertorMap.put(
ConnectionPoolMetricsMessageProto.class.getName(), new
ConnectionPoolMetricsMessageConvertor());
```
### 2)Additions
`serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/convertor/ConnectionPoolMetricsMessageConvertor.java`
**Purpose**: Handles Protobuf serialization and deserialization of
connection-pool metrics messages.
```java
@Override
public ConnectionPoolMetricsMessageProto
convert2Proto(ConnectionPoolMetricsMessage msg) {
// Build protobuf message, handle null fields
builder.setApplicationId(msg.getApplicationId() == null ? "" :
msg.getApplicationId());
// Convert Hikari & Druid metrics
}
@Override
public ConnectionPoolMetricsMessage
convert2Model(ConnectionPoolMetricsMessageProto proto) {
// Restore Java object from protobuf message
// Handle conversion for list types
}
```
`serializer/seata-serializer-protobuf/src/main/resources/protobuf/org/apache/seata/protocol/transcation/connectionPoolMetricsMessage.proto`
**Purpose**: Defines the Protobuf schema for connection-pool metrics
messages.
## 4.Serialization/Deserialization: Seata Codec
### 1)Modifications
`serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/MessageCodecFactory.java`
**Purpose:** Factory for Seata message codecs; creates corresponding codec
instances based on message type.
1. **New imports**:
- Added import for `ConnectionPoolMetricsMessage`
- Added import for `ConnectionPoolMetricsMessageCodec`
2. **getMessageCodec method**:
- Added support for `MessageType.TYPE_CONNECTION_POOL_METRICS`
- Returns instance of`ConnectionPoolMetricsMessageCodec`
3. **getMessage method**:
- Added support for `MessageType.TYPE_CONNECTION_POOL_METRICS`
- Returns new `ConnectionPoolMetricsMessage` instance
```java
case MessageType.TYPE_CONNECTION_POOL_METRICS:
msgCodec = new ConnectionPoolMetricsMessageCodec(version);
break;
case MessageType.TYPE_CONNECTION_POOL_METRICS:
abstractMessage = new ConnectionPoolMetricsMessage();
break;
```
------
### 2)Additions
`serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/protocol/ConnectionPoolMetricsMessageCodec.java`
**Purpose:** Implements binary encoding/decoding for
`ConnectionPoolMetricsMessage` supporting compact serialization of
connection-pool monitoring data.
1. **Message type support**:
- Implements `MessageSeataCodec`
- Handles only `ConnectionPoolMetricsMessage`
2. **Encoding**:
- Encodes metrics message into binary
- Serializes HikariCP/Druid metrics, SQL execution records, slow-SQL lists
3. **Decoding**:
- Decodes binary data back into `ConnectionPoolMetricsMessage`
- Supports list reconstruction for both pool types and SQL entries
```java
@Override
public Class<?> getMessageClassType() {
return ConnectionPoolMetricsMessage.class;
}
@Override
public <T> void encode(T t, ByteBuf out) {
// Binary encoding of connection-pool metrics
}
@Override
public <T> void decode(T t, ByteBuffer in) {
// Binary decoding of connection-pool metrics
}
```
## 5.Explanation of Key Method Changes
- Client sending timing (core loop):`AbstractNettyRemotingClient`
- `shouldReportPoolInfo()`:default false; subclasses determine reporting
based on switch and time window.
- `createMetricsMessage()`:default returns `HeartbeatMessage.PING`,RM
overrides to return `ConnectionPoolMetricsMessage`。
- RM client settings: `RmNettyRemotingClient`
- `setEnableConnectionPoolMetrics(boolean)`:
- `setHttpPort(int)`:
- `registerProcessor()`:Registers `ClientConnectionPoolMetricsProcessor`
for message construction & processing.
- Metrics collection: `DataSourceConnectionPoolCollector`
- `collectHikariMetrics(...)`:reads metrics via `HikariPoolMXBean` and
`HikariDataSource`
- `collectDruidMetrics(...)`:reads pool metrics from `DruidDataSource`and
injects SQL execution data.
- `updateConfig(poolName, request)`:updates pool config using reflection,
ensuring safety.
- Message construction & throttling: `ClientConnectionPoolMetricsProcessor`
- `createMetricsMessage()`:wraps application ID, client URL, metrics,
sequence, timestamp.
- `shouldReportPoolInfo()`:throttles based on last report time (default
every 10 seconds).
- Server-side processing chain: `ServerConnectionPoolMetricsProcessor` →
`ConnectionPoolInfoCache`
- Aggregates metrics by `poolName` and stores pool type + client URL.
- Protocol & serialization:
- `ConnectionPoolMetricsMessage` and
`MessageType.TYPE_CONNECTION_POOL_METRICS`
- Fully compatible with Netty/Grpc serialization paths.
---
## 6.Class Relationship Diagram
```text
┌──────────────────────┐ ┌──────────────────────────────────────┐
│ RmNettyRemotingClient│ uses │ ClientConnectionPoolMetricsProcessor │
└─────────┬────────────┘ └──────────────┬───────────────────────┘
│ │ builds
│ ▼
│ ConnectionPoolMetricsMessage
│ │ carries
│ ▼
heartbeat │ HikariConnectionPoolMetrics /
DruidConnectionPoolMetrics
│ ▲
│ │ collects
│ DataSourceConnectionPoolCollector
│ ▲
│ │ uses
│ SqlCollector(+Slow/Exec)
│
▼
┌────────────────────┐ ┌─────────────────────────────────────┐
│NettyRemotingServer │ uses │ ServerConnectionPoolMetricsProcessor│
└────────────────────┘ └─────────────────────────────────────┘
│ cache
▼
┌────────────────────────┐
│ ConnectionPoolInfoCache│
└────────────────────────┘
```
### Ⅴ. Special notes for reviews
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]