I was trying to find a way to resessionize features in different events based on the event timestamps using Spark and I found a code example that uses mapGroupsWithStateto resessionize events using processing timestamps in their repo.
https://github.com/apache/spark/blob/v2.3.0/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala To quickly test if this sessionization thing works with event timestamps, I added withWatermark("timestamp", "10 seconds") (treating processing time as the event timestamp) and changed ProcessingTimeTimeout to EventTimeTimeout. val lines = spark.readStream .format("socket") .option("host", host) .option("port", port) .option("includeTimestamp", value = true) .load() // Split the lines into words, treat words as sessionId of events val events = lines .withWatermark("timestamp", "10 seconds") // added .as[(String, Timestamp)] .flatMap { case (line, timestamp) => line.split(" ").map(word => Event(sessionId = word, timestamp)) } val sessionUpdates = events .groupByKey(event => event.sessionId) .mapGroupsWithState[SessionInfo, SessionUpdate].(GroupStateTimeout.EventTimeTimeout) { ... } // Start running the query that prints the session updates to the console val query = sessionUpdates .writeStream .outputMode("update") .format("console") .start() query.awaitTermination() However,when I ran it, Spark threw org.apache.spark.sql.AnalysisException and said that Watermark must be specified in the query using '[Dataset/DataFrame].withWatermark()' for using event-time timeout in a [map|flatMap]GroupsWithState. Event-time timeout not supported without watermark, which is not true and confusing, because that 'timestamp' column is clearly in the physical plan following that exception message: ... +- EventTimeWatermark timestamp#3: timestamp, interval 10 seconds +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@394a6d2b,socket,List(),..., [value#2, timestamp#3] Did I miss something or did something wrong? Thanks in advance! -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org