Hi guys,

I think I have found something interesting about windowing.

I have a pipeline that gets data from Kafka and writes in HDFS by means of
TextIO.
Once written, generated files are combined to apply some custom operations.
However, the combine does not receive data. Following, you can find the
highlight of my pipeline.

//STREAM + processing time.
pipeline.apply(KafkaIO.read())
        .apply(...) //mappers, a window and a combine

        .apply("WriteFiles",
TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
        .getPerDestinationOutputFilenames()
        .apply("CombineFileNames", Combine.perKey(...))

Running the pipeline with Flink I have found a log trace that says data are
discarded as late in the combine CombineFileNames. Then, I have added
AllowedLateness to pipeline window strategy, as a workaround.
It works by now, but this opens several questions to me

I think the problem is getPerDestinationOutputFilenames generates files, but
it does not maintain the windowing information. Then, CombineFileNames
compares
file names with the watermark of the pipeline and discards them as late.

Is there any issue with getPerDestinationOutputFilenames? Maybe, I am doing
something wrong
and using getPerDestinationOutputFilenames + combine does not make sense.
What do you think?

Please, note I am using Beam 2.17.0 with Flink 1.9.1.

Many thanks,
Jose

Reply via email to