[ https://issues.apache.org/jira/browse/SPARK-43001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17709932#comment-17709932 ]
Vindhya G commented on SPARK-43001: ----------------------------------- Agree.I think there should be some way for users to also provide wall clock threshold. If it crosses this mark dont use eventtime but wall clock and flush pending data. What do you suggest [~kabhwan] > Spark last window dont flush in append mode > ------------------------------------------- > > Key: SPARK-43001 > URL: https://issues.apache.org/jira/browse/SPARK-43001 > Project: Spark > Issue Type: Bug > Components: PySpark, Spark Core > Affects Versions: 3.3.2 > Reporter: padavan > Priority: Critical > Original Estimate: 24h > Remaining Estimate: 24h > > The problem is very simple, when you use *TUMBLING* *window* with {*}append > mode{*}, then the window is closed +only when the next message arrives+ > ({_}+watermark logic{_}). > In the current implementation, if you *stop* *incoming* streaming data, the > *last* window will *NEVER close* and we LOSE the last window data. > > Business situation: > Worked correctly and new messages stop incoming and next message come in 5 > hours later and the client will get the message after 5 hours instead of the > 10 seconds delay of window. > !https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=707,height=294! > The current implementation needs to be improved. Include in spark internal > mechanisms to close windows automatically. > > *What we propose:* > Add third parameter > {{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, > *maxDelayClose*{_}). And then trigger will execute > {code:java} > if(now - window.upper_bound > maxDelayClose){ > window.close().flush(); > } > {code} > I assume it can be done in a day. It wasn't expected for us that our > customers couldn't get the notifications. (the company is in the medical > field). > > simple code for problem: > {code:java} > kafka_stream_df = spark \ > .readStream \ > .format("kafka") \ > .option("kafka.bootstrap.servers", KAFKA_BROKER) \ > .option("subscribe", KAFKA_TOPIC) \ > .option("includeHeaders", "true") \ > .load() > sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS > STRING)") > .select(from_json(col("value").cast("string"), > json_schema).alias("data")) > .select("data.*") > .withWatermark("dt", "1 seconds") > .groupBy(window("dt", "10 seconds")) > .agg(sum("price")) > ) > > console = sel \ > .writeStream \ > .trigger(processingTime='10 seconds') \ > .format("console") \ > .outputMode("append")\ > .start() > {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org