Karthik created SPARK-24382: ------------------------------- Summary: Spark Structured Streaming aggregation on old timestamp data Key: SPARK-24382 URL: https://issues.apache.org/jira/browse/SPARK-24382 Project: Spark Issue Type: Question Components: Structured Streaming Affects Versions: 2.2.0 Reporter: Karthik
I am trying to aggregate the count of records every 10 seconds using the structured streaming for the following incoming kafka data {code:java} { "ts2" : "2018/05/01 00:02:50.041", "serviceGroupId" : "123", "userId" : "avv-0", "stream" : "", "lastUserActivity" : "00:02:50", "lastUserActivityCount" : "0" } { "ts2" : "2018/05/01 00:09:02.079", "serviceGroupId" : "123", "userId" : "avv-0", "stream" : "", "lastUserActivity" : "00:09:02", "lastUserActivityCount" : "0" } { "ts2" : "2018/05/01 00:09:02.086", "serviceGroupId" : "123", "userId" : "avv-2", "stream" : "", "lastUserActivity" : "00:09:02", "lastUserActivityCount" : "0" } {code} With the following logic {code:java} val sdvTuneInsAgg1 = df .withWatermark("ts2", "10 seconds") .groupBy(window(col("ts2"),"10 seconds")) .agg(count("*") as "count") .as[CountMetric1] val query1 = sdvTuneInsAgg1.writeStream .format("console") .foreach(writer) .start() {code} and I do not see any records inside the writer. But, the only anomaly is that the current date is 2018/05/24 but the record that I am processing (ts2) has old dates. Will aggregation / count work in this scenario ? -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org