Hello, I wish to calculate the most recent event time from a Stream.
Something like this: val timestamped = records.withColumn("ts_long", unix_timestamp($"eventTime")) val lastReport = timestamped .withWatermark("eventTime", "4 hours") .groupBy(col("eventTime"), window(col("eventTime"), "10 minutes", "5 minutes")) .max("ts_long") .writeStream .foreach(new LastReportUpdater(stationId)) .start() During normal execution, I expect to receive a few events per minute, at most. So now for the problem: During system initiation, I batch load a longer history of data (stretching back months). Because the volume is higher during initiation, records arrive with lots of time skew. I'm saving the result off to a database and want to update it in realtime during streaming operation. Do I write to flavors of the query - one as a static Dataset for initiation and another for realtime? Is my logic incorrect? Thanks, Jason -- Thanks, Jason