Great to hear that this worked out for you :)

Progression of watermarks on an empty stream is a known issue, that we are 
working on to resolve in the future. Usually recommended workarounds are to 
send a custom blank event (which should be ignored) once a while.

I have expanded the documentation: 
Please check it and If you have any further suggestions you are welcome to make 
a comments in the PR. I hope it clarifies the behaviour.


> On 25 May 2018, at 00:03, Elias Levy <> wrote:
> On Thu, May 24, 2018 at 9:20 AM, Elias Levy < 
> <>> wrote:
> On Thu, May 24, 2018 at 7:26 AM, Piotr Nowojski < 
> <>> wrote:
> From top of my head I can imagine two solutions:
> 1. Override the default behaviour of the operator via for example 
> org.apache.flink.streaming.api.datastream.ConnectedStreams#transform
> That seems the safer, but more complicated path.
> As we had already implemented the business logic in a RichCoFlatMapFunction, 
> I ended up extending CoStreamFlatMap:
> class SingleWatermarkCoFlatMap[IN1,IN2,OUT](flatMapper: 
> CoFlatMapFunction[IN1,IN2,OUT]) extends CoStreamFlatMap(flatMapper)  {
>   // Pass through the watermarks from the first stream
>   override def processWatermark1(mark: Watermark): Unit = 
> processWatermark(mark)
>   // Ignore watermarks from the second stream
>   override def processWatermark2(mark: Watermark): Unit = {}
> }
> Then it was easy to replace:
> stream1
>       .connect(stream2)
>       .flatMap( new BusinessCoFlatMapFunction(params) )
>         .name("Operator")
>         .uid("op")
> with:
> stream1
>       .connect(stream2)
>       .transform("Operator", new SingleWatermarkCoFlatMap[X,Y,Z](new 
> BusinessCoFlatMapFunction(params)))
>       .uid("op")

Reply via email to