You are exactly right! A few hours ago, I tried many things and finally got the example working by defining event timestamp column before groupByKey, just like what you suggested, but I wasn't able to figure out the reasoning behind my fix.
val sessionUpdates = events .withWatermark("timestamp", "10 seconds") .groupByKey(event => event.sessionId) .mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout.EventTimeTimeout()) It turns out that it's just impossible for the planner to figure out the source of the watermark column after applied flatMap. Thanks Tathagata! -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org