Piotr Nowojski created FLINK-39639:
--------------------------------------
Summary: Split assignment accepted after source finished reading
leads to silent data loss
Key: FLINK-39639
URL: https://issues.apache.org/jira/browse/FLINK-39639
Project: Flink
Issue Type: Bug
Components: Runtime / Task
Affects Versions: 2.0.1
Reporter: Piotr Nowojski
h2. Summary
When an {{OperatorCoordinator}} sends a split assignment
({{{}AddSplitEvent{}}}) to a {{SourceOperator}} that has concurrently finished
reading (reached {{{}END_OF_INPUT{}}}), the event is silently
accepted and the splits are added to the {{SourceReader}} — but they will never
be processed. The coordinator receives no error signal, so it believes the
assignment succeeded. This is silent
data loss.
h2. Root Cause
There is a window between the source operator finishing and the mailbox being
quiesced where coordinator events are still accepted and processed:
{{SourceReader.pollNext()}} returns {{END_OF_INPUT}}
{{StreamTask.processInput()}} calls {{endData(DRAIN)}} →
{{operatorChain.finishOperators()}} → {{SourceOperator.finish()}}
The mailbox loop suspends, but the mailbox remains in {{OPEN}} state
A coordinator event (split assignment) arrives via RPC and is queued into the
mailbox via {{StreamTask.dispatchOperatorEvent()}}
{{afterInvoke()}} runs a second mailbox loop ({{{}StreamTask.java{}}} line
1074) that processes the queued event
{{SourceOperator.handleOperatorEvent()}} calls {{sourceReader.addSplits()}}
with no guard on the operator's {{operatingMode}} — the split is accepted even
though the operator is in
{{DATA_FINISHED}} mode
The only protection against late events is the {{RejectedExecutionException}}
catch in {{{}StreamTask.dispatchOperatorEvent(){}}}, but that only fires after
{{mailboxProcessor.prepareClose()}}
(line 1085), which happens after the second mailbox loop has already drained.
h2. Impact
In batch (bounded) execution, if a {{SplitEnumerator}} assigns splits
concurrently with a reader reaching end-of-input, the assigned splits are lost
without any error. The enumerator believes
the assignment succeeded. This can occur when:
- Multiple subtasks finish at different times and the enumerator redistributes
work
- The enumerator performs lazy/deferred split discovery and assigns splits
just as the reader finishes its current work
h2. Reproducer
A unit test using {{StreamTaskMailboxTestHarness}} demonstrates the issue
without any timing dependencies:
{code:java}
/**
* Verifies that a coordinator event (split assignment) dispatched after the
source has finished
* reading (END_OF_INPUT) is silently accepted and the splits are added to the
reader, even
* though they will never be processed — demonstrating a race condition between
coordinator event
* delivery and operator lifecycle.
*/
@Test
void testSplitAssignmentAfterSourceFinished() throws Exception {
SplitAssignmentTrackingSource testSource =
new SplitAssignmentTrackingSource(Boundedness.BOUNDED);
SourceOperatorFactory<Integer> sourceOperatorFactory =
new SourceOperatorFactory<>(testSource,
WatermarkStrategy.noWatermarks());
try (StreamTaskMailboxTestHarness<Integer> testHarness =
new StreamTaskMailboxTestHarnessBuilder<>(
SourceOperatorStreamTask::new,
BasicTypeInfo.INT_TYPE_INFO)
.setCollectNetworkEvents()
.setupOutputForSingletonOperatorChain(sourceOperatorFactory, OPERATOR_ID)
.build()) {
SplitAssignmentTrackingReader reader =
(SplitAssignmentTrackingReader)
((SourceOperator)
testHarness.getStreamTask().mainOperator)
.getSourceReader();
// Assign an initial split with some records so the source has work to
do.
MockSourceSplit initialSplit = new MockSourceSplit(0, 0, 2);
initialSplit.addRecord(100);
initialSplit.addRecord(200);
AddSplitEvent<MockSourceSplit> initialAssignment =
new AddSplitEvent<>(
Collections.singletonList(initialSplit),
new MockSourceSplitSerializer());
testHarness
.getStreamTask()
.dispatchOperatorEvent(OPERATOR_ID, new
SerializedValue<>(initialAssignment));
// Process all records. The source will reach END_OF_INPUT, triggering
endData() which
// calls finish() on the operator. The mailbox loop then suspends.
reader.markAvailable();
testHarness.processAll();
// At this point:
// - The source reader has returned END_OF_INPUT
// - SourceOperator.finish() has been called
// - The mailbox is suspended but still OPEN (not quiesced)
assertThat(reader.hasFinishedReading).isTrue();
assertThat(reader.splitsAddedAfterFinish).isEmpty();
// Now simulate a coordinator sending a late split assignment. This can
happen in
// practice when the enumerator assigns splits concurrently with the
reader finishing.
MockSourceSplit lateSplit = new MockSourceSplit(1, 0);
lateSplit.addRecord(999);
AddSplitEvent<MockSourceSplit> lateAssignment =
new AddSplitEvent<>(
Collections.singletonList(lateSplit),
new MockSourceSplitSerializer());
testHarness
.getStreamTask()
.dispatchOperatorEvent(OPERATOR_ID, new
SerializedValue<>(lateAssignment));
// Drive the task through afterInvoke(), which runs a second mailbox
loop that will
// process the queued split assignment event.
testHarness.finishProcessing();
// The late split was accepted by the reader even though the source had
already
// finished reading. This split will never be processed — it represents
lost data.
assertThat(reader.splitsAddedAfterFinish)
.as(
"Split assignment was accepted after the source
finished reading, "
+ "leading to potential data loss")
.hasSize(1);
assertThat(reader.splitsAddedAfterFinish.get(0).splitId()).isEqualTo("1");
}
}
private static class SplitAssignmentTrackingSource extends MockSource {
private static final long serialVersionUID = 1L;
SplitAssignmentTrackingSource(Boundedness boundedness) {
super(boundedness, 1);
}
@Override
public SourceReader<Integer, MockSourceSplit> createReader(
SourceReaderContext readerContext) {
return new SplitAssignmentTrackingReader();
}
}
private static class SplitAssignmentTrackingReader extends MockSourceReader {
volatile boolean hasFinishedReading = false;
volatile boolean hasClosed = false;
final List<MockSourceSplit> splitsAddedAfterFinish = new ArrayList<>();
final List<MockSourceSplit> splitsAddedAfterClose = new ArrayList<>();
@Override
public InputStatus pollNext(ReaderOutput<Integer> sourceOutput) throws
Exception {
InputStatus status = super.pollNext(sourceOutput);
if (status == InputStatus.END_OF_INPUT) {
hasFinishedReading = true;
}
return status;
}
@Override
public void close() throws Exception {
hasClosed = true;
super.close();
}
@Override
public void addSplits(List<MockSourceSplit> splits) {
if (hasFinishedReading) {
splitsAddedAfterFinish.addAll(splits);
}
if (hasClosed) {
splitsAddedAfterClose.addAll(splits);
}
super.addSplits(splits);
}
}{code}
Full test added in {{{}SourceOperatorStreamTaskTest{}}}.
h2. Possible Fixes
- {{SourceOperator.handleOperatorEvent()}} could check {{operatingMode}} and
reject events when in {{DATA_FINISHED}} (or any post-reading state). The
coordinator would then receive an error and
could handle it (e.g., reassign the split to another subtask).
- {{StreamTask.dispatchOperatorEvent()}} could check whether operators have
been finished before enqueueing the event.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)