You are exactly right! A few hours ago, I tried many things and finally got
the example working by defining event timestamp column before groupByKey,
just like what you suggested, but I wasn't able to figure out the reasoning
behind my fix.

    val sessionUpdates = events
      .withWatermark("timestamp", "10 seconds")
      .groupByKey(event => event.sessionId)
      .mapGroupsWithState[SessionInfo,
SessionUpdate](GroupStateTimeout.EventTimeTimeout())

It turns out that it's just impossible for the planner to figure out the
source of the watermark column after applied flatMap.

Thanks Tathagata!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to