I was trying to find a way to resessionize features in different events based
on the event timestamps using Spark and I found a code example that uses
mapGroupsWithStateto resessionize events using processing timestamps in
their repo.

https://github.com/apache/spark/blob/v2.3.0/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala

To quickly test if this sessionization thing works with event timestamps, I
added withWatermark("timestamp", "10 seconds") (treating processing time as
the event timestamp) and changed ProcessingTimeTimeout to EventTimeTimeout.

  val lines = spark.readStream
  .format("socket")
  .option("host", host)
  .option("port", port)
  .option("includeTimestamp", value = true)
  .load()

 // Split the lines into words, treat words as sessionId of events
 val events = lines
  .withWatermark("timestamp", "10 seconds") // added
  .as[(String, Timestamp)]
  .flatMap { case (line, timestamp) =>
    line.split(" ").map(word => Event(sessionId = word, timestamp))
  }

 val sessionUpdates = events
  .groupByKey(event => event.sessionId)
  .mapGroupsWithState[SessionInfo,
SessionUpdate].(GroupStateTimeout.EventTimeTimeout) {
   ...
  }

  // Start running the query that prints the session updates to the console
 val query = sessionUpdates
  .writeStream
  .outputMode("update")
  .format("console")
  .start()

 query.awaitTermination()
However,when I ran it, Spark threw org.apache.spark.sql.AnalysisException
and said that Watermark must be specified in the query using
'[Dataset/DataFrame].withWatermark()' for using event-time timeout in a
[map|flatMap]GroupsWithState. Event-time timeout not supported without
watermark, which is not true and confusing, because that 'timestamp' column
is clearly in the physical plan following that exception message:

        ...
        +- EventTimeWatermark timestamp#3: timestamp, interval 10 seconds
           +- StreamingRelation
DataSource(org.apache.spark.sql.SparkSession@394a6d2b,socket,List(),...,
[value#2, timestamp#3]
Did I miss something or did something wrong?

Thanks in advance!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to