padavan created SPARK-43001:
-------------------------------

             Summary: Spark last window dont flush in append mode
                 Key: SPARK-43001
                 URL: https://issues.apache.org/jira/browse/SPARK-43001
             Project: Spark
          Issue Type: Bug
          Components: PySpark, Spark Core
    Affects Versions: 3.3.2
            Reporter: padavan


The problem is very simple, when you use *TUMBLING* *window* with {*}append 
mode{*}, then the window is closed only when the next message arrives 
({_}+watermark logic{_}). 

In the current implementation, if you *stop* *incoming* streaming data, the 
*last* window will *NEVER close* and we LOSE the last window data.

 

Business situation:

Worked correctly and new messages stop incoming and next message come in 5 
hours  later and the client will get the message after 5 hours instead of the 
10 seconds delay of window.

!https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png!

The current implementation needs to be improved. Include in spark internal 
mechanisms to close windows automatically.

 

*What we propose:*

Add third parameter 

{{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
*maxDelayClose*{_}). And then trigger will execute 

 
{code:java}
if(now-window.upper_bound > maxDelayClose){
     window.close().flush();
}
{code}
 

I assume it can be done in a day. It wasn't expected for us that our customers 
couldn't get the notifications. (the company is in the medical field).

!image-2023-04-01-10-38-43-326.png|width=159,height=101!

 

 

simple code for problem:

 
{code:java}
kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("includeHeaders", "true") \
    .load()
sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)")
       .select(from_json(col("value").cast("string"), 
json_schema).alias("data"))
       .select("data.*")
       .withWatermark("dt", "1 seconds")
       .groupBy(window("dt", "10 seconds"))
       .agg(sum("price"))
      )
 
console = sel \
    .writeStream \
    .trigger(processingTime='10 seconds') \
    .format("console") \
    .outputMode("append")\
    .start()
{code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to