The window information should still be there.  Beam propagates windows
through PCollection, and I don't think WriteFiles does anything explicit to
stop that.

Can you try this with the direct runner to see what happens there?
What is your windowing on this PCollection?

Reuven

On Fri, May 8, 2020 at 3:49 AM Jose Manuel <kiuby88....@gmail.com> wrote:

> I got the same behavior using Spark Runner (with Spark 2.4.3), window
> information was missing.
>
> Just to clarify, the combiner after TextIO had different results. In Flink
> runner the files names were dropped, and in Spark the combination process
> happened twice, duplicating data.  I think it is because different runners
> manage in a different way the data without the windowing information.
>
>
>
> El vie., 8 may. 2020 a las 0:45, Luke Cwik (<lc...@google.com>) escribió:
>
>> +dev <d...@beam.apache.org>
>>
>> On Mon, May 4, 2020 at 3:56 AM Jose Manuel <kiuby88....@gmail.com> wrote:
>>
>>> Hi guys,
>>>
>>> I think I have found something interesting about windowing.
>>>
>>> I have a pipeline that gets data from Kafka and writes in HDFS by means
>>> of TextIO.
>>> Once written, generated files are combined to apply some custom
>>> operations.
>>> However, the combine does not receive data. Following, you can find the
>>> highlight of my pipeline.
>>>
>>> //STREAM + processing time.
>>> pipeline.apply(KafkaIO.read())
>>>         .apply(...) //mappers, a window and a combine
>>>
>>>         .apply("WriteFiles",
>>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
>>>         .getPerDestinationOutputFilenames()
>>>         .apply("CombineFileNames", Combine.perKey(...))
>>>
>>> Running the pipeline with Flink I have found a log trace that says data
>>> are
>>> discarded as late in the combine CombineFileNames. Then, I have added
>>> AllowedLateness to pipeline window strategy, as a workaround.
>>> It works by now, but this opens several questions to me
>>>
>>> I think the problem is getPerDestinationOutputFilenames generates files,
>>> but
>>> it does not maintain the windowing information. Then, CombineFileNames
>>> compares
>>> file names with the watermark of the pipeline and discards them as late.
>>>
>>> Is there any issue with getPerDestinationOutputFilenames? Maybe, I am
>>> doing something wrong
>>> and using getPerDestinationOutputFilenames + combine does not make sense.
>>> What do you think?
>>>
>>> Please, note I am using Beam 2.17.0 with Flink 1.9.1.
>>>
>>> Many thanks,
>>> Jose
>>>
>>>

Reply via email to