Pane info is supposed to be preserved across transforms. If the Fink runner does not, than I believe that is a bug.
On Sat, May 9, 2020 at 11:22 PM Jozef Vilcek <jozo.vil...@gmail.com> wrote: > I am using FileIO and I do observe the drop of pane info information on > Flink runner too. It was mentioned in this thread: > https://www.mail-archive.com/dev@beam.apache.org/msg20186.html > > It is a result of different reshuffle expansion for optimisation reasons. > However, I did not observe a data loss in my case. Windowing and watermark > info should be preserved. Pane info is not, which brings a question how > reliable pane info should be in terms of SDK and runner. > > If you do observe a data loss, it would be great to share a test case > which replicates the problem. > > On Sun, May 10, 2020 at 8:03 AM Reuven Lax <re...@google.com> wrote: > >> Ah, I think I see the problem. >> >> It appears that for some reason, the Flink runner loses windowing >> information when a Reshuffle is applied. I'm not entirely sure why, because >> windowing information should be maintained across a Reshuffle. >> >> Reuven >> >> On Sat, May 9, 2020 at 9:50 AM Jose Manuel <kiuby88....@gmail.com> wrote: >> >>> >>> Hi, >>> >>> I have added some logs to the pipeline as following (you can find the >>> log function in the Appendix): >>> >>> //STREAM + processing time. >>> pipeline.apply(KafkaIO.read()) >>> .apply(...) //mappers, a window and a combine >>> .apply(logBeforeWrite()) >>> >>> .apply("WriteFiles", >>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites()) >>> .getPerDestinationOutputFilenames() >>> >>> .apply(logAfterWrite()) >>> .apply("CombineFileNames", Combine.perKey(...)) >>> >>> I have run the pipeline using DirectRunner (local), SparkRunner and >>> FlinkRunner, both of them using a cluster. >>> Below you can see the timing and pane information before/after (you can >>> see traces in detail with window and timestamp information in the Appendix). >>> >>> DirectRunner: >>> [Before Write] timing=ON_TIME, pane=PaneInfo{isFirst=true, isLast=true, >>> timing=ON_TIME, index=0, onTimeIndex=0} >>> [After Write] timing=EARLY, pane=PaneInfo{isFirst=true, >>> timing=EARLY, index=0} >>> >>> FlinkRunner: >>> [Before Write] timing=ON_TIME, pane=PaneInfo{isFirst=true, >>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} >>> [After Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING >>> >>> SparkRunner: >>> [Before Write] timing=ON_TIME, pane=PaneInfo{isFirst=true, >>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} >>> [After Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING >>> >>> It seems DirectRunner propagates the windowing information as expected. >>> I am not sure if TextIO really propagates or it just emits a window >>> pane, because the timing before TextIO is ON_TIME and after TextIO is EARLY. >>> In any case using FlinkRunner and SparkRunner the timing and the pane >>> are not set. >>> >>> I thought the problem was in GatherBundlesPerWindowFn, but now, after >>> seeing that the DirectRunner filled windowing data... I am not sure. >>> >>> https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406 >>> >>> >>> Appendix >>> ----------- >>> Here you can see the log function and traces for different runners in >>> detail. >>> >>> private SingleOutput<String, String> logBefore() { >>> return ParDo.of(new DoFn<String, String>() { >>> @ProcessElement >>> public void processElement(ProcessContext context, BoundedWindow >>> boundedWindow) { >>> String value = context.element(); >>> log.info("[Before Write] Element=data window={}, >>> timestamp={}, timing={}, index ={}, isFirst ={}, isLast={}, pane={}", >>> boundedWindow, >>> context.timestamp(), >>> context.pane().getTiming(), >>> context.pane().getIndex(), >>> context.pane().isFirst(), >>> context.pane().isLast(), >>> context.pane() >>> ); >>> context.output(context.element()); >>> } >>> }); >>> } >>> >>> logAfter function shows the same information. >>> >>> Traces in details. >>> >>> DirectRunner (local): >>> [Before Write] Element=data >>> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z), >>> timestamp=2020-05-09T13:39:59.999Z, timing=ON_TIME, index =0, isFirst >>> =true, isLast=true pane=PaneInfo{isFirst=true, isLast=true, >>> timing=ON_TIME, index=0, onTimeIndex=0} >>> [After Write] Element=file >>> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z), >>> timestamp=2020-05-09T13:39:59.999Z, timing=EARLY, index =0, isFirst >>> =true, isLast=false pane=PaneInfo{isFirst=true, timing=EARLY, index=0} >>> >>> >>> FlinkRunner (cluster): >>> [Before Write] Element=data >>> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z), >>> timestamp=2020-05-09T15:13:59.999Z, timing=ON_TIME, index =0, isFirst >>> =true, isLast=true pane=PaneInfo{isFirst=true, isLast=true, >>> timing=ON_TIME, index=0, onTimeIndex=0} >>> [After Write] Element=file >>> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z), >>> timestamp=2020-05-09T15:13:59.999Z, timing=UNKNOWN, index =0, isFirst >>> =true, isLast=true pane=PaneInfo.NO_FIRING >>> >>> SparkRunner (cluster): >>> [Before Write] Element=data >>> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z), >>> timestamp=2020-05-09T15:34:59.999Z, timing=ON_TIME, index =0, isFirst >>> =true, isLast=true pane=PaneInfo{isFirst=true, isLast=true, >>> timing=ON_TIME, index=0, onTimeIndex=0} >>> [After Write] Element=file >>> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z), >>> timestamp=2020-05-09T15:34:59.999Z, timing=UNKNOWN, index =0, isFirst >>> =true, isLast=true pane=PaneInfo.NO_FIRING >>> >>> >>> El vie., 8 may. 2020 a las 19:01, Reuven Lax (<re...@google.com>) >>> escribió: >>> >>>> The window information should still be there. Beam propagates windows >>>> through PCollection, and I don't think WriteFiles does anything explicit to >>>> stop that. >>>> >>>> Can you try this with the direct runner to see what happens there? >>>> What is your windowing on this PCollection? >>>> >>>> Reuven >>>> >>>> On Fri, May 8, 2020 at 3:49 AM Jose Manuel <kiuby88....@gmail.com> >>>> wrote: >>>> >>>>> I got the same behavior using Spark Runner (with Spark 2.4.3), window >>>>> information was missing. >>>>> >>>>> Just to clarify, the combiner after TextIO had different results. In >>>>> Flink runner the files names were dropped, and in Spark the combination >>>>> process happened twice, duplicating data. I think it is because different >>>>> runners manage in a different way the data without the windowing >>>>> information. >>>>> >>>>> >>>>> >>>>> El vie., 8 may. 2020 a las 0:45, Luke Cwik (<lc...@google.com>) >>>>> escribió: >>>>> >>>>>> +dev <dev@beam.apache.org> >>>>>> >>>>>> On Mon, May 4, 2020 at 3:56 AM Jose Manuel <kiuby88....@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi guys, >>>>>>> >>>>>>> I think I have found something interesting about windowing. >>>>>>> >>>>>>> I have a pipeline that gets data from Kafka and writes in HDFS by >>>>>>> means of TextIO. >>>>>>> Once written, generated files are combined to apply some custom >>>>>>> operations. >>>>>>> However, the combine does not receive data. Following, you can find >>>>>>> the >>>>>>> highlight of my pipeline. >>>>>>> >>>>>>> //STREAM + processing time. >>>>>>> pipeline.apply(KafkaIO.read()) >>>>>>> .apply(...) //mappers, a window and a combine >>>>>>> >>>>>>> .apply("WriteFiles", >>>>>>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites()) >>>>>>> .getPerDestinationOutputFilenames() >>>>>>> .apply("CombineFileNames", Combine.perKey(...)) >>>>>>> >>>>>>> Running the pipeline with Flink I have found a log trace that says >>>>>>> data are >>>>>>> discarded as late in the combine CombineFileNames. Then, I have >>>>>>> added >>>>>>> AllowedLateness to pipeline window strategy, as a workaround. >>>>>>> It works by now, but this opens several questions to me >>>>>>> >>>>>>> I think the problem is getPerDestinationOutputFilenames generates >>>>>>> files, but >>>>>>> it does not maintain the windowing information. Then, >>>>>>> CombineFileNames compares >>>>>>> file names with the watermark of the pipeline and discards them as >>>>>>> late. >>>>>>> >>>>>>> Is there any issue with getPerDestinationOutputFilenames? Maybe, I >>>>>>> am doing something wrong >>>>>>> and using getPerDestinationOutputFilenames + combine does not make >>>>>>> sense. >>>>>>> What do you think? >>>>>>> >>>>>>> Please, note I am using Beam 2.17.0 with Flink 1.9.1. >>>>>>> >>>>>>> Many thanks, >>>>>>> Jose >>>>>>> >>>>>>>