Hi,

I have added some logs to the pipeline as following (you can find the log
function in the Appendix):

//STREAM + processing time.
pipeline.apply(KafkaIO.read())
       .apply(...) //mappers, a window and a combine
       .apply(logBeforeWrite())

       .apply("WriteFiles",
TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
       .getPerDestinationOutputFilenames()

       .apply(logAfterWrite())
       .apply("CombineFileNames", Combine.perKey(...))

I have run the pipeline using DirectRunner (local), SparkRunner and
FlinkRunner, both of them using a cluster.
Below you can see the timing and pane information before/after (you can see
traces in detail with window and timestamp information in the Appendix).

DirectRunner:
[Before Write] timing=ON_TIME,  pane=PaneInfo{isFirst=true, isLast=true,
timing=ON_TIME, index=0, onTimeIndex=0}
[After    Write] timing=EARLY,       pane=PaneInfo{isFirst=true,
timing=EARLY, index=0}

FlinkRunner:
[Before Write] timing=ON_TIME,    pane=PaneInfo{isFirst=true, isLast=true,
timing=ON_TIME, index=0, onTimeIndex=0}
[After    Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING

SparkRunner:
[Before Write] timing=ON_TIME,    pane=PaneInfo{isFirst=true, isLast=true,
timing=ON_TIME, index=0, onTimeIndex=0}
[After    Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING

It seems DirectRunner propagates the windowing information as expected.
I am not sure if TextIO really propagates or it just emits a window pane,
because the timing before TextIO is ON_TIME and after TextIO is EARLY.
In any case using FlinkRunner and SparkRunner the timing and the pane are
not set.

I thought the problem was in GatherBundlesPerWindowFn, but now, after
seeing that the DirectRunner filled windowing data... I am not sure.
https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406


Appendix
-----------
Here you can see the log function and traces for different runners in
detail.

private SingleOutput<String, String> logBefore() {
    return ParDo.of(new DoFn<String, String>() {
        @ProcessElement
        public void processElement(ProcessContext context, BoundedWindow
boundedWindow) {
            String value = context.element();
            log.info("[Before Write] Element=data window={}, timestamp={},
timing={}, index ={}, isFirst ={}, isLast={}, pane={}",
                    boundedWindow,
                    context.timestamp(),
                    context.pane().getTiming(),
                    context.pane().getIndex(),
                    context.pane().isFirst(),
                    context.pane().isLast(),
                    context.pane()
            );
            context.output(context.element());
        }
    });
}

logAfter function shows the same information.

Traces in details.

DirectRunner (local):
[Before Write] Element=data
window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
timestamp=2020-05-09T13:39:59.999Z, timing=ON_TIME, index =0, isFirst
=true, isLast=true  pane=PaneInfo{isFirst=true, isLast=true,
timing=ON_TIME, index=0, onTimeIndex=0}
[After  Write] Element=file
window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
timestamp=2020-05-09T13:39:59.999Z, timing=EARLY,   index =0, isFirst
=true, isLast=false pane=PaneInfo{isFirst=true, timing=EARLY, index=0}


FlinkRunner (cluster):
[Before Write] Element=data
window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z),
timestamp=2020-05-09T15:13:59.999Z, timing=ON_TIME, index =0, isFirst
=true, isLast=true  pane=PaneInfo{isFirst=true, isLast=true,
timing=ON_TIME, index=0, onTimeIndex=0}
[After  Write] Element=file
window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z),
timestamp=2020-05-09T15:13:59.999Z, timing=UNKNOWN, index =0, isFirst
=true, isLast=true  pane=PaneInfo.NO_FIRING

SparkRunner (cluster):
[Before Write] Element=data
window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z),
timestamp=2020-05-09T15:34:59.999Z, timing=ON_TIME, index =0, isFirst
=true, isLast=true  pane=PaneInfo{isFirst=true, isLast=true,
timing=ON_TIME, index=0, onTimeIndex=0}
[After  Write] Element=file
window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z),
timestamp=2020-05-09T15:34:59.999Z, timing=UNKNOWN, index =0, isFirst
=true, isLast=true  pane=PaneInfo.NO_FIRING


El vie., 8 may. 2020 a las 19:01, Reuven Lax (<re...@google.com>) escribió:

> The window information should still be there.  Beam propagates windows
> through PCollection, and I don't think WriteFiles does anything explicit to
> stop that.
>
> Can you try this with the direct runner to see what happens there?
> What is your windowing on this PCollection?
>
> Reuven
>
> On Fri, May 8, 2020 at 3:49 AM Jose Manuel <kiuby88....@gmail.com> wrote:
>
>> I got the same behavior using Spark Runner (with Spark 2.4.3), window
>> information was missing.
>>
>> Just to clarify, the combiner after TextIO had different results. In
>> Flink runner the files names were dropped, and in Spark the combination
>> process happened twice, duplicating data.  I think it is because different
>> runners manage in a different way the data without the windowing
>> information.
>>
>>
>>
>> El vie., 8 may. 2020 a las 0:45, Luke Cwik (<lc...@google.com>) escribió:
>>
>>> +dev <dev@beam.apache.org>
>>>
>>> On Mon, May 4, 2020 at 3:56 AM Jose Manuel <kiuby88....@gmail.com>
>>> wrote:
>>>
>>>> Hi guys,
>>>>
>>>> I think I have found something interesting about windowing.
>>>>
>>>> I have a pipeline that gets data from Kafka and writes in HDFS by means
>>>> of TextIO.
>>>> Once written, generated files are combined to apply some custom
>>>> operations.
>>>> However, the combine does not receive data. Following, you can find the
>>>> highlight of my pipeline.
>>>>
>>>> //STREAM + processing time.
>>>> pipeline.apply(KafkaIO.read())
>>>>         .apply(...) //mappers, a window and a combine
>>>>
>>>>         .apply("WriteFiles",
>>>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
>>>>         .getPerDestinationOutputFilenames()
>>>>         .apply("CombineFileNames", Combine.perKey(...))
>>>>
>>>> Running the pipeline with Flink I have found a log trace that says data
>>>> are
>>>> discarded as late in the combine CombineFileNames. Then, I have added
>>>> AllowedLateness to pipeline window strategy, as a workaround.
>>>> It works by now, but this opens several questions to me
>>>>
>>>> I think the problem is getPerDestinationOutputFilenames generates
>>>> files, but
>>>> it does not maintain the windowing information. Then, CombineFileNames
>>>> compares
>>>> file names with the watermark of the pipeline and discards them as late.
>>>>
>>>> Is there any issue with getPerDestinationOutputFilenames? Maybe, I am
>>>> doing something wrong
>>>> and using getPerDestinationOutputFilenames + combine does not make
>>>> sense.
>>>> What do you think?
>>>>
>>>> Please, note I am using Beam 2.17.0 with Flink 1.9.1.
>>>>
>>>> Many thanks,
>>>> Jose
>>>>
>>>>

Reply via email to