
I would like to clarify that while TextIO is writing every data are in the
files (shards). The losing happens when file names emitted by
getPerDestinationOutputFilenames are processed by a window.

I have created a pipeline to reproduce the scenario in which some filenames
are loss after the getPerDestinationOutputFilenames. Please, note I tried
to simplify the code as much as possible, but the scenario is not easy to

Please check this project https://github.com/kiuby88/windowing-textio
Check readme to build and run (
Project contains only a class with the pipeline PipelineWithTextIo,
a log4j2.xml file in the resources and the pom.

The pipeline in PipelineWithTextIo generates unbounded data using a
sequence. It adds a little delay (10s) per data entry, it uses a distinct
(just to apply the window), and then it writes data using TexIO.
The windows for the distinct is fixed (5 seconds) and it does not use
Generated files can be found in
windowing-textio/pipe_with_lateness_0s/files. To write files the
FileNamePolicy uses window + timing + shards (see
Files are emitted using getPerDestinationOutputFilenames()
(see the code here,

Then, File names in the PCollection are extracted and logged. Please, note
file names dot not have pain information in that point.

To apply a window a distinct is used again. Here several files are
discarded as late and they are not processed by this second distinct.
Please, see

Debug is enabled for WindowTracing, so you can find in the terminal several
messages as the followiing:
DEBUG org.apache.beam.sdk.util.WindowTracing - LateDataFilter: Dropping
element at 2020-05-12T14:05:14.999Z for
window:[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z) since too far
behind inputWatermark:2020-05-12T14:05:19.799Z;

What happen here? I think that messages are generated per second and a
window of 5 seconds group them. Then a delay is added and finally data are
written in a file.
The pipeline reads more data, increasing the watermark.
Then, file names are emitted without pane information (see "Emitted File"
in logs). Window in second distinct compares file names' timestamp and the
pipeline watermark and then it discards file names as late.

You can add a lateness to the pipeline. See

If a minute is added a lateness for window the file names are processed as
late. As result the traces of LateDataFilter disappear.

Moreover, in order to illustrate better that file names are emitted as late
for the second discarded I added a second TextIO to write file names in
other files.
Same FileNamePolicy than before was used (window + timing + shards). Then,
you can find files that contains the original filenames in
windowing-textio/pipe_with_lateness_60s/files-after-distinct. This is the
interesting part, because you will find several files with LATE in their

Please, let me know if you need more information or if the example is not
enough to check the expected scenarios.


El dom., 10 may. 2020 a las 17:04, Reuven Lax (<re...@google.com>) escribió:

> Pane info is supposed to be preserved across transforms. If the Fink
> runner does not, than I believe that is a bug.
> On Sat, May 9, 2020 at 11:22 PM Jozef Vilcek <jozo.vil...@gmail.com>
> wrote:
>> I am using FileIO and I do observe the drop of pane info information on
>> Flink runner too. It was mentioned in this thread:
>> https://www.mail-archive.com/dev@beam.apache.org/msg20186.html
>> It is a result of different reshuffle expansion for optimisation reasons.
>> However, I did not observe a data loss in my case. Windowing and watermark
>> info should be preserved. Pane info is not, which brings a question how
>> reliable pane info should be in terms of SDK and runner.
>> If you do observe a data loss, it would be great to share a test case
>> which replicates the problem.
>> On Sun, May 10, 2020 at 8:03 AM Reuven Lax <re...@google.com> wrote:
>>> Ah, I think I see the problem.
>>> It appears that for some reason, the Flink runner loses windowing
>>> information when a Reshuffle is applied. I'm not entirely sure why, because
>>> windowing information should be maintained across a Reshuffle.
>>> Reuven
>>> On Sat, May 9, 2020 at 9:50 AM Jose Manuel <kiuby88....@gmail.com>
>>> wrote:
>>>> Hi,
>>>> I have added some logs to the pipeline as following (you can find the
>>>> log function in the Appendix):
>>>> //STREAM + processing time.
>>>> pipeline.apply(KafkaIO.read())
>>>>        .apply(...) //mappers, a window and a combine
>>>>        .apply(logBeforeWrite())
>>>>        .apply("WriteFiles",
>>>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
>>>>        .getPerDestinationOutputFilenames()
>>>>        .apply(logAfterWrite())
>>>>        .apply("CombineFileNames", Combine.perKey(...))
>>>> I have run the pipeline using DirectRunner (local), SparkRunner and
>>>> FlinkRunner, both of them using a cluster.
>>>> Below you can see the timing and pane information before/after (you can
>>>> see traces in detail with window and timestamp information in the 
>>>> Appendix).
>>>> DirectRunner:
>>>> [Before Write] timing=ON_TIME,  pane=PaneInfo{isFirst=true,
>>>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}
>>>> [After    Write] timing=EARLY,       pane=PaneInfo{isFirst=true,
>>>> timing=EARLY, index=0}
>>>> FlinkRunner:
>>>> [Before Write] timing=ON_TIME,    pane=PaneInfo{isFirst=true,
>>>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}
>>>> [After    Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING
>>>> SparkRunner:
>>>> [Before Write] timing=ON_TIME,    pane=PaneInfo{isFirst=true,
>>>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}
>>>> [After    Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING
>>>> It seems DirectRunner propagates the windowing information as expected.
>>>> I am not sure if TextIO really propagates or it just emits a window
>>>> pane, because the timing before TextIO is ON_TIME and after TextIO is 
>>>> EARLY.
>>>> In any case using FlinkRunner and SparkRunner the timing and the pane
>>>> are not set.
>>>> I thought the problem was in GatherBundlesPerWindowFn, but now, after
>>>> seeing that the DirectRunner filled windowing data... I am not sure.
>>>> https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406
>>>> Appendix
>>>> -----------
>>>> Here you can see the log function and traces for different runners in
>>>> detail.
>>>> private SingleOutput<String, String> logBefore() {
>>>>     return ParDo.of(new DoFn<String, String>() {
>>>>         @ProcessElement
>>>>         public void processElement(ProcessContext context,
>>>> BoundedWindow boundedWindow) {
>>>>             String value = context.element();
>>>>             log.info("[Before Write] Element=data window={},
>>>> timestamp={}, timing={}, index ={}, isFirst ={}, isLast={}, pane={}",
>>>>                     boundedWindow,
>>>>                     context.timestamp(),
>>>>                     context.pane().getTiming(),
>>>>                     context.pane().getIndex(),
>>>>                     context.pane().isFirst(),
>>>>                     context.pane().isLast(),
>>>>                     context.pane()
>>>>             );
>>>>             context.output(context.element());
>>>>         }
>>>>     });
>>>> }
>>>> logAfter function shows the same information.
>>>> Traces in details.
>>>> DirectRunner (local):
>>>> [Before Write] Element=data
>>>> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
>>>> timestamp=2020-05-09T13:39:59.999Z, timing=ON_TIME, index =0, isFirst
>>>> =true, isLast=true  pane=PaneInfo{isFirst=true, isLast=true,
>>>> timing=ON_TIME, index=0, onTimeIndex=0}
>>>> [After  Write] Element=file
>>>> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
>>>> timestamp=2020-05-09T13:39:59.999Z, timing=EARLY,   index =0, isFirst
>>>> =true, isLast=false pane=PaneInfo{isFirst=true, timing=EARLY, index=0}
>>>> FlinkRunner (cluster):
>>>> [Before Write] Element=data
>>>> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z),
>>>> timestamp=2020-05-09T15:13:59.999Z, timing=ON_TIME, index =0, isFirst
>>>> =true, isLast=true  pane=PaneInfo{isFirst=true, isLast=true,
>>>> timing=ON_TIME, index=0, onTimeIndex=0}
>>>> [After  Write] Element=file
>>>> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z),
>>>> timestamp=2020-05-09T15:13:59.999Z, timing=UNKNOWN, index =0, isFirst
>>>> =true, isLast=true  pane=PaneInfo.NO_FIRING
>>>> SparkRunner (cluster):
>>>> [Before Write] Element=data
>>>> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z),
>>>> timestamp=2020-05-09T15:34:59.999Z, timing=ON_TIME, index =0, isFirst
>>>> =true, isLast=true  pane=PaneInfo{isFirst=true, isLast=true,
>>>> timing=ON_TIME, index=0, onTimeIndex=0}
>>>> [After  Write] Element=file
>>>> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z),
>>>> timestamp=2020-05-09T15:34:59.999Z, timing=UNKNOWN, index =0, isFirst
>>>> =true, isLast=true  pane=PaneInfo.NO_FIRING
>>>> El vie., 8 may. 2020 a las 19:01, Reuven Lax (<re...@google.com>)
>>>> escribió:
>>>>> The window information should still be there.  Beam propagates windows
>>>>> through PCollection, and I don't think WriteFiles does anything explicit 
>>>>> to
>>>>> stop that.
>>>>> Can you try this with the direct runner to see what happens there?
>>>>> What is your windowing on this PCollection?
>>>>> Reuven
>>>>> On Fri, May 8, 2020 at 3:49 AM Jose Manuel <kiuby88....@gmail.com>
>>>>> wrote:
>>>>>> I got the same behavior using Spark Runner (with Spark 2.4.3), window
>>>>>> information was missing.
>>>>>> Just to clarify, the combiner after TextIO had different results. In
>>>>>> Flink runner the files names were dropped, and in Spark the combination
>>>>>> process happened twice, duplicating data.  I think it is because 
>>>>>> different
>>>>>> runners manage in a different way the data without the windowing
>>>>>> information.
>>>>>> El vie., 8 may. 2020 a las 0:45, Luke Cwik (<lc...@google.com>)
>>>>>> escribió:
>>>>>>> +dev <d...@beam.apache.org>
>>>>>>> On Mon, May 4, 2020 at 3:56 AM Jose Manuel <kiuby88....@gmail.com>
>>>>>>> wrote:
>>>>>>>> Hi guys,
>>>>>>>> I think I have found something interesting about windowing.
>>>>>>>> I have a pipeline that gets data from Kafka and writes in HDFS by
>>>>>>>> means of TextIO.
>>>>>>>> Once written, generated files are combined to apply some custom
>>>>>>>> operations.
>>>>>>>> However, the combine does not receive data. Following, you can find
>>>>>>>> the
>>>>>>>> highlight of my pipeline.
>>>>>>>> //STREAM + processing time.
>>>>>>>> pipeline.apply(KafkaIO.read())
>>>>>>>>         .apply(...) //mappers, a window and a combine
>>>>>>>>         .apply("WriteFiles",
>>>>>>>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
>>>>>>>>         .getPerDestinationOutputFilenames()
>>>>>>>>         .apply("CombineFileNames", Combine.perKey(...))
>>>>>>>> Running the pipeline with Flink I have found a log trace that says
>>>>>>>> data are
>>>>>>>> discarded as late in the combine CombineFileNames. Then, I have
>>>>>>>> added
>>>>>>>> AllowedLateness to pipeline window strategy, as a workaround.
>>>>>>>> It works by now, but this opens several questions to me
>>>>>>>> I think the problem is getPerDestinationOutputFilenames generates
>>>>>>>> files, but
>>>>>>>> it does not maintain the windowing information. Then,
>>>>>>>> CombineFileNames compares
>>>>>>>> file names with the watermark of the pipeline and discards them as
>>>>>>>> late.
>>>>>>>> Is there any issue with getPerDestinationOutputFilenames? Maybe, I
>>>>>>>> am doing something wrong
>>>>>>>> and using getPerDestinationOutputFilenames + combine does not make
>>>>>>>> sense.
>>>>>>>> What do you think?
>>>>>>>> Please, note I am using Beam 2.17.0 with Flink 1.9.1.
>>>>>>>> Many thanks,
>>>>>>>> Jose

Reply via email to