Hi

I observed that on Direct Runner when watermark is advanced programatically
(in tests) late event is not discarded for session window.

The aggregation:

type User = String
type Action = String

def activitiesInSessionWindow(
      userActions: SCollection[(User, Action)],
      gapDuration: Duration,
      allowedLateness: Duration = Duration.ZERO,
      accumulationMode: AccumulationMode =
AccumulationMode.DISCARDING_FIRED_PANES
): SCollection[(User, Iterable[Action])] = {
    val windowOptions = WindowOptions(
      allowedLateness = allowedLateness,
      accumulationMode = accumulationMode
    )

    userActions
      .withSessionWindows(gapDuration, windowOptions)
      .groupByKey
}

Test scenario (Scio with some additional syntactic sugar):

"Late event" should "not close the gap and two sessions are materialized"
in runWithContext { sc =>
  val userActions = testStreamOf[(User, Action)]

    .addElementsAt("00:00:00", ("jack", "open app"))

    .addElementsAt("00:01:00", ("jack", "search product"))

    .addElementsAt("00:01:30", ("jack", "open product"))

    .addElementsAt("00:03:00", ("jack", "add to cart"))

    .advanceWatermarkTo("00:13:00")

    .addElementsAt("00:09:30", ("jack", "checkout"))

    .addElementsAt("00:13:10", ("jack", "close app"))

    .advanceWatermarkToInfinity()



  val results = activitiesInSessionWindow(sc.testStream(userActions),
DefaultGapDuration)


  results.withTimestamp should inOnTimePane("00:00:00", "00:13:00") {

    containSingleValueAtWindowTime("00:13:00", ("jack", Iterable("open
app", "search product", "open product", "add to cart")))
  }



  // Why session window starts at 00:09:30 if watermark has been advanced
to 00:13:00? I would expect dropped "checkout" due to lateness.
  results.withTimestamp should inOnTimePane("00:09:30", "00:23:10") {

    containSingleValueAtWindowTime("00:23:10", ("jack",
Iterable("checkout", "close app")))

  }

}


Look at "checkout" event with event-time 00:09:30 emitted when watermark
has been already advanced to 00:13:00.
For me that event should be dropped due to lateness (exaclty like it is
done for the fixed window), but for some reason for sesion window the event
is counted for the second session window.

You can find the full source code at:
https://github.com/mkuthan/example-streaming/blob/beamsessions/src/test/scala/org/mkuthan/examples/streaming/usersessions/BeamUserSessionsTest.scala#L98

What do you think, bug or feature?

Thanks in advance,
Marcin

Reply via email to