Hi, I have added some logs to the pipeline as following (you can find the log function in the Appendix):
//STREAM + processing time. pipeline.apply(KafkaIO.read()) .apply(...) //mappers, a window and a combine .apply(logBeforeWrite()) .apply("WriteFiles", TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites()) .getPerDestinationOutputFilenames() .apply(logAfterWrite()) .apply("CombineFileNames", Combine.perKey(...)) I have run the pipeline using DirectRunner (local), SparkRunner and FlinkRunner, both of them using a cluster. Below you can see the timing and pane information before/after (you can see traces in detail with window and timestamp information in the Appendix). DirectRunner: [Before Write] timing=ON_TIME, pane=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} [After Write] timing=EARLY, pane=PaneInfo{isFirst=true, timing=EARLY, index=0} FlinkRunner: [Before Write] timing=ON_TIME, pane=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} [After Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING SparkRunner: [Before Write] timing=ON_TIME, pane=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} [After Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING It seems DirectRunner propagates the windowing information as expected. I am not sure if TextIO really propagates or it just emits a window pane, because the timing before TextIO is ON_TIME and after TextIO is EARLY. In any case using FlinkRunner and SparkRunner the timing and the pane are not set. I thought the problem was in GatherBundlesPerWindowFn, but now, after seeing that the DirectRunner filled windowing data... I am not sure. https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406 Appendix ----------- Here you can see the log function and traces for different runners in detail. private SingleOutput<String, String> logBefore() { return ParDo.of(new DoFn<String, String>() { @ProcessElement public void processElement(ProcessContext context, BoundedWindow boundedWindow) { String value = context.element(); log.info("[Before Write] Element=data window={}, timestamp={}, timing={}, index ={}, isFirst ={}, isLast={}, pane={}", boundedWindow, context.timestamp(), context.pane().getTiming(), context.pane().getIndex(), context.pane().isFirst(), context.pane().isLast(), context.pane() ); context.output(context.element()); } }); } logAfter function shows the same information. Traces in details. DirectRunner (local): [Before Write] Element=data window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z), timestamp=2020-05-09T13:39:59.999Z, timing=ON_TIME, index =0, isFirst =true, isLast=true pane=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} [After Write] Element=file window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z), timestamp=2020-05-09T13:39:59.999Z, timing=EARLY, index =0, isFirst =true, isLast=false pane=PaneInfo{isFirst=true, timing=EARLY, index=0} FlinkRunner (cluster): [Before Write] Element=data window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z), timestamp=2020-05-09T15:13:59.999Z, timing=ON_TIME, index =0, isFirst =true, isLast=true pane=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} [After Write] Element=file window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z), timestamp=2020-05-09T15:13:59.999Z, timing=UNKNOWN, index =0, isFirst =true, isLast=true pane=PaneInfo.NO_FIRING SparkRunner (cluster): [Before Write] Element=data window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z), timestamp=2020-05-09T15:34:59.999Z, timing=ON_TIME, index =0, isFirst =true, isLast=true pane=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} [After Write] Element=file window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z), timestamp=2020-05-09T15:34:59.999Z, timing=UNKNOWN, index =0, isFirst =true, isLast=true pane=PaneInfo.NO_FIRING El vie., 8 may. 2020 a las 19:01, Reuven Lax (<re...@google.com>) escribió: > The window information should still be there. Beam propagates windows > through PCollection, and I don't think WriteFiles does anything explicit to > stop that. > > Can you try this with the direct runner to see what happens there? > What is your windowing on this PCollection? > > Reuven > > On Fri, May 8, 2020 at 3:49 AM Jose Manuel <kiuby88....@gmail.com> wrote: > >> I got the same behavior using Spark Runner (with Spark 2.4.3), window >> information was missing. >> >> Just to clarify, the combiner after TextIO had different results. In >> Flink runner the files names were dropped, and in Spark the combination >> process happened twice, duplicating data. I think it is because different >> runners manage in a different way the data without the windowing >> information. >> >> >> >> El vie., 8 may. 2020 a las 0:45, Luke Cwik (<lc...@google.com>) escribió: >> >>> +dev <dev@beam.apache.org> >>> >>> On Mon, May 4, 2020 at 3:56 AM Jose Manuel <kiuby88....@gmail.com> >>> wrote: >>> >>>> Hi guys, >>>> >>>> I think I have found something interesting about windowing. >>>> >>>> I have a pipeline that gets data from Kafka and writes in HDFS by means >>>> of TextIO. >>>> Once written, generated files are combined to apply some custom >>>> operations. >>>> However, the combine does not receive data. Following, you can find the >>>> highlight of my pipeline. >>>> >>>> //STREAM + processing time. >>>> pipeline.apply(KafkaIO.read()) >>>> .apply(...) //mappers, a window and a combine >>>> >>>> .apply("WriteFiles", >>>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites()) >>>> .getPerDestinationOutputFilenames() >>>> .apply("CombineFileNames", Combine.perKey(...)) >>>> >>>> Running the pipeline with Flink I have found a log trace that says data >>>> are >>>> discarded as late in the combine CombineFileNames. Then, I have added >>>> AllowedLateness to pipeline window strategy, as a workaround. >>>> It works by now, but this opens several questions to me >>>> >>>> I think the problem is getPerDestinationOutputFilenames generates >>>> files, but >>>> it does not maintain the windowing information. Then, CombineFileNames >>>> compares >>>> file names with the watermark of the pipeline and discards them as late. >>>> >>>> Is there any issue with getPerDestinationOutputFilenames? Maybe, I am >>>> doing something wrong >>>> and using getPerDestinationOutputFilenames + combine does not make >>>> sense. >>>> What do you think? >>>> >>>> Please, note I am using Beam 2.17.0 with Flink 1.9.1. >>>> >>>> Many thanks, >>>> Jose >>>> >>>>