[ 
https://issues.apache.org/jira/browse/FLINK-39639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18088105#comment-18088105
 ] 

Spoorthi Basu commented on FLINK-39639:
---------------------------------------

Hi [~pnowojski] , I reproduced this on current master with the test in the 
description. The root cause is that {{handleAddSplitsEvent}} has no 
{{DATA_FINISHED}} guard: a late {{AddSplitEvent}} reaches 
{{sourceReader.addSplits()}} but is never polled, since {{emitNext}} no longer 
drives the reader. The coordinator gets no signal and records the assignment as 
successful, which is the silent loss.

My fix rejects the assignment in {{DATA_FINISHED}} by failing the task. This 
doesn't lose data: the split is recorded in the assignment tracker before the 
event is sent ({{{}SourceCoordinatorContext.assignSplits{}}}), so failover 
returns it via {{subtaskReset}} -> {{getAndRemoveUncheckpointedAssignment}} -> 
{{addSplitsBack}} and it's reassigned.

It does change one existing test, {{testSourceCheckpointLastUnaligned}} from 
FLINK-18906: it drives the chained {{MockSource}} to {{END_OF_INPUT}} (no 
splits, so the reader finishes immediately) and asserts that a later split's 
records are dropped. A real reader only reaches {{END_OF_INPUT}} after 
no-more-splits, so that drop looks like the mock surfacing this same bug rather 
than intended behavior, in which case the test should expect the rejection. 
Before I change it: is the post-finish drop ever intentional, or is rejecting 
at {{DATA_FINISHED}} the right call across the board?

Happy to open a PR once that's settled.

> Split assignment accepted after source finished reading leads to silent data 
> loss
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-39639
>                 URL: https://issues.apache.org/jira/browse/FLINK-39639
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 2.0.1
>            Reporter: Piotr Nowojski
>            Priority: Major
>
> h2. Summary
> When an {{OperatorCoordinator}} sends a split assignment 
> ({{{}AddSplitEvent{}}}) to a {{SourceOperator}} that has concurrently 
> finished reading (reached {{{}END_OF_INPUT{}}}), the event is silently 
> accepted and the splits are added to the {{SourceReader}} — but they will 
> never be processed. The coordinator receives no error signal, so it believes 
> the assignment succeeded. This is silent
> data loss.
> h2. Root Cause
> There is a window between the source operator finishing and the mailbox being 
> quiesced where coordinator events are still accepted and processed:
> {{SourceReader.pollNext()}} returns {{END_OF_INPUT}}
> {{StreamTask.processInput()}} calls {{endData(DRAIN)}} → 
> {{operatorChain.finishOperators()}} → {{SourceOperator.finish()}}
> The mailbox loop suspends, but the mailbox remains in {{OPEN}} state
> A coordinator event (split assignment) arrives via RPC and is queued into the 
> mailbox via {{StreamTask.dispatchOperatorEvent()}}
> {{afterInvoke()}} runs a second mailbox loop ({{{}StreamTask.java{}}} line 
> 1074) that processes the queued event
> {{SourceOperator.handleOperatorEvent()}} calls {{sourceReader.addSplits()}} 
> with no guard on the operator's {{operatingMode}} — the split is accepted 
> even though the operator is in 
> {{DATA_FINISHED}} mode
> The only protection against late events is the {{RejectedExecutionException}} 
> catch in {{{}StreamTask.dispatchOperatorEvent(){}}}, but that only fires 
> after {{mailboxProcessor.prepareClose()}} 
> (line 1085), which happens after the second mailbox loop has already drained.
> h2. Impact
> In batch (bounded) execution, if a {{SplitEnumerator}} assigns splits 
> concurrently with a reader reaching end-of-input, the assigned splits are 
> lost without any error. The enumerator believes 
> the assignment succeeded. This can occur when:
>  - Multiple subtasks finish at different times and the enumerator 
> redistributes work
>  - The enumerator performs lazy/deferred split discovery and assigns splits 
> just as the reader finishes its current work
> h2. Reproducer
> A unit test using {{StreamTaskMailboxTestHarness}} demonstrates the issue 
> without any timing dependencies:
> {code:java}
> /**
>  * Verifies that a coordinator event (split assignment) dispatched after the 
> source has finished
>  * reading (END_OF_INPUT) is silently accepted and the splits are added to 
> the reader, even
>  * though they will never be processed — demonstrating a race condition 
> between coordinator event
>  * delivery and operator lifecycle.
>  */
> @Test
> void testSplitAssignmentAfterSourceFinished() throws Exception {
>     SplitAssignmentTrackingSource testSource =
>             new SplitAssignmentTrackingSource(Boundedness.BOUNDED);
>     SourceOperatorFactory<Integer> sourceOperatorFactory =
>             new SourceOperatorFactory<>(testSource, 
> WatermarkStrategy.noWatermarks());
>     try (StreamTaskMailboxTestHarness<Integer> testHarness =
>             new StreamTaskMailboxTestHarnessBuilder<>(
>                             SourceOperatorStreamTask::new, 
> BasicTypeInfo.INT_TYPE_INFO)
>                     .setCollectNetworkEvents()
>                     
> .setupOutputForSingletonOperatorChain(sourceOperatorFactory, OPERATOR_ID)
>                     .build()) {
>         SplitAssignmentTrackingReader reader =
>                 (SplitAssignmentTrackingReader)
>                         ((SourceOperator) 
> testHarness.getStreamTask().mainOperator)
>                                 .getSourceReader();
>         // Assign an initial split with some records so the source has work 
> to do.
>         MockSourceSplit initialSplit = new MockSourceSplit(0, 0, 2);
>         initialSplit.addRecord(100);
>         initialSplit.addRecord(200);
>         AddSplitEvent<MockSourceSplit> initialAssignment =
>                 new AddSplitEvent<>(
>                         Collections.singletonList(initialSplit),
>                         new MockSourceSplitSerializer());
>         testHarness
>                 .getStreamTask()
>                 .dispatchOperatorEvent(OPERATOR_ID, new 
> SerializedValue<>(initialAssignment));
>         // Process all records. The source will reach END_OF_INPUT, 
> triggering endData() which
>         // calls finish() on the operator. The mailbox loop then suspends.
>         reader.markAvailable();
>         testHarness.processAll();
>         // At this point:
>         // - The source reader has returned END_OF_INPUT
>         // - SourceOperator.finish() has been called
>         // - The mailbox is suspended but still OPEN (not quiesced)
>         assertThat(reader.hasFinishedReading).isTrue();
>         assertThat(reader.splitsAddedAfterFinish).isEmpty();
>         // Now simulate a coordinator sending a late split assignment. This 
> can happen in
>         // practice when the enumerator assigns splits concurrently with the 
> reader finishing.
>         MockSourceSplit lateSplit = new MockSourceSplit(1, 0);
>         lateSplit.addRecord(999);
>         AddSplitEvent<MockSourceSplit> lateAssignment =
>                 new AddSplitEvent<>(
>                         Collections.singletonList(lateSplit),
>                         new MockSourceSplitSerializer());
>         testHarness
>                 .getStreamTask()
>                 .dispatchOperatorEvent(OPERATOR_ID, new 
> SerializedValue<>(lateAssignment));
>         // Drive the task through afterInvoke(), which runs a second mailbox 
> loop that will
>         // process the queued split assignment event.
>         testHarness.finishProcessing();
>         // The late split was accepted by the reader even though the source 
> had already
>         // finished reading. This split will never be processed — it 
> represents lost data.
>         assertThat(reader.splitsAddedAfterFinish)
>                 .as(
>                         "Split assignment was accepted after the source 
> finished reading, "
>                                 + "leading to potential data loss")
>                 .hasSize(1);
>         
> assertThat(reader.splitsAddedAfterFinish.get(0).splitId()).isEqualTo("1");
>     }
> }
> private static class SplitAssignmentTrackingSource extends MockSource {
>     private static final long serialVersionUID = 1L;
>     SplitAssignmentTrackingSource(Boundedness boundedness) {
>         super(boundedness, 1);
>     }
>     @Override
>     public SourceReader<Integer, MockSourceSplit> createReader(
>             SourceReaderContext readerContext) {
>         return new SplitAssignmentTrackingReader();
>     }
> }
> private static class SplitAssignmentTrackingReader extends MockSourceReader {
>     volatile boolean hasFinishedReading = false;
>     volatile boolean hasClosed = false;
>     final List<MockSourceSplit> splitsAddedAfterFinish = new ArrayList<>();
>     final List<MockSourceSplit> splitsAddedAfterClose = new ArrayList<>();
>     @Override
>     public InputStatus pollNext(ReaderOutput<Integer> sourceOutput) throws 
> Exception {
>         InputStatus status = super.pollNext(sourceOutput);
>         if (status == InputStatus.END_OF_INPUT) {
>             hasFinishedReading = true;
>         }
>         return status;
>     }
>     @Override
>     public void close() throws Exception {
>         hasClosed = true;
>         super.close();
>     }
>     @Override
>     public void addSplits(List<MockSourceSplit> splits) {
>         if (hasFinishedReading) {
>             splitsAddedAfterFinish.addAll(splits);
>         }
>         if (hasClosed) {
>             splitsAddedAfterClose.addAll(splits);
>         }
>         super.addSplits(splits);
>     }
> }{code}
> Full test added in {{{}SourceOperatorStreamTaskTest{}}}.
> h2. Possible Fixes
>  - {{SourceOperator.handleOperatorEvent()}} could check {{operatingMode}} and 
> reject events when in {{DATA_FINISHED}} (or any post-reading state). The 
> coordinator would then receive an error and
> could handle it (e.g., reassign the split to another subtask).
>  - {{StreamTask.dispatchOperatorEvent()}} could check whether operators have 
> been finished before enqueueing the event.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to