Anomaly in handling late arriving data

2019-09-25 Thread Indraneel R
Hi Everyone,

I am trying to execute this simple sessionization pipeline, with the
allowed lateness shown below:

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(2)


val source: DataStream[Event] = env.addSource(new SourceFunction[Event]
{
  lazy val input: Seq[Event] = Seq(
Event("u1", "e1", 1L),
Event("u1", "e5", 6L),
Event("u1", "e7", 11L),
Event("u1", "e8", 12L),
Event("u1", "e9", 16L),
Event("u1", "e11", 14L),
*Event("u1", "e12", 8L),*
Event("u1", "e13", 20L),
  )

  override def run(ctx: SourceFunction.SourceContext[Event]): Unit = {
{
  input.foreach(event => {
ctx.collectWithTimestamp(event, event.timestamp)
*ctx.emitWatermark(new Watermark(event.timestamp - 1))*
  })
  ctx.emitWatermark(new Watermark(Long.MaxValue))
}
  }

  override def cancel(): Unit = {}
})

val tag: OutputTag[Event] = OutputTag("late-data")

val sessionizedStream: DataStream[Event] = source
  .keyBy(item => item.userId)
*  .window(EventTimeSessionWindows.withGap(Time.milliseconds(3L)))*
  .sideOutputLateData(tag)
*  .allowedLateness(Time.milliseconds(2L))*
  .process(new ProcessWindowFunction[Event, Event, String, TimeWindow] {

override def process(key: String, context: Context, elements:
Iterable[Event], out: Collector[Event]): Unit = {
  val sessionIdForWindow = key + "-" + context.currentWatermark +
"-" + context.window.getStart

  elements.toSeq
.sortBy(event => event.timestamp)
.foreach(event => {
  out.collect(event.copy(sessionId = sessionIdForWindow, count
= elements.size))
})
}
  })

sessionizedStream.getSideOutput(tag).print()
env.execute()
  }

But heres the problem. I am expecting the event highlighted in red
above(e12) , to be collected in the side output as a late event.

But it isn't. The event is not printed.

Whats interesting is, if I make *any one* of the following changes, the
event e12 is considered late and is printed.
   1) Event("u1", "e12", 8L) *change to *Event("u1", "e12", *7L*)
   2) allowedLateness(Time.milliseconds(2L))   change
to allowedLateness(Time.milliseconds(*1L*))
  3)   Event("u1", "e12", 8L) *change to *Event("u1", "e12",
*7L*) *AND*
allowedLateness(Time.milliseconds(2L))   *change to *
allowedLateness(Time.milliseconds(4*L*))   // or anything less than 7L

Can someone explain whats going on? What am I missing here?


regards
-Indraneel


Property based testing

2019-09-18 Thread Indraneel R
Hi All,

Is there any property based testing framework for flink like
'SparkTestingBase'  for spark?

Would also be useful to know what are some of the standard testing
practices for data testing for flink pipelines.

regards
-Indraneel


Re: Property based testing

2019-09-18 Thread Indraneel R
Oh great! Thanks, Aaron that was quite clear.
I will give it a try!

On Wed, Sep 18, 2019 at 8:29 PM Aaron Levin  wrote:

> Hey,
>
> I've used ScalaCheck to test flink applications. Basic idea is:
>
> * use ScalaCheck to generate some kind of collection
> * use `fromCollection` in `StreamExecutionEnvironment` to create a
> `DataStream`
> * use `DataStreamUtils.collect` as a sink
> * plug my flink logic between the collection source and the collection sink
> * make a ScalaCheck property assertion based on the input collection and
> output collection.
>
> Possible to wrap all that in a single method in Scala. LMK if you have any
> more questions or any of this was not clear!
>
> (note: not sure how to do this in Java).
>
> Best,
>
> Aaron Levin
>
> On Wed, Sep 18, 2019 at 8:36 AM Indraneel R 
> wrote:
>
>> Hi All,
>>
>> Is there any property based testing framework for flink like
>> 'SparkTestingBase'  for spark?
>>
>> Would also be useful to know what are some of the standard testing
>> practices for data testing for flink pipelines.
>>
>> regards
>> -Indraneel
>>
>