This is still confusing to me - why would the messages be dropped as late
in this case?

On Mon, May 18, 2020 at 6:14 AM Maximilian Michels <m...@apache.org> wrote:

> All runners which use the Beam reference implementation drop the
> PaneInfo for WriteFilesResult#getPerDestinationOutputFilenames(). That's
> why we can observe this behavior not only in Flink but also Spark.
>
> The WriteFilesResult is returned here:
>
> https://github.com/apache/beam/blob/d773f8ca7a4d63d01472b5eaef8b67157d60f40e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L363
>
> GatherBundlesPerWindow will discard the pane information because all
> buffered elements are emitted in the FinishBundle method which always
> has a NO_FIRING (unknown) pane info:
>
> https://github.com/apache/beam/blob/d773f8ca7a4d63d01472b5eaef8b67157d60f40e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L895
>
> So this seems expected behavior. We would need to preserve the panes in
> the Multimap buffer.
>
> -Max
>
> On 15.05.20 18:34, Reuven Lax wrote:
> > Lateness should never be introduced inside a pipeline - generally late
> > data can only come from a source.  If data was not dropped as late
> > earlier in the pipeline, it should not be dropped after the file write.
> > I suspect that this is a bug in how the Flink runner handles the
> > Reshuffle transform, but I'm not sure what the exact bug is.
> >
> > Reuven
> >
> > On Fri, May 15, 2020 at 2:23 AM Jozef Vilcek <jozo.vil...@gmail.com
> > <mailto:jozo.vil...@gmail.com>> wrote:
> >
> >     Hi Jose,
> >
> >     thank you for putting the effort to get example which
> >     demonstrate your problem.
> >
> >     You are using a streaming pipeline and it seems that watermark in
> >     downstream already advanced further, so when your File pane arrives,
> >     it is already late. Since you define that lateness is not tolerated,
> >     it is dropped.
> >     I myself never had requirement to specify zero allowed lateness for
> >     streaming. It feels dangerous. Do you have a specific use case?
> >     Also, in may cases, after windowed files are written, I usually
> >     collect them into global window and specify a different triggering
> >     policy for collecting them. Both cases are why I never came across
> >     this situation.
> >
> >     I do not have an explanation if it is a bug or not. I would guess
> >     that watermark can advance further, e.g. because elements can be
> >     processed in arbitrary order. Not saying this is the case.
> >     It needs someone with better understanding of how watermark advance
> >     is / should be handled within pipelines.
> >
> >
> >     P.S.: you can add `.withTimestampFn()` to your generate sequence, to
> >     get more stable timing, which is also easier to reason about:
> >
> >     Dropping element at 1970-01-01T00:00:19.999Z for key
> >     ... window:[1970-01-01T00:00:15.000Z..1970-01-01T00:00:20.000Z)
> >     since too far behind inputWatermark:1970-01-01T00:00:24.000Z;
> >     outputWatermark:1970-01-01T00:00:24
> >     .000Z
> >
> >                instead of
> >
> >     Dropping element at 2020-05-15T08:52:34.999Z for key ...
> >     window:[2020-05-15T08:52:30.000Z..2020-05-15T08:52:35.000Z) since
> >     too far behind inputWatermark:2020-05-15T08:52:39.318Z;
> >     outputWatermark:2020-05-15T08:52:39.318Z
> >
> >
> >
> >
> >     In my
> >
> >
> >
> >     On Thu, May 14, 2020 at 10:47 AM Jose Manuel <kiuby88....@gmail.com
> >     <mailto:kiuby88....@gmail.com>> wrote:
> >
> >         Hi again,
> >
> >         I have simplify the example to reproduce the data loss. The
> >         scenario is the following:
> >
> >         - TextIO write files.
> >         - getPerDestinationOutputFilenames emits file names
> >         - File names are processed by a aggregator (combine, distinct,
> >         groupbyKey...) with a window **without allowlateness**
> >         - File names are discarded as late
> >
> >         Here you can see the data loss in the picture
> >         in
> https://github.com/kiuby88/windowing-textio/blob/master/README.md#showing-data-loss
> >
> >         Please, follow README to run the pipeline and find log traces
> >         that say data are dropped as late.
> >         Remember, you can run the pipeline with another
> >         window's  lateness values (check README.md)
> >
> >         Kby.
> >
> >         El mar., 12 may. 2020 a las 17:16, Jose Manuel
> >         (<kiuby88....@gmail.com <mailto:kiuby88....@gmail.com>>)
> escribió:
> >
> >             Hi,
> >
> >             I would like to clarify that while TextIO is writing every
> >             data are in the files (shards). The losing happens when file
> >             names emitted by getPerDestinationOutputFilenames are
> >             processed by a window.
> >
> >             I have created a pipeline to reproduce the scenario in which
> >             some filenames are loss after the
> >             getPerDestinationOutputFilenames. Please, note I tried to
> >             simplify the code as much as possible, but the scenario is
> >             not easy to reproduce.
> >
> >             Please check this project
> >             https://github.com/kiuby88/windowing-textio
> >             Check readme to build and run
> >             (https://github.com/kiuby88/windowing-textio#build-and-run)
> >             Project contains only a class with the
> >             pipeline PipelineWithTextIo, a log4j2.xml file in the
> >             resources and the pom.
> >
> >             The pipeline in PipelineWithTextIo generates unbounded data
> >             using a sequence. It adds a little delay (10s) per data
> >             entry, it uses a distinct (just to apply the window), and
> >             then it writes data using TexIO.
> >             The windows for the distinct is fixed (5 seconds) and it
> >             does not use lateness.
> >             Generated files can be found in
> >             windowing-textio/pipe_with_lateness_0s/files. To write files
> >             the FileNamePolicy uses window + timing + shards
> >             (see
> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L135
> )
> >             Files are emitted using getPerDestinationOutputFilenames()
> >             (see the code here,
> >
> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L71-L78
> )
> >
> >             Then, File names in the PCollection are extracted and
> >             logged. Please, note file names dot not have pain
> >             information in that point.
> >
> >             To apply a window a distinct is used again. Here several
> >             files are discarded as late and they are not processed by
> >             this second distinct. Please, see
> >
> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L80-L83
> >
> >             Debug is enabled for WindowTracing, so you can find in the
> >             terminal several messages as the followiing:
> >             DEBUG org.apache.beam.sdk.util.WindowTracing -
> >             LateDataFilter: Dropping element at 2020-05-12T14:05:14.999Z
> >             for
> >
>  
> key:path/pipe_with_lateness_0s/files/[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z)-ON_TIME-0-of-1.txt;
> >             window:[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z)
> >             since too far behind
> >             inputWatermark:2020-05-12T14:05:19.799Z;
> >             outputWatermark:2020-05-12T14:05:19.799Z`
> >
> >             What happen here? I think that messages are generated per
> >             second and a window of 5 seconds group them. Then a delay is
> >             added and finally data are written in a file.
> >             The pipeline reads more data, increasing the watermark.
> >             Then, file names are emitted without pane information (see
> >             "Emitted File" in logs). Window in second distinct compares
> >             file names' timestamp and the pipeline watermark and then it
> >             discards file names as late.
> >
> >
> >             Bonus
> >             -----
> >             You can add a lateness to the pipeline. See
> >
> https://github.com/kiuby88/windowing-textio/blob/master/README.md#run-with-lateness
> >
> >             If a minute is added a lateness for window the file names
> >             are processed as late. As result the traces of
> >             LateDataFilter disappear.
> >
> >             Moreover, in order to illustrate better that file names are
> >             emitted as late for the second discarded I added a second
> >             TextIO to write file names in other files.
> >             Same FileNamePolicy than before was used (window + timing +
> >             shards). Then, you can find files that contains the original
> >             filenames in
> >
>  windowing-textio/pipe_with_lateness_60s/files-after-distinct. This
> >             is the interesting part, because you will find several files
> >             with LATE in their names.
> >
> >             Please, let me know if you need more information or if the
> >             example is not enough to check the expected scenarios.
> >
> >             Kby.
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >             El dom., 10 may. 2020 a las 17:04, Reuven Lax
> >             (<re...@google.com <mailto:re...@google.com>>) escribió:
> >
> >                 Pane info is supposed to be preserved across transforms.
> >                 If the Fink runner does not, than I believe that is a
> bug.
> >
> >                 On Sat, May 9, 2020 at 11:22 PM Jozef Vilcek
> >                 <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>>
> >                 wrote:
> >
> >                     I am using FileIO and I do observe the drop of pane
> >                     info information on Flink runner too. It was
> >                     mentioned in this thread:
> >
> https://www.mail-archive.com/dev@beam.apache.org/msg20186.html
> >
> >                     It is a result of different reshuffle expansion for
> >                     optimisation reasons. However, I did not observe a
> >                     data loss in my case. Windowing and watermark info
> >                     should be preserved. Pane info is not, which brings
> >                     a question how reliable pane info should be in terms
> >                     of SDK and runner.
> >
> >                     If you do observe a data loss, it would be great to
> >                     share a test case which replicates the problem.
> >
> >                     On Sun, May 10, 2020 at 8:03 AM Reuven Lax
> >                     <re...@google.com <mailto:re...@google.com>> wrote:
> >
> >                         Ah, I think I see the problem.
> >
> >                         It appears that for some reason, the Flink
> >                         runner loses windowing information when a
> >                         Reshuffle is applied. I'm not entirely sure why,
> >                         because windowing information should be
> >                         maintained across a Reshuffle.
> >
> >                         Reuven
> >
> >                         On Sat, May 9, 2020 at 9:50 AM Jose Manuel
> >                         <kiuby88....@gmail.com
> >                         <mailto:kiuby88....@gmail.com>> wrote:
> >
> >
> >                             Hi,
> >
> >                             I have added some logs to the pipeline as
> >                             following (you can find the log function in
> >                             the Appendix):
> >
> >                             //STREAM + processing time.
> >                             pipeline.apply(KafkaIO.read())
> >                                    .apply(...) //mappers, a window and a
> >                             combine
> >                                    .apply(logBeforeWrite())
> >
> >                                    .apply("WriteFiles",
> >
>  
> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
> >                                    .getPerDestinationOutputFilenames()
> >
> >                                    .apply(logAfterWrite())
> >                                    .apply("CombineFileNames",
> >                             Combine.perKey(...))
> >
> >                             I have run the pipeline using DirectRunner
> >                             (local), SparkRunner and FlinkRunner, both
> >                             of them using a cluster.
> >                             Below you can see the timing and pane
> >                             information before/after (you can see traces
> >                             in detail with window and timestamp
> >                             information in the Appendix).
> >
> >                             DirectRunner:
> >                             [Before Write] timing=ON_TIME,
> >                             pane=PaneInfo{isFirst=true, isLast=true,
> >                             timing=ON_TIME, index=0, onTimeIndex=0}
> >                             [After    Write] timing=EARLY,
> >                              pane=PaneInfo{isFirst=true, timing=EARLY,
> >                             index=0}
> >
> >                             FlinkRunner:
> >                             [Before Write] timing=ON_TIME,
> >                             pane=PaneInfo{isFirst=true, isLast=true,
> >                             timing=ON_TIME, index=0, onTimeIndex=0}
> >                             [After    Write] timing=UNKNOWN,
> >                             pane=PaneInfo.NO_FIRING
> >
> >                             SparkRunner:
> >                             [Before Write] timing=ON_TIME,
> >                             pane=PaneInfo{isFirst=true, isLast=true,
> >                             timing=ON_TIME, index=0, onTimeIndex=0}
> >                             [After    Write] timing=UNKNOWN,
> >                             pane=PaneInfo.NO_FIRING
> >
> >                             It seems DirectRunner propagates the
> >                             windowing information as expected.
> >                             I am not sure if TextIO really propagates or
> >                             it just emits a window pane, because the
> >                             timing before TextIO is ON_TIME and after
> >                             TextIO is EARLY.
> >                             In any case using FlinkRunner and
> >                             SparkRunner the timing and the pane are not
> set.
> >
> >                             I thought the problem was in
> >                             GatherBundlesPerWindowFn, but now, after
> >                             seeing that the DirectRunner filled
> >                             windowing data... I am not sure.
> >
> https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406
> >
> >
> >                             Appendix
> >                             -----------
> >                             Here you can see the log function and traces
> >                             for different runners in detail.
> >
> >                             private SingleOutput<String, String>
> >                             logBefore() {
> >                                 return ParDo.of(new DoFn<String,
> String>() {
> >                                     @ProcessElement
> >                                     public void
> >                             processElement(ProcessContext context,
> >                             BoundedWindow boundedWindow) {
> >                                         String value = context.element();
> >                                         log.info
> >                             <http://log.info>("[Before Write]
> >                             Element=data window={}, timestamp={},
> >                             timing={}, index ={}, isFirst ={},
> >                             isLast={}, pane={}",
> >                                                 boundedWindow,
> >                                                 context.timestamp(),
> >
> context.pane().getTiming(),
> >
> context.pane().getIndex(),
> >                                                 context.pane().isFirst(),
> >                                                 context.pane().isLast(),
> >                                                 context.pane()
> >                                         );
> >
> context.output(context.element());
> >                                     }
> >                                 });
> >                             }
> >
> >                             logAfter function shows the same information.
> >
> >                             Traces in details.
> >
> >                             DirectRunner (local):
> >                             [Before Write] Element=data
> >
>  window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
> >                             timestamp=2020-05-09T13:39:59.999Z,
> >                             timing=ON_TIME, index =0, isFirst =true,
> >                             isLast=true  pane=PaneInfo{isFirst=true,
> >                             isLast=true, timing=ON_TIME, index=0,
> >                             onTimeIndex=0}
> >                             [After  Write] Element=file
> >
>  window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
> >                             timestamp=2020-05-09T13:39:59.999Z,
> >                             timing=EARLY,   index =0, isFirst =true,
> >                             isLast=false pane=PaneInfo{isFirst=true,
> >                             timing=EARLY, index=0}
> >
> >
> >                             FlinkRunner (cluster):
> >                             [Before Write] Element=data
> >
>  window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z),
> >                             timestamp=2020-05-09T15:13:59.999Z,
> >                             timing=ON_TIME, index =0, isFirst =true,
> >                             isLast=true  pane=PaneInfo{isFirst=true,
> >                             isLast=true, timing=ON_TIME, index=0,
> >                             onTimeIndex=0}
> >                             [After  Write] Element=file
> >
>  window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z),
> >                             timestamp=2020-05-09T15:13:59.999Z,
> >                             timing=UNKNOWN, index =0, isFirst =true,
> >                             isLast=true  pane=PaneInfo.NO_FIRING
> >
> >                             SparkRunner (cluster):
> >                             [Before Write] Element=data
> >
>  window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z),
> >                             timestamp=2020-05-09T15:34:59.999Z,
> >                             timing=ON_TIME, index =0, isFirst =true,
> >                             isLast=true  pane=PaneInfo{isFirst=true,
> >                             isLast=true, timing=ON_TIME, index=0,
> >                             onTimeIndex=0}
> >                             [After  Write] Element=file
> >
>  window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z),
> >                             timestamp=2020-05-09T15:34:59.999Z,
> >                             timing=UNKNOWN, index =0, isFirst =true,
> >                             isLast=true  pane=PaneInfo.NO_FIRING
> >
> >
> >                             El vie., 8 may. 2020 a las 19:01, Reuven Lax
> >                             (<re...@google.com
> >                             <mailto:re...@google.com>>) escribió:
> >
> >                                 The window information should still be
> >                                 there.  Beam propagates windows through
> >                                 PCollection, and I don't think
> >                                 WriteFiles does anything explicit to
> >                                 stop that.
> >
> >                                 Can you try this with the direct runner
> >                                 to see what happens there?
> >                                 What is your windowing on this
> PCollection?
> >
> >                                 Reuven
> >
> >                                 On Fri, May 8, 2020 at 3:49 AM Jose
> >                                 Manuel <kiuby88....@gmail.com
> >                                 <mailto:kiuby88....@gmail.com>> wrote:
> >
> >                                     I got the same behavior using Spark
> >                                     Runner (with Spark 2.4.3), window
> >                                     information was missing.
> >
> >                                     Just to clarify, the combiner after
> >                                     TextIO had different results. In
> >                                     Flink runner the files names were
> >                                     dropped, and in Spark the
> >                                     combination process happened twice,
> >                                     duplicating data.  I think it is
> >                                     because different runners manage in
> >                                     a different way the data without the
> >                                     windowing information.
> >
> >
> >
> >                                     El vie., 8 may. 2020 a las 0:45,
> >                                     Luke Cwik (<lc...@google.com
> >                                     <mailto:lc...@google.com>>)
> escribió:
> >
> >                                         +dev <mailto:dev@beam.apache.org
> >
> >
> >                                         On Mon, May 4, 2020 at 3:56 AM
> >                                         Jose Manuel
> >                                         <kiuby88....@gmail.com
> >                                         <mailto:kiuby88....@gmail.com>>
> >                                         wrote:
> >
> >                                             Hi guys,
> >
> >                                             I think I have found
> >                                             something interesting about
> >                                             windowing.
> >
> >                                             I have a pipeline that gets
> >                                             data from Kafka and writes
> >                                             in HDFS by means of TextIO.
> >                                             Once written, generated
> >                                             files are combined to apply
> >                                             some custom operations.
> >                                             However, the combine does
> >                                             not receive data. Following,
> >                                             you can find the
> >                                             highlight of my pipeline.
> >
> >                                             //STREAM + processing time.
> >
>  pipeline.apply(KafkaIO.read())
> >                                                     .apply(...)
> >                                             //mappers, a window and a
> >                                             combine
> >
> >                                                     .apply("WriteFiles",
> >
>  
> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
> >
> >
>  .getPerDestinationOutputFilenames()
> >
> >                                             .apply("CombineFileNames",
> >                                             Combine.perKey(...))
> >
> >                                             Running the pipeline with
> >                                             Flink I have found a log
> >                                             trace that says data are
> >                                             discarded as late in the
> >                                             combine CombineFileNames.
> >                                             Then, I have added
> >                                             AllowedLateness to pipeline
> >                                             window strategy, as a
> >                                             workaround.
> >                                             It works by now, but this
> >                                             opens several questions to me
> >
> >                                             I think the problem is
> >
>  getPerDestinationOutputFilenames
> >                                             generates files, but
> >                                             it does not maintain the
> >                                             windowing information. Then,
> >                                             CombineFileNames compares
> >                                             file names with the
> >                                             watermark of the pipeline
> >                                             and discards them as late.
> >
> >                                             Is there any issue with
> >
>  getPerDestinationOutputFilenames?
> >                                             Maybe, I am doing something
> >                                             wrong
> >                                             and using
> >
>  getPerDestinationOutputFilenames
> >                                             + combine does not make
> sense.
> >                                             What do you think?
> >
> >                                             Please, note I am using Beam
> >                                             2.17.0 with Flink 1.9.1.
> >
> >                                             Many thanks,
> >                                             Jose
> >
>

Reply via email to