Hi Krzysztof,

Thanks for your investigation. Can you maybe share the code with us?
collectWithClient will insert a custom sink into the datastream that
buffers all incoming records and make them queryable. It is already
deprecated and one should use executeAndCollect that fulfills the same
purpose.

There is a difference between the execution modes streaming and batch,
and boundedness of a source. It is possible to execute a bounded
source in streaming mode and therefore have checkpoints.

For your experiments did you only change the boundedness or also the
runtime mode? [1]

Best,
Fabian

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/#execution-mode-batchstreaming

On Tue, Jan 11, 2022 at 12:04 PM Krzysztof Chmielewski
<krzysiek.chmielew...@gmail.com> wrote:
>
> Hi Fabian,
> Thank you for your input and I'm sorry for delay on my part.
>
> Before I will create a ticket I would like to ask about one thing more.
> There is test 
> FileSourceTextLinesITCase::testBoundedTextFileSourceWithJobManagerFailover
> This test uses DataStreamUtils.collectWithClient(...) which returns an 
> iterator that we later to get the processing results.
>
> I did a quick PoC where I created my own FileSource that uses 
> alreadyProcessedFiles Set in Bounded mode, it is based on FileSource 
> implementation.
> I noticed some issues with this test, when i use it for my Bounded Split 
> Enumerator that keeps track of already processed files. For example
>
> Case 1:
> a) set execution mode to Streaming
> b) set checkpoint Interval to 10 milis
> Result : test fails because result has fever records that it is expected, 
> actually it reports zero records in the result.
>
> Case 2:
> a) set execution mode to Streaming
> b) disable checkpoint
> Result: test passes
>
> Case 3:
> a) set execution mode to Bounded
> b) disable checkpoint
> Result Test Passes
>
> Case 3:
> a) set execution mode to Bounded
> b) enable checkpoint
> Result Test Passes (since checkpoints are ignored in BATCH mode)
>
> I looked at testBoundedTextFileSource and testContinuousTextFileSource 
> methods and I understand idea how the Cluster failover is trigger and whatnot.
> Although I do see that gathering the final results for Continuous mode is 
> slightly different.
> Could you shed some light on this and how the collectWithClient works 
> especially in case if Failover.
>
> Thanks,
> Krzysztof Chmielewski
>
> czw., 6 sty 2022 o 09:29 Fabian Paul <fp...@apache.org> napisał(a):
>>
>> Hi,
>>
>> I think your analysis is correct. One thing to note here is that I
>> guess when implementing the StaticFileSplitEnumerator we only thought
>> about the batch case where no checkpoints exist [1] on the other hand
>> it is possible as you have noted to run a bounded source in streaming
>> mode.
>>
>> Although in the current implementation we already checkpoint the
>> remaining splits of the StaticFileSplitEnumerator so it should be easy
>> to also pass the alreadyDiscoveredPaths to the
>> StaticFileSplitEnumerator.
>>
>> @Krzysztof Chmielewski can you create a ticket for that?
>>
>> Best,
>> Fabian
>>
>>
>> [1] 
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/#execution-mode-batchstreaming
>>
>> On Thu, Jan 6, 2022 at 9:13 AM Krzysztof Chmielewski
>> <krzysiek.chmielew...@gmail.com> wrote:
>> >
>> > Hi,
>> > Yes I know that ContinuousFileSplitEnumerator has continuously scan the 
>> > monitored folder for the new files and StaticFileSplitEnumerator does not, 
>> > this is clear.
>> >
>> > However I was asking about a different scenario, the scenario when we are 
>> > restoring from a checkpoint.
>> > FileSource can process many files, not only one. The underlying API uses 
>> > array of paths not just single Path.
>> >
>> > If I understand correctly, when we are recovering from a checkpoint, for 
>> > example due to Job Manager issue, FileEnumerator will create an Array of 
>> > Splits and pass it to StaticFileSplitEnumerator.
>> >
>> > Same goes for ContinuousFileSplitEnumerator. However  when 
>> > ContinuousFileSplitEnumerator is started, it iterates through Path[] array 
>> > and checks which files were already processed and skip them using 
>> > pathsAlreadyProcessed set hence not creating Splits for those files.
>> >
>> > However it seems that StaticFileSplitEnumerator will reprocess files that 
>> > were already used for Split creation. In case of Checkpoint restoration it 
>> > does not check if that file was already processed.
>> >
>> > Regards,
>> > Krzysztof Chmielewski
>> >
>> >
>> >
>> >
>> > czw., 6 sty 2022, 03:21 użytkownik Caizhi Weng <tsreape...@gmail.com> 
>> > napisał:
>> >>
>> >> Hi!
>> >>
>> >> Do you mean the pathsAlreadyProcessed set in 
>> >> ContinuousFileSplitEnumerator?
>> >>
>> >> This is because ContinuousFileSplitEnumerator has to continuously add new 
>> >> files to splitAssigner, while StaticFileSplitEnumerator does not. The 
>> >> pathsAlreadyProcessed set records the paths already discovered by 
>> >> ContinuousFileSplitEnumerator so that it will not add the same file to 
>> >> splitAssigner twice. For StaticFileSplitEnumerator it does not need to 
>> >> discover new files and all files have already been recorded in its 
>> >> splitAssigner so it does not need the pathsAlreadyProcessed set.
>> >>
>> >> For more detailed logic check the caller of the constructors of both 
>> >> enumerators.
>> >>
>> >> Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com> 于2022年1月6日周四 
>> >> 07:04写道:
>> >>>
>> >>> Hi,
>> >>> Why StaticFileSplitEnumerator from FileSource does not track the already 
>> >>> processed files similar to how ContinuousFileSplitEnumerator does?
>> >>>
>> >>> I'm thinking about scenario where we have a Bounded FileSource that 
>> >>> reads a lot of files using FileSource and stream it's content to 
>> >>> Kafka.If there will be a Job/cluster restart then we will process same 
>> >>> files again.
>> >>>
>> >>> Regards,
>> >>> Krzysztof Chmielewski

Reply via email to