Try to define the watermark on the right column immediately before calling
`groupByKey(...).mapGroupsWithState(...)`. You are applying the watermark
and then doing a bunch of opaque transformation (user-defined flatMap that
the planner has no visibility into). This prevents the planner from
propagating the watermark tag through such operations.

Specifically, you are applying a flatMap that takes a timestmap and
splitting into multiple records with timestamp columns. The SQL
analyzer/planner cannot possibly reason from the opaque user-defined code
that the generated timestamp is same or different as the input timestamp
column, hence it cannot propagate the watermark information down to the
mapGropuswithState.


Hope this helps.

On Fri, Jun 8, 2018 at 7:50 AM, frankdede <frank.huanggua...@gmail.com>
wrote:

> I was trying to find a way to resessionize features in different events
> based
> on the event timestamps using Spark and I found a code example that uses
> mapGroupsWithStateto resessionize events using processing timestamps in
> their repo.
>
> https://github.com/apache/spark/blob/v2.3.0/examples/
> src/main/scala/org/apache/spark/examples/sql/streaming/
> StructuredSessionization.scala
>
> To quickly test if this sessionization thing works with event timestamps, I
> added withWatermark("timestamp", "10 seconds") (treating processing time as
> the event timestamp) and changed ProcessingTimeTimeout to EventTimeTimeout.
>
>   val lines = spark.readStream
>   .format("socket")
>   .option("host", host)
>   .option("port", port)
>   .option("includeTimestamp", value = true)
>   .load()
>
>  // Split the lines into words, treat words as sessionId of events
>  val events = lines
>   .withWatermark("timestamp", "10 seconds") // added
>   .as[(String, Timestamp)]
>   .flatMap { case (line, timestamp) =>
>     line.split(" ").map(word => Event(sessionId = word, timestamp))
>   }
>
>  val sessionUpdates = events
>   .groupByKey(event => event.sessionId)
>   .mapGroupsWithState[SessionInfo,
> SessionUpdate].(GroupStateTimeout.EventTimeTimeout) {
>    ...
>   }
>
>   // Start running the query that prints the session updates to the console
>  val query = sessionUpdates
>   .writeStream
>   .outputMode("update")
>   .format("console")
>   .start()
>
>  query.awaitTermination()
> However,when I ran it, Spark threw org.apache.spark.sql.AnalysisException
> and said that Watermark must be specified in the query using
> '[Dataset/DataFrame].withWatermark()' for using event-time timeout in a
> [map|flatMap]GroupsWithState. Event-time timeout not supported without
> watermark, which is not true and confusing, because that 'timestamp' column
> is clearly in the physical plan following that exception message:
>
>         ...
>         +- EventTimeWatermark timestamp#3: timestamp, interval 10 seconds
>            +- StreamingRelation
> DataSource(org.apache.spark.sql.SparkSession@394a6d2b,socket,List(),...,
> [value#2, timestamp#3]
> Did I miss something or did something wrong?
>
> Thanks in advance!
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

Reply via email to