There are lots of examples on 'Stateful Structured Streaming' in 'The Definitive Guide' book BUT all of them read JSON from a 'path'. That's working for me.
Now I need to read from Kafka. I Googled but I couldn't find any example. I am struggling to Map the 'Value' of the Kafka message to my JSON. Any help would be appreciated. Here's what I am trying: val query = withEventTime .as[R00tJsonObject] .withWatermark("event_time", "5 minutes") .groupByKey(row => (row.report.id, row.report.time.toString, row.report.cId)) .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents) .writeStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("topic", *"myTopic"*) .option("checkpointLocation", "/Users/username/checkpointLocation") .outputMode("update") .start().awaitTermination cannot resolve 'arrivalTime' given input columns: [value, event_time];