Hi again, I have simplify the example to reproduce the data loss. The scenario is the following:
- TextIO write files. - getPerDestinationOutputFilenames emits file names - File names are processed by a aggregator (combine, distinct, groupbyKey...) with a window **without allowlateness** - File names are discarded as late Here you can see the data loss in the picture in https://github.com/kiuby88/windowing-textio/blob/master/README.md#showing-data-loss Please, follow README to run the pipeline and find log traces that say data are dropped as late. Remember, you can run the pipeline with another window's lateness values (check README.md) Kby. El mar., 12 may. 2020 a las 17:16, Jose Manuel (<kiuby88....@gmail.com>) escribió: > Hi, > > I would like to clarify that while TextIO is writing every data are in the > files (shards). The losing happens when file names emitted by > getPerDestinationOutputFilenames are processed by a window. > > I have created a pipeline to reproduce the scenario in which some > filenames are loss after the getPerDestinationOutputFilenames. Please, note > I tried to simplify the code as much as possible, but the scenario is not > easy to reproduce. > > Please check this project https://github.com/kiuby88/windowing-textio > Check readme to build and run ( > https://github.com/kiuby88/windowing-textio#build-and-run) > Project contains only a class with the pipeline PipelineWithTextIo, > a log4j2.xml file in the resources and the pom. > > The pipeline in PipelineWithTextIo generates unbounded data using a > sequence. It adds a little delay (10s) per data entry, it uses a distinct > (just to apply the window), and then it writes data using TexIO. > The windows for the distinct is fixed (5 seconds) and it does not use > lateness. > Generated files can be found in > windowing-textio/pipe_with_lateness_0s/files. To write files the > FileNamePolicy uses window + timing + shards (see > https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L135 > ) > Files are emitted using getPerDestinationOutputFilenames() > (see the code here, > https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L71-L78 > ) > > Then, File names in the PCollection are extracted and logged. Please, note > file names dot not have pain information in that point. > > To apply a window a distinct is used again. Here several files are > discarded as late and they are not processed by this second distinct. > Please, see > > https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L80-L83 > > Debug is enabled for WindowTracing, so you can find in the terminal > several messages as the followiing: > DEBUG org.apache.beam.sdk.util.WindowTracing - LateDataFilter: Dropping > element at 2020-05-12T14:05:14.999Z for > key:path/pipe_with_lateness_0s/files/[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z)-ON_TIME-0-of-1.txt; > window:[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z) since too far > behind inputWatermark:2020-05-12T14:05:19.799Z; > outputWatermark:2020-05-12T14:05:19.799Z` > > What happen here? I think that messages are generated per second and a > window of 5 seconds group them. Then a delay is added and finally data are > written in a file. > The pipeline reads more data, increasing the watermark. > Then, file names are emitted without pane information (see "Emitted File" > in logs). Window in second distinct compares file names' timestamp and the > pipeline watermark and then it discards file names as late. > > > Bonus > ----- > You can add a lateness to the pipeline. See > https://github.com/kiuby88/windowing-textio/blob/master/README.md#run-with-lateness > > If a minute is added a lateness for window the file names are processed as > late. As result the traces of LateDataFilter disappear. > > Moreover, in order to illustrate better that file names are emitted as > late for the second discarded I added a second TextIO to write file names > in other files. > Same FileNamePolicy than before was used (window + timing + shards). Then, > you can find files that contains the original filenames in > windowing-textio/pipe_with_lateness_60s/files-after-distinct. This is the > interesting part, because you will find several files with LATE in their > names. > > Please, let me know if you need more information or if the example is not > enough to check the expected scenarios. > > Kby. > > > > > > > > > > > > > > > > > > El dom., 10 may. 2020 a las 17:04, Reuven Lax (<re...@google.com>) > escribió: > >> Pane info is supposed to be preserved across transforms. If the Fink >> runner does not, than I believe that is a bug. >> >> On Sat, May 9, 2020 at 11:22 PM Jozef Vilcek <jozo.vil...@gmail.com> >> wrote: >> >>> I am using FileIO and I do observe the drop of pane info information on >>> Flink runner too. It was mentioned in this thread: >>> https://www.mail-archive.com/dev@beam.apache.org/msg20186.html >>> >>> It is a result of different reshuffle expansion for optimisation >>> reasons. However, I did not observe a data loss in my case. Windowing and >>> watermark info should be preserved. Pane info is not, which brings a >>> question how reliable pane info should be in terms of SDK and runner. >>> >>> If you do observe a data loss, it would be great to share a test case >>> which replicates the problem. >>> >>> On Sun, May 10, 2020 at 8:03 AM Reuven Lax <re...@google.com> wrote: >>> >>>> Ah, I think I see the problem. >>>> >>>> It appears that for some reason, the Flink runner loses windowing >>>> information when a Reshuffle is applied. I'm not entirely sure why, because >>>> windowing information should be maintained across a Reshuffle. >>>> >>>> Reuven >>>> >>>> On Sat, May 9, 2020 at 9:50 AM Jose Manuel <kiuby88....@gmail.com> >>>> wrote: >>>> >>>>> >>>>> Hi, >>>>> >>>>> I have added some logs to the pipeline as following (you can find the >>>>> log function in the Appendix): >>>>> >>>>> //STREAM + processing time. >>>>> pipeline.apply(KafkaIO.read()) >>>>> .apply(...) //mappers, a window and a combine >>>>> .apply(logBeforeWrite()) >>>>> >>>>> .apply("WriteFiles", >>>>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites()) >>>>> .getPerDestinationOutputFilenames() >>>>> >>>>> .apply(logAfterWrite()) >>>>> .apply("CombineFileNames", Combine.perKey(...)) >>>>> >>>>> I have run the pipeline using DirectRunner (local), SparkRunner and >>>>> FlinkRunner, both of them using a cluster. >>>>> Below you can see the timing and pane information before/after (you >>>>> can see traces in detail with window and timestamp information in the >>>>> Appendix). >>>>> >>>>> DirectRunner: >>>>> [Before Write] timing=ON_TIME, pane=PaneInfo{isFirst=true, >>>>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} >>>>> [After Write] timing=EARLY, pane=PaneInfo{isFirst=true, >>>>> timing=EARLY, index=0} >>>>> >>>>> FlinkRunner: >>>>> [Before Write] timing=ON_TIME, pane=PaneInfo{isFirst=true, >>>>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} >>>>> [After Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING >>>>> >>>>> SparkRunner: >>>>> [Before Write] timing=ON_TIME, pane=PaneInfo{isFirst=true, >>>>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} >>>>> [After Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING >>>>> >>>>> It seems DirectRunner propagates the windowing information as >>>>> expected. >>>>> I am not sure if TextIO really propagates or it just emits a window >>>>> pane, because the timing before TextIO is ON_TIME and after TextIO is >>>>> EARLY. >>>>> In any case using FlinkRunner and SparkRunner the timing and the pane >>>>> are not set. >>>>> >>>>> I thought the problem was in GatherBundlesPerWindowFn, but now, after >>>>> seeing that the DirectRunner filled windowing data... I am not sure. >>>>> >>>>> https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406 >>>>> >>>>> >>>>> Appendix >>>>> ----------- >>>>> Here you can see the log function and traces for different runners in >>>>> detail. >>>>> >>>>> private SingleOutput<String, String> logBefore() { >>>>> return ParDo.of(new DoFn<String, String>() { >>>>> @ProcessElement >>>>> public void processElement(ProcessContext context, >>>>> BoundedWindow boundedWindow) { >>>>> String value = context.element(); >>>>> log.info("[Before Write] Element=data window={}, >>>>> timestamp={}, timing={}, index ={}, isFirst ={}, isLast={}, pane={}", >>>>> boundedWindow, >>>>> context.timestamp(), >>>>> context.pane().getTiming(), >>>>> context.pane().getIndex(), >>>>> context.pane().isFirst(), >>>>> context.pane().isLast(), >>>>> context.pane() >>>>> ); >>>>> context.output(context.element()); >>>>> } >>>>> }); >>>>> } >>>>> >>>>> logAfter function shows the same information. >>>>> >>>>> Traces in details. >>>>> >>>>> DirectRunner (local): >>>>> [Before Write] Element=data >>>>> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z), >>>>> timestamp=2020-05-09T13:39:59.999Z, timing=ON_TIME, index =0, isFirst >>>>> =true, isLast=true pane=PaneInfo{isFirst=true, isLast=true, >>>>> timing=ON_TIME, index=0, onTimeIndex=0} >>>>> [After Write] Element=file >>>>> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z), >>>>> timestamp=2020-05-09T13:39:59.999Z, timing=EARLY, index =0, isFirst >>>>> =true, isLast=false pane=PaneInfo{isFirst=true, timing=EARLY, index=0} >>>>> >>>>> >>>>> FlinkRunner (cluster): >>>>> [Before Write] Element=data >>>>> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z), >>>>> timestamp=2020-05-09T15:13:59.999Z, timing=ON_TIME, index =0, isFirst >>>>> =true, isLast=true pane=PaneInfo{isFirst=true, isLast=true, >>>>> timing=ON_TIME, index=0, onTimeIndex=0} >>>>> [After Write] Element=file >>>>> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z), >>>>> timestamp=2020-05-09T15:13:59.999Z, timing=UNKNOWN, index =0, isFirst >>>>> =true, isLast=true pane=PaneInfo.NO_FIRING >>>>> >>>>> SparkRunner (cluster): >>>>> [Before Write] Element=data >>>>> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z), >>>>> timestamp=2020-05-09T15:34:59.999Z, timing=ON_TIME, index =0, isFirst >>>>> =true, isLast=true pane=PaneInfo{isFirst=true, isLast=true, >>>>> timing=ON_TIME, index=0, onTimeIndex=0} >>>>> [After Write] Element=file >>>>> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z), >>>>> timestamp=2020-05-09T15:34:59.999Z, timing=UNKNOWN, index =0, isFirst >>>>> =true, isLast=true pane=PaneInfo.NO_FIRING >>>>> >>>>> >>>>> El vie., 8 may. 2020 a las 19:01, Reuven Lax (<re...@google.com>) >>>>> escribió: >>>>> >>>>>> The window information should still be there. Beam propagates >>>>>> windows through PCollection, and I don't think WriteFiles does anything >>>>>> explicit to stop that. >>>>>> >>>>>> Can you try this with the direct runner to see what happens there? >>>>>> What is your windowing on this PCollection? >>>>>> >>>>>> Reuven >>>>>> >>>>>> On Fri, May 8, 2020 at 3:49 AM Jose Manuel <kiuby88....@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> I got the same behavior using Spark Runner (with Spark 2.4.3), >>>>>>> window information was missing. >>>>>>> >>>>>>> Just to clarify, the combiner after TextIO had different results. In >>>>>>> Flink runner the files names were dropped, and in Spark the combination >>>>>>> process happened twice, duplicating data. I think it is because >>>>>>> different >>>>>>> runners manage in a different way the data without the windowing >>>>>>> information. >>>>>>> >>>>>>> >>>>>>> >>>>>>> El vie., 8 may. 2020 a las 0:45, Luke Cwik (<lc...@google.com>) >>>>>>> escribió: >>>>>>> >>>>>>>> +dev <d...@beam.apache.org> >>>>>>>> >>>>>>>> On Mon, May 4, 2020 at 3:56 AM Jose Manuel <kiuby88....@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi guys, >>>>>>>>> >>>>>>>>> I think I have found something interesting about windowing. >>>>>>>>> >>>>>>>>> I have a pipeline that gets data from Kafka and writes in HDFS by >>>>>>>>> means of TextIO. >>>>>>>>> Once written, generated files are combined to apply some custom >>>>>>>>> operations. >>>>>>>>> However, the combine does not receive data. Following, you can >>>>>>>>> find the >>>>>>>>> highlight of my pipeline. >>>>>>>>> >>>>>>>>> //STREAM + processing time. >>>>>>>>> pipeline.apply(KafkaIO.read()) >>>>>>>>> .apply(...) //mappers, a window and a combine >>>>>>>>> >>>>>>>>> .apply("WriteFiles", >>>>>>>>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites()) >>>>>>>>> .getPerDestinationOutputFilenames() >>>>>>>>> .apply("CombineFileNames", Combine.perKey(...)) >>>>>>>>> >>>>>>>>> Running the pipeline with Flink I have found a log trace that says >>>>>>>>> data are >>>>>>>>> discarded as late in the combine CombineFileNames. Then, I have >>>>>>>>> added >>>>>>>>> AllowedLateness to pipeline window strategy, as a workaround. >>>>>>>>> It works by now, but this opens several questions to me >>>>>>>>> >>>>>>>>> I think the problem is getPerDestinationOutputFilenames generates >>>>>>>>> files, but >>>>>>>>> it does not maintain the windowing information. Then, >>>>>>>>> CombineFileNames compares >>>>>>>>> file names with the watermark of the pipeline and discards them as >>>>>>>>> late. >>>>>>>>> >>>>>>>>> Is there any issue with getPerDestinationOutputFilenames? Maybe, >>>>>>>>> I am doing something wrong >>>>>>>>> and using getPerDestinationOutputFilenames + combine does not make >>>>>>>>> sense. >>>>>>>>> What do you think? >>>>>>>>> >>>>>>>>> Please, note I am using Beam 2.17.0 with Flink 1.9.1. >>>>>>>>> >>>>>>>>> Many thanks, >>>>>>>>> Jose >>>>>>>>> >>>>>>>>>