Anomaly in handling late arriving data

2019-09-25 Thread Indraneel R
Hi Everyone,

I am trying to execute this simple sessionization pipeline, with the
allowed lateness shown below:

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(2)


val source: DataStream[Event] = env.addSource(new SourceFunction[Event]
{
  lazy val input: Seq[Event] = Seq(
Event("u1", "e1", 1L),
Event("u1", "e5", 6L),
Event("u1", "e7", 11L),
Event("u1", "e8", 12L),
Event("u1", "e9", 16L),
Event("u1", "e11", 14L),
*Event("u1", "e12", 8L),*
Event("u1", "e13", 20L),
  )

  override def run(ctx: SourceFunction.SourceContext[Event]): Unit = {
{
  input.foreach(event => {
ctx.collectWithTimestamp(event, event.timestamp)
*ctx.emitWatermark(new Watermark(event.timestamp - 1))*
  })
  ctx.emitWatermark(new Watermark(Long.MaxValue))
}
  }

  override def cancel(): Unit = {}
})

val tag: OutputTag[Event] = OutputTag("late-data")

val sessionizedStream: DataStream[Event] = source
  .keyBy(item => item.userId)
*  .window(EventTimeSessionWindows.withGap(Time.milliseconds(3L)))*
  .sideOutputLateData(tag)
*  .allowedLateness(Time.milliseconds(2L))*
  .process(new ProcessWindowFunction[Event, Event, String, TimeWindow] {

override def process(key: String, context: Context, elements:
Iterable[Event], out: Collector[Event]): Unit = {
  val sessionIdForWindow = key + "-" + context.currentWatermark +
"-" + context.window.getStart

  elements.toSeq
.sortBy(event => event.timestamp)
.foreach(event => {
  out.collect(event.copy(sessionId = sessionIdForWindow, count
= elements.size))
})
}
  })

sessionizedStream.getSideOutput(tag).print()
env.execute()
  }

But heres the problem. I am expecting the event highlighted in red
above(e12) , to be collected in the side output as a late event.

But it isn't. The event is not printed.

Whats interesting is, if I make *any one* of the following changes, the
event e12 is considered late and is printed.
   1) Event("u1", "e12", 8L) *change to *Event("u1", "e12", *7L*)
   2) allowedLateness(Time.milliseconds(2L))   change
to allowedLateness(Time.milliseconds(*1L*))
  3)   Event("u1", "e12", 8L) *change to *Event("u1", "e12",
*7L*) *AND*
allowedLateness(Time.milliseconds(2L))   *change to *
allowedLateness(Time.milliseconds(4*L*))   // or anything less than 7L

Can someone explain whats going on? What am I missing here?


regards
-Indraneel


Re: Anomaly in handling late arriving data

2019-09-25 Thread Zhu Zhu
Hi Indraneel,

In your case, ("u1", "e12", 8L) is not considered late and will go into the
session window {e7,e8,e9,e11} (range=11~19).
This is because 8+3(session gap) >= 11, the lower bound of the existing
session window

Regarding your 3 questions:
*>> 1) Event("u1", "e12", 8L) change to Event("u1", "e12", 7L)*
7+3 < 11, so e12 will not go into the session window {e7,e8,e9,e11}.
And it will be fired for the lateness.

*>> 2) allowedLateness(Time.milliseconds(2L)) change
to allowedLateness(Time.milliseconds(1L)) *
Reduce the allowedLateness will cause window {e7,e8} to be fired when e9
arrives.
So when e12 arrives, the existing session window is (e9,e11} (range=14~17).
e12 will be considered to be late in this case.

*>> 3)   Event("u1", "e12", 8L) change to Event("u1", "e12", 7L) AND
allowedLateness(Time.milliseconds(2L)) change
to allowedLateness(Time.milliseconds(4L)) *
The same as case 1).

Thanks,
Zhu Zhu

Indraneel R  于2019年9月26日周四 上午2:24写道:

> Hi Everyone,
>
> I am trying to execute this simple sessionization pipeline, with the
> allowed lateness shown below:
>
> def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.setParallelism(2)
>
>
> val source: DataStream[Event] = env.addSource(new
> SourceFunction[Event] {
>   lazy val input: Seq[Event] = Seq(
> Event("u1", "e1", 1L),
> Event("u1", "e5", 6L),
> Event("u1", "e7", 11L),
> Event("u1", "e8", 12L),
> Event("u1", "e9", 16L),
> Event("u1", "e11", 14L),
> *Event("u1", "e12", 8L),*
> Event("u1", "e13", 20L),
>   )
>
>   override def run(ctx: SourceFunction.SourceContext[Event]): Unit = {
> {
>   input.foreach(event => {
> ctx.collectWithTimestamp(event, event.timestamp)
> *ctx.emitWatermark(new Watermark(event.timestamp - 1))*
>   })
>   ctx.emitWatermark(new Watermark(Long.MaxValue))
> }
>   }
>
>   override def cancel(): Unit = {}
> })
>
> val tag: OutputTag[Event] = OutputTag("late-data")
>
> val sessionizedStream: DataStream[Event] = source
>   .keyBy(item => item.userId)
> *  .window(EventTimeSessionWindows.withGap(Time.milliseconds(3L)))*
>   .sideOutputLateData(tag)
> *  .allowedLateness(Time.milliseconds(2L))*
>   .process(new ProcessWindowFunction[Event, Event, String, TimeWindow]
> {
>
> override def process(key: String, context: Context, elements:
> Iterable[Event], out: Collector[Event]): Unit = {
>   val sessionIdForWindow = key + "-" + context.currentWatermark +
> "-" + context.window.getStart
>
>   elements.toSeq
> .sortBy(event => event.timestamp)
> .foreach(event => {
>   out.collect(event.copy(sessionId = sessionIdForWindow, count
> = elements.size))
> })
> }
>   })
>
> sessionizedStream.getSideOutput(tag).print()
> env.execute()
>   }
>
> But heres the problem. I am expecting the event highlighted in red
> above(e12) , to be collected in the side output as a late event.
>
> But it isn't. The event is not printed.
>
> Whats interesting is, if I make *any one* of the following changes, the
> event e12 is considered late and is printed.
>1) Event("u1", "e12", 8L) *change to *Event("u1", "e12", *7L*)
>2) allowedLateness(Time.milliseconds(2L))   change
> to allowedLateness(Time.milliseconds(*1L*))
>   3)   Event("u1", "e12", 8L) *change to *Event("u1", "e12",
> *7L*) *AND*
> allowedLateness(Time.milliseconds(2L))   *change to *
> allowedLateness(Time.milliseconds(4*L*))   // or anything less than 7L
>
> Can someone explain whats going on? What am I missing here?
>
>
> regards
> -Indraneel
>
>