Pane info is supposed to be preserved across transforms. If the Fink runner
does not, than I believe that is a bug.

On Sat, May 9, 2020 at 11:22 PM Jozef Vilcek <jozo.vil...@gmail.com> wrote:

> I am using FileIO and I do observe the drop of pane info information on
> Flink runner too. It was mentioned in this thread:
> https://www.mail-archive.com/dev@beam.apache.org/msg20186.html
>
> It is a result of different reshuffle expansion for optimisation reasons.
> However, I did not observe a data loss in my case. Windowing and watermark
> info should be preserved. Pane info is not, which brings a question how
> reliable pane info should be in terms of SDK and runner.
>
> If you do observe a data loss, it would be great to share a test case
> which replicates the problem.
>
> On Sun, May 10, 2020 at 8:03 AM Reuven Lax <re...@google.com> wrote:
>
>> Ah, I think I see the problem.
>>
>> It appears that for some reason, the Flink runner loses windowing
>> information when a Reshuffle is applied. I'm not entirely sure why, because
>> windowing information should be maintained across a Reshuffle.
>>
>> Reuven
>>
>> On Sat, May 9, 2020 at 9:50 AM Jose Manuel <kiuby88....@gmail.com> wrote:
>>
>>>
>>> Hi,
>>>
>>> I have added some logs to the pipeline as following (you can find the
>>> log function in the Appendix):
>>>
>>> //STREAM + processing time.
>>> pipeline.apply(KafkaIO.read())
>>>        .apply(...) //mappers, a window and a combine
>>>        .apply(logBeforeWrite())
>>>
>>>        .apply("WriteFiles",
>>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
>>>        .getPerDestinationOutputFilenames()
>>>
>>>        .apply(logAfterWrite())
>>>        .apply("CombineFileNames", Combine.perKey(...))
>>>
>>> I have run the pipeline using DirectRunner (local), SparkRunner and
>>> FlinkRunner, both of them using a cluster.
>>> Below you can see the timing and pane information before/after (you can
>>> see traces in detail with window and timestamp information in the Appendix).
>>>
>>> DirectRunner:
>>> [Before Write] timing=ON_TIME,  pane=PaneInfo{isFirst=true, isLast=true,
>>> timing=ON_TIME, index=0, onTimeIndex=0}
>>> [After    Write] timing=EARLY,       pane=PaneInfo{isFirst=true,
>>> timing=EARLY, index=0}
>>>
>>> FlinkRunner:
>>> [Before Write] timing=ON_TIME,    pane=PaneInfo{isFirst=true,
>>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}
>>> [After    Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING
>>>
>>> SparkRunner:
>>> [Before Write] timing=ON_TIME,    pane=PaneInfo{isFirst=true,
>>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}
>>> [After    Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING
>>>
>>> It seems DirectRunner propagates the windowing information as expected.
>>> I am not sure if TextIO really propagates or it just emits a window
>>> pane, because the timing before TextIO is ON_TIME and after TextIO is EARLY.
>>> In any case using FlinkRunner and SparkRunner the timing and the pane
>>> are not set.
>>>
>>> I thought the problem was in GatherBundlesPerWindowFn, but now, after
>>> seeing that the DirectRunner filled windowing data... I am not sure.
>>>
>>> https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406
>>>
>>>
>>> Appendix
>>> -----------
>>> Here you can see the log function and traces for different runners in
>>> detail.
>>>
>>> private SingleOutput<String, String> logBefore() {
>>>     return ParDo.of(new DoFn<String, String>() {
>>>         @ProcessElement
>>>         public void processElement(ProcessContext context, BoundedWindow
>>> boundedWindow) {
>>>             String value = context.element();
>>>             log.info("[Before Write] Element=data window={},
>>> timestamp={}, timing={}, index ={}, isFirst ={}, isLast={}, pane={}",
>>>                     boundedWindow,
>>>                     context.timestamp(),
>>>                     context.pane().getTiming(),
>>>                     context.pane().getIndex(),
>>>                     context.pane().isFirst(),
>>>                     context.pane().isLast(),
>>>                     context.pane()
>>>             );
>>>             context.output(context.element());
>>>         }
>>>     });
>>> }
>>>
>>> logAfter function shows the same information.
>>>
>>> Traces in details.
>>>
>>> DirectRunner (local):
>>> [Before Write] Element=data
>>> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
>>> timestamp=2020-05-09T13:39:59.999Z, timing=ON_TIME, index =0, isFirst
>>> =true, isLast=true  pane=PaneInfo{isFirst=true, isLast=true,
>>> timing=ON_TIME, index=0, onTimeIndex=0}
>>> [After  Write] Element=file
>>> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
>>> timestamp=2020-05-09T13:39:59.999Z, timing=EARLY,   index =0, isFirst
>>> =true, isLast=false pane=PaneInfo{isFirst=true, timing=EARLY, index=0}
>>>
>>>
>>> FlinkRunner (cluster):
>>> [Before Write] Element=data
>>> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z),
>>> timestamp=2020-05-09T15:13:59.999Z, timing=ON_TIME, index =0, isFirst
>>> =true, isLast=true  pane=PaneInfo{isFirst=true, isLast=true,
>>> timing=ON_TIME, index=0, onTimeIndex=0}
>>> [After  Write] Element=file
>>> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z),
>>> timestamp=2020-05-09T15:13:59.999Z, timing=UNKNOWN, index =0, isFirst
>>> =true, isLast=true  pane=PaneInfo.NO_FIRING
>>>
>>> SparkRunner (cluster):
>>> [Before Write] Element=data
>>> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z),
>>> timestamp=2020-05-09T15:34:59.999Z, timing=ON_TIME, index =0, isFirst
>>> =true, isLast=true  pane=PaneInfo{isFirst=true, isLast=true,
>>> timing=ON_TIME, index=0, onTimeIndex=0}
>>> [After  Write] Element=file
>>> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z),
>>> timestamp=2020-05-09T15:34:59.999Z, timing=UNKNOWN, index =0, isFirst
>>> =true, isLast=true  pane=PaneInfo.NO_FIRING
>>>
>>>
>>> El vie., 8 may. 2020 a las 19:01, Reuven Lax (<re...@google.com>)
>>> escribió:
>>>
>>>> The window information should still be there.  Beam propagates windows
>>>> through PCollection, and I don't think WriteFiles does anything explicit to
>>>> stop that.
>>>>
>>>> Can you try this with the direct runner to see what happens there?
>>>> What is your windowing on this PCollection?
>>>>
>>>> Reuven
>>>>
>>>> On Fri, May 8, 2020 at 3:49 AM Jose Manuel <kiuby88....@gmail.com>
>>>> wrote:
>>>>
>>>>> I got the same behavior using Spark Runner (with Spark 2.4.3), window
>>>>> information was missing.
>>>>>
>>>>> Just to clarify, the combiner after TextIO had different results. In
>>>>> Flink runner the files names were dropped, and in Spark the combination
>>>>> process happened twice, duplicating data.  I think it is because different
>>>>> runners manage in a different way the data without the windowing
>>>>> information.
>>>>>
>>>>>
>>>>>
>>>>> El vie., 8 may. 2020 a las 0:45, Luke Cwik (<lc...@google.com>)
>>>>> escribió:
>>>>>
>>>>>> +dev <dev@beam.apache.org>
>>>>>>
>>>>>> On Mon, May 4, 2020 at 3:56 AM Jose Manuel <kiuby88....@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi guys,
>>>>>>>
>>>>>>> I think I have found something interesting about windowing.
>>>>>>>
>>>>>>> I have a pipeline that gets data from Kafka and writes in HDFS by
>>>>>>> means of TextIO.
>>>>>>> Once written, generated files are combined to apply some custom
>>>>>>> operations.
>>>>>>> However, the combine does not receive data. Following, you can find
>>>>>>> the
>>>>>>> highlight of my pipeline.
>>>>>>>
>>>>>>> //STREAM + processing time.
>>>>>>> pipeline.apply(KafkaIO.read())
>>>>>>>         .apply(...) //mappers, a window and a combine
>>>>>>>
>>>>>>>         .apply("WriteFiles",
>>>>>>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
>>>>>>>         .getPerDestinationOutputFilenames()
>>>>>>>         .apply("CombineFileNames", Combine.perKey(...))
>>>>>>>
>>>>>>> Running the pipeline with Flink I have found a log trace that says
>>>>>>> data are
>>>>>>> discarded as late in the combine CombineFileNames. Then, I have
>>>>>>> added
>>>>>>> AllowedLateness to pipeline window strategy, as a workaround.
>>>>>>> It works by now, but this opens several questions to me
>>>>>>>
>>>>>>> I think the problem is getPerDestinationOutputFilenames generates
>>>>>>> files, but
>>>>>>> it does not maintain the windowing information. Then,
>>>>>>> CombineFileNames compares
>>>>>>> file names with the watermark of the pipeline and discards them as
>>>>>>> late.
>>>>>>>
>>>>>>> Is there any issue with getPerDestinationOutputFilenames? Maybe, I
>>>>>>> am doing something wrong
>>>>>>> and using getPerDestinationOutputFilenames + combine does not make
>>>>>>> sense.
>>>>>>> What do you think?
>>>>>>>
>>>>>>> Please, note I am using Beam 2.17.0 with Flink 1.9.1.
>>>>>>>
>>>>>>> Many thanks,
>>>>>>> Jose
>>>>>>>
>>>>>>>

Reply via email to