There are lots of examples on 'Stateful Structured Streaming' in 'The
Definitive Guide' book BUT all of them read JSON from a 'path'. That's
working for me.

Now I need to read from Kafka.

I Googled but I couldn't find any example. I am struggling to Map the
'Value' of the Kafka message to my JSON. Any help would be appreciated.
Here's what I am trying:

val query = withEventTime
      .as[R00tJsonObject]
      .withWatermark("event_time", "5 minutes")
      .groupByKey(row => (row.report.id, row.report.time.toString,
row.report.cId))
      
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents)
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", *"myTopic"*)
      .option("checkpointLocation", "/Users/username/checkpointLocation")
      .outputMode("update")
      .start().awaitTermination


cannot resolve 'arrivalTime' given input columns: [value, event_time];

Reply via email to