DanielCarter-stack commented on PR #10238:
URL: https://github.com/apache/seatunnel/pull/10238#issuecomment-3834906786
<!-- code-pr-reviewer -->
<!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10238", "part": 1,
"total": 1} -->
### Issue 1: Collectors.toMap missing merge function, duplicate mappings
throw exception
**Location**: `StarRocksBeReadClient.java:123-160`
**Problem Description**:
The code uses `Collectors.toMap(KeyMapper, ValueMapper)` without providing a
merge function. If a user configures multiple mappings for the same host in
`be_host_port_mapping` configuration (possibly due to accidental configuration
error), it will throw `IllegalStateException`, and the error message is not
user-friendly.
**Reproduction Configuration**:
```hocon
be_host_port_mapping = [
{ host_port = "be1:9060", ip_port = "192.168.1.1:31088" },
{ host_port = "be1:9060", ip_port = "192.168.1.2:31088" } # 重复主机
]
```
**Exception**:
```
java.lang.IllegalStateException: Duplicate key be1
```
**Impact Scope**:
- Direct impact: `StarRocksBeReadClient` constructor
- Indirect impact: `StarRocksSourceReader` cannot be initialized, job
startup fails
- Impact surface: Single Connector
**Severity**: MAJOR
**Improvement Suggestion**:
```java
private static Map<String, Pair<String, Integer>> formatBeHostPortMapping(
SourceConfig sourceConfig) {
return sourceConfig.getBeHostPortMapping().stream()
.collect(Collectors.toMap(
mapping -> extractHost(mapping),
mapping -> parsePort(mapping),
(existing, duplicate) -> {
log.warn("Duplicate host mapping found: '{}'. Using
first mapping: '{}', ignoring: '{}'",
existing.getKey(), existing, duplicate);
return existing; // Or throw a more friendly
exception
}
));
}
```
**Rationale**: Provide clear error messages or handling strategies to avoid
user confusion.
---
### Issue 2: Port validation incomplete, port range and empty port not
checked
**Location**: `StarRocksBeReadClient.java:143-160`
**Problem Description**:
The code only checks whether the port can be parsed as `int`, without
checking:
1. Whether the port is empty (e.g., `be_host:`)
2. Whether the port is within valid range (1-65535)
3. Whether the port is negative or zero
**Reproduction Configuration**:
```hocon
be_host_port_mapping = [
{ host_port = "be1:9060", ip_port = "192.168.1.1:0" } # 端口为0
{ host_port = "be2:9060", ip_port = "192.168.1.2:70000" } # 端口超出范围
{ host_port = "be3:9060", ip_port = "192.168.1.3:abc" } # 非数字
{ host_port = "be4:9060", ip_port = "192.168.1.4:" } # 端口为空
]
```
**Potential Risks**:
- Risk 1: When port is 0 or out of range, `TSocket` connection will fail,
but the error message is unclear
- Risk 2: User configuration errors are difficult to discover during
initialization, errors are only reported when connecting
**Impact Scope**:
- Direct impact: `StarRocksBeReadClient.formatBeHostPortMapping`
- Indirect impact: Configuration errors only discovered at runtime
- Impact surface: Single Connector
**Severity**: MAJOR
**Improvement Suggestion**:
```java
try {
int port = Integer.parseInt(accessIpInfo[1].trim());
if (port <= 0 || port > 65535) {
log.error("Invalid port number: '{}' in ip_port '{}'. Port must be
between 1 and 65535",
port, mapping.getIpPort());
throw new StarRocksConnectorException(
StarRocksConnectorErrorCode.HOST_MAPPING_ILLEGAL,
String.format("Invalid port number: %s in ip_port '%s'. Port
must be between 1 and 65535",
port, mapping.getIpPort()));
}
return Pair.of(accessIpInfo[0].trim(), port);
} catch (NumberFormatException e) {
log.error("The port '{}' in ip_port '{}' is not a valid number",
accessIpInfo[1], mapping.getIpPort());
throw new StarRocksConnectorException(
StarRocksConnectorErrorCode.HOST_MAPPING_ILLEGAL,
String.format("The port '%s' in ip_port '%s' is not a valid
number",
accessIpInfo[1], mapping.getIpPort()), e);
}
```
**Rationale**: Validate port validity in advance, provide clear error
messages, reduce configuration error troubleshooting costs.
---
### Issue 3: Error messages do not contain specific configuration values,
making it difficult to locate problems
**Location**: `StarRocksBeReadClient.java:132-135, 146-149`
**Problem Description**:
When the configuration format is incorrect, the exception message only
states "can't be null", but does not display the actual configured value,
making it impossible for users to quickly identify which configuration item has
a problem.
**Current Error Message**:
```java
throw new StarRocksConnectorException(
StarRocksConnectorErrorCode.HOST_MAPPING_ILLEGAL,
"The be host and be_port can't be null");
```
**Improved Error Message Example**:
```
Config 'be_host_port_mapping' parse failed: Invalid host_port value
'be-host:' in mapping entry. Expected format 'host:port', but got empty port.
```
**Impact Scope**:
- Direct impact: All users using `be_host_port_mapping`
- Indirect impact: Increased troubleshooting time
- Impact surface: Single Connector
**Severity**: MAJOR
**Improvement Suggestion**:
```java
// Lines 132-135
if (StringUtils.isBlank(mapping.getHostPort())
|| (hostInfo = mapping.getHostPort().split(":")).length != 2) {
String actualValue = mapping.getHostPort();
log.error("Invalid host_port configuration: '{}'. Expected format
'host:port'", actualValue);
throw new StarRocksConnectorException(
StarRocksConnectorErrorCode.HOST_MAPPING_ILLEGAL,
String.format("Invalid host_port configuration: '%s'. Expected
format 'host:port'", actualValue));
}
// Similar handling for lines 146-149
if (StringUtils.isBlank(mapping.getIpPort())
|| (accessIpInfo = mapping.getIpPort().split(":")).length != 2) {
String actualValue = mapping.getIpPort();
log.error("Invalid ip_port configuration: '{}'. Expected format
'ip:port'", actualValue);
throw new StarRocksConnectorException(
StarRocksConnectorErrorCode.HOST_MAPPING_ILLEGAL,
String.format("Invalid ip_port configuration: '%s'. Expected
format 'ip:port'", actualValue));
}
```
**Rationale**: Error messages should contain sufficient context information
(Who, What, Where, Why) to help users quickly locate and fix problems.
---
### Issue 4: Changing log level from debug to info may cause excessive log
volume
**Location**: `StarRocksBeReadClient.java:70`
**Problem Description**:
Changing the log level for parsing BE addresses from `debug` to `info` will
generate a large amount of logs in scenarios with many BE nodes, affecting log
readability and storage.
**Original Code**:
```java
log.debug("Parse StarRocks BE address: '{}'.", beNodeInfo);
```
**After Modification**:
```java
log.info("Parse StarRocks BE address: '{}'.", beNodeInfo);
```
**Scenario Analysis**:
- Assume the cluster has 10 BE nodes
- Each BE node creates a client once (cached)
- Re-logged on each job restart
- If there are multiple parallel Sources, logging will be repeated
**Potential Risks**:
- Log files grow rapidly
- Critical information gets buried
- Affects log search and analysis
**Impact Scope**:
- Direct impact: Log system
- Indirect impact: Operations troubleshooting efficiency
- Impact surface: Single Connector
**Severity**: MINOR
**Improvement Suggestion**:
```java
// Option 1: Keep debug level
log.debug("Parse StarRocks BE address: '{}'.", beNodeInfo);
// Option 2: Only use info when applying mapping
if (beHostPortMapping.containsKey(hostPort[0].trim())) {
log.info("BE address mapping applied: {}:{} -> {}:{}",
hostPort[0].trim(), hostPort[1].trim(), ip, port);
} else {
log.debug("Parse StarRocks BE address: '{}'.", beNodeInfo);
}
```
**Rationale**: Routine operations should use debug level, important events
(such as configuration mapping taking effect) should use info level.
---
### Issue 5: SourceConfig field declaration order change may affect
serialization compatibility
**Location**: `SourceConfig.java:34-45`
**Problem Description**:
Moving field declarations from after the constructor to before it, although
Java serialization specifications generally allow field reordering, there are
potential risks in Checkpoint/Savepoint recovery scenarios.
**Original Order**:
```java
public SourceConfig(ReadonlyConfig config) { ... }
private int maxRetries = ...;
private int requestTabletSize = ...;
// ... 8 fields
private Map<String, String> sourceOptionProps = new HashMap<>();
```
**Modified Order**:
```java
private int maxRetries = ...;
// ... 11 fields (including beHostPortMapping)
private Map<String, String> sourceOptionProps = new HashMap<>();
public SourceConfig(ReadonlyConfig config) { ... }
```
**Potential Risks**:
- `StarRocksConfig` and `SourceConfig` implement `Serializable`
- Although `serialVersionUID` is not explicitly declared, the JVM calculates
it based on field order, type, etc.
- Field order changes will cause `serialVersionUID` changes (if calculated)
- If there is serialized data from old versions, deserialization may fail
**Trace Call Chain**:
- `SourceConfig` itself is not directly serialized in `StarRocksSourceSplit`
- But if `SourceConfig` is serialized elsewhere (such as in cache, state),
there will be risks
**Impact Scope**:
- Direct impact: Serialization/deserialization
- Indirect impact: Checkpoint recovery (low probability)
- Impact surface: Single Connector
**Severity**: MAJOR
**Improvement Suggestion**:
```java
// Option 1: Add explicit serialVersionUID, keep field order unchanged
public class SourceConfig extends StarRocksConfig {
private static final long serialVersionUID = 1L;
// Keep original field order
public SourceConfig(ReadonlyConfig config) { ... }
private int maxRetries = ...;
// ...
// Add new fields at the end
private List<BeHostPortMapping> beHostPortMapping = new ArrayList<>();
}
// Option 2: If confirmed that SourceConfig does not participate in
serialization, add comment to document
/**
* Note: This class is not directly serialized in checkpoints.
* Field reordering is safe as only the configuration values are used.
*/
public class SourceConfig extends StarRocksConfig {
// ...
}
```
**Rationale**: Explicitly declaring `serialVersionUID` is a best practice
for serialization classes, avoiding deserialization issues caused by field
changes.
---
### Issue 6: Typo in documentation configuration example title
**Location**: `docs/en/connectors/source/StarRocks.md:263`
**Problem Description**:
In the English documentation, the title of example 3 is written as
`be_fost_port_mapping`, should be `be_host_port_mapping`.
**Current Content**:
```markdown
## Example 3: Using 'be_fost_port_mapping' to obtain data
```
**Correct Content**:
```markdown
## Example 3: Using 'be_host_port_mapping' to obtain data
```
**Impact Scope**:
- Direct impact: Documentation accuracy
- Indirect impact: Users cannot find this example when searching for keywords
- Impact surface: Documentation readers
**Severity**: MINOR
**Improvement Suggestion**:
Fix the typo and check if there are other similar typos in the full text.
**Rationale**: Documentation is the user's first point of contact, and typos
reduce the project's professionalism.
---
### Issue 7: formatBeHostPortMapping method complexity too high, poor
readability and testability
**Location**: `StarRocksBeReadClient.java:121-161`
**Problem Description**:
The `formatBeHostPortMapping` method uses Stream API + Lambda, but the
Lambda body contains a large amount of validation logic and exception handling,
resulting in:
1. Method too long (40 lines)
2. Deep Lambda nesting
3. Validation logic cannot be tested separately
4. Return type `Map<String, Pair<String, Integer>>` not intuitive enough
**Current Code**:
```java
private static Map<String, Pair<String, Integer>> formatBeHostPortMapping(
SourceConfig sourceConfig) {
return sourceConfig.getBeHostPortMapping().stream()
.collect(
Collectors.toMap(
mapping -> {
// 15 lines validation logic
},
mapping -> {
// 20 lines validation logic
}));
}
```
**Potential Risks**:
- Difficult to test validation logic separately
- Difficult to reuse validation logic
- Hard for newcomers to understand
**Impact Scope**:
- Direct impact: Code maintainability
- Indirect impact: Future extensions
- Impact surface: Single class
**Severity**: MINOR
**Improvement Suggestion**:
```java
/**
* Validate and extract host from host_port field.
* @throws StarRocksConnectorException if format is invalid
*/
private static String extractHost(BeHostPortMapping mapping) {
String hostPort = mapping.getHostPort();
if (StringUtils.isBlank(hostPort)) {
throw new StarRocksConnectorException(
StarRocksConnectorErrorCode.HOST_MAPPING_ILLEGAL,
"host_port cannot be blank");
}
String[] parts = hostPort.split(":");
if (parts.length != 2) {
throw new StarRocksConnectorException(
StarRocksConnectorErrorCode.HOST_MAPPING_ILLEGAL,
String.format("Invalid host_port format: '%s'. Expected
'host:port'", hostPort));
}
String host = parts[0].trim();
if (StringUtils.isBlank(host)) {
throw new StarRocksConnectorException(
StarRocksConnectorErrorCode.HOST_MAPPING_ILLEGAL,
String.format("Host cannot be empty in host_port: '%s'",
hostPort));
}
return host;
}
/**
* Validate and parse accessible ip:port to Pair.
* @throws StarRocksConnectorException if format is invalid
*/
private static Pair<String, Integer> parseAccessiblePort(BeHostPortMapping
mapping) {
String ipPort = mapping.getIpPort();
// ... Similar validation logic
return Pair.of(ip, port);
}
private static Map<String, Pair<String, Integer>> formatBeHostPortMapping(
SourceConfig sourceConfig) {
return sourceConfig.getBeHostPortMapping().stream()
.collect(Collectors.toMap(
BeHostPortMapping::extractHost,
BeHostPortMapping::parseAccessiblePort,
(existing, duplicate) -> {
log.warn("Duplicate host mapping: '{}'. Using first:
'{}', ignoring: '{}'",
existing.getKey(), existing, duplicate);
return existing;
}
));
}
```
**Rationale**:
1. Extracting methods allows separate testing of validation logic
2. Improve code readability
3. Follow single responsibility principle
4. Facilitate future reuse
---
### Issue 8: Missing unit tests, new functionality verification insufficient
**Location**: `seatunnel-connectors-v2/connector-starrocks/src/test/java/`
**Problem Description**:
The newly added `be_host_port_mapping` functionality lacks unit tests, the
following scenarios are not verified:
1. Normal mapping (BE address in mapping table)
2. No mapping (not configured or BE address not in mapping table)
3. Format error (missing colon, multiple colons, blank string)
4. Invalid port (empty, non-numeric, out of range)
5. Duplicate mapping (same host configured with multiple mappings)
6. Empty configuration list
7. null configuration (although default value is non-null, defensive
programming should consider)
**Potential Risks**:
- Boundary condition bugs not discovered
- Easy to break functionality during refactoring
- Behavior does not meet expectations when users configure incorrectly
**Impact Scope**:
- Direct impact: Code quality
- Indirect impact: Online failure risk
- Impact surface: Single Connector
**Severity**: MAJOR
**Improvement Suggestion**:
Create test class `StarRocksBeReadClientTest.java`:
```java
public class StarRocksBeReadClientTest {
@Test
public void testBeHostPortMapping_Normal() {
// Test normal mapping
}
@Test
public void testBeHostPortMapping_NotInMapping() {
// Test BE address not in mapping table
}
@Test(expected = StarRocksConnectorException.class)
public void testBeHostPortMapping_InvalidHostPortFormat() {
// Test host_port format error
}
@Test(expected = StarRocksConnectorException.class)
public void testBeHostPortMapping_InvalidIpPortFormat() {
// Test ip_port format error
}
@Test(expected = StarRocksConnectorException.class)
public void testBeHostPortMapping_PortOutOfRange() {
// Test port out of range
}
@Test(expected = StarRocksConnectorException.class)
public void testBeHostPortMapping_DuplicateHost() {
// Test duplicate host mapping
}
@Test
public void testBeHostPortMapping_EmptyMapping() {
// Test empty mapping list
}
}
```
**Rationale**:
1. Apache top-level projects require test coverage
2. New functionality must have test protection
3. Tests serve as documentation, helping to understand expected behavior
---
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]