On Tue, May 19, 2020 at 2:02 AM Maximilian Michels <m...@apache.org> wrote:

> > This is still confusing to me - why would the messages be dropped as
> late in this case?
>
> Since you previously mentioned that the bug is due to the pane info
> missing, I just pointed out that the WriteFiles logic is expected to
> drop the pane info.
>

I mentioned the other issue, where it appeared that Reshuffle in flink
drops pane info (which seems wrong to me). However the original issue here
is elements dropped due to lateness.


>
> @Jose Would it make sense to file a JIRA and summarize all the findings
> here?
>
> @Jozef What you describe in
> https://www.mail-archive.com/dev@beam.apache.org/msg20186.html is
> expected because Flink does not do a GroupByKey on Reshuffle but just
> redistributes the elements.
>
> Thanks,
> Max
>
> On 18.05.20 21:59, Jose Manuel wrote:
> > Hi Reuven,
> >
> > I can try to explaining what I guess.
> >
> > - There is a source which is reading data entries and updating the
> > watermark.
> > - Then, data entries are grouped and stored in files.
> > - The window information of these data entries are used to emit
> > filenames. Data entries's window and timestamp. PaneInfo is empty.
> > - When a second window is applied to filenames, if allowlateness is zero
> > of lower than the spent time in the previous reading/writing, the
> > filenames are discarded as late.
> >
> > I guess, the key is in
> >
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L168
> >
> > My assumption is global watermark (or source watermark, I am not sure
> > about the name) is used to evaluate the filenames, what are in an
> > already emitted window.
> >
> > Thanks
> > Jose
> >
> >
> > El lun., 18 may. 2020 a las 18:37, Reuven Lax (<re...@google.com
> > <mailto:re...@google.com>>) escribió:
> >
> >     This is still confusing to me - why would the messages be dropped as
> >     late in this case?
> >
> >     On Mon, May 18, 2020 at 6:14 AM Maximilian Michels <m...@apache.org
> >     <mailto:m...@apache.org>> wrote:
> >
> >         All runners which use the Beam reference implementation drop the
> >         PaneInfo for
> >         WriteFilesResult#getPerDestinationOutputFilenames(). That's
> >         why we can observe this behavior not only in Flink but also
> Spark.
> >
> >         The WriteFilesResult is returned here:
> >
> https://github.com/apache/beam/blob/d773f8ca7a4d63d01472b5eaef8b67157d60f40e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L363
> >
> >         GatherBundlesPerWindow will discard the pane information because
> all
> >         buffered elements are emitted in the FinishBundle method which
> >         always
> >         has a NO_FIRING (unknown) pane info:
> >
> https://github.com/apache/beam/blob/d773f8ca7a4d63d01472b5eaef8b67157d60f40e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L895
> >
> >         So this seems expected behavior. We would need to preserve the
> >         panes in
> >         the Multimap buffer.
> >
> >         -Max
> >
> >         On 15.05.20 18:34, Reuven Lax wrote:
> >         > Lateness should never be introduced inside a pipeline -
> >         generally late
> >         > data can only come from a source.  If data was not dropped as
> late
> >         > earlier in the pipeline, it should not be dropped after the
> >         file write.
> >         > I suspect that this is a bug in how the Flink runner handles
> the
> >         > Reshuffle transform, but I'm not sure what the exact bug is.
> >         >
> >         > Reuven
> >         >
> >         > On Fri, May 15, 2020 at 2:23 AM Jozef Vilcek
> >         <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>
> >         > <mailto:jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>>>
> >         wrote:
> >         >
> >         >     Hi Jose,
> >         >
> >         >     thank you for putting the effort to get example which
> >         >     demonstrate your problem.
> >         >
> >         >     You are using a streaming pipeline and it seems that
> >         watermark in
> >         >     downstream already advanced further, so when your File
> >         pane arrives,
> >         >     it is already late. Since you define that lateness is not
> >         tolerated,
> >         >     it is dropped.
> >         >     I myself never had requirement to specify zero allowed
> >         lateness for
> >         >     streaming. It feels dangerous. Do you have a specific use
> >         case?
> >         >     Also, in may cases, after windowed files are written, I
> >         usually
> >         >     collect them into global window and specify a different
> >         triggering
> >         >     policy for collecting them. Both cases are why I never
> >         came across
> >         >     this situation.
> >         >
> >         >     I do not have an explanation if it is a bug or not. I
> >         would guess
> >         >     that watermark can advance further, e.g. because elements
> >         can be
> >         >     processed in arbitrary order. Not saying this is the case.
> >         >     It needs someone with better understanding of how
> >         watermark advance
> >         >     is / should be handled within pipelines.
> >         >
> >         >
> >         >     P.S.: you can add `.withTimestampFn()` to your generate
> >         sequence, to
> >         >     get more stable timing, which is also easier to reason
> about:
> >         >
> >         >     Dropping element at 1970-01-01T00:00:19.999Z for key
> >         >
> >          ... window:[1970-01-01T00:00:15.000Z..1970-01-01T00:00:20.000Z)
> >         >     since too far behind
> inputWatermark:1970-01-01T00:00:24.000Z;
> >         >     outputWatermark:1970-01-01T00:00:24
> >         >     .000Z
> >         >
> >         >                instead of
> >         >
> >         >     Dropping element at 2020-05-15T08:52:34.999Z for key ...
> >         >
> >          window:[2020-05-15T08:52:30.000Z..2020-05-15T08:52:35.000Z)
> since
> >         >     too far behind inputWatermark:2020-05-15T08:52:39.318Z;
> >         >     outputWatermark:2020-05-15T08:52:39.318Z
> >         >
> >         >
> >         >
> >         >
> >         >     In my
> >         >
> >         >
> >         >
> >         >     On Thu, May 14, 2020 at 10:47 AM Jose Manuel
> >         <kiuby88....@gmail.com <mailto:kiuby88....@gmail.com>
> >         >     <mailto:kiuby88....@gmail.com
> >         <mailto:kiuby88....@gmail.com>>> wrote:
> >         >
> >         >         Hi again,
> >         >
> >         >         I have simplify the example to reproduce the data
> >         loss. The
> >         >         scenario is the following:
> >         >
> >         >         - TextIO write files.
> >         >         - getPerDestinationOutputFilenames emits file names
> >         >         - File names are processed by a aggregator (combine,
> >         distinct,
> >         >         groupbyKey...) with a window **without allowlateness**
> >         >         - File names are discarded as late
> >         >
> >         >         Here you can see the data loss in the picture
> >         >
> >          in
> https://github.com/kiuby88/windowing-textio/blob/master/README.md#showing-data-loss
> >         >
> >         >         Please, follow README to run the pipeline and find log
> >         traces
> >         >         that say data are dropped as late.
> >         >         Remember, you can run the pipeline with another
> >         >         window's  lateness values (check README.md)
> >         >
> >         >         Kby.
> >         >
> >         >         El mar., 12 may. 2020 a las 17:16, Jose Manuel
> >         >         (<kiuby88....@gmail.com <mailto:kiuby88....@gmail.com>
> >         <mailto:kiuby88....@gmail.com <mailto:kiuby88....@gmail.com>>>)
> >         escribió:
> >         >
> >         >             Hi,
> >         >
> >         >             I would like to clarify that while TextIO is
> >         writing every
> >         >             data are in the files (shards). The losing happens
> >         when file
> >         >             names emitted by getPerDestinationOutputFilenames
> are
> >         >             processed by a window.
> >         >
> >         >             I have created a pipeline to reproduce the
> >         scenario in which
> >         >             some filenames are loss after the
> >         >             getPerDestinationOutputFilenames. Please, note I
> >         tried to
> >         >             simplify the code as much as possible, but the
> >         scenario is
> >         >             not easy to reproduce.
> >         >
> >         >             Please check this project
> >         >             https://github.com/kiuby88/windowing-textio
> >         >             Check readme to build and run
> >         >
> >          (https://github.com/kiuby88/windowing-textio#build-and-run)
> >         >             Project contains only a class with the
> >         >             pipeline PipelineWithTextIo, a log4j2.xml file in
> the
> >         >             resources and the pom.
> >         >
> >         >             The pipeline in PipelineWithTextIo generates
> >         unbounded data
> >         >             using a sequence. It adds a little delay (10s) per
> >         data
> >         >             entry, it uses a distinct (just to apply the
> >         window), and
> >         >             then it writes data using TexIO.
> >         >             The windows for the distinct is fixed (5 seconds)
> >         and it
> >         >             does not use lateness.
> >         >             Generated files can be found in
> >         >             windowing-textio/pipe_with_lateness_0s/files. To
> >         write files
> >         >             the FileNamePolicy uses window + timing + shards
> >         >
> >          (see
> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L135
> )
> >         >             Files are emitted using
> >         getPerDestinationOutputFilenames()
> >         >             (see the code here,
> >         >
> >
> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L71-L78
> )
> >         >
> >         >             Then, File names in the PCollection are extracted
> and
> >         >             logged. Please, note file names dot not have pain
> >         >             information in that point.
> >         >
> >         >             To apply a window a distinct is used again. Here
> >         several
> >         >             files are discarded as late and they are not
> >         processed by
> >         >             this second distinct. Please, see
> >         >
> >
> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L80-L83
> >         >
> >         >             Debug is enabled for WindowTracing, so you can
> >         find in the
> >         >             terminal several messages as the followiing:
> >         >             DEBUG org.apache.beam.sdk.util.WindowTracing -
> >         >             LateDataFilter: Dropping element at
> >         2020-05-12T14:05:14.999Z
> >         >             for
> >         >
> >
>   
> key:path/pipe_with_lateness_0s/files/[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z)-ON_TIME-0-of-1.txt;
> >         >
> >          window:[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z)
> >         >             since too far behind
> >         >             inputWatermark:2020-05-12T14:05:19.799Z;
> >         >             outputWatermark:2020-05-12T14:05:19.799Z`
> >         >
> >         >             What happen here? I think that messages are
> >         generated per
> >         >             second and a window of 5 seconds group them. Then
> >         a delay is
> >         >             added and finally data are written in a file.
> >         >             The pipeline reads more data, increasing the
> >         watermark.
> >         >             Then, file names are emitted without pane
> >         information (see
> >         >             "Emitted File" in logs). Window in second distinct
> >         compares
> >         >             file names' timestamp and the pipeline watermark
> >         and then it
> >         >             discards file names as late.
> >         >
> >         >
> >         >             Bonus
> >         >             -----
> >         >             You can add a lateness to the pipeline. See
> >         >
> >
> https://github.com/kiuby88/windowing-textio/blob/master/README.md#run-with-lateness
> >         >
> >         >             If a minute is added a lateness for window the
> >         file names
> >         >             are processed as late. As result the traces of
> >         >             LateDataFilter disappear.
> >         >
> >         >             Moreover, in order to illustrate better that file
> >         names are
> >         >             emitted as late for the second discarded I added a
> >         second
> >         >             TextIO to write file names in other files.
> >         >             Same FileNamePolicy than before was used (window +
> >         timing +
> >         >             shards). Then, you can find files that contains
> >         the original
> >         >             filenames in
> >         >
> >          windowing-textio/pipe_with_lateness_60s/files-after-distinct.
> This
> >         >             is the interesting part, because you will find
> >         several files
> >         >             with LATE in their names.
> >         >
> >         >             Please, let me know if you need more information
> >         or if the
> >         >             example is not enough to check the expected
> scenarios.
> >         >
> >         >             Kby.
> >         >
> >         >
> >         >
> >         >
> >         >
> >         >
> >         >
> >         >
> >         >
> >         >
> >         >
> >         >
> >         >
> >         >
> >         >
> >         >
> >         >
> >         >             El dom., 10 may. 2020 a las 17:04, Reuven Lax
> >         >             (<re...@google.com <mailto:re...@google.com>
> >         <mailto:re...@google.com <mailto:re...@google.com>>>) escribió:
> >         >
> >         >                 Pane info is supposed to be preserved across
> >         transforms.
> >         >                 If the Fink runner does not, than I believe
> >         that is a bug.
> >         >
> >         >                 On Sat, May 9, 2020 at 11:22 PM Jozef Vilcek
> >         >                 <jozo.vil...@gmail.com
> >         <mailto:jozo.vil...@gmail.com> <mailto:jozo.vil...@gmail.com
> >         <mailto:jozo.vil...@gmail.com>>>
> >         >                 wrote:
> >         >
> >         >                     I am using FileIO and I do observe the
> >         drop of pane
> >         >                     info information on Flink runner too. It
> was
> >         >                     mentioned in this thread:
> >         >
> >          https://www.mail-archive.com/dev@beam.apache.org/msg20186.html
> >         >
> >         >                     It is a result of different reshuffle
> >         expansion for
> >         >                     optimisation reasons. However, I did not
> >         observe a
> >         >                     data loss in my case. Windowing and
> >         watermark info
> >         >                     should be preserved. Pane info is not,
> >         which brings
> >         >                     a question how reliable pane info should
> >         be in terms
> >         >                     of SDK and runner.
> >         >
> >         >                     If you do observe a data loss, it would be
> >         great to
> >         >                     share a test case which replicates the
> >         problem.
> >         >
> >         >                     On Sun, May 10, 2020 at 8:03 AM Reuven Lax
> >         >                     <re...@google.com
> >         <mailto:re...@google.com> <mailto:re...@google.com
> >         <mailto:re...@google.com>>> wrote:
> >         >
> >         >                         Ah, I think I see the problem.
> >         >
> >         >                         It appears that for some reason, the
> Flink
> >         >                         runner loses windowing information
> when a
> >         >                         Reshuffle is applied. I'm not
> >         entirely sure why,
> >         >                         because windowing information should be
> >         >                         maintained across a Reshuffle.
> >         >
> >         >                         Reuven
> >         >
> >         >                         On Sat, May 9, 2020 at 9:50 AM Jose
> Manuel
> >         >                         <kiuby88....@gmail.com
> >         <mailto:kiuby88....@gmail.com>
> >         >                         <mailto:kiuby88....@gmail.com
> >         <mailto:kiuby88....@gmail.com>>> wrote:
> >         >
> >         >
> >         >                             Hi,
> >         >
> >         >                             I have added some logs to the
> >         pipeline as
> >         >                             following (you can find the log
> >         function in
> >         >                             the Appendix):
> >         >
> >         >                             //STREAM + processing time.
> >         >                             pipeline.apply(KafkaIO.read())
> >         >                                    .apply(...) //mappers, a
> >         window and a
> >         >                             combine
> >         >                                    .apply(logBeforeWrite())
> >         >
> >         >                                    .apply("WriteFiles",
> >         >
> >
>   
> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
> >         >
> >          .getPerDestinationOutputFilenames()
> >         >
> >         >                                    .apply(logAfterWrite())
> >         >                                    .apply("CombineFileNames",
> >         >                             Combine.perKey(...))
> >         >
> >         >                             I have run the pipeline using
> >         DirectRunner
> >         >                             (local), SparkRunner and
> >         FlinkRunner, both
> >         >                             of them using a cluster.
> >         >                             Below you can see the timing and
> pane
> >         >                             information before/after (you can
> >         see traces
> >         >                             in detail with window and timestamp
> >         >                             information in the Appendix).
> >         >
> >         >                             DirectRunner:
> >         >                             [Before Write] timing=ON_TIME,
> >         >                             pane=PaneInfo{isFirst=true,
> >         isLast=true,
> >         >                             timing=ON_TIME, index=0,
> >         onTimeIndex=0}
> >         >                             [After    Write] timing=EARLY,
> >         >                              pane=PaneInfo{isFirst=true,
> >         timing=EARLY,
> >         >                             index=0}
>
> >         >
> >         >                             FlinkRunner:
> >         >                             [Before Write] timing=ON_TIME,
> >         >                             pane=PaneInfo{isFirst=true,
> >         isLast=true,
> >         >                             timing=ON_TIME, index=0,
> >         onTimeIndex=0}
> >         >                             [After    Write] timing=UNKNOWN,
> >         >                             pane=PaneInfo.NO_FIRING
> >         >
> >         >                             SparkRunner:
> >         >                             [Before Write] timing=ON_TIME,
> >         >                             pane=PaneInfo{isFirst=true,
> >         isLast=true,
> >         >                             timing=ON_TIME, index=0,
> >         onTimeIndex=0}
> >         >                             [After    Write] timing=UNKNOWN,
> >         >                             pane=PaneInfo.NO_FIRING
> >         >
> >         >                             It seems DirectRunner propagates
> the
> >         >                             windowing information as expected.
> >         >                             I am not sure if TextIO really
> >         propagates or
> >         >                             it just emits a window pane,
> >         because the
> >         >                             timing before TextIO is ON_TIME
> >         and after
> >         >                             TextIO is EARLY.
> >         >                             In any case using FlinkRunner and
> >         >                             SparkRunner the timing and the
> >         pane are not set.
> >         >
> >         >                             I thought the problem was in
> >         >                             GatherBundlesPerWindowFn, but now,
> >         after
> >         >                             seeing that the DirectRunner filled
> >         >                             windowing data... I am not sure.
> >         >
> >
> https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406
> >         >
> >         >
> >         >                             Appendix
> >         >                             -----------
> >         >                             Here you can see the log function
> >         and traces
> >         >                             for different runners in detail.
> >         >
> >         >                             private SingleOutput<String,
> String>
> >         >                             logBefore() {
> >         >                                 return ParDo.of(new
> >         DoFn<String, String>() {
> >         >                                     @ProcessElement
> >         >                                     public void
> >         >                             processElement(ProcessContext
> context,
> >         >                             BoundedWindow boundedWindow) {
> >         >                                         String value =
> >         context.element();
> >         >                                         log.info <
> http://log.info>
> >         >                             <http://log.info>("[Before Write]
> >         >                             Element=data window={},
> timestamp={},
> >         >                             timing={}, index ={}, isFirst ={},
> >         >                             isLast={}, pane={}",
> >         >                                                 boundedWindow,
> >         >
> >         context.timestamp(),
> >         >
> >         context.pane().getTiming(),
> >         >
> >         context.pane().getIndex(),
> >         >
> >         context.pane().isFirst(),
> >         >
> >         context.pane().isLast(),
> >         >                                                 context.pane()
> >         >                                         );
> >         >
> >         context.output(context.element());
> >         >                                     }
> >         >                                 });
> >         >                             }
> >         >
> >         >                             logAfter function shows the same
> >         information.
> >         >
> >         >                             Traces in details.
> >         >
> >         >                             DirectRunner (local):
> >         >                             [Before Write] Element=data
> >         >
> >          window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
> >         >                             timestamp=2020-05-09T13:39:59.999Z,
> >         >                             timing=ON_TIME, index =0, isFirst
> >         =true,
> >         >                             isLast=true
> >          pane=PaneInfo{isFirst=true,
> >         >                             isLast=true, timing=ON_TIME,
> index=0,
> >         >                             onTimeIndex=0}
> >         >                             [After  Write] Element=file
> >         >
> >          window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
> >         >                             timestamp=2020-05-09T13:39:59.999Z,
> >         >                             timing=EARLY,   index =0, isFirst
> >         =true,
> >         >                             isLast=false
> >         pane=PaneInfo{isFirst=true,
> >         >                             timing=EARLY, index=0}
> >
> >         >
> >         >
> >         >                             FlinkRunner (cluster):
> >         >                             [Before Write] Element=data
> >         >
> >          window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z),
> >         >                             timestamp=2020-05-09T15:13:59.999Z,
> >         >                             timing=ON_TIME, index =0, isFirst
> >         =true,
> >         >                             isLast=true
> >          pane=PaneInfo{isFirst=true,
> >         >                             isLast=true, timing=ON_TIME,
> index=0,
> >         >                             onTimeIndex=0}
> >         >                             [After  Write] Element=file
> >         >
> >          window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z),
> >         >                             timestamp=2020-05-09T15:13:59.999Z,
> >         >                             timing=UNKNOWN, index =0, isFirst
> >         =true,
> >         >                             isLast=true
>  pane=PaneInfo.NO_FIRING
> >         >
> >         >                             SparkRunner (cluster):
> >         >                             [Before Write] Element=data
> >         >
> >          window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z),
> >         >                             timestamp=2020-05-09T15:34:59.999Z,
> >         >                             timing=ON_TIME, index =0, isFirst
> >         =true,
> >         >                             isLast=true
> >          pane=PaneInfo{isFirst=true,
> >         >                             isLast=true, timing=ON_TIME,
> index=0,
> >         >                             onTimeIndex=0}
> >         >                             [After  Write] Element=file
> >         >
> >          window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z),
> >         >                             timestamp=2020-05-09T15:34:59.999Z,
> >         >                             timing=UNKNOWN, index =0, isFirst
> >         =true,
> >         >                             isLast=true
>  pane=PaneInfo.NO_FIRING
> >         >
> >         >
> >         >                             El vie., 8 may. 2020 a las 19:01,
> >         Reuven Lax
> >         >                             (<re...@google.com
> >         <mailto:re...@google.com>
> >         >                             <mailto:re...@google.com
> >         <mailto:re...@google.com>>>) escribió:
> >         >
> >         >                                 The window information should
> >         still be
> >         >                                 there.  Beam propagates
> >         windows through
> >         >                                 PCollection, and I don't think
> >         >                                 WriteFiles does anything
> >         explicit to
> >         >                                 stop that.
> >         >
> >         >                                 Can you try this with the
> >         direct runner
> >         >                                 to see what happens there?
> >         >                                 What is your windowing on this
> >         PCollection?
> >         >
> >         >                                 Reuven
> >         >
> >         >                                 On Fri, May 8, 2020 at 3:49 AM
> >         Jose
> >         >                                 Manuel <kiuby88....@gmail.com
> >         <mailto:kiuby88....@gmail.com>
> >         >                                 <mailto:kiuby88....@gmail.com
> >         <mailto:kiuby88....@gmail.com>>> wrote:
> >         >
> >         >                                     I got the same behavior
> >         using Spark
> >         >                                     Runner (with Spark 2.4.3),
> >         window
> >         >                                     information was missing.
> >         >
> >         >                                     Just to clarify, the
> >         combiner after
> >         >                                     TextIO had different
> >         results. In
> >         >                                     Flink runner the files
> >         names were
> >         >                                     dropped, and in Spark the
> >         >                                     combination process
> >         happened twice,
> >         >                                     duplicating data.  I think
> >         it is
> >         >                                     because different runners
> >         manage in
> >         >                                     a different way the data
> >         without the
> >         >                                     windowing information.
> >         >
> >         >
> >         >
> >         >                                     El vie., 8 may. 2020 a las
> >         0:45,
> >         >        ��                            Luke Cwik
> >         (<lc...@google.com <mailto:lc...@google.com>
> >         >                                     <mailto:lc...@google.com
> >         <mailto:lc...@google.com>>>) escribió:
> >         >
> >         >                                         +dev
> >         <mailto:d...@beam.apache.org <mailto:d...@beam.apache.org>>
> >         >
> >         >                                         On Mon, May 4, 2020 at
> >         3:56 AM
> >         >                                         Jose Manuel
> >         >                                         <kiuby88....@gmail.com
> >         <mailto:kiuby88....@gmail.com>
> >         >
> >          <mailto:kiuby88....@gmail.com <mailto:kiuby88....@gmail.com>>>
> >         >                                         wrote:
> >         >
> >         >                                             Hi guys,
> >         >
> >         >                                             I think I have
> found
> >         >                                             something
> >         interesting about
> >         >                                             windowing.
> >         >
> >         >                                             I have a pipeline
> >         that gets
> >         >                                             data from Kafka
> >         and writes
> >         >                                             in HDFS by means
> >         of TextIO.
> >         >                                             Once written,
> >         generated
> >         >                                             files are combined
> >         to apply
> >         >                                             some custom
> >         operations.
> >         >                                             However, the
> >         combine does
> >         >                                             not receive data.
> >         Following,
> >         >                                             you can find the
> >         >                                             highlight of my
> >         pipeline.
> >         >
> >         >                                             //STREAM +
> >         processing time.
> >         >
> >          pipeline.apply(KafkaIO.read())
> >         >                                                     .apply(...)
> >         >                                             //mappers, a
> >         window and a
> >         >                                             combine
> >         >
> >         >
> >         .apply("WriteFiles",
> >         >
> >
>   
> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
> >         >
> >         >
> >          .getPerDestinationOutputFilenames()
> >         >
> >         >
> >          .apply("CombineFileNames",
> >         >
>  Combine.perKey(...))
> >         >
> >         >                                             Running the
> >         pipeline with
> >         >                                             Flink I have found
> >         a log
> >         >                                             trace that says
> >         data are
> >         >                                             discarded as late
> >         in the
> >         >                                             combine
> >         CombineFileNames.
> >         >                                             Then, I have added
> >         >                                             AllowedLateness to
> >         pipeline
> >         >                                             window strategy,
> as a
> >         >                                             workaround.
> >         >                                             It works by now,
> >         but this
> >         >                                             opens several
> >         questions to me
> >         >
> >         >                                             I think the
> problem is
> >         >
> >          getPerDestinationOutputFilenames
> >         >                                             generates files,
> but
> >         >                                             it does not
> >         maintain the
> >         >                                             windowing
> >         information. Then,
> >         >                                             CombineFileNames
> >         compares
> >         >                                             file names with the
> >         >                                             watermark of the
> >         pipeline
> >         >                                             and discards them
> >         as late.
> >         >
> >         >                                             Is there any issue
> >         with
> >         >
> >          getPerDestinationOutputFilenames?
> >         >                                             Maybe, I am doing
> >         something
> >         >                                             wrong
> >         >                                             and using
> >         >
> >          getPerDestinationOutputFilenames
> >         >                                             + combine does not
> >         make sense.
> >         >                                             What do you think?
> >         >
> >         >                                             Please, note I am
> >         using Beam
> >         >                                             2.17.0 with Flink
> >         1.9.1.
> >         >
> >         >                                             Many thanks,
> >         >                                             Jose
> >         >
> >
>

Reply via email to