Lateness should never be introduced inside a pipeline - generally late data
can only come from a source.  If data was not dropped as late earlier in
the pipeline, it should not be dropped after the file write. I suspect that
this is a bug in how the Flink runner handles the Reshuffle transform, but
I'm not sure what the exact bug is.

Reuven

On Fri, May 15, 2020 at 2:23 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote:

> Hi Jose,
>
> thank you for putting the effort to get example which demonstrate your
> problem.
>
> You are using a streaming pipeline and it seems that watermark in
> downstream already advanced further, so when your File pane arrives, it is
> already late. Since you define that lateness is not tolerated, it is
> dropped.
> I myself never had requirement to specify zero allowed lateness for
> streaming. It feels dangerous. Do you have a specific use case? Also, in
> may cases, after windowed files are written, I usually collect them into
> global window and specify a different triggering policy for collecting
> them. Both cases are why I never came across this situation.
>
> I do not have an explanation if it is a bug or not. I would guess that
> watermark can advance further, e.g. because elements can be processed in
> arbitrary order. Not saying this is the case.
> It needs someone with better understanding of how watermark advance is /
> should be handled within pipelines.
>
>
> P.S.: you can add `.withTimestampFn()` to your generate sequence, to get
> more stable timing, which is also easier to reason about:
>
> Dropping element at 1970-01-01T00:00:19.999Z for key
> ... window:[1970-01-01T00:00:15.000Z..1970-01-01T00:00:20.000Z) since too
> far behind inputWatermark:1970-01-01T00:00:24.000Z;
> outputWatermark:1970-01-01T00:00:24
> .000Z
>
>            instead of
>
> Dropping element at 2020-05-15T08:52:34.999Z for key ...
> window:[2020-05-15T08:52:30.000Z..2020-05-15T08:52:35.000Z) since too far
> behind inputWatermark:2020-05-15T08:52:39.318Z;
> outputWatermark:2020-05-15T08:52:39.318Z
>
>
>
>
> In my
>
>
>
> On Thu, May 14, 2020 at 10:47 AM Jose Manuel <kiuby88....@gmail.com>
> wrote:
>
>> Hi again,
>>
>> I have simplify the example to reproduce the data loss. The scenario is
>> the following:
>>
>> - TextIO write files.
>> - getPerDestinationOutputFilenames emits file names
>> - File names are processed by a aggregator (combine, distinct,
>> groupbyKey...) with a window **without allowlateness**
>> - File names are discarded as late
>>
>> Here you can see the data loss in the picture in
>> https://github.com/kiuby88/windowing-textio/blob/master/README.md#showing-data-loss
>>
>> Please, follow README to run the pipeline and find log traces that say
>> data are dropped as late.
>> Remember, you can run the pipeline with another window's  lateness values
>> (check README.md)
>>
>> Kby.
>>
>> El mar., 12 may. 2020 a las 17:16, Jose Manuel (<kiuby88....@gmail.com>)
>> escribió:
>>
>>> Hi,
>>>
>>> I would like to clarify that while TextIO is writing every data are in
>>> the files (shards). The losing happens when file names emitted by
>>> getPerDestinationOutputFilenames are processed by a window.
>>>
>>> I have created a pipeline to reproduce the scenario in which some
>>> filenames are loss after the getPerDestinationOutputFilenames. Please, note
>>> I tried to simplify the code as much as possible, but the scenario is not
>>> easy to reproduce.
>>>
>>> Please check this project https://github.com/kiuby88/windowing-textio
>>> Check readme to build and run (
>>> https://github.com/kiuby88/windowing-textio#build-and-run)
>>> Project contains only a class with the pipeline PipelineWithTextIo,
>>> a log4j2.xml file in the resources and the pom.
>>>
>>> The pipeline in PipelineWithTextIo generates unbounded data using a
>>> sequence. It adds a little delay (10s) per data entry, it uses a distinct
>>> (just to apply the window), and then it writes data using TexIO.
>>> The windows for the distinct is fixed (5 seconds) and it does not use
>>> lateness.
>>> Generated files can be found in
>>> windowing-textio/pipe_with_lateness_0s/files. To write files the
>>> FileNamePolicy uses window + timing + shards (see
>>> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L135
>>> )
>>> Files are emitted using getPerDestinationOutputFilenames()
>>> (see the code here,
>>> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L71-L78
>>> )
>>>
>>> Then, File names in the PCollection are extracted and logged. Please,
>>> note file names dot not have pain information in that point.
>>>
>>> To apply a window a distinct is used again. Here several files are
>>> discarded as late and they are not processed by this second distinct.
>>> Please, see
>>>
>>> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L80-L83
>>>
>>> Debug is enabled for WindowTracing, so you can find in the terminal
>>> several messages as the followiing:
>>> DEBUG org.apache.beam.sdk.util.WindowTracing - LateDataFilter: Dropping
>>> element at 2020-05-12T14:05:14.999Z for
>>> key:path/pipe_with_lateness_0s/files/[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z)-ON_TIME-0-of-1.txt;
>>> window:[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z) since too far
>>> behind inputWatermark:2020-05-12T14:05:19.799Z;
>>> outputWatermark:2020-05-12T14:05:19.799Z`
>>>
>>> What happen here? I think that messages are generated per second and a
>>> window of 5 seconds group them. Then a delay is added and finally data are
>>> written in a file.
>>> The pipeline reads more data, increasing the watermark.
>>> Then, file names are emitted without pane information (see "Emitted
>>> File" in logs). Window in second distinct compares file names' timestamp
>>> and the pipeline watermark and then it discards file names as late.
>>>
>>>
>>> Bonus
>>> -----
>>> You can add a lateness to the pipeline. See
>>> https://github.com/kiuby88/windowing-textio/blob/master/README.md#run-with-lateness
>>>
>>> If a minute is added a lateness for window the file names are processed
>>> as late. As result the traces of LateDataFilter disappear.
>>>
>>> Moreover, in order to illustrate better that file names are emitted as
>>> late for the second discarded I added a second TextIO to write file names
>>> in other files.
>>> Same FileNamePolicy than before was used (window + timing + shards).
>>> Then, you can find files that contains the original filenames in
>>> windowing-textio/pipe_with_lateness_60s/files-after-distinct. This is the
>>> interesting part, because you will find several files with LATE in their
>>> names.
>>>
>>> Please, let me know if you need more information or if the example is
>>> not enough to check the expected scenarios.
>>>
>>> Kby.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> El dom., 10 may. 2020 a las 17:04, Reuven Lax (<re...@google.com>)
>>> escribió:
>>>
>>>> Pane info is supposed to be preserved across transforms. If the Fink
>>>> runner does not, than I believe that is a bug.
>>>>
>>>> On Sat, May 9, 2020 at 11:22 PM Jozef Vilcek <jozo.vil...@gmail.com>
>>>> wrote:
>>>>
>>>>> I am using FileIO and I do observe the drop of pane info information
>>>>> on Flink runner too. It was mentioned in this thread:
>>>>> https://www.mail-archive.com/dev@beam.apache.org/msg20186.html
>>>>>
>>>>> It is a result of different reshuffle expansion for optimisation
>>>>> reasons. However, I did not observe a data loss in my case. Windowing and
>>>>> watermark info should be preserved. Pane info is not, which brings a
>>>>> question how reliable pane info should be in terms of SDK and runner.
>>>>>
>>>>> If you do observe a data loss, it would be great to share a test case
>>>>> which replicates the problem.
>>>>>
>>>>> On Sun, May 10, 2020 at 8:03 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Ah, I think I see the problem.
>>>>>>
>>>>>> It appears that for some reason, the Flink runner loses windowing
>>>>>> information when a Reshuffle is applied. I'm not entirely sure why, 
>>>>>> because
>>>>>> windowing information should be maintained across a Reshuffle.
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Sat, May 9, 2020 at 9:50 AM Jose Manuel <kiuby88....@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I have added some logs to the pipeline as following (you can find
>>>>>>> the log function in the Appendix):
>>>>>>>
>>>>>>> //STREAM + processing time.
>>>>>>> pipeline.apply(KafkaIO.read())
>>>>>>>        .apply(...) //mappers, a window and a combine
>>>>>>>        .apply(logBeforeWrite())
>>>>>>>
>>>>>>>        .apply("WriteFiles",
>>>>>>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
>>>>>>>        .getPerDestinationOutputFilenames()
>>>>>>>
>>>>>>>        .apply(logAfterWrite())
>>>>>>>        .apply("CombineFileNames", Combine.perKey(...))
>>>>>>>
>>>>>>> I have run the pipeline using DirectRunner (local), SparkRunner and
>>>>>>> FlinkRunner, both of them using a cluster.
>>>>>>> Below you can see the timing and pane information before/after (you
>>>>>>> can see traces in detail with window and timestamp information in the
>>>>>>> Appendix).
>>>>>>>
>>>>>>> DirectRunner:
>>>>>>> [Before Write] timing=ON_TIME,  pane=PaneInfo{isFirst=true,
>>>>>>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}
>>>>>>> [After    Write] timing=EARLY,       pane=PaneInfo{isFirst=true,
>>>>>>> timing=EARLY, index=0}
>>>>>>>
>>>>>>> FlinkRunner:
>>>>>>> [Before Write] timing=ON_TIME,    pane=PaneInfo{isFirst=true,
>>>>>>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}
>>>>>>> [After    Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING
>>>>>>>
>>>>>>> SparkRunner:
>>>>>>> [Before Write] timing=ON_TIME,    pane=PaneInfo{isFirst=true,
>>>>>>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}
>>>>>>> [After    Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING
>>>>>>>
>>>>>>> It seems DirectRunner propagates the windowing information as
>>>>>>> expected.
>>>>>>> I am not sure if TextIO really propagates or it just emits a window
>>>>>>> pane, because the timing before TextIO is ON_TIME and after TextIO is 
>>>>>>> EARLY.
>>>>>>> In any case using FlinkRunner and SparkRunner the timing and the
>>>>>>> pane are not set.
>>>>>>>
>>>>>>> I thought the problem was in GatherBundlesPerWindowFn, but now,
>>>>>>> after seeing that the DirectRunner filled windowing data... I am not 
>>>>>>> sure.
>>>>>>>
>>>>>>> https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406
>>>>>>>
>>>>>>>
>>>>>>> Appendix
>>>>>>> -----------
>>>>>>> Here you can see the log function and traces for different runners
>>>>>>> in detail.
>>>>>>>
>>>>>>> private SingleOutput<String, String> logBefore() {
>>>>>>>     return ParDo.of(new DoFn<String, String>() {
>>>>>>>         @ProcessElement
>>>>>>>         public void processElement(ProcessContext context,
>>>>>>> BoundedWindow boundedWindow) {
>>>>>>>             String value = context.element();
>>>>>>>             log.info("[Before Write] Element=data window={},
>>>>>>> timestamp={}, timing={}, index ={}, isFirst ={}, isLast={}, pane={}",
>>>>>>>                     boundedWindow,
>>>>>>>                     context.timestamp(),
>>>>>>>                     context.pane().getTiming(),
>>>>>>>                     context.pane().getIndex(),
>>>>>>>                     context.pane().isFirst(),
>>>>>>>                     context.pane().isLast(),
>>>>>>>                     context.pane()
>>>>>>>             );
>>>>>>>             context.output(context.element());
>>>>>>>         }
>>>>>>>     });
>>>>>>> }
>>>>>>>
>>>>>>> logAfter function shows the same information.
>>>>>>>
>>>>>>> Traces in details.
>>>>>>>
>>>>>>> DirectRunner (local):
>>>>>>> [Before Write] Element=data
>>>>>>> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
>>>>>>> timestamp=2020-05-09T13:39:59.999Z, timing=ON_TIME, index =0, isFirst
>>>>>>> =true, isLast=true  pane=PaneInfo{isFirst=true, isLast=true,
>>>>>>> timing=ON_TIME, index=0, onTimeIndex=0}
>>>>>>> [After  Write] Element=file
>>>>>>> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
>>>>>>> timestamp=2020-05-09T13:39:59.999Z, timing=EARLY,   index =0, isFirst
>>>>>>> =true, isLast=false pane=PaneInfo{isFirst=true, timing=EARLY, index=0}
>>>>>>>
>>>>>>>
>>>>>>> FlinkRunner (cluster):
>>>>>>> [Before Write] Element=data
>>>>>>> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z),
>>>>>>> timestamp=2020-05-09T15:13:59.999Z, timing=ON_TIME, index =0, isFirst
>>>>>>> =true, isLast=true  pane=PaneInfo{isFirst=true, isLast=true,
>>>>>>> timing=ON_TIME, index=0, onTimeIndex=0}
>>>>>>> [After  Write] Element=file
>>>>>>> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z),
>>>>>>> timestamp=2020-05-09T15:13:59.999Z, timing=UNKNOWN, index =0, isFirst
>>>>>>> =true, isLast=true  pane=PaneInfo.NO_FIRING
>>>>>>>
>>>>>>> SparkRunner (cluster):
>>>>>>> [Before Write] Element=data
>>>>>>> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z),
>>>>>>> timestamp=2020-05-09T15:34:59.999Z, timing=ON_TIME, index =0, isFirst
>>>>>>> =true, isLast=true  pane=PaneInfo{isFirst=true, isLast=true,
>>>>>>> timing=ON_TIME, index=0, onTimeIndex=0}
>>>>>>> [After  Write] Element=file
>>>>>>> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z),
>>>>>>> timestamp=2020-05-09T15:34:59.999Z, timing=UNKNOWN, index =0, isFirst
>>>>>>> =true, isLast=true  pane=PaneInfo.NO_FIRING
>>>>>>>
>>>>>>>
>>>>>>> El vie., 8 may. 2020 a las 19:01, Reuven Lax (<re...@google.com>)
>>>>>>> escribió:
>>>>>>>
>>>>>>>> The window information should still be there.  Beam propagates
>>>>>>>> windows through PCollection, and I don't think WriteFiles does anything
>>>>>>>> explicit to stop that.
>>>>>>>>
>>>>>>>> Can you try this with the direct runner to see what happens there?
>>>>>>>> What is your windowing on this PCollection?
>>>>>>>>
>>>>>>>> Reuven
>>>>>>>>
>>>>>>>> On Fri, May 8, 2020 at 3:49 AM Jose Manuel <kiuby88....@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I got the same behavior using Spark Runner (with Spark 2.4.3),
>>>>>>>>> window information was missing.
>>>>>>>>>
>>>>>>>>> Just to clarify, the combiner after TextIO had different results.
>>>>>>>>> In Flink runner the files names were dropped, and in Spark the 
>>>>>>>>> combination
>>>>>>>>> process happened twice, duplicating data.  I think it is because 
>>>>>>>>> different
>>>>>>>>> runners manage in a different way the data without the windowing
>>>>>>>>> information.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> El vie., 8 may. 2020 a las 0:45, Luke Cwik (<lc...@google.com>)
>>>>>>>>> escribió:
>>>>>>>>>
>>>>>>>>>> +dev <dev@beam.apache.org>
>>>>>>>>>>
>>>>>>>>>> On Mon, May 4, 2020 at 3:56 AM Jose Manuel <kiuby88....@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi guys,
>>>>>>>>>>>
>>>>>>>>>>> I think I have found something interesting about windowing.
>>>>>>>>>>>
>>>>>>>>>>> I have a pipeline that gets data from Kafka and writes in HDFS
>>>>>>>>>>> by means of TextIO.
>>>>>>>>>>> Once written, generated files are combined to apply some custom
>>>>>>>>>>> operations.
>>>>>>>>>>> However, the combine does not receive data. Following, you can
>>>>>>>>>>> find the
>>>>>>>>>>> highlight of my pipeline.
>>>>>>>>>>>
>>>>>>>>>>> //STREAM + processing time.
>>>>>>>>>>> pipeline.apply(KafkaIO.read())
>>>>>>>>>>>         .apply(...) //mappers, a window and a combine
>>>>>>>>>>>
>>>>>>>>>>>         .apply("WriteFiles",
>>>>>>>>>>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
>>>>>>>>>>>         .getPerDestinationOutputFilenames()
>>>>>>>>>>>         .apply("CombineFileNames", Combine.perKey(...))
>>>>>>>>>>>
>>>>>>>>>>> Running the pipeline with Flink I have found a log trace that
>>>>>>>>>>> says data are
>>>>>>>>>>> discarded as late in the combine CombineFileNames. Then, I have
>>>>>>>>>>> added
>>>>>>>>>>> AllowedLateness to pipeline window strategy, as a workaround.
>>>>>>>>>>> It works by now, but this opens several questions to me
>>>>>>>>>>>
>>>>>>>>>>> I think the problem is getPerDestinationOutputFilenames
>>>>>>>>>>> generates files, but
>>>>>>>>>>> it does not maintain the windowing information. Then,
>>>>>>>>>>> CombineFileNames compares
>>>>>>>>>>> file names with the watermark of the pipeline and discards them
>>>>>>>>>>> as late.
>>>>>>>>>>>
>>>>>>>>>>> Is there any issue with getPerDestinationOutputFilenames?
>>>>>>>>>>> Maybe, I am doing something wrong
>>>>>>>>>>> and using getPerDestinationOutputFilenames + combine does not
>>>>>>>>>>> make sense.
>>>>>>>>>>> What do you think?
>>>>>>>>>>>
>>>>>>>>>>> Please, note I am using Beam 2.17.0 with Flink 1.9.1.
>>>>>>>>>>>
>>>>>>>>>>> Many thanks,
>>>>>>>>>>> Jose
>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to