[ https://issues.apache.org/jira/browse/SPARK-29426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16948379#comment-16948379 ]
jingshanglu edited comment on SPARK-29426 at 10/10/19 11:45 AM: ---------------------------------------------------------------- my kafka mes like this: {code:java} // code placeholder [kafka@HC-25-28-36 ~]$ kafka-console-producer.sh --broker-list 172.25.28.38:9092,172.25.28.37:9092,172.25.28.36:9092 --topic test0 {"sql":"select * from user","timestamp":"2019-03-05 12:22:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306} {"sql":"select * from user","timestamp":"2019-03-05 12:32:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306} {"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306} {"sql":"select * from user","timestamp":"2019-03-05 12:22:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306} {"sql":"select * from user","timestamp":"2019-03-05 12:32:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306} {"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306} {"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306} {"sql":"select * from user","timestamp":"2019-03-04 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306} {code} output like this: {code:java} // code placeholder Batch: 0 ------------------------------------------- +------+---+------+---+-----+ |window|sql|client| ip|count| +------+---+------+---+-----+ +------+---+------+---+-----+------------------------------------------- Batch: 1 ------------------------------------------- +--------------------+------------------+------------+---------+-----+ | window| sql| client| ip|count| +--------------------+------------------+------------+---------+-----+ |[2019-03-05 12:20...|select * from user|192.168.54.6|172.0.0.1| 1| |[2019-03-05 12:15...|select * from user|192.168.54.6|172.0.0.1| 1| +--------------------+------------------+------------+---------+-----+------------------------------------------- Batch: 2 ------------------------------------------- +------+---+------+---+-----+ |window|sql|client| ip|count| +------+---+------+---+-----+ +------+---+------+---+-----+------------------------------------------- Batch: 3 ------------------------------------------- +--------------------+------------------+------------+---------+-----+ | window| sql| client| ip|count| +--------------------+------------------+------------+---------+-----+ |[2019-03-05 12:25...|select * from user|192.168.54.6|172.0.0.1| 1| |[2019-03-05 12:30...|select * from user|192.168.54.6|172.0.0.1| 1| +--------------------+------------------+------------+---------+-----+------------------------------------------- Batch: 4 ------------------------------------------- +------+---+------+---+-----+ |window|sql|client| ip|count| +------+---+------+---+-----+ +------+---+------+---+-----+------------------------------------------- Batch: 5 ------------------------------------------- +--------------------+------------------+------------+---------+-----+ | window| sql| client| ip|count| +--------------------+------------------+------------+---------+-----+ |[2019-03-05 12:20...|select * from user|192.168.54.6|172.0.0.1| 2| |[2019-03-05 12:15...|select * from user|192.168.54.6|172.0.0.1| 2| +--------------------+------------------+------------+---------+-----+ {code} the watermark behind the event time(2019-03-04 12:23:22), but this event {"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306} still be Aggregated was (Author: jingshang): my kafka mes like this: {code:java} // code placeholder [kafka@HC-25-28-36 ~]$ kafka-console-producer.sh --broker-list 172.25.28.38:9092,172.25.28.37:9092,172.25.28.36:9092 --topic test0 {"sql":"select * from user","timestamp":"2019-03-05 12:22:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306} {"sql":"select * from user","timestamp":"2019-03-05 12:32:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306} {"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306} {"sql":"select * from user","timestamp":"2019-03-05 12:22:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306} {"sql":"select * from user","timestamp":"2019-03-05 12:32:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306} {"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306} {"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306} {"sql":"select * from user","timestamp":"2019-03-04 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306} {code} output like this: {code:java} // code placeholder Batch: 5 ------------------------------------------- +--------------------+------------------+------------+---------+-----+ | window| sql| client| ip|count| +--------------------+------------------+------------+---------+-----+ |[2019-03-05 12:20...|select * from user|192.168.54.6|172.0.0.1| 2| |[2019-03-05 12:15...|select * from user|192.168.54.6|172.0.0.1| 2| +--------------------+------------------+------------+---------+-----+------------------------------------------- Batch: 6 ------------------------------------------- +--------------------+------------------+------------+---------+-----+ | window| sql| client| ip|count| +--------------------+------------------+------------+---------+-----+ |[2019-03-05 12:20...|select * from user|192.168.54.6|172.0.0.1| 3| |[2019-03-05 12:15...|select * from user|192.168.54.6|172.0.0.1| 3| +--------------------+------------------+------------+---------+-----+------------------------------------------- Batch: 7 ------------------------------------------- +--------------------+------------------+------------+---------+-----+ | window| sql| client| ip|count| +--------------------+------------------+------------+---------+-----+ |[2019-03-04 12:20...|select * from user|192.168.54.6|172.0.0.1| 1| |[2019-03-04 12:15...|select * from user|192.168.54.6|172.0.0.1| 1| +--------------------+------------------+------------+---------+-----+ {code} the watermark behind the event time(2019-03-04 12:23:22), but this event {"sql":"select * from user","timestamp":"2019-03-05 12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306} still be Aggregated > Watermark does not take effect > ------------------------------ > > Key: SPARK-29426 > URL: https://issues.apache.org/jira/browse/SPARK-29426 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.4.3 > Reporter: jingshanglu > Priority: Major > > I use withWatermark and window to express windowed aggregations, but the > Watermark does not take effect. > my code: > {code:java} > // code placeholder > Dataset<Row> clientSqlIpCount = mes.withWatermark("timestamp","1 minute") > .groupBy( > functions.window(mes.col("timestamp"),"10 minutes","5 > minutes"), > mes.col("sql"),mes.col("client"),mes.col("ip")) > .count(); > StreamingQuery query = clientSqlIpCount > .writeStream() > .outputMode("Update") > .format("console") > .start(); > spark.streams().awaitAnyTermination(); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org