Lateness should never be introduced inside a pipeline - generally late data can only come from a source. If data was not dropped as late earlier in the pipeline, it should not be dropped after the file write. I suspect that this is a bug in how the Flink runner handles the Reshuffle transform, but I'm not sure what the exact bug is.
Reuven On Fri, May 15, 2020 at 2:23 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote: > Hi Jose, > > thank you for putting the effort to get example which demonstrate your > problem. > > You are using a streaming pipeline and it seems that watermark in > downstream already advanced further, so when your File pane arrives, it is > already late. Since you define that lateness is not tolerated, it is > dropped. > I myself never had requirement to specify zero allowed lateness for > streaming. It feels dangerous. Do you have a specific use case? Also, in > may cases, after windowed files are written, I usually collect them into > global window and specify a different triggering policy for collecting > them. Both cases are why I never came across this situation. > > I do not have an explanation if it is a bug or not. I would guess that > watermark can advance further, e.g. because elements can be processed in > arbitrary order. Not saying this is the case. > It needs someone with better understanding of how watermark advance is / > should be handled within pipelines. > > > P.S.: you can add `.withTimestampFn()` to your generate sequence, to get > more stable timing, which is also easier to reason about: > > Dropping element at 1970-01-01T00:00:19.999Z for key > ... window:[1970-01-01T00:00:15.000Z..1970-01-01T00:00:20.000Z) since too > far behind inputWatermark:1970-01-01T00:00:24.000Z; > outputWatermark:1970-01-01T00:00:24 > .000Z > > instead of > > Dropping element at 2020-05-15T08:52:34.999Z for key ... > window:[2020-05-15T08:52:30.000Z..2020-05-15T08:52:35.000Z) since too far > behind inputWatermark:2020-05-15T08:52:39.318Z; > outputWatermark:2020-05-15T08:52:39.318Z > > > > > In my > > > > On Thu, May 14, 2020 at 10:47 AM Jose Manuel <kiuby88....@gmail.com> > wrote: > >> Hi again, >> >> I have simplify the example to reproduce the data loss. The scenario is >> the following: >> >> - TextIO write files. >> - getPerDestinationOutputFilenames emits file names >> - File names are processed by a aggregator (combine, distinct, >> groupbyKey...) with a window **without allowlateness** >> - File names are discarded as late >> >> Here you can see the data loss in the picture in >> https://github.com/kiuby88/windowing-textio/blob/master/README.md#showing-data-loss >> >> Please, follow README to run the pipeline and find log traces that say >> data are dropped as late. >> Remember, you can run the pipeline with another window's lateness values >> (check README.md) >> >> Kby. >> >> El mar., 12 may. 2020 a las 17:16, Jose Manuel (<kiuby88....@gmail.com>) >> escribió: >> >>> Hi, >>> >>> I would like to clarify that while TextIO is writing every data are in >>> the files (shards). The losing happens when file names emitted by >>> getPerDestinationOutputFilenames are processed by a window. >>> >>> I have created a pipeline to reproduce the scenario in which some >>> filenames are loss after the getPerDestinationOutputFilenames. Please, note >>> I tried to simplify the code as much as possible, but the scenario is not >>> easy to reproduce. >>> >>> Please check this project https://github.com/kiuby88/windowing-textio >>> Check readme to build and run ( >>> https://github.com/kiuby88/windowing-textio#build-and-run) >>> Project contains only a class with the pipeline PipelineWithTextIo, >>> a log4j2.xml file in the resources and the pom. >>> >>> The pipeline in PipelineWithTextIo generates unbounded data using a >>> sequence. It adds a little delay (10s) per data entry, it uses a distinct >>> (just to apply the window), and then it writes data using TexIO. >>> The windows for the distinct is fixed (5 seconds) and it does not use >>> lateness. >>> Generated files can be found in >>> windowing-textio/pipe_with_lateness_0s/files. To write files the >>> FileNamePolicy uses window + timing + shards (see >>> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L135 >>> ) >>> Files are emitted using getPerDestinationOutputFilenames() >>> (see the code here, >>> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L71-L78 >>> ) >>> >>> Then, File names in the PCollection are extracted and logged. Please, >>> note file names dot not have pain information in that point. >>> >>> To apply a window a distinct is used again. Here several files are >>> discarded as late and they are not processed by this second distinct. >>> Please, see >>> >>> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L80-L83 >>> >>> Debug is enabled for WindowTracing, so you can find in the terminal >>> several messages as the followiing: >>> DEBUG org.apache.beam.sdk.util.WindowTracing - LateDataFilter: Dropping >>> element at 2020-05-12T14:05:14.999Z for >>> key:path/pipe_with_lateness_0s/files/[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z)-ON_TIME-0-of-1.txt; >>> window:[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z) since too far >>> behind inputWatermark:2020-05-12T14:05:19.799Z; >>> outputWatermark:2020-05-12T14:05:19.799Z` >>> >>> What happen here? I think that messages are generated per second and a >>> window of 5 seconds group them. Then a delay is added and finally data are >>> written in a file. >>> The pipeline reads more data, increasing the watermark. >>> Then, file names are emitted without pane information (see "Emitted >>> File" in logs). Window in second distinct compares file names' timestamp >>> and the pipeline watermark and then it discards file names as late. >>> >>> >>> Bonus >>> ----- >>> You can add a lateness to the pipeline. See >>> https://github.com/kiuby88/windowing-textio/blob/master/README.md#run-with-lateness >>> >>> If a minute is added a lateness for window the file names are processed >>> as late. As result the traces of LateDataFilter disappear. >>> >>> Moreover, in order to illustrate better that file names are emitted as >>> late for the second discarded I added a second TextIO to write file names >>> in other files. >>> Same FileNamePolicy than before was used (window + timing + shards). >>> Then, you can find files that contains the original filenames in >>> windowing-textio/pipe_with_lateness_60s/files-after-distinct. This is the >>> interesting part, because you will find several files with LATE in their >>> names. >>> >>> Please, let me know if you need more information or if the example is >>> not enough to check the expected scenarios. >>> >>> Kby. >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> El dom., 10 may. 2020 a las 17:04, Reuven Lax (<re...@google.com>) >>> escribió: >>> >>>> Pane info is supposed to be preserved across transforms. If the Fink >>>> runner does not, than I believe that is a bug. >>>> >>>> On Sat, May 9, 2020 at 11:22 PM Jozef Vilcek <jozo.vil...@gmail.com> >>>> wrote: >>>> >>>>> I am using FileIO and I do observe the drop of pane info information >>>>> on Flink runner too. It was mentioned in this thread: >>>>> https://www.mail-archive.com/dev@beam.apache.org/msg20186.html >>>>> >>>>> It is a result of different reshuffle expansion for optimisation >>>>> reasons. However, I did not observe a data loss in my case. Windowing and >>>>> watermark info should be preserved. Pane info is not, which brings a >>>>> question how reliable pane info should be in terms of SDK and runner. >>>>> >>>>> If you do observe a data loss, it would be great to share a test case >>>>> which replicates the problem. >>>>> >>>>> On Sun, May 10, 2020 at 8:03 AM Reuven Lax <re...@google.com> wrote: >>>>> >>>>>> Ah, I think I see the problem. >>>>>> >>>>>> It appears that for some reason, the Flink runner loses windowing >>>>>> information when a Reshuffle is applied. I'm not entirely sure why, >>>>>> because >>>>>> windowing information should be maintained across a Reshuffle. >>>>>> >>>>>> Reuven >>>>>> >>>>>> On Sat, May 9, 2020 at 9:50 AM Jose Manuel <kiuby88....@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I have added some logs to the pipeline as following (you can find >>>>>>> the log function in the Appendix): >>>>>>> >>>>>>> //STREAM + processing time. >>>>>>> pipeline.apply(KafkaIO.read()) >>>>>>> .apply(...) //mappers, a window and a combine >>>>>>> .apply(logBeforeWrite()) >>>>>>> >>>>>>> .apply("WriteFiles", >>>>>>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites()) >>>>>>> .getPerDestinationOutputFilenames() >>>>>>> >>>>>>> .apply(logAfterWrite()) >>>>>>> .apply("CombineFileNames", Combine.perKey(...)) >>>>>>> >>>>>>> I have run the pipeline using DirectRunner (local), SparkRunner and >>>>>>> FlinkRunner, both of them using a cluster. >>>>>>> Below you can see the timing and pane information before/after (you >>>>>>> can see traces in detail with window and timestamp information in the >>>>>>> Appendix). >>>>>>> >>>>>>> DirectRunner: >>>>>>> [Before Write] timing=ON_TIME, pane=PaneInfo{isFirst=true, >>>>>>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} >>>>>>> [After Write] timing=EARLY, pane=PaneInfo{isFirst=true, >>>>>>> timing=EARLY, index=0} >>>>>>> >>>>>>> FlinkRunner: >>>>>>> [Before Write] timing=ON_TIME, pane=PaneInfo{isFirst=true, >>>>>>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} >>>>>>> [After Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING >>>>>>> >>>>>>> SparkRunner: >>>>>>> [Before Write] timing=ON_TIME, pane=PaneInfo{isFirst=true, >>>>>>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} >>>>>>> [After Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING >>>>>>> >>>>>>> It seems DirectRunner propagates the windowing information as >>>>>>> expected. >>>>>>> I am not sure if TextIO really propagates or it just emits a window >>>>>>> pane, because the timing before TextIO is ON_TIME and after TextIO is >>>>>>> EARLY. >>>>>>> In any case using FlinkRunner and SparkRunner the timing and the >>>>>>> pane are not set. >>>>>>> >>>>>>> I thought the problem was in GatherBundlesPerWindowFn, but now, >>>>>>> after seeing that the DirectRunner filled windowing data... I am not >>>>>>> sure. >>>>>>> >>>>>>> https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406 >>>>>>> >>>>>>> >>>>>>> Appendix >>>>>>> ----------- >>>>>>> Here you can see the log function and traces for different runners >>>>>>> in detail. >>>>>>> >>>>>>> private SingleOutput<String, String> logBefore() { >>>>>>> return ParDo.of(new DoFn<String, String>() { >>>>>>> @ProcessElement >>>>>>> public void processElement(ProcessContext context, >>>>>>> BoundedWindow boundedWindow) { >>>>>>> String value = context.element(); >>>>>>> log.info("[Before Write] Element=data window={}, >>>>>>> timestamp={}, timing={}, index ={}, isFirst ={}, isLast={}, pane={}", >>>>>>> boundedWindow, >>>>>>> context.timestamp(), >>>>>>> context.pane().getTiming(), >>>>>>> context.pane().getIndex(), >>>>>>> context.pane().isFirst(), >>>>>>> context.pane().isLast(), >>>>>>> context.pane() >>>>>>> ); >>>>>>> context.output(context.element()); >>>>>>> } >>>>>>> }); >>>>>>> } >>>>>>> >>>>>>> logAfter function shows the same information. >>>>>>> >>>>>>> Traces in details. >>>>>>> >>>>>>> DirectRunner (local): >>>>>>> [Before Write] Element=data >>>>>>> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z), >>>>>>> timestamp=2020-05-09T13:39:59.999Z, timing=ON_TIME, index =0, isFirst >>>>>>> =true, isLast=true pane=PaneInfo{isFirst=true, isLast=true, >>>>>>> timing=ON_TIME, index=0, onTimeIndex=0} >>>>>>> [After Write] Element=file >>>>>>> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z), >>>>>>> timestamp=2020-05-09T13:39:59.999Z, timing=EARLY, index =0, isFirst >>>>>>> =true, isLast=false pane=PaneInfo{isFirst=true, timing=EARLY, index=0} >>>>>>> >>>>>>> >>>>>>> FlinkRunner (cluster): >>>>>>> [Before Write] Element=data >>>>>>> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z), >>>>>>> timestamp=2020-05-09T15:13:59.999Z, timing=ON_TIME, index =0, isFirst >>>>>>> =true, isLast=true pane=PaneInfo{isFirst=true, isLast=true, >>>>>>> timing=ON_TIME, index=0, onTimeIndex=0} >>>>>>> [After Write] Element=file >>>>>>> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z), >>>>>>> timestamp=2020-05-09T15:13:59.999Z, timing=UNKNOWN, index =0, isFirst >>>>>>> =true, isLast=true pane=PaneInfo.NO_FIRING >>>>>>> >>>>>>> SparkRunner (cluster): >>>>>>> [Before Write] Element=data >>>>>>> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z), >>>>>>> timestamp=2020-05-09T15:34:59.999Z, timing=ON_TIME, index =0, isFirst >>>>>>> =true, isLast=true pane=PaneInfo{isFirst=true, isLast=true, >>>>>>> timing=ON_TIME, index=0, onTimeIndex=0} >>>>>>> [After Write] Element=file >>>>>>> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z), >>>>>>> timestamp=2020-05-09T15:34:59.999Z, timing=UNKNOWN, index =0, isFirst >>>>>>> =true, isLast=true pane=PaneInfo.NO_FIRING >>>>>>> >>>>>>> >>>>>>> El vie., 8 may. 2020 a las 19:01, Reuven Lax (<re...@google.com>) >>>>>>> escribió: >>>>>>> >>>>>>>> The window information should still be there. Beam propagates >>>>>>>> windows through PCollection, and I don't think WriteFiles does anything >>>>>>>> explicit to stop that. >>>>>>>> >>>>>>>> Can you try this with the direct runner to see what happens there? >>>>>>>> What is your windowing on this PCollection? >>>>>>>> >>>>>>>> Reuven >>>>>>>> >>>>>>>> On Fri, May 8, 2020 at 3:49 AM Jose Manuel <kiuby88....@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I got the same behavior using Spark Runner (with Spark 2.4.3), >>>>>>>>> window information was missing. >>>>>>>>> >>>>>>>>> Just to clarify, the combiner after TextIO had different results. >>>>>>>>> In Flink runner the files names were dropped, and in Spark the >>>>>>>>> combination >>>>>>>>> process happened twice, duplicating data. I think it is because >>>>>>>>> different >>>>>>>>> runners manage in a different way the data without the windowing >>>>>>>>> information. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> El vie., 8 may. 2020 a las 0:45, Luke Cwik (<lc...@google.com>) >>>>>>>>> escribió: >>>>>>>>> >>>>>>>>>> +dev <dev@beam.apache.org> >>>>>>>>>> >>>>>>>>>> On Mon, May 4, 2020 at 3:56 AM Jose Manuel <kiuby88....@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi guys, >>>>>>>>>>> >>>>>>>>>>> I think I have found something interesting about windowing. >>>>>>>>>>> >>>>>>>>>>> I have a pipeline that gets data from Kafka and writes in HDFS >>>>>>>>>>> by means of TextIO. >>>>>>>>>>> Once written, generated files are combined to apply some custom >>>>>>>>>>> operations. >>>>>>>>>>> However, the combine does not receive data. Following, you can >>>>>>>>>>> find the >>>>>>>>>>> highlight of my pipeline. >>>>>>>>>>> >>>>>>>>>>> //STREAM + processing time. >>>>>>>>>>> pipeline.apply(KafkaIO.read()) >>>>>>>>>>> .apply(...) //mappers, a window and a combine >>>>>>>>>>> >>>>>>>>>>> .apply("WriteFiles", >>>>>>>>>>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites()) >>>>>>>>>>> .getPerDestinationOutputFilenames() >>>>>>>>>>> .apply("CombineFileNames", Combine.perKey(...)) >>>>>>>>>>> >>>>>>>>>>> Running the pipeline with Flink I have found a log trace that >>>>>>>>>>> says data are >>>>>>>>>>> discarded as late in the combine CombineFileNames. Then, I have >>>>>>>>>>> added >>>>>>>>>>> AllowedLateness to pipeline window strategy, as a workaround. >>>>>>>>>>> It works by now, but this opens several questions to me >>>>>>>>>>> >>>>>>>>>>> I think the problem is getPerDestinationOutputFilenames >>>>>>>>>>> generates files, but >>>>>>>>>>> it does not maintain the windowing information. Then, >>>>>>>>>>> CombineFileNames compares >>>>>>>>>>> file names with the watermark of the pipeline and discards them >>>>>>>>>>> as late. >>>>>>>>>>> >>>>>>>>>>> Is there any issue with getPerDestinationOutputFilenames? >>>>>>>>>>> Maybe, I am doing something wrong >>>>>>>>>>> and using getPerDestinationOutputFilenames + combine does not >>>>>>>>>>> make sense. >>>>>>>>>>> What do you think? >>>>>>>>>>> >>>>>>>>>>> Please, note I am using Beam 2.17.0 with Flink 1.9.1. >>>>>>>>>>> >>>>>>>>>>> Many thanks, >>>>>>>>>>> Jose >>>>>>>>>>> >>>>>>>>>>>