[ https://issues.apache.org/jira/browse/SPARK-43001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
padavan updated SPARK-43001: ---------------------------- Attachment: (was: image-2023-04-01-10-57-28-866.png) > Spark last window dont flush in append mode > ------------------------------------------- > > Key: SPARK-43001 > URL: https://issues.apache.org/jira/browse/SPARK-43001 > Project: Spark > Issue Type: Bug > Components: PySpark, Spark Core > Affects Versions: 3.3.2 > Reporter: padavan > Priority: Critical > Original Estimate: 24h > Remaining Estimate: 24h > > The problem is very simple, when you use *TUMBLING* *window* with {*}append > mode{*}, then the window is closed +only when the next message arrives+ > ({_}+watermark logic{_}). > In the current implementation, if you *stop* *incoming* streaming data, the > *last* window will *NEVER close* and we LOSE the last window data. > > Business situation: > Worked correctly and new messages stop incoming and next message come in 5 > hours later and the client will get the message after 5 hours instead of the > 10 seconds delay of window. > !https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=707,height=294! > The current implementation needs to be improved. Include in spark internal > mechanisms to close windows automatically. > > *What we propose:* > Add third parameter > {{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, > *maxDelayClose*{_}). And then trigger will execute > {code:java} > if(now - window.upper_bound > maxDelayClose){ > window.close().flush(); > } > {code} > I assume it can be done in a day. It wasn't expected for us that our > customers couldn't get the notifications. (the company is in the medical > field). > !image-2023-04-01-10-57-28-866.png|width=109,height=91! > > simple code for problem: > {code:java} > kafka_stream_df = spark \ > .readStream \ > .format("kafka") \ > .option("kafka.bootstrap.servers", KAFKA_BROKER) \ > .option("subscribe", KAFKA_TOPIC) \ > .option("includeHeaders", "true") \ > .load() > sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS > STRING)") > .select(from_json(col("value").cast("string"), > json_schema).alias("data")) > .select("data.*") > .withWatermark("dt", "1 seconds") > .groupBy(window("dt", "10 seconds")) > .agg(sum("price")) > ) > > console = sel \ > .writeStream \ > .trigger(processingTime='10 seconds') \ > .format("console") \ > .outputMode("append")\ > .start() > {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org