Piotr Nowojski created FLINK-39639:
--------------------------------------

             Summary: Split assignment accepted after source finished reading 
leads to silent data loss
                 Key: FLINK-39639
                 URL: https://issues.apache.org/jira/browse/FLINK-39639
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Task
    Affects Versions: 2.0.1
            Reporter: Piotr Nowojski


h2. Summary

When an {{OperatorCoordinator}} sends a split assignment 
({{{}AddSplitEvent{}}}) to a {{SourceOperator}} that has concurrently finished 
reading (reached {{{}END_OF_INPUT{}}}), the event is silently 
accepted and the splits are added to the {{SourceReader}} — but they will never 
be processed. The coordinator receives no error signal, so it believes the 
assignment succeeded. This is silent
data loss.
h2. Root Cause

There is a window between the source operator finishing and the mailbox being 
quiesced where coordinator events are still accepted and processed:

{{SourceReader.pollNext()}} returns {{END_OF_INPUT}}

{{StreamTask.processInput()}} calls {{endData(DRAIN)}} → 
{{operatorChain.finishOperators()}} → {{SourceOperator.finish()}}

The mailbox loop suspends, but the mailbox remains in {{OPEN}} state

A coordinator event (split assignment) arrives via RPC and is queued into the 
mailbox via {{StreamTask.dispatchOperatorEvent()}}

{{afterInvoke()}} runs a second mailbox loop ({{{}StreamTask.java{}}} line 
1074) that processes the queued event

{{SourceOperator.handleOperatorEvent()}} calls {{sourceReader.addSplits()}} 
with no guard on the operator's {{operatingMode}} — the split is accepted even 
though the operator is in 
{{DATA_FINISHED}} mode

The only protection against late events is the {{RejectedExecutionException}} 
catch in {{{}StreamTask.dispatchOperatorEvent(){}}}, but that only fires after 
{{mailboxProcessor.prepareClose()}} 
(line 1085), which happens after the second mailbox loop has already drained.
h2. Impact

In batch (bounded) execution, if a {{SplitEnumerator}} assigns splits 
concurrently with a reader reaching end-of-input, the assigned splits are lost 
without any error. The enumerator believes 
the assignment succeeded. This can occur when:
 - Multiple subtasks finish at different times and the enumerator redistributes 
work
 - The enumerator performs lazy/deferred split discovery and assigns splits 
just as the reader finishes its current work

h2. Reproducer

A unit test using {{StreamTaskMailboxTestHarness}} demonstrates the issue 
without any timing dependencies:
{code:java}
/**
 * Verifies that a coordinator event (split assignment) dispatched after the 
source has finished
 * reading (END_OF_INPUT) is silently accepted and the splits are added to the 
reader, even
 * though they will never be processed — demonstrating a race condition between 
coordinator event
 * delivery and operator lifecycle.
 */
@Test
void testSplitAssignmentAfterSourceFinished() throws Exception {
    SplitAssignmentTrackingSource testSource =
            new SplitAssignmentTrackingSource(Boundedness.BOUNDED);
    SourceOperatorFactory<Integer> sourceOperatorFactory =
            new SourceOperatorFactory<>(testSource, 
WatermarkStrategy.noWatermarks());

    try (StreamTaskMailboxTestHarness<Integer> testHarness =
            new StreamTaskMailboxTestHarnessBuilder<>(
                            SourceOperatorStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
                    .setCollectNetworkEvents()
                    
.setupOutputForSingletonOperatorChain(sourceOperatorFactory, OPERATOR_ID)
                    .build()) {

        SplitAssignmentTrackingReader reader =
                (SplitAssignmentTrackingReader)
                        ((SourceOperator) 
testHarness.getStreamTask().mainOperator)
                                .getSourceReader();

        // Assign an initial split with some records so the source has work to 
do.
        MockSourceSplit initialSplit = new MockSourceSplit(0, 0, 2);
        initialSplit.addRecord(100);
        initialSplit.addRecord(200);
        AddSplitEvent<MockSourceSplit> initialAssignment =
                new AddSplitEvent<>(
                        Collections.singletonList(initialSplit),
                        new MockSourceSplitSerializer());
        testHarness
                .getStreamTask()
                .dispatchOperatorEvent(OPERATOR_ID, new 
SerializedValue<>(initialAssignment));

        // Process all records. The source will reach END_OF_INPUT, triggering 
endData() which
        // calls finish() on the operator. The mailbox loop then suspends.
        reader.markAvailable();
        testHarness.processAll();

        // At this point:
        // - The source reader has returned END_OF_INPUT
        // - SourceOperator.finish() has been called
        // - The mailbox is suspended but still OPEN (not quiesced)
        assertThat(reader.hasFinishedReading).isTrue();
        assertThat(reader.splitsAddedAfterFinish).isEmpty();

        // Now simulate a coordinator sending a late split assignment. This can 
happen in
        // practice when the enumerator assigns splits concurrently with the 
reader finishing.
        MockSourceSplit lateSplit = new MockSourceSplit(1, 0);
        lateSplit.addRecord(999);
        AddSplitEvent<MockSourceSplit> lateAssignment =
                new AddSplitEvent<>(
                        Collections.singletonList(lateSplit),
                        new MockSourceSplitSerializer());
        testHarness
                .getStreamTask()
                .dispatchOperatorEvent(OPERATOR_ID, new 
SerializedValue<>(lateAssignment));

        // Drive the task through afterInvoke(), which runs a second mailbox 
loop that will
        // process the queued split assignment event.
        testHarness.finishProcessing();

        // The late split was accepted by the reader even though the source had 
already
        // finished reading. This split will never be processed — it represents 
lost data.
        assertThat(reader.splitsAddedAfterFinish)
                .as(
                        "Split assignment was accepted after the source 
finished reading, "
                                + "leading to potential data loss")
                .hasSize(1);
        
assertThat(reader.splitsAddedAfterFinish.get(0).splitId()).isEqualTo("1");
    }
}

private static class SplitAssignmentTrackingSource extends MockSource {
    private static final long serialVersionUID = 1L;

    SplitAssignmentTrackingSource(Boundedness boundedness) {
        super(boundedness, 1);
    }

    @Override
    public SourceReader<Integer, MockSourceSplit> createReader(
            SourceReaderContext readerContext) {
        return new SplitAssignmentTrackingReader();
    }
}

private static class SplitAssignmentTrackingReader extends MockSourceReader {
    volatile boolean hasFinishedReading = false;
    volatile boolean hasClosed = false;
    final List<MockSourceSplit> splitsAddedAfterFinish = new ArrayList<>();
    final List<MockSourceSplit> splitsAddedAfterClose = new ArrayList<>();

    @Override
    public InputStatus pollNext(ReaderOutput<Integer> sourceOutput) throws 
Exception {
        InputStatus status = super.pollNext(sourceOutput);
        if (status == InputStatus.END_OF_INPUT) {
            hasFinishedReading = true;
        }
        return status;
    }

    @Override
    public void close() throws Exception {
        hasClosed = true;
        super.close();
    }

    @Override
    public void addSplits(List<MockSourceSplit> splits) {
        if (hasFinishedReading) {
            splitsAddedAfterFinish.addAll(splits);
        }
        if (hasClosed) {
            splitsAddedAfterClose.addAll(splits);
        }
        super.addSplits(splits);
    }
}{code}
Full test added in {{{}SourceOperatorStreamTaskTest{}}}.
h2. Possible Fixes
 - {{SourceOperator.handleOperatorEvent()}} could check {{operatingMode}} and 
reject events when in {{DATA_FINISHED}} (or any post-reading state). The 
coordinator would then receive an error and
could handle it (e.g., reassign the split to another subtask).
 - {{StreamTask.dispatchOperatorEvent()}} could check whether operators have 
been finished before enqueueing the event.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to