Hi Everyone,
I am trying to execute this simple sessionization pipeline, with the
allowed lateness shown below:
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(2)
val source: DataStream[Event] = env.addSource(new SourceFunction[Event]
{
lazy val input: Seq[Event] = Seq(
Event("u1", "e1", 1L),
Event("u1", "e5", 6L),
Event("u1", "e7", 11L),
Event("u1", "e8", 12L),
Event("u1", "e9", 16L),
Event("u1", "e11", 14L),
*Event("u1", "e12", 8L),*
Event("u1", "e13", 20L),
)
override def run(ctx: SourceFunction.SourceContext[Event]): Unit = {
{
input.foreach(event => {
ctx.collectWithTimestamp(event, event.timestamp)
*ctx.emitWatermark(new Watermark(event.timestamp - 1))*
})
ctx.emitWatermark(new Watermark(Long.MaxValue))
}
}
override def cancel(): Unit = {}
})
val tag: OutputTag[Event] = OutputTag("late-data")
val sessionizedStream: DataStream[Event] = source
.keyBy(item => item.userId)
* .window(EventTimeSessionWindows.withGap(Time.milliseconds(3L)))*
.sideOutputLateData(tag)
* .allowedLateness(Time.milliseconds(2L))*
.process(new ProcessWindowFunction[Event, Event, String, TimeWindow] {
override def process(key: String, context: Context, elements:
Iterable[Event], out: Collector[Event]): Unit = {
val sessionIdForWindow = key + "-" + context.currentWatermark +
"-" + context.window.getStart
elements.toSeq
.sortBy(event => event.timestamp)
.foreach(event => {
out.collect(event.copy(sessionId = sessionIdForWindow, count
= elements.size))
})
}
})
sessionizedStream.getSideOutput(tag).print()
env.execute()
}
But heres the problem. I am expecting the event highlighted in red
above(e12) , to be collected in the side output as a late event.
But it isn't. The event is not printed.
Whats interesting is, if I make *any one* of the following changes, the
event e12 is considered late and is printed.
1) Event("u1", "e12", 8L) *change to *Event("u1", "e12", *7L*)
2) allowedLateness(Time.milliseconds(2L)) change
to allowedLateness(Time.milliseconds(*1L*))
3) Event("u1", "e12", 8L) *change to *Event("u1", "e12",
*7L*) *AND*
allowedLateness(Time.milliseconds(2L)) *change to *
allowedLateness(Time.milliseconds(4*L*)) // or anything less than 7L
Can someone explain whats going on? What am I missing here?
regards
-Indraneel