[GitHub] [spark] HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode
HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode URL: https://github.com/apache/spark/pull/23576#issuecomment-525708001 cc. @tdas @zsxwing @jose-torres to see whether committers in this area are interested on this topic or not. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode
HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode URL: https://github.com/apache/spark/pull/23576#issuecomment-525703325 Maybe I need to go through both Beam and Flink to understand the details and discuss in detailed level. And @echauchot thanks for asking but I wouldn't be at the ApacheCon - I might consider attending the event eventually when ApacheCon plans to hold in east Asia (I'm in S.Korea). You may also want to consider that I'm just a one of contributors in Spark project, and without long-term support (shepherd) from community (committers/PMC members), I couldn't put efforts for this huge major feature. (So if you have a chance to meet some PMC members of Apache Spark in person, it would be better chance for you.) Moreover the necessary efforts seem to be beyond which I could spend my own time, so persuading my employer might be also needed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode
HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode URL: https://github.com/apache/spark/pull/23576#issuecomment-525646638 > Beam does not trigger output unless the watermark pass the end of window + allowed lateness. There is no triggering between end of window and allowed lateness. Close and output is at the same time. Ah OK I see. That's a bit different from what I read a book for Flink so assuming there're some differences between Beam and Flink... (BTW I also read "Streaming Systems", though it mostly explains theory and not having pretty much details on Beam.) > Ah I thought we were talking about watermark. For choosing the event timestamp, Beam uses a TimestampCombiner which default policy is to set the resulting timestamp to the end of the window for new record. That seems to only explain the case where window is applied. How it works for other cases? Does it keep the origin event timestamp as it is? In windowed stream-stream join it also makes sense, but there're also non-windowed stream-stream join as well, and then output should have only one event time whereas there're two inputs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode
HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode URL: https://github.com/apache/spark/pull/23576#issuecomment-525225611 > Still there is something I do not understand. With previous Spark Dstream framework, multiple-aggregations were supported. What has changed in Spark watermark behavior that makes it not supported now with Structured Streaming ? Sorry I'm not aware of DStream's behavior, kinda started from structured streaming and didn't mind DStream much. But as there's no notion of event time and watermark in DStream doc, I'd rather avoid dealing with DStream for event time processing. You'll likely be doing processing time, with limit to batch duration. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode
HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode URL: https://github.com/apache/spark/pull/23576#issuecomment-525217328 > Regarding output mode, most of Beam runners (spark for ex) support discarding output in which element from different windows are independent and previous states are dropped. I'm not sure I understand it correctly. The point for Append mode is, output for specific key (key shouldn't be necessary to be windowed, but should include "event time" column) will be provided only once in any case, regardless of allowed lateness, no case of "upsert". If Beam doesn't close the window when watermark passes by (but still doesn't pass by allowed lateness) but triggers window and emits the output of window so far (so output could be emitted multiple times), it's not compatible with Spark's Append mode. stream-stream join should decide which "event time" should be taken even we change the way of storing event time, as there're two rows being joined. How Beam decides "event time" for new record from two records? In column based event time (current Spark), it should be hard to choose "min" or "max" of event time, as which column to pick as event time should be determined by query plan phase. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode
HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode URL: https://github.com/apache/spark/pull/23576#issuecomment-524686985 I revisited and thought about this briefly, and felt that the watermark and mode Spark provide are different with other frameworks. Append mode is tricky if you are familiar with other frameworks. In Append mode, Spark tries to ensure there's only one output for each key, which "delay threshold" is taken into consideration as well. AFAIK, Flink emits another output if late but allowed tuple comes later than watermark and updates output, hence dealing with "upsert" is necessary. (Not sure for Beam but I guess Flink follows the Beam model so I would expect similar.) In Spark, "upsert" is still yet defined for DSv2, and hence UPDATE mode will be disabled for Spark 3. (#23859) Suppose there's stateful operator OP1 with batch B2, and watermark is defined before OP1 with delay threshold set to 1hr. The range of outputs OP1 can emit in B2 are following: `WM(OP1B1) - delay threshold` <= outputs < `WM(OP1B2) - delay threshold` as it denotes that outputs which were not evicted (emitted) from previous batch but match condition of evicting (emitting) for this batch. If we have OP2 having OP1 as upstream, it will retrieve outputs as above, and to not drop any intermediate outputs, either 1) OP2 should inherit WM(OP1B1) as WM(OP2B2) and also have equal bigger delay threshold, or 2) OP2 should define WM(OP2B2) as `WM(OP1B1) - delay threshold`. I think Spark needs to make some changes before introducing advanced features. I think the main issue of Spark Structured Streaming is being "flexible" on watermark, flexible enough to let end users mess up their query easily. I assume other frameworks have special field for "event time" and prevent modifying the field, but for Spark it's just same as other column and open for modification. If event time is modified, it's no longer in line with watermark and the result would be indeterministic. Same for `withWatermark`, end users can call `withWatermark` between OP1 and OP2, then everything is up to end users - what would be WM(OP2)? - and Spark can't help there. Similarly, which is event time column for stream-stream joined output where event time column is defined per each input? I'm not seeing clear definition of this. I'd in favor to let streaming engine manages event time and watermark once value of event time is defined, and restrict end users to modify event time (one-time update). To achieve this, each row should have meta-column of "event time", and once it's defined, further update should be done only from Spark side - each stateful operator needs to decide the event time of output according to its watermark. (e.g. for windowed aggregation, "window.start" should be used for "event time" and it shouldn't be changed.) That's the only way Spark could ensure event time and watermark are in sync during multiple stateful operations. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode
HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode URL: https://github.com/apache/spark/pull/23576#issuecomment-501466445 I've left comment in the doc. Sorry I shouldn't leave comment here to make confusion. Let's talk in doc. Btw I guess this patch is not addressing the doc yet, then you may want to mark this patch as `WIP`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode
HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode URL: https://github.com/apache/spark/pull/23576#issuecomment-501180447 I just found the case I missed somehow - watermark for (N+1) level of stage shouldn't refer "input" watermark of N level of stage. It should refer "output" watermark of N level of stage. The option 2 doesn't address the input/output watermark hence it should be considered as well. (I just added comment on design doc.) Once input watermark and output watermark for each stateful operator are properly adopted, I think it would work. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode
HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode URL: https://github.com/apache/spark/pull/23576#issuecomment-496409067 I'm trying hard to think about the cases which this proposal can bring correctness issue, and I found nothing. Please correct me if I'm mistaken here. First of all, Spark doesn't support late firing - Spark includes `delay` in watermark instead of dealing them separately, so the possible latest fire for specific window would be when watermark passes by the window. In other words, once watermark passes by specific window, the window should be evicted (and might be also fired) and never be sent in further batches. (If you may indicate, yes, "complete mode" breaks this. But you know, "complete mode" just ignores watermark and only does upsert so irrelevant with watermark in any way, and not suitable on stateful query. I think "complete mode" is not the one worth to discuss about "logically correct" - we should just drop out.) Second, if I'm not missing anything, all of these approaches we are considering as options ensure that watermark for (N+1) level of stage cannot be higher than watermark for N level of stage (even there're multiple output stages in N level of stage). Considering two conditions together, these conditions ensure that (N+1) level of stage never misses the outputs from N level of stage. This should apply to all modes except complete mode. Let's apply this to update mode. You can get early firing outputs from N level of stages in (N+1) level of stage (and retraction is needed to handle these outputs correctly), but watermark for (N+1) stage will not be advanced anyway if we ensure condition 2 in above, so it doesn't break watermark for (N+1) stage. What do you think? Does my explanation sound good to convince us? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org