Ah, I think I see the problem.

It appears that for some reason, the Flink runner loses windowing
information when a Reshuffle is applied. I'm not entirely sure why, because
windowing information should be maintained across a Reshuffle.

Reuven

On Sat, May 9, 2020 at 9:50 AM Jose Manuel <kiuby88....@gmail.com> wrote:

>
> Hi,
>
> I have added some logs to the pipeline as following (you can find the log
> function in the Appendix):
>
> //STREAM + processing time.
> pipeline.apply(KafkaIO.read())
>        .apply(...) //mappers, a window and a combine
>        .apply(logBeforeWrite())
>
>        .apply("WriteFiles",
> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
>        .getPerDestinationOutputFilenames()
>
>        .apply(logAfterWrite())
>        .apply("CombineFileNames", Combine.perKey(...))
>
> I have run the pipeline using DirectRunner (local), SparkRunner and
> FlinkRunner, both of them using a cluster.
> Below you can see the timing and pane information before/after (you can
> see traces in detail with window and timestamp information in the Appendix).
>
> DirectRunner:
> [Before Write] timing=ON_TIME,  pane=PaneInfo{isFirst=true, isLast=true,
> timing=ON_TIME, index=0, onTimeIndex=0}
> [After    Write] timing=EARLY,       pane=PaneInfo{isFirst=true,
> timing=EARLY, index=0}
>
> FlinkRunner:
> [Before Write] timing=ON_TIME,    pane=PaneInfo{isFirst=true, isLast=true,
> timing=ON_TIME, index=0, onTimeIndex=0}
> [After    Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING
>
> SparkRunner:
> [Before Write] timing=ON_TIME,    pane=PaneInfo{isFirst=true, isLast=true,
> timing=ON_TIME, index=0, onTimeIndex=0}
> [After    Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING
>
> It seems DirectRunner propagates the windowing information as expected.
> I am not sure if TextIO really propagates or it just emits a window pane,
> because the timing before TextIO is ON_TIME and after TextIO is EARLY.
> In any case using FlinkRunner and SparkRunner the timing and the pane are
> not set.
>
> I thought the problem was in GatherBundlesPerWindowFn, but now, after
> seeing that the DirectRunner filled windowing data... I am not sure.
>
> https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406
>
>
> Appendix
> -----------
> Here you can see the log function and traces for different runners in
> detail.
>
> private SingleOutput<String, String> logBefore() {
>     return ParDo.of(new DoFn<String, String>() {
>         @ProcessElement
>         public void processElement(ProcessContext context, BoundedWindow
> boundedWindow) {
>             String value = context.element();
>             log.info("[Before Write] Element=data window={},
> timestamp={}, timing={}, index ={}, isFirst ={}, isLast={}, pane={}",
>                     boundedWindow,
>                     context.timestamp(),
>                     context.pane().getTiming(),
>                     context.pane().getIndex(),
>                     context.pane().isFirst(),
>                     context.pane().isLast(),
>                     context.pane()
>             );
>             context.output(context.element());
>         }
>     });
> }
>
> logAfter function shows the same information.
>
> Traces in details.
>
> DirectRunner (local):
> [Before Write] Element=data
> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
> timestamp=2020-05-09T13:39:59.999Z, timing=ON_TIME, index =0, isFirst
> =true, isLast=true  pane=PaneInfo{isFirst=true, isLast=true,
> timing=ON_TIME, index=0, onTimeIndex=0}
> [After  Write] Element=file
> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
> timestamp=2020-05-09T13:39:59.999Z, timing=EARLY,   index =0, isFirst
> =true, isLast=false pane=PaneInfo{isFirst=true, timing=EARLY, index=0}
>
>
> FlinkRunner (cluster):
> [Before Write] Element=data
> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z),
> timestamp=2020-05-09T15:13:59.999Z, timing=ON_TIME, index =0, isFirst
> =true, isLast=true  pane=PaneInfo{isFirst=true, isLast=true,
> timing=ON_TIME, index=0, onTimeIndex=0}
> [After  Write] Element=file
> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z),
> timestamp=2020-05-09T15:13:59.999Z, timing=UNKNOWN, index =0, isFirst
> =true, isLast=true  pane=PaneInfo.NO_FIRING
>
> SparkRunner (cluster):
> [Before Write] Element=data
> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z),
> timestamp=2020-05-09T15:34:59.999Z, timing=ON_TIME, index =0, isFirst
> =true, isLast=true  pane=PaneInfo{isFirst=true, isLast=true,
> timing=ON_TIME, index=0, onTimeIndex=0}
> [After  Write] Element=file
> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z),
> timestamp=2020-05-09T15:34:59.999Z, timing=UNKNOWN, index =0, isFirst
> =true, isLast=true  pane=PaneInfo.NO_FIRING
>
>
> El vie., 8 may. 2020 a las 19:01, Reuven Lax (<re...@google.com>)
> escribió:
>
>> The window information should still be there.  Beam propagates windows
>> through PCollection, and I don't think WriteFiles does anything explicit to
>> stop that.
>>
>> Can you try this with the direct runner to see what happens there?
>> What is your windowing on this PCollection?
>>
>> Reuven
>>
>> On Fri, May 8, 2020 at 3:49 AM Jose Manuel <kiuby88....@gmail.com> wrote:
>>
>>> I got the same behavior using Spark Runner (with Spark 2.4.3), window
>>> information was missing.
>>>
>>> Just to clarify, the combiner after TextIO had different results. In
>>> Flink runner the files names were dropped, and in Spark the combination
>>> process happened twice, duplicating data.  I think it is because different
>>> runners manage in a different way the data without the windowing
>>> information.
>>>
>>>
>>>
>>> El vie., 8 may. 2020 a las 0:45, Luke Cwik (<lc...@google.com>)
>>> escribió:
>>>
>>>> +dev <d...@beam.apache.org>
>>>>
>>>> On Mon, May 4, 2020 at 3:56 AM Jose Manuel <kiuby88....@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi guys,
>>>>>
>>>>> I think I have found something interesting about windowing.
>>>>>
>>>>> I have a pipeline that gets data from Kafka and writes in HDFS by
>>>>> means of TextIO.
>>>>> Once written, generated files are combined to apply some custom
>>>>> operations.
>>>>> However, the combine does not receive data. Following, you can find the
>>>>> highlight of my pipeline.
>>>>>
>>>>> //STREAM + processing time.
>>>>> pipeline.apply(KafkaIO.read())
>>>>>         .apply(...) //mappers, a window and a combine
>>>>>
>>>>>         .apply("WriteFiles",
>>>>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
>>>>>         .getPerDestinationOutputFilenames()
>>>>>         .apply("CombineFileNames", Combine.perKey(...))
>>>>>
>>>>> Running the pipeline with Flink I have found a log trace that says
>>>>> data are
>>>>> discarded as late in the combine CombineFileNames. Then, I have added
>>>>> AllowedLateness to pipeline window strategy, as a workaround.
>>>>> It works by now, but this opens several questions to me
>>>>>
>>>>> I think the problem is getPerDestinationOutputFilenames generates
>>>>> files, but
>>>>> it does not maintain the windowing information. Then, CombineFileNames
>>>>> compares
>>>>> file names with the watermark of the pipeline and discards them as
>>>>> late.
>>>>>
>>>>> Is there any issue with getPerDestinationOutputFilenames? Maybe, I am
>>>>> doing something wrong
>>>>> and using getPerDestinationOutputFilenames + combine does not make
>>>>> sense.
>>>>> What do you think?
>>>>>
>>>>> Please, note I am using Beam 2.17.0 with Flink 1.9.1.
>>>>>
>>>>> Many thanks,
>>>>> Jose
>>>>>
>>>>>

Reply via email to