Just to confirm, I ran a test this morning using a large file with only a
single file split that took over 10 minutes to process. My
checkpoints still executed as expected every minute with minimal latency
(sub second). Though I ran this with 1.15.4.

What evidence do you have that this operator is causing the timeout?
Typically this operator should handle the checkpoint barrier quickly and
proceed with reading (ignoring alignment). In my early days I suspected a
similar issue but it was other checkpoint alignment issues.

Darin

On Wed, Dec 6, 2023 at 4:13 AM Prabhu Joseph <prabhujose.ga...@gmail.com>
wrote:

> Thanks Darin for the details.
>
> Below is the problematic StreamTask, which has not processed the priority
> event (checkpoint barrier) from the
> mailbox, and has been there throughout the checkpoint timeout interval. And
> it is reading and collecting all the records from the split.
> Not sure why the executor was idle during that period, even though I could
> see the upstream source task checkpoint acknowledgement
> message was received by the JobManager. The Flink Version is 1.16.
>
> at
>
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.readAndCollectRecord(ContinuousFileReaderOperator.java:400)
>
> at
>
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.processRecord(ContinuousFileReaderOperator.java:364)
>
> at
>
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.lambda$new$0(ContinuousFileReaderOperator.java:240)
>
> at
>
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$$Lambda$1199/0x00007f3d2b07a858.run(Unknown
> Source)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:86)
>
> at
>
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.finish(ContinuousFileReaderOperator.java:466)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper$$Lambda$1827/0x00007f3d2482c058.run(Unknown
> Source)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finishOperator(StreamOperatorWrapper.java:239)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferFinishOperatorToMailbox$3(StreamOperatorWrapper.java:213)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper$$Lambda$1825/0x00007f3d2482a900.run(Unknown
> Source)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:97)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndFinishOperator(StreamOperatorWrapper.java:186)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:152)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:115)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:600)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:559)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$765/0x00007f3d326ee058.runDefaultAction(Unknown
> Source)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
>
> at
>
> org.apache.flink.runtime.taskmanager.Task$$Lambda$1637/0x00007f3d2a9b1470.run(Unknown
> Source)
>
> at
>
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>
> at java.lang.Thread.run(java.base@11.0.21/Thread.java:829)
>
> On Wed, Dec 6, 2023 at 8:11 AM Darin Amos <darin.a...@instacart.com
> .invalid>
> wrote:
>
> > I apologize, I was a little off with my description, it's been a while
> > since I have looked at this code but I have refreshed myself.
> >
> > The line I referred to earlier was correct though. This operator only
> > processes records in a file split while the operator is idle, meaning
> there
> > are no more incoming file splits. After every read it checks if there are
> > any incoming file splits before continuing to read from the split. If
> there
> > is indeed a new inbound file split, the loop will exit and it will
> re-queue
> > itself to continue processing records later. You can see that here
> > <
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L361
> > >
> > .
> >
> > When the loop is interrupted by a checkpoint barrier, snapshotState(...)
> is
> > called and the reader grabs the state from the provided Format. In the
> > normal case the state is simply the split offset (current progress
> > indicator), in more complex scenarios you can create your own format
> class
> > and provide whatever serializable state you desire. In my case we store
> > additional metadata about the progress of the reader.
> >
> > On state restore, the operator calls loadSplit and it will call *reopen*
> on
> > the format rather than open, passing the checkpoint state so you can
> > continue from where you left off . You can see that here
> > <
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L407-L411
> > >
> > .
> >
> > Cheers
> >
> > Darin
> >
> >
> >
> >
> > On Tue, Dec 5, 2023 at 8:08 PM Darin Amos <darin.a...@instacart.com>
> > wrote:
> >
> > > They way I understand this loop is that the
> ContinuiousFileReaderOperator
> > > only processes records in the background while the operator is idle,
> i.e.
> > > while it's not receiving any records.
> > >
> > > At the very bottom of that loop here
> > > <
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L358C5-L358C6
> > >
> > > it exits if the executor is no longer idle, i.e. there are incoming
> > records.
> > >
> > > If you look here
> > > <
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L407
> > >,
> > > the operator supports checkpointable input splits, meaning it'll save
> > it's
> > > place within a file split. This would only be possible if the reader
> can
> > be
> > > interrupted in the middle of a split. I have written custom splits that
> > do
> > > this exactly.
> > >
> > > Darin
> > >
> > > On Tue, Dec 5, 2023 at 11:31 AM Prabhu Joseph <
> > prabhujose.ga...@gmail.com>
> > > wrote:
> > >
> > >> This is the loop - code reference
> > >> <
> > >>
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L346
> > >> >,
> > >> where it fetches all records from the split, and then only the
> > >> MailboxProcessor gets control to check other mail. This loop was
> > >> introduced
> > >> here
> > >> <
> > >>
> >
> https://github.com/apache/flink/commit/1a69cb9fce629b0c458f5ea514d9ac8de008687f
> > >> >
> > >> .
> > >>
> > >>
> > >>
> > >>
> > >> On Tue, Dec 5, 2023 at 9:00 PM Darin Amos <darin.a...@instacart.com
> > >> .invalid>
> > >> wrote:
> > >>
> > >> > I thought for sure this was already the existing behavior with this
> > >> > operator. Does it not check the mailbox executor after every record
> > >> read?
> > >> >
> > >> > On Tue, Dec 5, 2023 at 6:48 AM Prabhu Joseph (Jira) <
> j...@apache.org>
> > >> > wrote:
> > >> >
> > >> > > Prabhu Joseph created FLINK-33753:
> > >> > > -------------------------------------
> > >> > >
> > >> > >              Summary: ContinuousFileReaderOperator consume records
> > as
> > >> > mini
> > >> > > batch
> > >> > >                  Key: FLINK-33753
> > >> > >                  URL:
> > >> https://issues.apache.org/jira/browse/FLINK-33753
> > >> > >              Project: Flink
> > >> > >           Issue Type: Improvement
> > >> > >     Affects Versions: 1.18.0
> > >> > >             Reporter: Prabhu Joseph
> > >> > >
> > >> > >
> > >> > > The ContinuousFileReaderOperator reads and collects the records
> > from a
> > >> > > split in a loop. If the split size is large, then the loop will
> take
> > >> more
> > >> > > time, and then the mailbox executor won't have a chance to process
> > the
> > >> > > checkpoint barrier. This leads to checkpoint timing out.
> > >> > > ContinuousFileReaderOperator could be improved to consume the
> > records
> > >> in
> > >> > a
> > >> > > mini batch, similar to Hudi's StreamReadOperator (
> > >> > > https://issues.apache.org/jira/browse/HUDI-2485).
> > >> > >
> > >> > >
> > >> > >
> > >> > > --
> > >> > > This message was sent by Atlassian Jira
> > >> > > (v8.20.10#820010)
> > >> > >
> > >> >
> > >>
> > >
> >
>

Reply via email to