Karthik created SPARK-24382:
-------------------------------

             Summary: Spark Structured Streaming aggregation on old timestamp 
data
                 Key: SPARK-24382
                 URL: https://issues.apache.org/jira/browse/SPARK-24382
             Project: Spark
          Issue Type: Question
          Components: Structured Streaming
    Affects Versions: 2.2.0
            Reporter: Karthik


I am trying to aggregate the count of records every 10 seconds using the 
structured streaming for the following incoming kafka data
{code:java}
{ 
"ts2" : "2018/05/01 00:02:50.041", 
"serviceGroupId" : "123", 
"userId" : "avv-0", 
"stream" : "", 
"lastUserActivity" : "00:02:50", 
"lastUserActivityCount" : "0" 
} 
{ 
"ts2" : "2018/05/01 00:09:02.079", 
"serviceGroupId" : "123", 
"userId" : "avv-0", 
"stream" : "", 
"lastUserActivity" : "00:09:02", 
"lastUserActivityCount" : "0" 
} 
{ 
"ts2" : "2018/05/01 00:09:02.086", 
"serviceGroupId" : "123", 
"userId" : "avv-2", 
"stream" : "", 
"lastUserActivity" : "00:09:02", 
"lastUserActivityCount" : "0" 
}
{code}
With the following logic
{code:java}
val sdvTuneInsAgg1 = df 
 .withWatermark("ts2", "10 seconds") 
 .groupBy(window(col("ts2"),"10 seconds")) 
 .agg(count("*") as "count") 
 .as[CountMetric1]

val query1 = sdvTuneInsAgg1.writeStream
.format("console")
.foreach(writer)
.start()
{code}
and I do not see any records inside the writer. But, the only anomaly is that 
the current date is 2018/05/24 but the record that I am processing (ts2) has 
old dates. Will aggregation / count work in this scenario ?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to