On Tue, May 19, 2020 at 2:02 AM Maximilian Michels <m...@apache.org> wrote:
> > This is still confusing to me - why would the messages be dropped as > late in this case? > > Since you previously mentioned that the bug is due to the pane info > missing, I just pointed out that the WriteFiles logic is expected to > drop the pane info. > I mentioned the other issue, where it appeared that Reshuffle in flink drops pane info (which seems wrong to me). However the original issue here is elements dropped due to lateness. > > @Jose Would it make sense to file a JIRA and summarize all the findings > here? > > @Jozef What you describe in > https://www.mail-archive.com/dev@beam.apache.org/msg20186.html is > expected because Flink does not do a GroupByKey on Reshuffle but just > redistributes the elements. > > Thanks, > Max > > On 18.05.20 21:59, Jose Manuel wrote: > > Hi Reuven, > > > > I can try to explaining what I guess. > > > > - There is a source which is reading data entries and updating the > > watermark. > > - Then, data entries are grouped and stored in files. > > - The window information of these data entries are used to emit > > filenames. Data entries's window and timestamp. PaneInfo is empty. > > - When a second window is applied to filenames, if allowlateness is zero > > of lower than the spent time in the previous reading/writing, the > > filenames are discarded as late. > > > > I guess, the key is in > > > https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L168 > > > > My assumption is global watermark (or source watermark, I am not sure > > about the name) is used to evaluate the filenames, what are in an > > already emitted window. > > > > Thanks > > Jose > > > > > > El lun., 18 may. 2020 a las 18:37, Reuven Lax (<re...@google.com > > <mailto:re...@google.com>>) escribió: > > > > This is still confusing to me - why would the messages be dropped as > > late in this case? > > > > On Mon, May 18, 2020 at 6:14 AM Maximilian Michels <m...@apache.org > > <mailto:m...@apache.org>> wrote: > > > > All runners which use the Beam reference implementation drop the > > PaneInfo for > > WriteFilesResult#getPerDestinationOutputFilenames(). That's > > why we can observe this behavior not only in Flink but also > Spark. > > > > The WriteFilesResult is returned here: > > > https://github.com/apache/beam/blob/d773f8ca7a4d63d01472b5eaef8b67157d60f40e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L363 > > > > GatherBundlesPerWindow will discard the pane information because > all > > buffered elements are emitted in the FinishBundle method which > > always > > has a NO_FIRING (unknown) pane info: > > > https://github.com/apache/beam/blob/d773f8ca7a4d63d01472b5eaef8b67157d60f40e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L895 > > > > So this seems expected behavior. We would need to preserve the > > panes in > > the Multimap buffer. > > > > -Max > > > > On 15.05.20 18:34, Reuven Lax wrote: > > > Lateness should never be introduced inside a pipeline - > > generally late > > > data can only come from a source. If data was not dropped as > late > > > earlier in the pipeline, it should not be dropped after the > > file write. > > > I suspect that this is a bug in how the Flink runner handles > the > > > Reshuffle transform, but I'm not sure what the exact bug is. > > > > > > Reuven > > > > > > On Fri, May 15, 2020 at 2:23 AM Jozef Vilcek > > <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com> > > > <mailto:jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>>> > > wrote: > > > > > > Hi Jose, > > > > > > thank you for putting the effort to get example which > > > demonstrate your problem. > > > > > > You are using a streaming pipeline and it seems that > > watermark in > > > downstream already advanced further, so when your File > > pane arrives, > > > it is already late. Since you define that lateness is not > > tolerated, > > > it is dropped. > > > I myself never had requirement to specify zero allowed > > lateness for > > > streaming. It feels dangerous. Do you have a specific use > > case? > > > Also, in may cases, after windowed files are written, I > > usually > > > collect them into global window and specify a different > > triggering > > > policy for collecting them. Both cases are why I never > > came across > > > this situation. > > > > > > I do not have an explanation if it is a bug or not. I > > would guess > > > that watermark can advance further, e.g. because elements > > can be > > > processed in arbitrary order. Not saying this is the case. > > > It needs someone with better understanding of how > > watermark advance > > > is / should be handled within pipelines. > > > > > > > > > P.S.: you can add `.withTimestampFn()` to your generate > > sequence, to > > > get more stable timing, which is also easier to reason > about: > > > > > > Dropping element at 1970-01-01T00:00:19.999Z for key > > > > > ... window:[1970-01-01T00:00:15.000Z..1970-01-01T00:00:20.000Z) > > > since too far behind > inputWatermark:1970-01-01T00:00:24.000Z; > > > outputWatermark:1970-01-01T00:00:24 > > > .000Z > > > > > > instead of > > > > > > Dropping element at 2020-05-15T08:52:34.999Z for key ... > > > > > window:[2020-05-15T08:52:30.000Z..2020-05-15T08:52:35.000Z) > since > > > too far behind inputWatermark:2020-05-15T08:52:39.318Z; > > > outputWatermark:2020-05-15T08:52:39.318Z > > > > > > > > > > > > > > > In my > > > > > > > > > > > > On Thu, May 14, 2020 at 10:47 AM Jose Manuel > > <kiuby88....@gmail.com <mailto:kiuby88....@gmail.com> > > > <mailto:kiuby88....@gmail.com > > <mailto:kiuby88....@gmail.com>>> wrote: > > > > > > Hi again, > > > > > > I have simplify the example to reproduce the data > > loss. The > > > scenario is the following: > > > > > > - TextIO write files. > > > - getPerDestinationOutputFilenames emits file names > > > - File names are processed by a aggregator (combine, > > distinct, > > > groupbyKey...) with a window **without allowlateness** > > > - File names are discarded as late > > > > > > Here you can see the data loss in the picture > > > > > in > https://github.com/kiuby88/windowing-textio/blob/master/README.md#showing-data-loss > > > > > > Please, follow README to run the pipeline and find log > > traces > > > that say data are dropped as late. > > > Remember, you can run the pipeline with another > > > window's lateness values (check README.md) > > > > > > Kby. > > > > > > El mar., 12 may. 2020 a las 17:16, Jose Manuel > > > (<kiuby88....@gmail.com <mailto:kiuby88....@gmail.com> > > <mailto:kiuby88....@gmail.com <mailto:kiuby88....@gmail.com>>>) > > escribió: > > > > > > Hi, > > > > > > I would like to clarify that while TextIO is > > writing every > > > data are in the files (shards). The losing happens > > when file > > > names emitted by getPerDestinationOutputFilenames > are > > > processed by a window. > > > > > > I have created a pipeline to reproduce the > > scenario in which > > > some filenames are loss after the > > > getPerDestinationOutputFilenames. Please, note I > > tried to > > > simplify the code as much as possible, but the > > scenario is > > > not easy to reproduce. > > > > > > Please check this project > > > https://github.com/kiuby88/windowing-textio > > > Check readme to build and run > > > > > (https://github.com/kiuby88/windowing-textio#build-and-run) > > > Project contains only a class with the > > > pipeline PipelineWithTextIo, a log4j2.xml file in > the > > > resources and the pom. > > > > > > The pipeline in PipelineWithTextIo generates > > unbounded data > > > using a sequence. It adds a little delay (10s) per > > data > > > entry, it uses a distinct (just to apply the > > window), and > > > then it writes data using TexIO. > > > The windows for the distinct is fixed (5 seconds) > > and it > > > does not use lateness. > > > Generated files can be found in > > > windowing-textio/pipe_with_lateness_0s/files. To > > write files > > > the FileNamePolicy uses window + timing + shards > > > > > (see > https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L135 > ) > > > Files are emitted using > > getPerDestinationOutputFilenames() > > > (see the code here, > > > > > > https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L71-L78 > ) > > > > > > Then, File names in the PCollection are extracted > and > > > logged. Please, note file names dot not have pain > > > information in that point. > > > > > > To apply a window a distinct is used again. Here > > several > > > files are discarded as late and they are not > > processed by > > > this second distinct. Please, see > > > > > > https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L80-L83 > > > > > > Debug is enabled for WindowTracing, so you can > > find in the > > > terminal several messages as the followiing: > > > DEBUG org.apache.beam.sdk.util.WindowTracing - > > > LateDataFilter: Dropping element at > > 2020-05-12T14:05:14.999Z > > > for > > > > > > > key:path/pipe_with_lateness_0s/files/[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z)-ON_TIME-0-of-1.txt; > > > > > window:[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z) > > > since too far behind > > > inputWatermark:2020-05-12T14:05:19.799Z; > > > outputWatermark:2020-05-12T14:05:19.799Z` > > > > > > What happen here? I think that messages are > > generated per > > > second and a window of 5 seconds group them. Then > > a delay is > > > added and finally data are written in a file. > > > The pipeline reads more data, increasing the > > watermark. > > > Then, file names are emitted without pane > > information (see > > > "Emitted File" in logs). Window in second distinct > > compares > > > file names' timestamp and the pipeline watermark > > and then it > > > discards file names as late. > > > > > > > > > Bonus > > > ----- > > > You can add a lateness to the pipeline. See > > > > > > https://github.com/kiuby88/windowing-textio/blob/master/README.md#run-with-lateness > > > > > > If a minute is added a lateness for window the > > file names > > > are processed as late. As result the traces of > > > LateDataFilter disappear. > > > > > > Moreover, in order to illustrate better that file > > names are > > > emitted as late for the second discarded I added a > > second > > > TextIO to write file names in other files. > > > Same FileNamePolicy than before was used (window + > > timing + > > > shards). Then, you can find files that contains > > the original > > > filenames in > > > > > windowing-textio/pipe_with_lateness_60s/files-after-distinct. > This > > > is the interesting part, because you will find > > several files > > > with LATE in their names. > > > > > > Please, let me know if you need more information > > or if the > > > example is not enough to check the expected > scenarios. > > > > > > Kby. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > El dom., 10 may. 2020 a las 17:04, Reuven Lax > > > (<re...@google.com <mailto:re...@google.com> > > <mailto:re...@google.com <mailto:re...@google.com>>>) escribió: > > > > > > Pane info is supposed to be preserved across > > transforms. > > > If the Fink runner does not, than I believe > > that is a bug. > > > > > > On Sat, May 9, 2020 at 11:22 PM Jozef Vilcek > > > <jozo.vil...@gmail.com > > <mailto:jozo.vil...@gmail.com> <mailto:jozo.vil...@gmail.com > > <mailto:jozo.vil...@gmail.com>>> > > > wrote: > > > > > > I am using FileIO and I do observe the > > drop of pane > > > info information on Flink runner too. It > was > > > mentioned in this thread: > > > > > https://www.mail-archive.com/dev@beam.apache.org/msg20186.html > > > > > > It is a result of different reshuffle > > expansion for > > > optimisation reasons. However, I did not > > observe a > > > data loss in my case. Windowing and > > watermark info > > > should be preserved. Pane info is not, > > which brings > > > a question how reliable pane info should > > be in terms > > > of SDK and runner. > > > > > > If you do observe a data loss, it would be > > great to > > > share a test case which replicates the > > problem. > > > > > > On Sun, May 10, 2020 at 8:03 AM Reuven Lax > > > <re...@google.com > > <mailto:re...@google.com> <mailto:re...@google.com > > <mailto:re...@google.com>>> wrote: > > > > > > Ah, I think I see the problem. > > > > > > It appears that for some reason, the > Flink > > > runner loses windowing information > when a > > > Reshuffle is applied. I'm not > > entirely sure why, > > > because windowing information should be > > > maintained across a Reshuffle. > > > > > > Reuven > > > > > > On Sat, May 9, 2020 at 9:50 AM Jose > Manuel > > > <kiuby88....@gmail.com > > <mailto:kiuby88....@gmail.com> > > > <mailto:kiuby88....@gmail.com > > <mailto:kiuby88....@gmail.com>>> wrote: > > > > > > > > > Hi, > > > > > > I have added some logs to the > > pipeline as > > > following (you can find the log > > function in > > > the Appendix): > > > > > > //STREAM + processing time. > > > pipeline.apply(KafkaIO.read()) > > > .apply(...) //mappers, a > > window and a > > > combine > > > .apply(logBeforeWrite()) > > > > > > .apply("WriteFiles", > > > > > > > TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites()) > > > > > .getPerDestinationOutputFilenames() > > > > > > .apply(logAfterWrite()) > > > .apply("CombineFileNames", > > > Combine.perKey(...)) > > > > > > I have run the pipeline using > > DirectRunner > > > (local), SparkRunner and > > FlinkRunner, both > > > of them using a cluster. > > > Below you can see the timing and > pane > > > information before/after (you can > > see traces > > > in detail with window and timestamp > > > information in the Appendix). > > > > > > DirectRunner: > > > [Before Write] timing=ON_TIME, > > > pane=PaneInfo{isFirst=true, > > isLast=true, > > > timing=ON_TIME, index=0, > > onTimeIndex=0} > > > [After Write] timing=EARLY, > > > pane=PaneInfo{isFirst=true, > > timing=EARLY, > > > index=0} > > > > > > > FlinkRunner: > > > [Before Write] timing=ON_TIME, > > > pane=PaneInfo{isFirst=true, > > isLast=true, > > > timing=ON_TIME, index=0, > > onTimeIndex=0} > > > [After Write] timing=UNKNOWN, > > > pane=PaneInfo.NO_FIRING > > > > > > SparkRunner: > > > [Before Write] timing=ON_TIME, > > > pane=PaneInfo{isFirst=true, > > isLast=true, > > > timing=ON_TIME, index=0, > > onTimeIndex=0} > > > [After Write] timing=UNKNOWN, > > > pane=PaneInfo.NO_FIRING > > > > > > It seems DirectRunner propagates > the > > > windowing information as expected. > > > I am not sure if TextIO really > > propagates or > > > it just emits a window pane, > > because the > > > timing before TextIO is ON_TIME > > and after > > > TextIO is EARLY. > > > In any case using FlinkRunner and > > > SparkRunner the timing and the > > pane are not set. > > > > > > I thought the problem was in > > > GatherBundlesPerWindowFn, but now, > > after > > > seeing that the DirectRunner filled > > > windowing data... I am not sure. > > > > > > https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406 > > > > > > > > > Appendix > > > ----------- > > > Here you can see the log function > > and traces > > > for different runners in detail. > > > > > > private SingleOutput<String, > String> > > > logBefore() { > > > return ParDo.of(new > > DoFn<String, String>() { > > > @ProcessElement > > > public void > > > processElement(ProcessContext > context, > > > BoundedWindow boundedWindow) { > > > String value = > > context.element(); > > > log.info < > http://log.info> > > > <http://log.info>("[Before Write] > > > Element=data window={}, > timestamp={}, > > > timing={}, index ={}, isFirst ={}, > > > isLast={}, pane={}", > > > boundedWindow, > > > > > context.timestamp(), > > > > > context.pane().getTiming(), > > > > > context.pane().getIndex(), > > > > > context.pane().isFirst(), > > > > > context.pane().isLast(), > > > context.pane() > > > ); > > > > > context.output(context.element()); > > > } > > > }); > > > } > > > > > > logAfter function shows the same > > information. > > > > > > Traces in details. > > > > > > DirectRunner (local): > > > [Before Write] Element=data > > > > > window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z), > > > timestamp=2020-05-09T13:39:59.999Z, > > > timing=ON_TIME, index =0, isFirst > > =true, > > > isLast=true > > pane=PaneInfo{isFirst=true, > > > isLast=true, timing=ON_TIME, > index=0, > > > onTimeIndex=0} > > > [After Write] Element=file > > > > > window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z), > > > timestamp=2020-05-09T13:39:59.999Z, > > > timing=EARLY, index =0, isFirst > > =true, > > > isLast=false > > pane=PaneInfo{isFirst=true, > > > timing=EARLY, index=0} > > > > > > > > > > > FlinkRunner (cluster): > > > [Before Write] Element=data > > > > > window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z), > > > timestamp=2020-05-09T15:13:59.999Z, > > > timing=ON_TIME, index =0, isFirst > > =true, > > > isLast=true > > pane=PaneInfo{isFirst=true, > > > isLast=true, timing=ON_TIME, > index=0, > > > onTimeIndex=0} > > > [After Write] Element=file > > > > > window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z), > > > timestamp=2020-05-09T15:13:59.999Z, > > > timing=UNKNOWN, index =0, isFirst > > =true, > > > isLast=true > pane=PaneInfo.NO_FIRING > > > > > > SparkRunner (cluster): > > > [Before Write] Element=data > > > > > window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z), > > > timestamp=2020-05-09T15:34:59.999Z, > > > timing=ON_TIME, index =0, isFirst > > =true, > > > isLast=true > > pane=PaneInfo{isFirst=true, > > > isLast=true, timing=ON_TIME, > index=0, > > > onTimeIndex=0} > > > [After Write] Element=file > > > > > window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z), > > > timestamp=2020-05-09T15:34:59.999Z, > > > timing=UNKNOWN, index =0, isFirst > > =true, > > > isLast=true > pane=PaneInfo.NO_FIRING > > > > > > > > > El vie., 8 may. 2020 a las 19:01, > > Reuven Lax > > > (<re...@google.com > > <mailto:re...@google.com> > > > <mailto:re...@google.com > > <mailto:re...@google.com>>>) escribió: > > > > > > The window information should > > still be > > > there. Beam propagates > > windows through > > > PCollection, and I don't think > > > WriteFiles does anything > > explicit to > > > stop that. > > > > > > Can you try this with the > > direct runner > > > to see what happens there? > > > What is your windowing on this > > PCollection? > > > > > > Reuven > > > > > > On Fri, May 8, 2020 at 3:49 AM > > Jose > > > Manuel <kiuby88....@gmail.com > > <mailto:kiuby88....@gmail.com> > > > <mailto:kiuby88....@gmail.com > > <mailto:kiuby88....@gmail.com>>> wrote: > > > > > > I got the same behavior > > using Spark > > > Runner (with Spark 2.4.3), > > window > > > information was missing. > > > > > > Just to clarify, the > > combiner after > > > TextIO had different > > results. In > > > Flink runner the files > > names were > > > dropped, and in Spark the > > > combination process > > happened twice, > > > duplicating data. I think > > it is > > > because different runners > > manage in > > > a different way the data > > without the > > > windowing information. > > > > > > > > > > > > El vie., 8 may. 2020 a las > > 0:45, > > > �� Luke Cwik > > (<lc...@google.com <mailto:lc...@google.com> > > > <mailto:lc...@google.com > > <mailto:lc...@google.com>>>) escribió: > > > > > > +dev > > <mailto:d...@beam.apache.org <mailto:d...@beam.apache.org>> > > > > > > On Mon, May 4, 2020 at > > 3:56 AM > > > Jose Manuel > > > <kiuby88....@gmail.com > > <mailto:kiuby88....@gmail.com> > > > > > <mailto:kiuby88....@gmail.com <mailto:kiuby88....@gmail.com>>> > > > wrote: > > > > > > Hi guys, > > > > > > I think I have > found > > > something > > interesting about > > > windowing. > > > > > > I have a pipeline > > that gets > > > data from Kafka > > and writes > > > in HDFS by means > > of TextIO. > > > Once written, > > generated > > > files are combined > > to apply > > > some custom > > operations. > > > However, the > > combine does > > > not receive data. > > Following, > > > you can find the > > > highlight of my > > pipeline. > > > > > > //STREAM + > > processing time. > > > > > pipeline.apply(KafkaIO.read()) > > > .apply(...) > > > //mappers, a > > window and a > > > combine > > > > > > > > .apply("WriteFiles", > > > > > > > TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites()) > > > > > > > > .getPerDestinationOutputFilenames() > > > > > > > > .apply("CombineFileNames", > > > > Combine.perKey(...)) > > > > > > Running the > > pipeline with > > > Flink I have found > > a log > > > trace that says > > data are > > > discarded as late > > in the > > > combine > > CombineFileNames. > > > Then, I have added > > > AllowedLateness to > > pipeline > > > window strategy, > as a > > > workaround. > > > It works by now, > > but this > > > opens several > > questions to me > > > > > > I think the > problem is > > > > > getPerDestinationOutputFilenames > > > generates files, > but > > > it does not > > maintain the > > > windowing > > information. Then, > > > CombineFileNames > > compares > > > file names with the > > > watermark of the > > pipeline > > > and discards them > > as late. > > > > > > Is there any issue > > with > > > > > getPerDestinationOutputFilenames? > > > Maybe, I am doing > > something > > > wrong > > > and using > > > > > getPerDestinationOutputFilenames > > > + combine does not > > make sense. > > > What do you think? > > > > > > Please, note I am > > using Beam > > > 2.17.0 with Flink > > 1.9.1. > > > > > > Many thanks, > > > Jose > > > > > >