[ 
https://issues.apache.org/jira/browse/SPARK-43001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17709924#comment-17709924
 ] 

Jungtaek Lim commented on SPARK-43001:
--------------------------------------

This is a known issue. The current available option for this is to use update 
mode with handling upsert on user logic (either using MERGE INTO in foreach 
batch sink or custom logic to do it), but I understand update mode does not 
work for every sink.

I think the thing SS lacks now is advancing watermark despite of idleness. 
Current implementation requires to run a microbatch as we store the value of 
watermark into the WAL log. That said, it should be no-data batch with 
advancing watermark. How much SS needs to advance the watermark if it comes 
with specified idleness is another thing to think hard. We use event time which 
is out of sync with wall clock, and when idleness happens we have to either 1) 
make specified duration of advancement based on previous watermark value or 2) 
sync with wall clock (e.g. wall clock - delay threshold).

I'm not saying it's impossible - I'd just like to say it may not be very 
trivial to fix it nicely.

> Spark last window dont flush in append mode
> -------------------------------------------
>
>                 Key: SPARK-43001
>                 URL: https://issues.apache.org/jira/browse/SPARK-43001
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Spark Core
>    Affects Versions: 3.3.2
>            Reporter: padavan
>            Priority: Critical
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The problem is very simple, when you use *TUMBLING* *window* with {*}append 
> mode{*}, then the window is closed +only when the next message arrives+ 
> ({_}+watermark logic{_}). 
> In the current implementation, if you *stop* *incoming* streaming data, the 
> *last* window will *NEVER close* and we LOSE the last window data.
>  
> Business situation:
> Worked correctly and new messages stop incoming and next message come in 5 
> hours  later and the client will get the message after 5 hours instead of the 
> 10 seconds delay of window.
> !https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=707,height=294!
> The current implementation needs to be improved. Include in spark internal 
> mechanisms to close windows automatically.
>  
> *What we propose:*
> Add third parameter 
> {{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
> *maxDelayClose*{_}). And then trigger will execute 
> {code:java}
> if(now - window.upper_bound > maxDelayClose){
>      window.close().flush();
> }
> {code}
> I assume it can be done in a day. It wasn't expected for us that our 
> customers couldn't get the notifications. (the company is in the medical 
> field).
>  
> simple code for problem:
> {code:java}
> kafka_stream_df = spark \
>     .readStream \
>     .format("kafka") \
>     .option("kafka.bootstrap.servers", KAFKA_BROKER) \
>     .option("subscribe", KAFKA_TOPIC) \
>     .option("includeHeaders", "true") \
>     .load()
> sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
> STRING)")
>        .select(from_json(col("value").cast("string"), 
> json_schema).alias("data"))
>        .select("data.*")
>        .withWatermark("dt", "1 seconds")
>        .groupBy(window("dt", "10 seconds"))
>        .agg(sum("price"))
>       )
>  
> console = sel \
>     .writeStream \
>     .trigger(processingTime='10 seconds') \
>     .format("console") \
>     .outputMode("append")\
>     .start()
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to