Great to hear that this worked out for you :)

Progression of watermarks on an empty stream is a known issue, that we are 
working on to resolve in the future. Usually recommended workarounds are to 
send a custom blank event (which should be ignored) once a while.

I have expanded the documentation:
https://github.com/apache/flink/pull/6076 
<https://github.com/apache/flink/pull/6076>
Please check it and If you have any further suggestions you are welcome to make 
a comments in the PR. I hope it clarifies the behaviour.

Piotrek

> On 25 May 2018, at 00:03, Elias Levy <fearsome.lucid...@gmail.com> wrote:
> 
> On Thu, May 24, 2018 at 9:20 AM, Elias Levy <fearsome.lucid...@gmail.com 
> <mailto:fearsome.lucid...@gmail.com>> wrote:
> On Thu, May 24, 2018 at 7:26 AM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> From top of my head I can imagine two solutions:
> 
> 1. Override the default behaviour of the operator via for example 
> org.apache.flink.streaming.api.datastream.ConnectedStreams#transform
> 
> That seems the safer, but more complicated path.
> 
> As we had already implemented the business logic in a RichCoFlatMapFunction, 
> I ended up extending CoStreamFlatMap:
> 
> class SingleWatermarkCoFlatMap[IN1,IN2,OUT](flatMapper: 
> CoFlatMapFunction[IN1,IN2,OUT]) extends CoStreamFlatMap(flatMapper)  {
> 
>   // Pass through the watermarks from the first stream
>   override def processWatermark1(mark: Watermark): Unit = 
> processWatermark(mark)
> 
>   // Ignore watermarks from the second stream
>   override def processWatermark2(mark: Watermark): Unit = {}
> }
> 
> 
> Then it was easy to replace:
> 
> stream1
>       .connect(stream2)
>       .flatMap( new BusinessCoFlatMapFunction(params) )
>         .name("Operator")
>         .uid("op")
> 
> with:
> 
> stream1
>       .connect(stream2)
>       .transform("Operator", new SingleWatermarkCoFlatMap[X,Y,Z](new 
> BusinessCoFlatMapFunction(params)))
>       .uid("op")
> 
> 

Reply via email to