[ 
https://issues.apache.org/jira/browse/SPARK-29426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16948379#comment-16948379
 ] 

jingshanglu edited comment on SPARK-29426 at 10/10/19 11:46 AM:
----------------------------------------------------------------

my kafka mes like this:
{code:java}
// code placeholder

[kafka@HC-25-28-36 ~]$ kafka-console-producer.sh --broker-list 
172.25.28.38:9092,172.25.28.37:9092,172.25.28.36:9092 --topic test0
{"sql":"select * from user","timestamp":"2019-03-05 
12:22:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 
12:32:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 
12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}

{code}
output like this:
{code:java}
// code placeholder
Batch: 0
-------------------------------------------
+------+---+------+---+-----+
|window|sql|client| ip|count|
+------+---+------+---+-----+
+------+---+------+---+-----+-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+------------------+------------+---------+-----+
|              window|               sql|      client|       ip|count|
+--------------------+------------------+------------+---------+-----+
|[2019-03-05 12:20...|select * from user|192.168.54.6|172.0.0.1|    1|
|[2019-03-05 12:15...|select * from user|192.168.54.6|172.0.0.1|    1|
+--------------------+------------------+------------+---------+-----+-------------------------------------------
Batch: 2
-------------------------------------------
+------+---+------+---+-----+
|window|sql|client| ip|count|
+------+---+------+---+-----+
+------+---+------+---+-----+-------------------------------------------
Batch: 3
-------------------------------------------
+--------------------+------------------+------------+---------+-----+
|              window|               sql|      client|       ip|count|
+--------------------+------------------+------------+---------+-----+
|[2019-03-05 12:25...|select * from user|192.168.54.6|172.0.0.1|    1|
|[2019-03-05 12:30...|select * from user|192.168.54.6|172.0.0.1|    1|
+--------------------+------------------+------------+---------+-----+-------------------------------------------
Batch: 4
-------------------------------------------
+------+---+------+---+-----+
|window|sql|client| ip|count|
+------+---+------+---+-----+
+------+---+------+---+-----+-------------------------------------------
Batch: 5
-------------------------------------------
+--------------------+------------------+------------+---------+-----+
|              window|               sql|      client|       ip|count|
+--------------------+------------------+------------+---------+-----+
|[2019-03-05 12:20...|select * from user|192.168.54.6|172.0.0.1|    2|
|[2019-03-05 12:15...|select * from user|192.168.54.6|172.0.0.1|    2|
+--------------------+------------------+------------+---------+-----+
{code}
the watermark behind the event time(2019-03-04 12:23:22), but this event

{"sql":"select * from user","timestamp":"2019-03-05 
12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}

still be Aggregated


was (Author: jingshang):
my kafka mes like this:
{code:java}
// code placeholder

[kafka@HC-25-28-36 ~]$ kafka-console-producer.sh --broker-list 
172.25.28.38:9092,172.25.28.37:9092,172.25.28.36:9092 --topic test0
{"sql":"select * from user","timestamp":"2019-03-05 
12:22:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 
12:32:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 
12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 
12:22:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 
12:32:22","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 
12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-05 
12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{"sql":"select * from user","timestamp":"2019-03-04 
12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}
{code}
output like this:
{code:java}
// code placeholder
Batch: 0
-------------------------------------------
+------+---+------+---+-----+
|window|sql|client| ip|count|
+------+---+------+---+-----+
+------+---+------+---+-----+-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+------------------+------------+---------+-----+
|              window|               sql|      client|       ip|count|
+--------------------+------------------+------------+---------+-----+
|[2019-03-05 12:20...|select * from user|192.168.54.6|172.0.0.1|    1|
|[2019-03-05 12:15...|select * from user|192.168.54.6|172.0.0.1|    1|
+--------------------+------------------+------------+---------+-----+-------------------------------------------
Batch: 2
-------------------------------------------
+------+---+------+---+-----+
|window|sql|client| ip|count|
+------+---+------+---+-----+
+------+---+------+---+-----+-------------------------------------------
Batch: 3
-------------------------------------------
+--------------------+------------------+------------+---------+-----+
|              window|               sql|      client|       ip|count|
+--------------------+------------------+------------+---------+-----+
|[2019-03-05 12:25...|select * from user|192.168.54.6|172.0.0.1|    1|
|[2019-03-05 12:30...|select * from user|192.168.54.6|172.0.0.1|    1|
+--------------------+------------------+------------+---------+-----+-------------------------------------------
Batch: 4
-------------------------------------------
+------+---+------+---+-----+
|window|sql|client| ip|count|
+------+---+------+---+-----+
+------+---+------+---+-----+-------------------------------------------
Batch: 5
-------------------------------------------
+--------------------+------------------+------------+---------+-----+
|              window|               sql|      client|       ip|count|
+--------------------+------------------+------------+---------+-----+
|[2019-03-05 12:20...|select * from user|192.168.54.6|172.0.0.1|    2|
|[2019-03-05 12:15...|select * from user|192.168.54.6|172.0.0.1|    2|
+--------------------+------------------+------------+---------+-----+
{code}
the watermark behind the event time(2019-03-04 12:23:22), but this event

{"sql":"select * from user","timestamp":"2019-03-05 
12:22:29","ip":"172.0.0.1","client":"192.168.54.6","port":3306}

still be Aggregated

> Watermark does not take effect
> ------------------------------
>
>                 Key: SPARK-29426
>                 URL: https://issues.apache.org/jira/browse/SPARK-29426
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.3
>            Reporter: jingshanglu
>            Priority: Major
>
> I use withWatermark and window to express windowed aggregations, but the 
> Watermark does not take effect.
> my code:
> {code:java}
> // code placeholder
> Dataset<Row> clientSqlIpCount = mes.withWatermark("timestamp","1 minute")
>         .groupBy(
>                 functions.window(mes.col("timestamp"),"10 minutes","5 
> minutes"),
>                 mes.col("sql"),mes.col("client"),mes.col("ip"))
>         .count();
> StreamingQuery query = clientSqlIpCount
>                 .writeStream()
>                 .outputMode("Update")
>                 .format("console")
>                 .start();
> spark.streams().awaitAnyTermination();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to