DanielCarter-stack commented on PR #10208:
URL: https://github.com/apache/seatunnel/pull/10208#issuecomment-3857334624
<!-- code-pr-reviewer -->
<!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10208", "part": 1,
"total": 1} -->
### Issue 1: Missing complete test for checkpoint recovery scenarios
**Location**:
`seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/test/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumeratorTest.java`
```java
@Test
void testResignalNoMoreSplitsAfterReaderReregister() {
SourceSplitEnumerator<DummySplit, Serializable> sourceSplitEnumerator =
Mockito.mock(SourceSplitEnumerator.class);
SplitEnumeratorContext<SplitWrapper<DummySplit>> enumeratorContext =
Mockito.mock(SplitEnumeratorContext.class);
Mockito.when(enumeratorContext.currentParallelism()).thenReturn(2);
Set<Integer> noMoreSplitsSignaledReaders = ConcurrentHashMap.newKeySet();
noMoreSplitsSignaledReaders.add(0); // Manually added
FlinkSourceEnumerator<DummySplit, Serializable> enumerator =
new FlinkSourceEnumerator<>(
sourceSplitEnumerator, enumeratorContext,
noMoreSplitsSignaledReaders);
enumerator.addReader(0);
Mockito.verify(enumeratorContext).signalNoMoreSplits(0);
}
```
**Related Context**:
- Test class: `FlinkSourceEnumeratorTest.java:58`
- Modified class: `FlinkSourceEnumerator.java:98`
- Tested SeaTunnel Engine: `SourceSplitEnumeratorTaskTest.java:109`
**Problem Description**:
The current test case is too simple, manually adding subtask 0 to the
`noMoreSplitsSignaledReaders` collection, which does not realistically simulate
the failure recovery scenario.
In a real failure recovery scenario:
1. `FlinkSource.restoreEnumerator()` is called
2. A **new** `FlinkSourceEnumerator` instance is created, where
`noMoreSplitsSignaledReaders` is an **empty collection**
3. Reader re-registers
4. `run()` is re-executed (because `isRun = false`)
5. `run()` calls `context.signalNoMoreSplits()` in it, adding to the
collection
6. Re-sending is triggered only if the reader registers again
The current test does not cover this complete flow.
**Potential Risks**:
- **Risk 1**: The test cannot verify real failure recovery scenarios and may
miss boundary condition issues
- **Risk 2**: If someone modifies `restoreEnumerator` logic in the future
(e.g., changing to a non-empty collection), the test cannot capture regression
issues
- **Risk 3**: The test gives developers a false impression that failure
recovery scenarios have been fully tested
**Impact Scope**:
- **Direct Impact**: `FlinkSourceEnumeratorTest` test coverage is incomplete
- **Indirect Impact**: May not detect failure recovery logic bugs in time
- **Affected Area**: Flink Translation layer
**Severity**: Medium
**Improvement Suggestions**:
```java
@Test
void testResignalNoMoreSplitsAfterReaderReregister() {
SourceSplitEnumerator<DummySplit, Serializable> sourceSplitEnumerator =
Mockito.mock(SourceSplitEnumerator.class);
SplitEnumeratorContext<SplitWrapper<DummySplit>> enumeratorContext =
Mockito.mock(SplitEnumeratorContext.class);
Mockito.when(enumeratorContext.currentParallelism()).thenReturn(2);
Set<Integer> noMoreSplitsSignaledReaders = ConcurrentHashMap.newKeySet();
FlinkSourceEnumerator<DummySplit, Serializable> enumerator =
new FlinkSourceEnumerator<>(
sourceSplitEnumerator, enumeratorContext,
noMoreSplitsSignaledReaders);
// Simulate reader registration and run() execution
enumerator.addReader(0);
enumerator.addReader(1);
// Reset mock, clear previous calls
Mockito.reset(enumeratorContext);
// Simulate reader re-registration (fault recovery scenario)
enumerator.addReader(0);
// Verify signal was re-sent
Mockito.verify(enumeratorContext).signalNoMoreSplits(0);
}
@Test
void testNoResignalWhenRunNotCompleted() {
// Test re-registration before run() completes, should not re-send signal
SourceSplitEnumerator<DummySplit, Serializable> sourceSplitEnumerator =
Mockito.mock(SourceSplitEnumerator.class);
SplitEnumeratorContext<SplitWrapper<DummySplit>> enumeratorContext =
Mockito.mock(SplitEnumeratorContext.class);
Mockito.when(enumeratorContext.currentParallelism()).thenReturn(2);
Set<Integer> noMoreSplitsSignaledReaders = ConcurrentHashMap.newKeySet();
FlinkSourceEnumerator<DummySplit, Serializable> enumerator =
new FlinkSourceEnumerator<>(
sourceSplitEnumerator, enumeratorContext,
noMoreSplitsSignaledReaders);
// Register only one reader, run() will not execute
enumerator.addReader(0);
// Reset mock
Mockito.reset(enumeratorContext);
// Reader re-registration
enumerator.addReader(0);
// Verify no signal was re-sent
Mockito.verify(enumeratorContext,
Mockito.never()).signalNoMoreSplits(Mockito.anyInt());
}
```
**Reason**: These test cases more realistically simulate failure recovery
scenarios and cover key boundary conditions (different behavior before and
after run() completes).
---
### Issue 3: SeaTunnel Engine layer `hasNoMoreSplitsSignaled` method access
modifier should be private
**Location**:
`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java:116-118`
```java
public boolean hasNoMoreSplitsSignaled(int subtaskIndex) {
return noMoreSplitsSignaledReaders.contains(subtaskIndex);
}
```
**Related Context**:
- Caller: `SourceSplitEnumeratorTask.java:230` (only used internally)
- Same pattern: Flink layer does not expose this method (directly accesses
the collection in `FlinkSourceEnumerator`)
**Problem Description**:
The `hasNoMoreSplitsSignaled` method is declared as `public`, but it is only
called in the `SourceSplitEnumeratorTask` class with no external callers.
Setting it to `public` may lead to:
1. External code incorrectly relying on this internal implementation detail
2. Increased API surface area, increasing maintenance costs
3. If this logic needs to be modified in the future, it will affect all
callers
**Potential Risks**:
- **Risk 1**: External code may depend on this method, making future
refactoring difficult
- **Risk 2**: Violates the principle of least privilege, increasing
unnecessary coupling
**Impact Scope**:
- **Direct Impact**: API design of `SeaTunnelSplitEnumeratorContext`
- **Indirect Impact**: Potential external callers (if any)
- **Affected Area**: SeaTunnel Engine
**Severity**: Low (code works properly, but design is not rigorous enough)
**Improvement Suggestions**:
```java
// Change access modifier to package-private (default)
boolean hasNoMoreSplitsSignaled(int subtaskIndex) {
return noMoreSplitsSignaledReaders.contains(subtaskIndex);
}
// Or change to private and provide a package-visible accessor
private boolean hasNoMoreSplitsSignaled(int subtaskIndex) {
return noMoreSplitsSignaledReaders.contains(subtaskIndex);
}
```
Since `SourceSplitEnumeratorTask` and `SeaTunnelSplitEnumeratorContext` are
in the same package, package-private is sufficient.
**Reason**: Reducing unnecessary API exposure reduces coupling and improves
code maintainability and evolvability.
---
### Issue 4: Missing CHANGELOG update
**Location**: Project root directory `CHANGELOG.md` (or other changelog
files)
**Problem Description**:
This is a bug fix and should update the CHANGELOG to explain what problem
was fixed and the impact scope.
**Potential Risks**:
- **Risk 1**: Users are unaware of this fix and may still be affected by
this bug
- **Risk 2**: Lack of reference information when upgrading, unable to assess
impact
**Impact Scope**:
- **Direct Impact**: Transparency of version releases
- **Indirect Impact**: Users' upgrade decisions
- **Affected Area**: All users
**Severity**: Low (does not affect code functionality, but affects project
management)
**Improvement Suggestions**:
When merging, add an entry like this to the CHANGELOG:
```markdown
## [Version X.Y.Z] - 2024-XX-XX
### Bug Fixes
* **[Translation]** Fixed an issue where bounded source jobs could hang
after TaskManager failover
because `NoMoreSplitsEvent` was not re-sent to recovered readers. The
framework now tracks
which subtasks have received the terminal signal and re-sends it when
readers re-register.
(Affected connectors: JDBC, Iceberg, Hive, File and all other bounded
sources)
```
**Reason**: Keeping the CHANGELOG updated helps users understand version
changes, assess upgrade impacts, and quickly locate issues.
---
### Issue 5: Missing integration tests or E2E tests for critical paths
**Location**: Test coverage
**Related Context**:
- Unit tests: `FlinkSourceEnumeratorTest.java`,
`SourceSplitEnumeratorTaskTest.java`
- E2E tests: `seatunnel-e2e/` directory
**Problem Description**:
The PR only contains unit tests and lacks integration tests or E2E tests to
verify real failure recovery scenarios.
While unit tests are useful, they:
1. Use Mockito mocking and cannot verify real Flink behavior
2. Test scenarios are too simplified to cover complex failure recovery
processes
3. Cannot verify interactions with other components (such as checkpoint,
state backend)
**Potential Risks**:
- **Risk 1**: Unit tests pass, but there may be issues in actual scenarios
- **Risk 2**: Cannot capture Flink framework layer compatibility issues
- **Risk 3**: Regression risk (future code modifications may break this
functionality)
**Impact Scope**:
- **Direct Impact**: Completeness of test coverage
- **Indirect Impact**: Code quality and stability
- **Affected Area**: All users using the Flink engine
**Severity**: Medium (unit tests exist but are insufficient)
**Improvement Suggestions**:
It is recommended to add an integration test using a real Flink MiniCluster
to test failure recovery scenarios:
```java
// Pseudo-code example
@Test
void testFailoverWithBoundedSource() throws Exception {
// 1. Start a Flink MiniCluster
// 2. Submit a job with a bounded data source (e.g., JDBC)
// 3. Wait for the job to run for a while
// 4. Simulate TaskManager failure (kill process or trigger exception)
// 5. Wait for Flink to recover the job
// 6. Verify the job can complete normally (instead of getting stuck)
// Or use Flink's fault injection mechanism
// 1. Create a Source that automatically fails
// 2. Configure Flink for fault recovery
// 3. Verify the job eventually completes
}
```
However, considering the complexity and execution time of integration tests,
you can also consider:
1. Add failure recovery scenarios to existing E2E tests (if any)
2. Perform manual testing before release
3. Document this fix and encourage users to verify in their test environment
**Reason**: Integration tests can provide higher confidence to ensure the
fix works effectively in real scenarios.
---
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]