I apologize, I was a little off with my description, it's been a while
since I have looked at this code but I have refreshed myself.

The line I referred to earlier was correct though. This operator only
processes records in a file split while the operator is idle, meaning there
are no more incoming file splits. After every read it checks if there are
any incoming file splits before continuing to read from the split. If there
is indeed a new inbound file split, the loop will exit and it will re-queue
itself to continue processing records later. You can see that here
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L361>
.

When the loop is interrupted by a checkpoint barrier, snapshotState(...) is
called and the reader grabs the state from the provided Format. In the
normal case the state is simply the split offset (current progress
indicator), in more complex scenarios you can create your own format class
and provide whatever serializable state you desire. In my case we store
additional metadata about the progress of the reader.

On state restore, the operator calls loadSplit and it will call *reopen* on
the format rather than open, passing the checkpoint state so you can
continue from where you left off . You can see that here
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L407-L411>
.

Cheers

Darin




On Tue, Dec 5, 2023 at 8:08 PM Darin Amos <darin.a...@instacart.com> wrote:

> They way I understand this loop is that the ContinuiousFileReaderOperator
> only processes records in the background while the operator is idle, i.e.
> while it's not receiving any records.
>
> At the very bottom of that loop here
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L358C5-L358C6>
> it exits if the executor is no longer idle, i.e. there are incoming records.
>
> If you look here
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L407>,
> the operator supports checkpointable input splits, meaning it'll save it's
> place within a file split. This would only be possible if the reader can be
> interrupted in the middle of a split. I have written custom splits that do
> this exactly.
>
> Darin
>
> On Tue, Dec 5, 2023 at 11:31 AM Prabhu Joseph <prabhujose.ga...@gmail.com>
> wrote:
>
>> This is the loop - code reference
>> <
>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L346
>> >,
>> where it fetches all records from the split, and then only the
>> MailboxProcessor gets control to check other mail. This loop was
>> introduced
>> here
>> <
>> https://github.com/apache/flink/commit/1a69cb9fce629b0c458f5ea514d9ac8de008687f
>> >
>> .
>>
>>
>>
>>
>> On Tue, Dec 5, 2023 at 9:00 PM Darin Amos <darin.a...@instacart.com
>> .invalid>
>> wrote:
>>
>> > I thought for sure this was already the existing behavior with this
>> > operator. Does it not check the mailbox executor after every record
>> read?
>> >
>> > On Tue, Dec 5, 2023 at 6:48 AM Prabhu Joseph (Jira) <j...@apache.org>
>> > wrote:
>> >
>> > > Prabhu Joseph created FLINK-33753:
>> > > -------------------------------------
>> > >
>> > >              Summary: ContinuousFileReaderOperator consume records as
>> > mini
>> > > batch
>> > >                  Key: FLINK-33753
>> > >                  URL:
>> https://issues.apache.org/jira/browse/FLINK-33753
>> > >              Project: Flink
>> > >           Issue Type: Improvement
>> > >     Affects Versions: 1.18.0
>> > >             Reporter: Prabhu Joseph
>> > >
>> > >
>> > > The ContinuousFileReaderOperator reads and collects the records from a
>> > > split in a loop. If the split size is large, then the loop will take
>> more
>> > > time, and then the mailbox executor won't have a chance to process the
>> > > checkpoint barrier. This leads to checkpoint timing out.
>> > > ContinuousFileReaderOperator could be improved to consume the records
>> in
>> > a
>> > > mini batch, similar to Hudi's StreamReadOperator (
>> > > https://issues.apache.org/jira/browse/HUDI-2485).
>> > >
>> > >
>> > >
>> > > --
>> > > This message was sent by Atlassian Jira
>> > > (v8.20.10#820010)
>> > >
>> >
>>
>

Reply via email to