This is still confusing to me - why would the messages be dropped as late in this case?
On Mon, May 18, 2020 at 6:14 AM Maximilian Michels <m...@apache.org> wrote: > All runners which use the Beam reference implementation drop the > PaneInfo for WriteFilesResult#getPerDestinationOutputFilenames(). That's > why we can observe this behavior not only in Flink but also Spark. > > The WriteFilesResult is returned here: > > https://github.com/apache/beam/blob/d773f8ca7a4d63d01472b5eaef8b67157d60f40e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L363 > > GatherBundlesPerWindow will discard the pane information because all > buffered elements are emitted in the FinishBundle method which always > has a NO_FIRING (unknown) pane info: > > https://github.com/apache/beam/blob/d773f8ca7a4d63d01472b5eaef8b67157d60f40e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L895 > > So this seems expected behavior. We would need to preserve the panes in > the Multimap buffer. > > -Max > > On 15.05.20 18:34, Reuven Lax wrote: > > Lateness should never be introduced inside a pipeline - generally late > > data can only come from a source. If data was not dropped as late > > earlier in the pipeline, it should not be dropped after the file write. > > I suspect that this is a bug in how the Flink runner handles the > > Reshuffle transform, but I'm not sure what the exact bug is. > > > > Reuven > > > > On Fri, May 15, 2020 at 2:23 AM Jozef Vilcek <jozo.vil...@gmail.com > > <mailto:jozo.vil...@gmail.com>> wrote: > > > > Hi Jose, > > > > thank you for putting the effort to get example which > > demonstrate your problem. > > > > You are using a streaming pipeline and it seems that watermark in > > downstream already advanced further, so when your File pane arrives, > > it is already late. Since you define that lateness is not tolerated, > > it is dropped. > > I myself never had requirement to specify zero allowed lateness for > > streaming. It feels dangerous. Do you have a specific use case? > > Also, in may cases, after windowed files are written, I usually > > collect them into global window and specify a different triggering > > policy for collecting them. Both cases are why I never came across > > this situation. > > > > I do not have an explanation if it is a bug or not. I would guess > > that watermark can advance further, e.g. because elements can be > > processed in arbitrary order. Not saying this is the case. > > It needs someone with better understanding of how watermark advance > > is / should be handled within pipelines. > > > > > > P.S.: you can add `.withTimestampFn()` to your generate sequence, to > > get more stable timing, which is also easier to reason about: > > > > Dropping element at 1970-01-01T00:00:19.999Z for key > > ... window:[1970-01-01T00:00:15.000Z..1970-01-01T00:00:20.000Z) > > since too far behind inputWatermark:1970-01-01T00:00:24.000Z; > > outputWatermark:1970-01-01T00:00:24 > > .000Z > > > > instead of > > > > Dropping element at 2020-05-15T08:52:34.999Z for key ... > > window:[2020-05-15T08:52:30.000Z..2020-05-15T08:52:35.000Z) since > > too far behind inputWatermark:2020-05-15T08:52:39.318Z; > > outputWatermark:2020-05-15T08:52:39.318Z > > > > > > > > > > In my > > > > > > > > On Thu, May 14, 2020 at 10:47 AM Jose Manuel <kiuby88....@gmail.com > > <mailto:kiuby88....@gmail.com>> wrote: > > > > Hi again, > > > > I have simplify the example to reproduce the data loss. The > > scenario is the following: > > > > - TextIO write files. > > - getPerDestinationOutputFilenames emits file names > > - File names are processed by a aggregator (combine, distinct, > > groupbyKey...) with a window **without allowlateness** > > - File names are discarded as late > > > > Here you can see the data loss in the picture > > in > https://github.com/kiuby88/windowing-textio/blob/master/README.md#showing-data-loss > > > > Please, follow README to run the pipeline and find log traces > > that say data are dropped as late. > > Remember, you can run the pipeline with another > > window's lateness values (check README.md) > > > > Kby. > > > > El mar., 12 may. 2020 a las 17:16, Jose Manuel > > (<kiuby88....@gmail.com <mailto:kiuby88....@gmail.com>>) > escribió: > > > > Hi, > > > > I would like to clarify that while TextIO is writing every > > data are in the files (shards). The losing happens when file > > names emitted by getPerDestinationOutputFilenames are > > processed by a window. > > > > I have created a pipeline to reproduce the scenario in which > > some filenames are loss after the > > getPerDestinationOutputFilenames. Please, note I tried to > > simplify the code as much as possible, but the scenario is > > not easy to reproduce. > > > > Please check this project > > https://github.com/kiuby88/windowing-textio > > Check readme to build and run > > (https://github.com/kiuby88/windowing-textio#build-and-run) > > Project contains only a class with the > > pipeline PipelineWithTextIo, a log4j2.xml file in the > > resources and the pom. > > > > The pipeline in PipelineWithTextIo generates unbounded data > > using a sequence. It adds a little delay (10s) per data > > entry, it uses a distinct (just to apply the window), and > > then it writes data using TexIO. > > The windows for the distinct is fixed (5 seconds) and it > > does not use lateness. > > Generated files can be found in > > windowing-textio/pipe_with_lateness_0s/files. To write files > > the FileNamePolicy uses window + timing + shards > > (see > https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L135 > ) > > Files are emitted using getPerDestinationOutputFilenames() > > (see the code here, > > > https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L71-L78 > ) > > > > Then, File names in the PCollection are extracted and > > logged. Please, note file names dot not have pain > > information in that point. > > > > To apply a window a distinct is used again. Here several > > files are discarded as late and they are not processed by > > this second distinct. Please, see > > > https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L80-L83 > > > > Debug is enabled for WindowTracing, so you can find in the > > terminal several messages as the followiing: > > DEBUG org.apache.beam.sdk.util.WindowTracing - > > LateDataFilter: Dropping element at 2020-05-12T14:05:14.999Z > > for > > > > key:path/pipe_with_lateness_0s/files/[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z)-ON_TIME-0-of-1.txt; > > window:[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z) > > since too far behind > > inputWatermark:2020-05-12T14:05:19.799Z; > > outputWatermark:2020-05-12T14:05:19.799Z` > > > > What happen here? I think that messages are generated per > > second and a window of 5 seconds group them. Then a delay is > > added and finally data are written in a file. > > The pipeline reads more data, increasing the watermark. > > Then, file names are emitted without pane information (see > > "Emitted File" in logs). Window in second distinct compares > > file names' timestamp and the pipeline watermark and then it > > discards file names as late. > > > > > > Bonus > > ----- > > You can add a lateness to the pipeline. See > > > https://github.com/kiuby88/windowing-textio/blob/master/README.md#run-with-lateness > > > > If a minute is added a lateness for window the file names > > are processed as late. As result the traces of > > LateDataFilter disappear. > > > > Moreover, in order to illustrate better that file names are > > emitted as late for the second discarded I added a second > > TextIO to write file names in other files. > > Same FileNamePolicy than before was used (window + timing + > > shards). Then, you can find files that contains the original > > filenames in > > > windowing-textio/pipe_with_lateness_60s/files-after-distinct. This > > is the interesting part, because you will find several files > > with LATE in their names. > > > > Please, let me know if you need more information or if the > > example is not enough to check the expected scenarios. > > > > Kby. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > El dom., 10 may. 2020 a las 17:04, Reuven Lax > > (<re...@google.com <mailto:re...@google.com>>) escribió: > > > > Pane info is supposed to be preserved across transforms. > > If the Fink runner does not, than I believe that is a > bug. > > > > On Sat, May 9, 2020 at 11:22 PM Jozef Vilcek > > <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> > > wrote: > > > > I am using FileIO and I do observe the drop of pane > > info information on Flink runner too. It was > > mentioned in this thread: > > > https://www.mail-archive.com/dev@beam.apache.org/msg20186.html > > > > It is a result of different reshuffle expansion for > > optimisation reasons. However, I did not observe a > > data loss in my case. Windowing and watermark info > > should be preserved. Pane info is not, which brings > > a question how reliable pane info should be in terms > > of SDK and runner. > > > > If you do observe a data loss, it would be great to > > share a test case which replicates the problem. > > > > On Sun, May 10, 2020 at 8:03 AM Reuven Lax > > <re...@google.com <mailto:re...@google.com>> wrote: > > > > Ah, I think I see the problem. > > > > It appears that for some reason, the Flink > > runner loses windowing information when a > > Reshuffle is applied. I'm not entirely sure why, > > because windowing information should be > > maintained across a Reshuffle. > > > > Reuven > > > > On Sat, May 9, 2020 at 9:50 AM Jose Manuel > > <kiuby88....@gmail.com > > <mailto:kiuby88....@gmail.com>> wrote: > > > > > > Hi, > > > > I have added some logs to the pipeline as > > following (you can find the log function in > > the Appendix): > > > > //STREAM + processing time. > > pipeline.apply(KafkaIO.read()) > > .apply(...) //mappers, a window and a > > combine > > .apply(logBeforeWrite()) > > > > .apply("WriteFiles", > > > > TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites()) > > .getPerDestinationOutputFilenames() > > > > .apply(logAfterWrite()) > > .apply("CombineFileNames", > > Combine.perKey(...)) > > > > I have run the pipeline using DirectRunner > > (local), SparkRunner and FlinkRunner, both > > of them using a cluster. > > Below you can see the timing and pane > > information before/after (you can see traces > > in detail with window and timestamp > > information in the Appendix). > > > > DirectRunner: > > [Before Write] timing=ON_TIME, > > pane=PaneInfo{isFirst=true, isLast=true, > > timing=ON_TIME, index=0, onTimeIndex=0} > > [After Write] timing=EARLY, > > pane=PaneInfo{isFirst=true, timing=EARLY, > > index=0} > > > > FlinkRunner: > > [Before Write] timing=ON_TIME, > > pane=PaneInfo{isFirst=true, isLast=true, > > timing=ON_TIME, index=0, onTimeIndex=0} > > [After Write] timing=UNKNOWN, > > pane=PaneInfo.NO_FIRING > > > > SparkRunner: > > [Before Write] timing=ON_TIME, > > pane=PaneInfo{isFirst=true, isLast=true, > > timing=ON_TIME, index=0, onTimeIndex=0} > > [After Write] timing=UNKNOWN, > > pane=PaneInfo.NO_FIRING > > > > It seems DirectRunner propagates the > > windowing information as expected. > > I am not sure if TextIO really propagates or > > it just emits a window pane, because the > > timing before TextIO is ON_TIME and after > > TextIO is EARLY. > > In any case using FlinkRunner and > > SparkRunner the timing and the pane are not > set. > > > > I thought the problem was in > > GatherBundlesPerWindowFn, but now, after > > seeing that the DirectRunner filled > > windowing data... I am not sure. > > > https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406 > > > > > > Appendix > > ----------- > > Here you can see the log function and traces > > for different runners in detail. > > > > private SingleOutput<String, String> > > logBefore() { > > return ParDo.of(new DoFn<String, > String>() { > > @ProcessElement > > public void > > processElement(ProcessContext context, > > BoundedWindow boundedWindow) { > > String value = context.element(); > > log.info > > <http://log.info>("[Before Write] > > Element=data window={}, timestamp={}, > > timing={}, index ={}, isFirst ={}, > > isLast={}, pane={}", > > boundedWindow, > > context.timestamp(), > > > context.pane().getTiming(), > > > context.pane().getIndex(), > > context.pane().isFirst(), > > context.pane().isLast(), > > context.pane() > > ); > > > context.output(context.element()); > > } > > }); > > } > > > > logAfter function shows the same information. > > > > Traces in details. > > > > DirectRunner (local): > > [Before Write] Element=data > > > window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z), > > timestamp=2020-05-09T13:39:59.999Z, > > timing=ON_TIME, index =0, isFirst =true, > > isLast=true pane=PaneInfo{isFirst=true, > > isLast=true, timing=ON_TIME, index=0, > > onTimeIndex=0} > > [After Write] Element=file > > > window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z), > > timestamp=2020-05-09T13:39:59.999Z, > > timing=EARLY, index =0, isFirst =true, > > isLast=false pane=PaneInfo{isFirst=true, > > timing=EARLY, index=0} > > > > > > FlinkRunner (cluster): > > [Before Write] Element=data > > > window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z), > > timestamp=2020-05-09T15:13:59.999Z, > > timing=ON_TIME, index =0, isFirst =true, > > isLast=true pane=PaneInfo{isFirst=true, > > isLast=true, timing=ON_TIME, index=0, > > onTimeIndex=0} > > [After Write] Element=file > > > window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z), > > timestamp=2020-05-09T15:13:59.999Z, > > timing=UNKNOWN, index =0, isFirst =true, > > isLast=true pane=PaneInfo.NO_FIRING > > > > SparkRunner (cluster): > > [Before Write] Element=data > > > window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z), > > timestamp=2020-05-09T15:34:59.999Z, > > timing=ON_TIME, index =0, isFirst =true, > > isLast=true pane=PaneInfo{isFirst=true, > > isLast=true, timing=ON_TIME, index=0, > > onTimeIndex=0} > > [After Write] Element=file > > > window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z), > > timestamp=2020-05-09T15:34:59.999Z, > > timing=UNKNOWN, index =0, isFirst =true, > > isLast=true pane=PaneInfo.NO_FIRING > > > > > > El vie., 8 may. 2020 a las 19:01, Reuven Lax > > (<re...@google.com > > <mailto:re...@google.com>>) escribió: > > > > The window information should still be > > there. Beam propagates windows through > > PCollection, and I don't think > > WriteFiles does anything explicit to > > stop that. > > > > Can you try this with the direct runner > > to see what happens there? > > What is your windowing on this > PCollection? > > > > Reuven > > > > On Fri, May 8, 2020 at 3:49 AM Jose > > Manuel <kiuby88....@gmail.com > > <mailto:kiuby88....@gmail.com>> wrote: > > > > I got the same behavior using Spark > > Runner (with Spark 2.4.3), window > > information was missing. > > > > Just to clarify, the combiner after > > TextIO had different results. In > > Flink runner the files names were > > dropped, and in Spark the > > combination process happened twice, > > duplicating data. I think it is > > because different runners manage in > > a different way the data without the > > windowing information. > > > > > > > > El vie., 8 may. 2020 a las 0:45, > > Luke Cwik (<lc...@google.com > > <mailto:lc...@google.com>>) > escribió: > > > > +dev <mailto:dev@beam.apache.org > > > > > > On Mon, May 4, 2020 at 3:56 AM > > Jose Manuel > > <kiuby88....@gmail.com > > <mailto:kiuby88....@gmail.com>> > > wrote: > > > > Hi guys, > > > > I think I have found > > something interesting about > > windowing. > > > > I have a pipeline that gets > > data from Kafka and writes > > in HDFS by means of TextIO. > > Once written, generated > > files are combined to apply > > some custom operations. > > However, the combine does > > not receive data. Following, > > you can find the > > highlight of my pipeline. > > > > //STREAM + processing time. > > > pipeline.apply(KafkaIO.read()) > > .apply(...) > > //mappers, a window and a > > combine > > > > .apply("WriteFiles", > > > > TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites()) > > > > > .getPerDestinationOutputFilenames() > > > > .apply("CombineFileNames", > > Combine.perKey(...)) > > > > Running the pipeline with > > Flink I have found a log > > trace that says data are > > discarded as late in the > > combine CombineFileNames. > > Then, I have added > > AllowedLateness to pipeline > > window strategy, as a > > workaround. > > It works by now, but this > > opens several questions to me > > > > I think the problem is > > > getPerDestinationOutputFilenames > > generates files, but > > it does not maintain the > > windowing information. Then, > > CombineFileNames compares > > file names with the > > watermark of the pipeline > > and discards them as late. > > > > Is there any issue with > > > getPerDestinationOutputFilenames? > > Maybe, I am doing something > > wrong > > and using > > > getPerDestinationOutputFilenames > > + combine does not make > sense. > > What do you think? > > > > Please, note I am using Beam > > 2.17.0 with Flink 1.9.1. > > > > Many thanks, > > Jose > > >