Hi Krzysztof, Thanks for your investigation. Can you maybe share the code with us? collectWithClient will insert a custom sink into the datastream that buffers all incoming records and make them queryable. It is already deprecated and one should use executeAndCollect that fulfills the same purpose.
There is a difference between the execution modes streaming and batch, and boundedness of a source. It is possible to execute a bounded source in streaming mode and therefore have checkpoints. For your experiments did you only change the boundedness or also the runtime mode? [1] Best, Fabian [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/#execution-mode-batchstreaming On Tue, Jan 11, 2022 at 12:04 PM Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com> wrote: > > Hi Fabian, > Thank you for your input and I'm sorry for delay on my part. > > Before I will create a ticket I would like to ask about one thing more. > There is test > FileSourceTextLinesITCase::testBoundedTextFileSourceWithJobManagerFailover > This test uses DataStreamUtils.collectWithClient(...) which returns an > iterator that we later to get the processing results. > > I did a quick PoC where I created my own FileSource that uses > alreadyProcessedFiles Set in Bounded mode, it is based on FileSource > implementation. > I noticed some issues with this test, when i use it for my Bounded Split > Enumerator that keeps track of already processed files. For example > > Case 1: > a) set execution mode to Streaming > b) set checkpoint Interval to 10 milis > Result : test fails because result has fever records that it is expected, > actually it reports zero records in the result. > > Case 2: > a) set execution mode to Streaming > b) disable checkpoint > Result: test passes > > Case 3: > a) set execution mode to Bounded > b) disable checkpoint > Result Test Passes > > Case 3: > a) set execution mode to Bounded > b) enable checkpoint > Result Test Passes (since checkpoints are ignored in BATCH mode) > > I looked at testBoundedTextFileSource and testContinuousTextFileSource > methods and I understand idea how the Cluster failover is trigger and whatnot. > Although I do see that gathering the final results for Continuous mode is > slightly different. > Could you shed some light on this and how the collectWithClient works > especially in case if Failover. > > Thanks, > Krzysztof Chmielewski > > czw., 6 sty 2022 o 09:29 Fabian Paul <fp...@apache.org> napisał(a): >> >> Hi, >> >> I think your analysis is correct. One thing to note here is that I >> guess when implementing the StaticFileSplitEnumerator we only thought >> about the batch case where no checkpoints exist [1] on the other hand >> it is possible as you have noted to run a bounded source in streaming >> mode. >> >> Although in the current implementation we already checkpoint the >> remaining splits of the StaticFileSplitEnumerator so it should be easy >> to also pass the alreadyDiscoveredPaths to the >> StaticFileSplitEnumerator. >> >> @Krzysztof Chmielewski can you create a ticket for that? >> >> Best, >> Fabian >> >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/#execution-mode-batchstreaming >> >> On Thu, Jan 6, 2022 at 9:13 AM Krzysztof Chmielewski >> <krzysiek.chmielew...@gmail.com> wrote: >> > >> > Hi, >> > Yes I know that ContinuousFileSplitEnumerator has continuously scan the >> > monitored folder for the new files and StaticFileSplitEnumerator does not, >> > this is clear. >> > >> > However I was asking about a different scenario, the scenario when we are >> > restoring from a checkpoint. >> > FileSource can process many files, not only one. The underlying API uses >> > array of paths not just single Path. >> > >> > If I understand correctly, when we are recovering from a checkpoint, for >> > example due to Job Manager issue, FileEnumerator will create an Array of >> > Splits and pass it to StaticFileSplitEnumerator. >> > >> > Same goes for ContinuousFileSplitEnumerator. However when >> > ContinuousFileSplitEnumerator is started, it iterates through Path[] array >> > and checks which files were already processed and skip them using >> > pathsAlreadyProcessed set hence not creating Splits for those files. >> > >> > However it seems that StaticFileSplitEnumerator will reprocess files that >> > were already used for Split creation. In case of Checkpoint restoration it >> > does not check if that file was already processed. >> > >> > Regards, >> > Krzysztof Chmielewski >> > >> > >> > >> > >> > czw., 6 sty 2022, 03:21 użytkownik Caizhi Weng <tsreape...@gmail.com> >> > napisał: >> >> >> >> Hi! >> >> >> >> Do you mean the pathsAlreadyProcessed set in >> >> ContinuousFileSplitEnumerator? >> >> >> >> This is because ContinuousFileSplitEnumerator has to continuously add new >> >> files to splitAssigner, while StaticFileSplitEnumerator does not. The >> >> pathsAlreadyProcessed set records the paths already discovered by >> >> ContinuousFileSplitEnumerator so that it will not add the same file to >> >> splitAssigner twice. For StaticFileSplitEnumerator it does not need to >> >> discover new files and all files have already been recorded in its >> >> splitAssigner so it does not need the pathsAlreadyProcessed set. >> >> >> >> For more detailed logic check the caller of the constructors of both >> >> enumerators. >> >> >> >> Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com> 于2022年1月6日周四 >> >> 07:04写道: >> >>> >> >>> Hi, >> >>> Why StaticFileSplitEnumerator from FileSource does not track the already >> >>> processed files similar to how ContinuousFileSplitEnumerator does? >> >>> >> >>> I'm thinking about scenario where we have a Bounded FileSource that >> >>> reads a lot of files using FileSource and stream it's content to >> >>> Kafka.If there will be a Job/cluster restart then we will process same >> >>> files again. >> >>> >> >>> Regards, >> >>> Krzysztof Chmielewski