Hi Niels,
you can log the watermarks by implementing a custom operator. (Operators
have access to the watermarks.) The map operator is a good example for this:
@Internal
public class StreamMap<IN, OUT>
extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> {
private static final long serialVersionUID = 1L;
public StreamMap(MapFunction<IN, OUT> mapper) {
super(mapper);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
output.collect(element.replace(userFunction.map(element.getValue())));
}
@Override
public void processWatermark(Watermark mark) throws Exception {
output.emitWatermark(mark);
}
}
In processWatermark() you would print/log the watermark. You can have a
simple identity operator that just forwards and prints and insert it
anywhere in the pipeline.
For your second question this section in the doc might be interesting:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_time.html#watermarks-in-parallel-streams
-Aljoscha
On Sat, 21 May 2016 at 16:05 Niels Basjes <[email protected]> wrote:
> Hi,
>
> I was working on a streaming application last week and I got stuck in a
> situation where I got the same time based window many times.
> I expect that I made a mistake in creating the watermarks in relation to
> the data I have and the watermark generating code.
>
> Writing the events to the console (for debugging) is easy, yet I have not
> been able to write the watermarks to my console.
>
> My question is very simple: How do I log the watermarks in the console so
> I can see the data and understand my mistake.
>
> I would also like to know "where do the watermarks live" in relation to
> the actual data.
>
> Thanks.
>
> Niels Basjes
>