[ 
https://issues.apache.org/jira/browse/SPARK-43001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-43001:
---------------------------------
    Priority: Major  (was: Critical)

> Spark last window dont flush in append mode
> -------------------------------------------
>
>                 Key: SPARK-43001
>                 URL: https://issues.apache.org/jira/browse/SPARK-43001
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Structured Streaming
>    Affects Versions: 3.3.2
>            Reporter: padavan
>            Priority: Major
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The problem is very simple, when you use *TUMBLING* *window* with {*}append 
> mode{*}, then the window is closed +only when the next message arrives+ 
> ({_}+watermark logic{_}). 
> In the current implementation, if you *stop* *incoming* streaming data, the 
> *last* window will *NEVER close* and we LOSE the last window data.
>  
> Business situation:
> Worked correctly and new messages stop incoming and next message come in 5 
> hours  later and the client will get the message after 5 hours instead of the 
> 10 seconds delay of window.
> !https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=707,height=294!
> The current implementation needs to be improved. Include in spark internal 
> mechanisms to close windows automatically.
>  
> *What we propose:*
> Add third parameter 
> {{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
> *maxDelayClose*{_}). And then trigger will execute 
> {code:java}
> if(now - window.upper_bound > maxDelayClose){
>      window.close().flush();
> }
> {code}
> I assume it can be done in a day. It wasn't expected for us that our 
> customers couldn't get the notifications. (the company is in the medical 
> field).
>  
> simple code for problem:
> {code:java}
> kafka_stream_df = spark \
>     .readStream \
>     .format("kafka") \
>     .option("kafka.bootstrap.servers", KAFKA_BROKER) \
>     .option("subscribe", KAFKA_TOPIC) \
>     .option("includeHeaders", "true") \
>     .load()
> sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
> STRING)")
>        .select(from_json(col("value").cast("string"), 
> json_schema).alias("data"))
>        .select("data.*")
>        .withWatermark("dt", "1 seconds")
>        .groupBy(window("dt", "10 seconds"))
>        .agg(sum("price"))
>       )
>  
> console = sel \
>     .writeStream \
>     .trigger(processingTime='10 seconds') \
>     .format("console") \
>     .outputMode("append")\
>     .start()
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to