[ 
https://issues.apache.org/jira/browse/SPARK-43001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

padavan updated SPARK-43001:
----------------------------
    Description: 
The problem is very simple, when you use *TUMBLING* *window* with {*}append 
mode{*}, then the window is closed +only when the next message arrives+ 
({_}+watermark logic{_}). 

In the current implementation, if you *stop* *incoming* streaming data, the 
*last* window will *NEVER close* and we LOSE the last window data.

 

Business situation:

Worked correctly and new messages stop incoming and next message come in 5 
hours  later and the client will get the message after 5 hours instead of the 
10 seconds delay of window.

!https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=707,height=294!

The current implementation needs to be improved. Include in spark internal 
mechanisms to close windows automatically.

 

*What we propose:*

Add third parameter 

{{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
*maxDelayClose*{_}). And then trigger will execute 

 
{code:java}
if(now-window.upper_bound > maxDelayClose){
     window.close().flush();
}
{code}
 

I assume it can be done in a day. It wasn't expected for us that our customers 
couldn't get the notifications. (the company is in the medical field).

!image-2023-04-01-10-57-28-866.png|width=109,height=91!

 

simple code for problem:

 
{code:java}
kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("includeHeaders", "true") \
    .load()
sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)")
       .select(from_json(col("value").cast("string"), 
json_schema).alias("data"))
       .select("data.*")
       .withWatermark("dt", "1 seconds")
       .groupBy(window("dt", "10 seconds"))
       .agg(sum("price"))
      )
 
console = sel \
    .writeStream \
    .trigger(processingTime='10 seconds') \
    .format("console") \
    .outputMode("append")\
    .start()
{code}
 

 

  was:
The problem is very simple, when you use *TUMBLING* *window* with {*}append 
mode{*}, then the window is closed +only when the next message arrives+ 
({_}+watermark logic{_}). 

In the current implementation, if you *stop* *incoming* streaming data, the 
*last* window will *NEVER close* and we LOSE the last window data.

 

Business situation:

Worked correctly and new messages stop incoming and next message come in 5 
hours  later and the client will get the message after 5 hours instead of the 
10 seconds delay of window.

!https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=707,height=294!

The current implementation needs to be improved. Include in spark internal 
mechanisms to close windows automatically.

 

*What we propose:*

Add third parameter 

{{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
*maxDelayClose*{_}). And then trigger will execute 

 
{code:java}
if(now-window.upper_bound > maxDelayClose){
     window.close().flush();
}
{code}
 

I assume it can be done in a day. It wasn't expected for us that our customers 
couldn't get the notifications. (the company is in the medical field).

!image-2023-04-01-10-38-43-326.png|width=159,height=101!

 

 

simple code for problem:

 
{code:java}
kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("includeHeaders", "true") \
    .load()
sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)")
       .select(from_json(col("value").cast("string"), 
json_schema).alias("data"))
       .select("data.*")
       .withWatermark("dt", "1 seconds")
       .groupBy(window("dt", "10 seconds"))
       .agg(sum("price"))
      )
 
console = sel \
    .writeStream \
    .trigger(processingTime='10 seconds') \
    .format("console") \
    .outputMode("append")\
    .start()
{code}
 

 


> Spark last window dont flush in append mode
> -------------------------------------------
>
>                 Key: SPARK-43001
>                 URL: https://issues.apache.org/jira/browse/SPARK-43001
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Spark Core
>    Affects Versions: 3.3.2
>            Reporter: padavan
>            Priority: Critical
>         Attachments: image-2023-04-01-10-57-28-866.png
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The problem is very simple, when you use *TUMBLING* *window* with {*}append 
> mode{*}, then the window is closed +only when the next message arrives+ 
> ({_}+watermark logic{_}). 
> In the current implementation, if you *stop* *incoming* streaming data, the 
> *last* window will *NEVER close* and we LOSE the last window data.
>  
> Business situation:
> Worked correctly and new messages stop incoming and next message come in 5 
> hours  later and the client will get the message after 5 hours instead of the 
> 10 seconds delay of window.
> !https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=707,height=294!
> The current implementation needs to be improved. Include in spark internal 
> mechanisms to close windows automatically.
>  
> *What we propose:*
> Add third parameter 
> {{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
> *maxDelayClose*{_}). And then trigger will execute 
>  
> {code:java}
> if(now-window.upper_bound > maxDelayClose){
>      window.close().flush();
> }
> {code}
>  
> I assume it can be done in a day. It wasn't expected for us that our 
> customers couldn't get the notifications. (the company is in the medical 
> field).
> !image-2023-04-01-10-57-28-866.png|width=109,height=91!
>  
> simple code for problem:
>  
> {code:java}
> kafka_stream_df = spark \
>     .readStream \
>     .format("kafka") \
>     .option("kafka.bootstrap.servers", KAFKA_BROKER) \
>     .option("subscribe", KAFKA_TOPIC) \
>     .option("includeHeaders", "true") \
>     .load()
> sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
> STRING)")
>        .select(from_json(col("value").cast("string"), 
> json_schema).alias("data"))
>        .select("data.*")
>        .withWatermark("dt", "1 seconds")
>        .groupBy(window("dt", "10 seconds"))
>        .agg(sum("price"))
>       )
>  
> console = sel \
>     .writeStream \
>     .trigger(processingTime='10 seconds') \
>     .format("console") \
>     .outputMode("append")\
>     .start()
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to