All runners which use the Beam reference implementation drop the
PaneInfo for WriteFilesResult#getPerDestinationOutputFilenames(). That's
why we can observe this behavior not only in Flink but also Spark.

The WriteFilesResult is returned here:
https://github.com/apache/beam/blob/d773f8ca7a4d63d01472b5eaef8b67157d60f40e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L363

GatherBundlesPerWindow will discard the pane information because all
buffered elements are emitted in the FinishBundle method which always
has a NO_FIRING (unknown) pane info:
https://github.com/apache/beam/blob/d773f8ca7a4d63d01472b5eaef8b67157d60f40e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L895

So this seems expected behavior. We would need to preserve the panes in
the Multimap buffer.

-Max

On 15.05.20 18:34, Reuven Lax wrote:
> Lateness should never be introduced inside a pipeline - generally late
> data can only come from a source.  If data was not dropped as late
> earlier in the pipeline, it should not be dropped after the file write.
> I suspect that this is a bug in how the Flink runner handles the
> Reshuffle transform, but I'm not sure what the exact bug is.
> 
> Reuven
> 
> On Fri, May 15, 2020 at 2:23 AM Jozef Vilcek <jozo.vil...@gmail.com
> <mailto:jozo.vil...@gmail.com>> wrote:
> 
>     Hi Jose,
> 
>     thank you for putting the effort to get example which
>     demonstrate your problem. 
> 
>     You are using a streaming pipeline and it seems that watermark in
>     downstream already advanced further, so when your File pane arrives,
>     it is already late. Since you define that lateness is not tolerated,
>     it is dropped.
>     I myself never had requirement to specify zero allowed lateness for
>     streaming. It feels dangerous. Do you have a specific use case?
>     Also, in may cases, after windowed files are written, I usually
>     collect them into global window and specify a different triggering
>     policy for collecting them. Both cases are why I never came across
>     this situation.
> 
>     I do not have an explanation if it is a bug or not. I would guess
>     that watermark can advance further, e.g. because elements can be
>     processed in arbitrary order. Not saying this is the case.
>     It needs someone with better understanding of how watermark advance
>     is / should be handled within pipelines. 
> 
> 
>     P.S.: you can add `.withTimestampFn()` to your generate sequence, to
>     get more stable timing, which is also easier to reason about:
> 
>     Dropping element at 1970-01-01T00:00:19.999Z for key
>     ... window:[1970-01-01T00:00:15.000Z..1970-01-01T00:00:20.000Z)
>     since too far behind inputWatermark:1970-01-01T00:00:24.000Z;
>     outputWatermark:1970-01-01T00:00:24
>     .000Z
> 
>                instead of
> 
>     Dropping element at 2020-05-15T08:52:34.999Z for key ...
>     window:[2020-05-15T08:52:30.000Z..2020-05-15T08:52:35.000Z) since
>     too far behind inputWatermark:2020-05-15T08:52:39.318Z;
>     outputWatermark:2020-05-15T08:52:39.318Z
> 
> 
> 
> 
>     In my
> 
> 
> 
>     On Thu, May 14, 2020 at 10:47 AM Jose Manuel <kiuby88....@gmail.com
>     <mailto:kiuby88....@gmail.com>> wrote:
> 
>         Hi again, 
> 
>         I have simplify the example to reproduce the data loss. The
>         scenario is the following:
> 
>         - TextIO write files. 
>         - getPerDestinationOutputFilenames emits file names 
>         - File names are processed by a aggregator (combine, distinct,
>         groupbyKey...) with a window **without allowlateness** 
>         - File names are discarded as late
> 
>         Here you can see the data loss in the picture
>         in 
> https://github.com/kiuby88/windowing-textio/blob/master/README.md#showing-data-loss
> 
>         Please, follow README to run the pipeline and find log traces
>         that say data are dropped as late.
>         Remember, you can run the pipeline with another
>         window's  lateness values (check README.md)
> 
>         Kby.
> 
>         El mar., 12 may. 2020 a las 17:16, Jose Manuel
>         (<kiuby88....@gmail.com <mailto:kiuby88....@gmail.com>>) escribió:
> 
>             Hi,
> 
>             I would like to clarify that while TextIO is writing every
>             data are in the files (shards). The losing happens when file
>             names emitted by getPerDestinationOutputFilenames are
>             processed by a window.
> 
>             I have created a pipeline to reproduce the scenario in which
>             some filenames are loss after the
>             getPerDestinationOutputFilenames. Please, note I tried to
>             simplify the code as much as possible, but the scenario is
>             not easy to reproduce.
> 
>             Please check this project
>             https://github.com/kiuby88/windowing-textio
>             Check readme to build and run
>             (https://github.com/kiuby88/windowing-textio#build-and-run)
>             Project contains only a class with the
>             pipeline PipelineWithTextIo, a log4j2.xml file in the
>             resources and the pom.
> 
>             The pipeline in PipelineWithTextIo generates unbounded data
>             using a sequence. It adds a little delay (10s) per data
>             entry, it uses a distinct (just to apply the window), and
>             then it writes data using TexIO.
>             The windows for the distinct is fixed (5 seconds) and it
>             does not use lateness.
>             Generated files can be found in
>             windowing-textio/pipe_with_lateness_0s/files. To write files
>             the FileNamePolicy uses window + timing + shards
>             (see 
> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L135)
>             Files are emitted using getPerDestinationOutputFilenames()
>             (see the code here,
>             
> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L71-L78)
> 
>             Then, File names in the PCollection are extracted and
>             logged. Please, note file names dot not have pain
>             information in that point.
> 
>             To apply a window a distinct is used again. Here several
>             files are discarded as late and they are not processed by
>             this second distinct. Please, see
>             
> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L80-L83
> 
>             Debug is enabled for WindowTracing, so you can find in the
>             terminal several messages as the followiing:
>             DEBUG org.apache.beam.sdk.util.WindowTracing -
>             LateDataFilter: Dropping element at 2020-05-12T14:05:14.999Z
>             for
>             
> key:path/pipe_with_lateness_0s/files/[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z)-ON_TIME-0-of-1.txt;
>             window:[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z)
>             since too far behind
>             inputWatermark:2020-05-12T14:05:19.799Z;
>             outputWatermark:2020-05-12T14:05:19.799Z`
> 
>             What happen here? I think that messages are generated per
>             second and a window of 5 seconds group them. Then a delay is
>             added and finally data are written in a file.
>             The pipeline reads more data, increasing the watermark.
>             Then, file names are emitted without pane information (see
>             "Emitted File" in logs). Window in second distinct compares
>             file names' timestamp and the pipeline watermark and then it
>             discards file names as late.
> 
> 
>             Bonus
>             -----
>             You can add a lateness to the pipeline. See
>             
> https://github.com/kiuby88/windowing-textio/blob/master/README.md#run-with-lateness
> 
>             If a minute is added a lateness for window the file names
>             are processed as late. As result the traces of
>             LateDataFilter disappear.
> 
>             Moreover, in order to illustrate better that file names are
>             emitted as late for the second discarded I added a second
>             TextIO to write file names in other files.
>             Same FileNamePolicy than before was used (window + timing +
>             shards). Then, you can find files that contains the original
>             filenames in
>             windowing-textio/pipe_with_lateness_60s/files-after-distinct. This
>             is the interesting part, because you will find several files
>             with LATE in their names.
> 
>             Please, let me know if you need more information or if the
>             example is not enough to check the expected scenarios.
> 
>             Kby.
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>             El dom., 10 may. 2020 a las 17:04, Reuven Lax
>             (<re...@google.com <mailto:re...@google.com>>) escribió:
> 
>                 Pane info is supposed to be preserved across transforms.
>                 If the Fink runner does not, than I believe that is a bug.
> 
>                 On Sat, May 9, 2020 at 11:22 PM Jozef Vilcek
>                 <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>>
>                 wrote:
> 
>                     I am using FileIO and I do observe the drop of pane
>                     info information on Flink runner too. It was
>                     mentioned in this thread:
>                     
> https://www.mail-archive.com/dev@beam.apache.org/msg20186.html
> 
>                     It is a result of different reshuffle expansion for
>                     optimisation reasons. However, I did not observe a
>                     data loss in my case. Windowing and watermark info
>                     should be preserved. Pane info is not, which brings
>                     a question how reliable pane info should be in terms
>                     of SDK and runner.
> 
>                     If you do observe a data loss, it would be great to
>                     share a test case which replicates the problem.
> 
>                     On Sun, May 10, 2020 at 8:03 AM Reuven Lax
>                     <re...@google.com <mailto:re...@google.com>> wrote:
> 
>                         Ah, I think I see the problem.
> 
>                         It appears that for some reason, the Flink
>                         runner loses windowing information when a
>                         Reshuffle is applied. I'm not entirely sure why,
>                         because windowing information should be
>                         maintained across a Reshuffle.
> 
>                         Reuven
> 
>                         On Sat, May 9, 2020 at 9:50 AM Jose Manuel
>                         <kiuby88....@gmail.com
>                         <mailto:kiuby88....@gmail.com>> wrote:
> 
> 
>                             Hi,
> 
>                             I have added some logs to the pipeline as
>                             following (you can find the log function in
>                             the Appendix):
> 
>                             //STREAM + processing time.
>                             pipeline.apply(KafkaIO.read())
>                                    .apply(...) //mappers, a window and a
>                             combine
>                                    .apply(logBeforeWrite())
> 
>                                    .apply("WriteFiles",
>                             
> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
>                                    .getPerDestinationOutputFilenames()
> 
>                                    .apply(logAfterWrite())
>                                    .apply("CombineFileNames",
>                             Combine.perKey(...))
> 
>                             I have run the pipeline using DirectRunner
>                             (local), SparkRunner and FlinkRunner, both
>                             of them using a cluster.
>                             Below you can see the timing and pane
>                             information before/after (you can see traces
>                             in detail with window and timestamp
>                             information in the Appendix).
> 
>                             DirectRunner:
>                             [Before Write] timing=ON_TIME, 
>                             pane=PaneInfo{isFirst=true, isLast=true,
>                             timing=ON_TIME, index=0, onTimeIndex=0}
>                             [After    Write] timing=EARLY,     
>                              pane=PaneInfo{isFirst=true, timing=EARLY,
>                             index=0}                              
> 
>                             FlinkRunner:
>                             [Before Write] timing=ON_TIME,   
>                             pane=PaneInfo{isFirst=true, isLast=true,
>                             timing=ON_TIME, index=0, onTimeIndex=0}
>                             [After    Write] timing=UNKNOWN,
>                             pane=PaneInfo.NO_FIRING
> 
>                             SparkRunner:
>                             [Before Write] timing=ON_TIME,   
>                             pane=PaneInfo{isFirst=true, isLast=true,
>                             timing=ON_TIME, index=0, onTimeIndex=0}
>                             [After    Write] timing=UNKNOWN,
>                             pane=PaneInfo.NO_FIRING
> 
>                             It seems DirectRunner propagates the
>                             windowing information as expected.
>                             I am not sure if TextIO really propagates or
>                             it just emits a window pane, because the
>                             timing before TextIO is ON_TIME and after
>                             TextIO is EARLY.
>                             In any case using FlinkRunner and
>                             SparkRunner the timing and the pane are not set.
> 
>                             I thought the problem was in
>                             GatherBundlesPerWindowFn, but now, after
>                             seeing that the DirectRunner filled
>                             windowing data... I am not sure.
>                             
> https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406
> 
> 
>                             Appendix
>                             -----------
>                             Here you can see the log function and traces
>                             for different runners in detail.
> 
>                             private SingleOutput<String, String>
>                             logBefore() {
>                                 return ParDo.of(new DoFn<String, String>() {
>                                     @ProcessElement
>                                     public void
>                             processElement(ProcessContext context,
>                             BoundedWindow boundedWindow) {
>                                         String value = context.element();
>                                         log.info
>                             <http://log.info>("[Before Write]
>                             Element=data window={}, timestamp={},
>                             timing={}, index ={}, isFirst ={},
>                             isLast={}, pane={}",
>                                                 boundedWindow,
>                                                 context.timestamp(),
>                                                 context.pane().getTiming(),
>                                                 context.pane().getIndex(),
>                                                 context.pane().isFirst(),
>                                                 context.pane().isLast(),
>                                                 context.pane()
>                                         );
>                                         context.output(context.element());
>                                     }
>                                 });
>                             }
> 
>                             logAfter function shows the same information.
> 
>                             Traces in details.
> 
>                             DirectRunner (local):
>                             [Before Write] Element=data
>                             
> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
>                             timestamp=2020-05-09T13:39:59.999Z,
>                             timing=ON_TIME, index =0, isFirst =true,
>                             isLast=true  pane=PaneInfo{isFirst=true,
>                             isLast=true, timing=ON_TIME, index=0,
>                             onTimeIndex=0}
>                             [After  Write] Element=file
>                             
> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
>                             timestamp=2020-05-09T13:39:59.999Z,
>                             timing=EARLY,   index =0, isFirst =true,
>                             isLast=false pane=PaneInfo{isFirst=true,
>                             timing=EARLY, index=0}                      
>                                    
> 
>                             FlinkRunner (cluster):
>                             [Before Write] Element=data
>                             
> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z),
>                             timestamp=2020-05-09T15:13:59.999Z,
>                             timing=ON_TIME, index =0, isFirst =true,
>                             isLast=true  pane=PaneInfo{isFirst=true,
>                             isLast=true, timing=ON_TIME, index=0,
>                             onTimeIndex=0}
>                             [After  Write] Element=file
>                             
> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z),
>                             timestamp=2020-05-09T15:13:59.999Z,
>                             timing=UNKNOWN, index =0, isFirst =true,
>                             isLast=true  pane=PaneInfo.NO_FIRING
> 
>                             SparkRunner (cluster):
>                             [Before Write] Element=data
>                             
> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z),
>                             timestamp=2020-05-09T15:34:59.999Z,
>                             timing=ON_TIME, index =0, isFirst =true,
>                             isLast=true  pane=PaneInfo{isFirst=true,
>                             isLast=true, timing=ON_TIME, index=0,
>                             onTimeIndex=0}
>                             [After  Write] Element=file
>                             
> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z),
>                             timestamp=2020-05-09T15:34:59.999Z,
>                             timing=UNKNOWN, index =0, isFirst =true,
>                             isLast=true  pane=PaneInfo.NO_FIRING
> 
> 
>                             El vie., 8 may. 2020 a las 19:01, Reuven Lax
>                             (<re...@google.com
>                             <mailto:re...@google.com>>) escribió:
> 
>                                 The window information should still be
>                                 there.  Beam propagates windows through
>                                 PCollection, and I don't think
>                                 WriteFiles does anything explicit to
>                                 stop that. 
> 
>                                 Can you try this with the direct runner
>                                 to see what happens there?
>                                 What is your windowing on this PCollection?
> 
>                                 Reuven
> 
>                                 On Fri, May 8, 2020 at 3:49 AM Jose
>                                 Manuel <kiuby88....@gmail.com
>                                 <mailto:kiuby88....@gmail.com>> wrote:
> 
>                                     I got the same behavior using Spark
>                                     Runner (with Spark 2.4.3), window
>                                     information was missing.
> 
>                                     Just to clarify, the combiner after
>                                     TextIO had different results. In
>                                     Flink runner the files names were
>                                     dropped, and in Spark the
>                                     combination process happened twice,
>                                     duplicating data.  I think it is
>                                     because different runners manage in
>                                     a different way the data without the
>                                     windowing information.
> 
> 
> 
>                                     El vie., 8 may. 2020 a las 0:45,
>                                     Luke Cwik (<lc...@google.com
>                                     <mailto:lc...@google.com>>) escribió:
> 
>                                         +dev <mailto:d...@beam.apache.org> 
> 
>                                         On Mon, May 4, 2020 at 3:56 AM
>                                         Jose Manuel
>                                         <kiuby88....@gmail.com
>                                         <mailto:kiuby88....@gmail.com>>
>                                         wrote:
> 
>                                             Hi guys,
> 
>                                             I think I have found
>                                             something interesting about
>                                             windowing.
> 
>                                             I have a pipeline that gets
>                                             data from Kafka and writes
>                                             in HDFS by means of TextIO.
>                                             Once written, generated
>                                             files are combined to apply
>                                             some custom operations.
>                                             However, the combine does
>                                             not receive data. Following,
>                                             you can find the
>                                             highlight of my pipeline.
> 
>                                             //STREAM + processing time.
>                                             pipeline.apply(KafkaIO.read())
>                                                     .apply(...)
>                                             //mappers, a window and a
>                                             combine
> 
>                                                     .apply("WriteFiles",
>                                             
> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
>                                                    
>                                             
> .getPerDestinationOutputFilenames()
>                                                    
>                                             .apply("CombineFileNames",
>                                             Combine.perKey(...))
> 
>                                             Running the pipeline with
>                                             Flink I have found a log
>                                             trace that says data are
>                                             discarded as late in the
>                                             combine CombineFileNames.
>                                             Then, I have added
>                                             AllowedLateness to pipeline
>                                             window strategy, as a
>                                             workaround.
>                                             It works by now, but this
>                                             opens several questions to me
> 
>                                             I think the problem is
>                                             getPerDestinationOutputFilenames
>                                             generates files, but
>                                             it does not maintain the
>                                             windowing information. Then,
>                                             CombineFileNames compares
>                                             file names with the
>                                             watermark of the pipeline
>                                             and discards them as late.
> 
>                                             Is there any issue with
>                                             getPerDestinationOutputFilenames?
>                                             Maybe, I am doing something
>                                             wrong
>                                             and using
>                                             getPerDestinationOutputFilenames
>                                             + combine does not make sense.
>                                             What do you think?
> 
>                                             Please, note I am using Beam
>                                             2.17.0 with Flink 1.9.1.
> 
>                                             Many thanks,
>                                             Jose
> 

Reply via email to