Hi Fabian,
Thank you for your input and I'm sorry for delay on my part.

Before I will create a ticket I would like to ask about one thing more.
There is
test FileSourceTextLinesITCase::testBoundedTextFileSourceWithJobManagerFailover
This test uses DataStreamUtils.collectWithClient(...) which returns an
iterator that we later to get the processing results.

I did a quick PoC where I created my own FileSource that uses
alreadyProcessedFiles Set in Bounded mode, it is based on FileSource
implementation.
I noticed some issues with this test, when i use it for my Bounded Split
Enumerator that keeps track of already processed files. For example

Case 1:
a) set execution mode to Streaming
b) set checkpoint Interval to 10 milis
Result : test fails because result has fever records that it is expected,
actually it reports zero records in the result.

Case 2:
a) set execution mode to Streaming
b) disable checkpoint
Result: test passes

Case 3:
a) set execution mode to Bounded
b) disable checkpoint
Result Test Passes

Case 3:
a) set execution mode to Bounded
b) enable checkpoint
Result Test Passes (since checkpoints are ignored in BATCH mode)

I looked at testBoundedTextFileSource and testContinuousTextFileSource
methods and I understand idea how the Cluster failover is trigger and
whatnot.
Although I do see that gathering the final results for Continuous mode is
slightly different.
Could you shed some light on this and how the collectWithClient works
especially in case if Failover.

Thanks,
Krzysztof Chmielewski

czw., 6 sty 2022 o 09:29 Fabian Paul <fp...@apache.org> napisał(a):

> Hi,
>
> I think your analysis is correct. One thing to note here is that I
> guess when implementing the StaticFileSplitEnumerator we only thought
> about the batch case where no checkpoints exist [1] on the other hand
> it is possible as you have noted to run a bounded source in streaming
> mode.
>
> Although in the current implementation we already checkpoint the
> remaining splits of the StaticFileSplitEnumerator so it should be easy
> to also pass the alreadyDiscoveredPaths to the
> StaticFileSplitEnumerator.
>
> @Krzysztof Chmielewski can you create a ticket for that?
>
> Best,
> Fabian
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/#execution-mode-batchstreaming
>
> On Thu, Jan 6, 2022 at 9:13 AM Krzysztof Chmielewski
> <krzysiek.chmielew...@gmail.com> wrote:
> >
> > Hi,
> > Yes I know that ContinuousFileSplitEnumerator has continuously scan the
> monitored folder for the new files and StaticFileSplitEnumerator does not,
> this is clear.
> >
> > However I was asking about a different scenario, the scenario when we
> are restoring from a checkpoint.
> > FileSource can process many files, not only one. The underlying API uses
> array of paths not just single Path.
> >
> > If I understand correctly, when we are recovering from a checkpoint, for
> example due to Job Manager issue, FileEnumerator will create an Array of
> Splits and pass it to StaticFileSplitEnumerator.
> >
> > Same goes for ContinuousFileSplitEnumerator. However  when
> ContinuousFileSplitEnumerator is started, it iterates through Path[] array
> and checks which files were already processed and skip them using
> pathsAlreadyProcessed set hence not creating Splits for those files.
> >
> > However it seems that StaticFileSplitEnumerator will reprocess files
> that were already used for Split creation. In case of Checkpoint
> restoration it does not check if that file was already processed.
> >
> > Regards,
> > Krzysztof Chmielewski
> >
> >
> >
> >
> > czw., 6 sty 2022, 03:21 użytkownik Caizhi Weng <tsreape...@gmail.com>
> napisał:
> >>
> >> Hi!
> >>
> >> Do you mean the pathsAlreadyProcessed set in
> ContinuousFileSplitEnumerator?
> >>
> >> This is because ContinuousFileSplitEnumerator has to continuously add
> new files to splitAssigner, while StaticFileSplitEnumerator does not. The
> pathsAlreadyProcessed set records the paths already discovered by
> ContinuousFileSplitEnumerator so that it will not add the same file to
> splitAssigner twice. For StaticFileSplitEnumerator it does not need to
> discover new files and all files have already been recorded in its
> splitAssigner so it does not need the pathsAlreadyProcessed set.
> >>
> >> For more detailed logic check the caller of the constructors of both
> enumerators.
> >>
> >> Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com> 于2022年1月6日周四
> 07:04写道:
> >>>
> >>> Hi,
> >>> Why StaticFileSplitEnumerator from FileSource does not track the
> already processed files similar to how ContinuousFileSplitEnumerator does?
> >>>
> >>> I'm thinking about scenario where we have a Bounded FileSource that
> reads a lot of files using FileSource and stream it's content to Kafka.If
> there will be a Job/cluster restart then we will process same files again.
> >>>
> >>> Regards,
> >>> Krzysztof Chmielewski
>

Reply via email to