[ https://issues.apache.org/jira/browse/SPARK-43001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17709924#comment-17709924 ]
Jungtaek Lim commented on SPARK-43001: -------------------------------------- This is a known issue. The current available option for this is to use update mode with handling upsert on user logic (either using MERGE INTO in foreach batch sink or custom logic to do it), but I understand update mode does not work for every sink. I think the thing SS lacks now is advancing watermark despite of idleness. Current implementation requires to run a microbatch as we store the value of watermark into the WAL log. That said, it should be no-data batch with advancing watermark. How much SS needs to advance the watermark if it comes with specified idleness is another thing to think hard. We use event time which is out of sync with wall clock, and when idleness happens we have to either 1) make specified duration of advancement based on previous watermark value or 2) sync with wall clock (e.g. wall clock - delay threshold). I'm not saying it's impossible - I'd just like to say it may not be very trivial to fix it nicely. > Spark last window dont flush in append mode > ------------------------------------------- > > Key: SPARK-43001 > URL: https://issues.apache.org/jira/browse/SPARK-43001 > Project: Spark > Issue Type: Bug > Components: PySpark, Spark Core > Affects Versions: 3.3.2 > Reporter: padavan > Priority: Critical > Original Estimate: 24h > Remaining Estimate: 24h > > The problem is very simple, when you use *TUMBLING* *window* with {*}append > mode{*}, then the window is closed +only when the next message arrives+ > ({_}+watermark logic{_}). > In the current implementation, if you *stop* *incoming* streaming data, the > *last* window will *NEVER close* and we LOSE the last window data. > > Business situation: > Worked correctly and new messages stop incoming and next message come in 5 > hours later and the client will get the message after 5 hours instead of the > 10 seconds delay of window. > !https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=707,height=294! > The current implementation needs to be improved. Include in spark internal > mechanisms to close windows automatically. > > *What we propose:* > Add third parameter > {{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, > *maxDelayClose*{_}). And then trigger will execute > {code:java} > if(now - window.upper_bound > maxDelayClose){ > window.close().flush(); > } > {code} > I assume it can be done in a day. It wasn't expected for us that our > customers couldn't get the notifications. (the company is in the medical > field). > > simple code for problem: > {code:java} > kafka_stream_df = spark \ > .readStream \ > .format("kafka") \ > .option("kafka.bootstrap.servers", KAFKA_BROKER) \ > .option("subscribe", KAFKA_TOPIC) \ > .option("includeHeaders", "true") \ > .load() > sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS > STRING)") > .select(from_json(col("value").cast("string"), > json_schema).alias("data")) > .select("data.*") > .withWatermark("dt", "1 seconds") > .groupBy(window("dt", "10 seconds")) > .agg(sum("price")) > ) > > console = sel \ > .writeStream \ > .trigger(processingTime='10 seconds') \ > .format("console") \ > .outputMode("append")\ > .start() > {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org