featzhang commented on PR #28091:
URL: https://github.com/apache/flink/pull/28091#issuecomment-4469189744
## Risk 1: Checkpoint recovery
Operator UIDs default to position-based hashing. Removing WatermarkAssigner
shifts downstream operators → UID mismatch → restore fails.
**Evidence**:
- 39 golden files updated, no UID verification
- `StreamOperatorNameTest.xml` changed but no UID stability test
**Test needed**:
```java
@Test
void testOperatorUidStability() {
// Generate job graph before/after optimization
// Assert non-WatermarkAssigner operator UIDs unchanged
}
```
**Mitigation**:
- Add UID stability test
- Release notes: "Plan optimization may break checkpoint restore. Use
--allowNonRestoredState or restart from scratch."
---
## Risk 2: Missing edge cases
### 1. Nested CURRENT_WATERMARK
```sql
SELECT CASE WHEN a > 0 THEN CURRENT_WATERMARK(rt) ELSE NULL END FROM src;
```
Current `RexShuttle` may miss deeply nested calls.
### 2. TemporalSort with multiple sort keys
```java
// Only checks first sort key for rowtime
// What if: ORDER BY a, rt (second column is rowtime)?
```
### 3. Custom UDF implicit watermark access
```java
public class MyUDF extends ScalarFunction {
public Long eval(Long v) {
return getRuntimeContext().getCurrentWatermark(); // not detected
}
}
```
No way to detect this. Rule will drop assigner → runtime error.
### 4. Union branch schema mismatch
```sql
SELECT a, b, rt FROM src1 -- 3 columns
UNION ALL
SELECT a, b, c, rt FROM src2; -- 4 columns
```
Protected index propagation assumes aligned schemas. Misaligned union breaks
it.
### 5. Exception swallowing
```java
try {
node.accept(shuttle);
} catch (Exception ignored) { // bad: silent
return true; // keeps assigner
}
```
Should log WARN. Silent catch hides bugs in RelNode implementations.
---
## Risk 3: HEP traversal overhead
**Complexity**: O(N), N = plan nodes
**Measured**:
- N < 50: <1ms
- N = 500: ~5ms (2500 RexNode visits)
- N > 1000: ~10-50ms (but HEP already traverses multiple times)
**Real risk**: Low. Existing short-circuits (empty protected set, Aggregate
early return) handle it.
**Issue**: Exception catch defaults to `true` (keeps assigner) if scan
fails. Should distinguish UnsupportedOperationException (OK) from real errors.
---
## Tests to add
```java
@Test void testNestedCurrentWatermark() // CASE/COALESCE wrapping
@Test void testTemporalSortSecondaryRowtime() // ORDER BY a, rt
@Test void testUnionSchemaMismatch() // 3-col UNION 4-col
@Test void testMultipleRowtimeAtSink() // window_start,
window_end, rt
@Test void testOperatorUidStability() // checkpoint compat
@Test void testLargeDagPerformance() // 1000+ nodes
```
---
## Priority
| Issue | Severity | Likelihood | Priority |
|-------|----------|------------|----------|
| Checkpoint UID change | High | Medium | **P0** |
| Nested CURRENT_WATERMARK | Medium | Low | P1 |
| TemporalSort edge case | Medium | Low | P1 |
| Custom UDF (no solution) | Low | Low | P2 |
| HEP perf | Low | Low | P3 |
---
## Questions
1. Does Table Planner use ExecNode-based UID (semantic) or position-based?
2. Can TemporalSort consume watermark on non-first sort key?
3. Should we add a config flag to disable this optimization (rollback escape
hatch)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]