I got the same behavior using Spark Runner (with Spark 2.4.3), window
information was missing.

Just to clarify, the combiner after TextIO had different results. In Flink
runner the files names were dropped, and in Spark the combination process
happened twice, duplicating data.  I think it is because different runners
manage in a different way the data without the windowing information.



El vie., 8 may. 2020 a las 0:45, Luke Cwik (<lc...@google.com>) escribió:

> +dev <d...@beam.apache.org>
>
> On Mon, May 4, 2020 at 3:56 AM Jose Manuel <kiuby88....@gmail.com> wrote:
>
>> Hi guys,
>>
>> I think I have found something interesting about windowing.
>>
>> I have a pipeline that gets data from Kafka and writes in HDFS by means
>> of TextIO.
>> Once written, generated files are combined to apply some custom
>> operations.
>> However, the combine does not receive data. Following, you can find the
>> highlight of my pipeline.
>>
>> //STREAM + processing time.
>> pipeline.apply(KafkaIO.read())
>>         .apply(...) //mappers, a window and a combine
>>
>>         .apply("WriteFiles",
>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
>>         .getPerDestinationOutputFilenames()
>>         .apply("CombineFileNames", Combine.perKey(...))
>>
>> Running the pipeline with Flink I have found a log trace that says data
>> are
>> discarded as late in the combine CombineFileNames. Then, I have added
>> AllowedLateness to pipeline window strategy, as a workaround.
>> It works by now, but this opens several questions to me
>>
>> I think the problem is getPerDestinationOutputFilenames generates files,
>> but
>> it does not maintain the windowing information. Then, CombineFileNames
>> compares
>> file names with the watermark of the pipeline and discards them as late.
>>
>> Is there any issue with getPerDestinationOutputFilenames? Maybe, I am
>> doing something wrong
>> and using getPerDestinationOutputFilenames + combine does not make sense.
>> What do you think?
>>
>> Please, note I am using Beam 2.17.0 with Flink 1.9.1.
>>
>> Many thanks,
>> Jose
>>
>>

Reply via email to