Hi Eugen,

It is true that for ITcases this can be difficult and this should be improved 
in Flink’s testing infrastructure,
but for this specific PR, what you need to check is if the allowedLateness 
parameter is propagated correctly
throughout the translation process. The window operator with allowed lateness 
(which is applied next)
is covered by other tests.

In this case I would recommend to do the Joining of the stream “manually”, i.e.:

input1.coGroup(input2)
      .where(keySelector1)
      .equalTo(keySelector2)
      .window(windowAssigner)

and then from the resulting WithWindow, just try to get the allowed lateness 
and verify that this is the 
value that you provided.

This will cover the propagation and make sure that nobody breaks it in the 
future.

Cheers,
Kostas


> On Sep 18, 2018, at 11:40 AM, Евгений Юшин <evgenij.us...@gmail.com> wrote:
> 
> Hi devs
> 
> During the work on https://issues.apache.org/jira/browse/FLINK-10050 I've
> found unstable behaviour of unit tests for unioned streams (which are used
> in CoGroupedStream/JoinedStream under the hood).
> Let's assume we have late elements in one of the stream. The thing is we
> have no guarantees which source will be read first, and in which order
> watermark alignment will be applied. So, the following example produce
> different results for different invocation:
> 
>     val s1 = env.addSource(new SourceFunction[(String, String)] {
>        override def run(ctx: SourceFunction.SourceContext[(String,
> String)]): Unit = {
>          ctx.collectWithTimestamp(("a", "a1"), 1)
>          //wmAllignmentLock.synchronized {
>              //wmAllignmentLock.wait()
>          //}
>          ctx.emitWatermark(new Watermark(4))
>          ctx.collectWithTimestamp(("a", "a2"), 2)
>        }
> 
>        override def cancel(): Unit = {}
>      })
> 
>      val s2 = env.addSource(new SourceFunction[(String, String)] {
>        override def run(ctx: SourceFunction.SourceContext[(String,
> String)]): Unit = {
>          ctx.collectWithTimestamp(("a", "b1"), 1)
>          ctx.emitWatermark(new Watermark(4))
>          //wmAllignmentLock.synchronized {
>              //wmAllignmentLock.notifyAll()
>          //}
>        }
> 
>        override def cancel(): Unit = {}
>      })
> 
>      val joined = s1.join(s2).where(_._1).equalTo(_._1)
>        .window(TumblingEventTimeWindows.of(Time.milliseconds(3)))
>        .apply((x, y) => s"$x:$y")
> For some invocations (when Flink decide to process 2nd source before
> 1st), ("a", "a2") is considered to be late and dropped; and vice
> versa.Here is the rate for 1000 invocations:
> Run JOIN periodic
> iteration [50] contains late total = 22, this iter = 22
> iteration [100] contains late total = 51, this iter = 29
> iteration [150] contains late total = 78, this iter = 27
> iteration [200] contains late total = 101, this iter = 23
> iteration [250] contains late total = 124, this iter = 23
> iteration [300] contains late total = 155, this iter = 31
> iteration [350] contains late total = 184, this iter = 29
> iteration [400] contains late total = 210, this iter = 26
> iteration [450] contains late total = 233, this iter = 23
> iteration [500] contains late total = 256, this iter = 23
> iteration [550] contains late total = 274, this iter = 18
> iteration [600] contains late total = 303, this iter = 29
> iteration [650] contains late total = 338, this iter = 35
> iteration [700] contains late total = 367, this iter = 29
> iteration [750] contains late total = 393, this iter = 26
> iteration [800] contains late total = 415, this iter = 22
> iteration [850] contains late total = 439, this iter = 24
> iteration [900] contains late total = 459, this iter = 20
> iteration [950] contains late total = 484, this iter = 25
> iteration [1000] contains late total = 502, this iter = 18
> contains late = 502
> 
> 
> It doesn't matter Periodic or Punctuated watermark assigner is used.
> As well as syncronization mechanism (commented in the code snippet
> above) doesn't help to align records in particular order.
> 
> While this behaviour is totally fine for Production case, I just
> wonder how to write stable unit test scenario to cover late elements
> processing.
> I didn't find any suitable test harness from utils.
> 
> Any feedback is appreciated!
> 
> Regards,
> Eugen

Reply via email to