padavan created SPARK-43001: ------------------------------- Summary: Spark last window dont flush in append mode Key: SPARK-43001 URL: https://issues.apache.org/jira/browse/SPARK-43001 Project: Spark Issue Type: Bug Components: PySpark, Spark Core Affects Versions: 3.3.2 Reporter: padavan
The problem is very simple, when you use *TUMBLING* *window* with {*}append mode{*}, then the window is closed only when the next message arrives ({_}+watermark logic{_}). In the current implementation, if you *stop* *incoming* streaming data, the *last* window will *NEVER close* and we LOSE the last window data. Business situation: Worked correctly and new messages stop incoming and next message come in 5 hours later and the client will get the message after 5 hours instead of the 10 seconds delay of window. !https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png! The current implementation needs to be improved. Include in spark internal mechanisms to close windows automatically. *What we propose:* Add third parameter {{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, *maxDelayClose*{_}). And then trigger will execute {code:java} if(now-window.upper_bound > maxDelayClose){ window.close().flush(); } {code} I assume it can be done in a day. It wasn't expected for us that our customers couldn't get the notifications. (the company is in the medical field). !image-2023-04-01-10-38-43-326.png|width=159,height=101! simple code for problem: {code:java} kafka_stream_df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", KAFKA_BROKER) \ .option("subscribe", KAFKA_TOPIC) \ .option("includeHeaders", "true") \ .load() sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .select(from_json(col("value").cast("string"), json_schema).alias("data")) .select("data.*") .withWatermark("dt", "1 seconds") .groupBy(window("dt", "10 seconds")) .agg(sum("price")) ) console = sel \ .writeStream \ .trigger(processingTime='10 seconds') \ .format("console") \ .outputMode("append")\ .start() {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org