Thanks Darin for the details.

Below is the problematic StreamTask, which has not processed the priority
event (checkpoint barrier) from the
mailbox, and has been there throughout the checkpoint timeout interval. And
it is reading and collecting all the records from the split.
Not sure why the executor was idle during that period, even though I could
see the upstream source task checkpoint acknowledgement
message was received by the JobManager. The Flink Version is 1.16.

at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.readAndCollectRecord(ContinuousFileReaderOperator.java:400)

at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.processRecord(ContinuousFileReaderOperator.java:364)

at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.lambda$new$0(ContinuousFileReaderOperator.java:240)

at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$$Lambda$1199/0x00007f3d2b07a858.run(Unknown
Source)

at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)

at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)

at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:86)

at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.finish(ContinuousFileReaderOperator.java:466)

at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper$$Lambda$1827/0x00007f3d2482c058.run(Unknown
Source)

at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)

at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finishOperator(StreamOperatorWrapper.java:239)

at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferFinishOperatorToMailbox$3(StreamOperatorWrapper.java:213)

at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper$$Lambda$1825/0x00007f3d2482a900.run(Unknown
Source)

at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)

at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)

at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:97)

at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndFinishOperator(StreamOperatorWrapper.java:186)

at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:152)

at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:115)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:600)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:559)

at
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$765/0x00007f3d326ee058.runDefaultAction(Unknown
Source)

at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)

at
org.apache.flink.runtime.taskmanager.Task$$Lambda$1637/0x00007f3d2a9b1470.run(Unknown
Source)

at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)

at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)

at java.lang.Thread.run(java.base@11.0.21/Thread.java:829)

On Wed, Dec 6, 2023 at 8:11 AM Darin Amos <darin.a...@instacart.com.invalid>
wrote:

> I apologize, I was a little off with my description, it's been a while
> since I have looked at this code but I have refreshed myself.
>
> The line I referred to earlier was correct though. This operator only
> processes records in a file split while the operator is idle, meaning there
> are no more incoming file splits. After every read it checks if there are
> any incoming file splits before continuing to read from the split. If there
> is indeed a new inbound file split, the loop will exit and it will re-queue
> itself to continue processing records later. You can see that here
> <
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L361
> >
> .
>
> When the loop is interrupted by a checkpoint barrier, snapshotState(...) is
> called and the reader grabs the state from the provided Format. In the
> normal case the state is simply the split offset (current progress
> indicator), in more complex scenarios you can create your own format class
> and provide whatever serializable state you desire. In my case we store
> additional metadata about the progress of the reader.
>
> On state restore, the operator calls loadSplit and it will call *reopen* on
> the format rather than open, passing the checkpoint state so you can
> continue from where you left off . You can see that here
> <
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L407-L411
> >
> .
>
> Cheers
>
> Darin
>
>
>
>
> On Tue, Dec 5, 2023 at 8:08 PM Darin Amos <darin.a...@instacart.com>
> wrote:
>
> > They way I understand this loop is that the ContinuiousFileReaderOperator
> > only processes records in the background while the operator is idle, i.e.
> > while it's not receiving any records.
> >
> > At the very bottom of that loop here
> > <
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L358C5-L358C6
> >
> > it exits if the executor is no longer idle, i.e. there are incoming
> records.
> >
> > If you look here
> > <
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L407
> >,
> > the operator supports checkpointable input splits, meaning it'll save
> it's
> > place within a file split. This would only be possible if the reader can
> be
> > interrupted in the middle of a split. I have written custom splits that
> do
> > this exactly.
> >
> > Darin
> >
> > On Tue, Dec 5, 2023 at 11:31 AM Prabhu Joseph <
> prabhujose.ga...@gmail.com>
> > wrote:
> >
> >> This is the loop - code reference
> >> <
> >>
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L346
> >> >,
> >> where it fetches all records from the split, and then only the
> >> MailboxProcessor gets control to check other mail. This loop was
> >> introduced
> >> here
> >> <
> >>
> https://github.com/apache/flink/commit/1a69cb9fce629b0c458f5ea514d9ac8de008687f
> >> >
> >> .
> >>
> >>
> >>
> >>
> >> On Tue, Dec 5, 2023 at 9:00 PM Darin Amos <darin.a...@instacart.com
> >> .invalid>
> >> wrote:
> >>
> >> > I thought for sure this was already the existing behavior with this
> >> > operator. Does it not check the mailbox executor after every record
> >> read?
> >> >
> >> > On Tue, Dec 5, 2023 at 6:48 AM Prabhu Joseph (Jira) <j...@apache.org>
> >> > wrote:
> >> >
> >> > > Prabhu Joseph created FLINK-33753:
> >> > > -------------------------------------
> >> > >
> >> > >              Summary: ContinuousFileReaderOperator consume records
> as
> >> > mini
> >> > > batch
> >> > >                  Key: FLINK-33753
> >> > >                  URL:
> >> https://issues.apache.org/jira/browse/FLINK-33753
> >> > >              Project: Flink
> >> > >           Issue Type: Improvement
> >> > >     Affects Versions: 1.18.0
> >> > >             Reporter: Prabhu Joseph
> >> > >
> >> > >
> >> > > The ContinuousFileReaderOperator reads and collects the records
> from a
> >> > > split in a loop. If the split size is large, then the loop will take
> >> more
> >> > > time, and then the mailbox executor won't have a chance to process
> the
> >> > > checkpoint barrier. This leads to checkpoint timing out.
> >> > > ContinuousFileReaderOperator could be improved to consume the
> records
> >> in
> >> > a
> >> > > mini batch, similar to Hudi's StreamReadOperator (
> >> > > https://issues.apache.org/jira/browse/HUDI-2485).
> >> > >
> >> > >
> >> > >
> >> > > --
> >> > > This message was sent by Atlassian Jira
> >> > > (v8.20.10#820010)
> >> > >
> >> >
> >>
> >
>

Reply via email to