All runners which use the Beam reference implementation drop the PaneInfo for WriteFilesResult#getPerDestinationOutputFilenames(). That's why we can observe this behavior not only in Flink but also Spark.
The WriteFilesResult is returned here: https://github.com/apache/beam/blob/d773f8ca7a4d63d01472b5eaef8b67157d60f40e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L363 GatherBundlesPerWindow will discard the pane information because all buffered elements are emitted in the FinishBundle method which always has a NO_FIRING (unknown) pane info: https://github.com/apache/beam/blob/d773f8ca7a4d63d01472b5eaef8b67157d60f40e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L895 So this seems expected behavior. We would need to preserve the panes in the Multimap buffer. -Max On 15.05.20 18:34, Reuven Lax wrote: > Lateness should never be introduced inside a pipeline - generally late > data can only come from a source. If data was not dropped as late > earlier in the pipeline, it should not be dropped after the file write. > I suspect that this is a bug in how the Flink runner handles the > Reshuffle transform, but I'm not sure what the exact bug is. > > Reuven > > On Fri, May 15, 2020 at 2:23 AM Jozef Vilcek <jozo.vil...@gmail.com > <mailto:jozo.vil...@gmail.com>> wrote: > > Hi Jose, > > thank you for putting the effort to get example which > demonstrate your problem. > > You are using a streaming pipeline and it seems that watermark in > downstream already advanced further, so when your File pane arrives, > it is already late. Since you define that lateness is not tolerated, > it is dropped. > I myself never had requirement to specify zero allowed lateness for > streaming. It feels dangerous. Do you have a specific use case? > Also, in may cases, after windowed files are written, I usually > collect them into global window and specify a different triggering > policy for collecting them. Both cases are why I never came across > this situation. > > I do not have an explanation if it is a bug or not. I would guess > that watermark can advance further, e.g. because elements can be > processed in arbitrary order. Not saying this is the case. > It needs someone with better understanding of how watermark advance > is / should be handled within pipelines. > > > P.S.: you can add `.withTimestampFn()` to your generate sequence, to > get more stable timing, which is also easier to reason about: > > Dropping element at 1970-01-01T00:00:19.999Z for key > ... window:[1970-01-01T00:00:15.000Z..1970-01-01T00:00:20.000Z) > since too far behind inputWatermark:1970-01-01T00:00:24.000Z; > outputWatermark:1970-01-01T00:00:24 > .000Z > > instead of > > Dropping element at 2020-05-15T08:52:34.999Z for key ... > window:[2020-05-15T08:52:30.000Z..2020-05-15T08:52:35.000Z) since > too far behind inputWatermark:2020-05-15T08:52:39.318Z; > outputWatermark:2020-05-15T08:52:39.318Z > > > > > In my > > > > On Thu, May 14, 2020 at 10:47 AM Jose Manuel <kiuby88....@gmail.com > <mailto:kiuby88....@gmail.com>> wrote: > > Hi again, > > I have simplify the example to reproduce the data loss. The > scenario is the following: > > - TextIO write files. > - getPerDestinationOutputFilenames emits file names > - File names are processed by a aggregator (combine, distinct, > groupbyKey...) with a window **without allowlateness** > - File names are discarded as late > > Here you can see the data loss in the picture > in > https://github.com/kiuby88/windowing-textio/blob/master/README.md#showing-data-loss > > Please, follow README to run the pipeline and find log traces > that say data are dropped as late. > Remember, you can run the pipeline with another > window's lateness values (check README.md) > > Kby. > > El mar., 12 may. 2020 a las 17:16, Jose Manuel > (<kiuby88....@gmail.com <mailto:kiuby88....@gmail.com>>) escribió: > > Hi, > > I would like to clarify that while TextIO is writing every > data are in the files (shards). The losing happens when file > names emitted by getPerDestinationOutputFilenames are > processed by a window. > > I have created a pipeline to reproduce the scenario in which > some filenames are loss after the > getPerDestinationOutputFilenames. Please, note I tried to > simplify the code as much as possible, but the scenario is > not easy to reproduce. > > Please check this project > https://github.com/kiuby88/windowing-textio > Check readme to build and run > (https://github.com/kiuby88/windowing-textio#build-and-run) > Project contains only a class with the > pipeline PipelineWithTextIo, a log4j2.xml file in the > resources and the pom. > > The pipeline in PipelineWithTextIo generates unbounded data > using a sequence. It adds a little delay (10s) per data > entry, it uses a distinct (just to apply the window), and > then it writes data using TexIO. > The windows for the distinct is fixed (5 seconds) and it > does not use lateness. > Generated files can be found in > windowing-textio/pipe_with_lateness_0s/files. To write files > the FileNamePolicy uses window + timing + shards > (see > https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L135) > Files are emitted using getPerDestinationOutputFilenames() > (see the code here, > > https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L71-L78) > > Then, File names in the PCollection are extracted and > logged. Please, note file names dot not have pain > information in that point. > > To apply a window a distinct is used again. Here several > files are discarded as late and they are not processed by > this second distinct. Please, see > > https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L80-L83 > > Debug is enabled for WindowTracing, so you can find in the > terminal several messages as the followiing: > DEBUG org.apache.beam.sdk.util.WindowTracing - > LateDataFilter: Dropping element at 2020-05-12T14:05:14.999Z > for > > key:path/pipe_with_lateness_0s/files/[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z)-ON_TIME-0-of-1.txt; > window:[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z) > since too far behind > inputWatermark:2020-05-12T14:05:19.799Z; > outputWatermark:2020-05-12T14:05:19.799Z` > > What happen here? I think that messages are generated per > second and a window of 5 seconds group them. Then a delay is > added and finally data are written in a file. > The pipeline reads more data, increasing the watermark. > Then, file names are emitted without pane information (see > "Emitted File" in logs). Window in second distinct compares > file names' timestamp and the pipeline watermark and then it > discards file names as late. > > > Bonus > ----- > You can add a lateness to the pipeline. See > > https://github.com/kiuby88/windowing-textio/blob/master/README.md#run-with-lateness > > If a minute is added a lateness for window the file names > are processed as late. As result the traces of > LateDataFilter disappear. > > Moreover, in order to illustrate better that file names are > emitted as late for the second discarded I added a second > TextIO to write file names in other files. > Same FileNamePolicy than before was used (window + timing + > shards). Then, you can find files that contains the original > filenames in > windowing-textio/pipe_with_lateness_60s/files-after-distinct. This > is the interesting part, because you will find several files > with LATE in their names. > > Please, let me know if you need more information or if the > example is not enough to check the expected scenarios. > > Kby. > > > > > > > > > > > > > > > > > > El dom., 10 may. 2020 a las 17:04, Reuven Lax > (<re...@google.com <mailto:re...@google.com>>) escribió: > > Pane info is supposed to be preserved across transforms. > If the Fink runner does not, than I believe that is a bug. > > On Sat, May 9, 2020 at 11:22 PM Jozef Vilcek > <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> > wrote: > > I am using FileIO and I do observe the drop of pane > info information on Flink runner too. It was > mentioned in this thread: > > https://www.mail-archive.com/dev@beam.apache.org/msg20186.html > > It is a result of different reshuffle expansion for > optimisation reasons. However, I did not observe a > data loss in my case. Windowing and watermark info > should be preserved. Pane info is not, which brings > a question how reliable pane info should be in terms > of SDK and runner. > > If you do observe a data loss, it would be great to > share a test case which replicates the problem. > > On Sun, May 10, 2020 at 8:03 AM Reuven Lax > <re...@google.com <mailto:re...@google.com>> wrote: > > Ah, I think I see the problem. > > It appears that for some reason, the Flink > runner loses windowing information when a > Reshuffle is applied. I'm not entirely sure why, > because windowing information should be > maintained across a Reshuffle. > > Reuven > > On Sat, May 9, 2020 at 9:50 AM Jose Manuel > <kiuby88....@gmail.com > <mailto:kiuby88....@gmail.com>> wrote: > > > Hi, > > I have added some logs to the pipeline as > following (you can find the log function in > the Appendix): > > //STREAM + processing time. > pipeline.apply(KafkaIO.read()) > .apply(...) //mappers, a window and a > combine > .apply(logBeforeWrite()) > > .apply("WriteFiles", > > TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites()) > .getPerDestinationOutputFilenames() > > .apply(logAfterWrite()) > .apply("CombineFileNames", > Combine.perKey(...)) > > I have run the pipeline using DirectRunner > (local), SparkRunner and FlinkRunner, both > of them using a cluster. > Below you can see the timing and pane > information before/after (you can see traces > in detail with window and timestamp > information in the Appendix). > > DirectRunner: > [Before Write] timing=ON_TIME, > pane=PaneInfo{isFirst=true, isLast=true, > timing=ON_TIME, index=0, onTimeIndex=0} > [After Write] timing=EARLY, > pane=PaneInfo{isFirst=true, timing=EARLY, > index=0} > > FlinkRunner: > [Before Write] timing=ON_TIME, > pane=PaneInfo{isFirst=true, isLast=true, > timing=ON_TIME, index=0, onTimeIndex=0} > [After Write] timing=UNKNOWN, > pane=PaneInfo.NO_FIRING > > SparkRunner: > [Before Write] timing=ON_TIME, > pane=PaneInfo{isFirst=true, isLast=true, > timing=ON_TIME, index=0, onTimeIndex=0} > [After Write] timing=UNKNOWN, > pane=PaneInfo.NO_FIRING > > It seems DirectRunner propagates the > windowing information as expected. > I am not sure if TextIO really propagates or > it just emits a window pane, because the > timing before TextIO is ON_TIME and after > TextIO is EARLY. > In any case using FlinkRunner and > SparkRunner the timing and the pane are not set. > > I thought the problem was in > GatherBundlesPerWindowFn, but now, after > seeing that the DirectRunner filled > windowing data... I am not sure. > > https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406 > > > Appendix > ----------- > Here you can see the log function and traces > for different runners in detail. > > private SingleOutput<String, String> > logBefore() { > return ParDo.of(new DoFn<String, String>() { > @ProcessElement > public void > processElement(ProcessContext context, > BoundedWindow boundedWindow) { > String value = context.element(); > log.info > <http://log.info>("[Before Write] > Element=data window={}, timestamp={}, > timing={}, index ={}, isFirst ={}, > isLast={}, pane={}", > boundedWindow, > context.timestamp(), > context.pane().getTiming(), > context.pane().getIndex(), > context.pane().isFirst(), > context.pane().isLast(), > context.pane() > ); > context.output(context.element()); > } > }); > } > > logAfter function shows the same information. > > Traces in details. > > DirectRunner (local): > [Before Write] Element=data > > window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z), > timestamp=2020-05-09T13:39:59.999Z, > timing=ON_TIME, index =0, isFirst =true, > isLast=true pane=PaneInfo{isFirst=true, > isLast=true, timing=ON_TIME, index=0, > onTimeIndex=0} > [After Write] Element=file > > window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z), > timestamp=2020-05-09T13:39:59.999Z, > timing=EARLY, index =0, isFirst =true, > isLast=false pane=PaneInfo{isFirst=true, > timing=EARLY, index=0} > > > FlinkRunner (cluster): > [Before Write] Element=data > > window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z), > timestamp=2020-05-09T15:13:59.999Z, > timing=ON_TIME, index =0, isFirst =true, > isLast=true pane=PaneInfo{isFirst=true, > isLast=true, timing=ON_TIME, index=0, > onTimeIndex=0} > [After Write] Element=file > > window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z), > timestamp=2020-05-09T15:13:59.999Z, > timing=UNKNOWN, index =0, isFirst =true, > isLast=true pane=PaneInfo.NO_FIRING > > SparkRunner (cluster): > [Before Write] Element=data > > window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z), > timestamp=2020-05-09T15:34:59.999Z, > timing=ON_TIME, index =0, isFirst =true, > isLast=true pane=PaneInfo{isFirst=true, > isLast=true, timing=ON_TIME, index=0, > onTimeIndex=0} > [After Write] Element=file > > window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z), > timestamp=2020-05-09T15:34:59.999Z, > timing=UNKNOWN, index =0, isFirst =true, > isLast=true pane=PaneInfo.NO_FIRING > > > El vie., 8 may. 2020 a las 19:01, Reuven Lax > (<re...@google.com > <mailto:re...@google.com>>) escribió: > > The window information should still be > there. Beam propagates windows through > PCollection, and I don't think > WriteFiles does anything explicit to > stop that. > > Can you try this with the direct runner > to see what happens there? > What is your windowing on this PCollection? > > Reuven > > On Fri, May 8, 2020 at 3:49 AM Jose > Manuel <kiuby88....@gmail.com > <mailto:kiuby88....@gmail.com>> wrote: > > I got the same behavior using Spark > Runner (with Spark 2.4.3), window > information was missing. > > Just to clarify, the combiner after > TextIO had different results. In > Flink runner the files names were > dropped, and in Spark the > combination process happened twice, > duplicating data. I think it is > because different runners manage in > a different way the data without the > windowing information. > > > > El vie., 8 may. 2020 a las 0:45, > Luke Cwik (<lc...@google.com > <mailto:lc...@google.com>>) escribió: > > +dev <mailto:d...@beam.apache.org> > > On Mon, May 4, 2020 at 3:56 AM > Jose Manuel > <kiuby88....@gmail.com > <mailto:kiuby88....@gmail.com>> > wrote: > > Hi guys, > > I think I have found > something interesting about > windowing. > > I have a pipeline that gets > data from Kafka and writes > in HDFS by means of TextIO. > Once written, generated > files are combined to apply > some custom operations. > However, the combine does > not receive data. Following, > you can find the > highlight of my pipeline. > > //STREAM + processing time. > pipeline.apply(KafkaIO.read()) > .apply(...) > //mappers, a window and a > combine > > .apply("WriteFiles", > > TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites()) > > > .getPerDestinationOutputFilenames() > > .apply("CombineFileNames", > Combine.perKey(...)) > > Running the pipeline with > Flink I have found a log > trace that says data are > discarded as late in the > combine CombineFileNames. > Then, I have added > AllowedLateness to pipeline > window strategy, as a > workaround. > It works by now, but this > opens several questions to me > > I think the problem is > getPerDestinationOutputFilenames > generates files, but > it does not maintain the > windowing information. Then, > CombineFileNames compares > file names with the > watermark of the pipeline > and discards them as late. > > Is there any issue with > getPerDestinationOutputFilenames? > Maybe, I am doing something > wrong > and using > getPerDestinationOutputFilenames > + combine does not make sense. > What do you think? > > Please, note I am using Beam > 2.17.0 with Flink 1.9.1. > > Many thanks, > Jose >