[GitHub] [spark] HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode

2019-08-28 Thread GitBox
HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple 
aggregates in append mode
URL: https://github.com/apache/spark/pull/23576#issuecomment-525708001
 
 
   cc. @tdas @zsxwing @jose-torres to see whether committers in this area are 
interested on this topic or not.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode

2019-08-28 Thread GitBox
HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple 
aggregates in append mode
URL: https://github.com/apache/spark/pull/23576#issuecomment-525703325
 
 
   Maybe I need to go through both Beam and Flink to understand the details and 
discuss in detailed level.
   
   And @echauchot thanks for asking but I wouldn't be at the ApacheCon - I 
might consider attending the event eventually when ApacheCon plans to hold in 
east Asia (I'm in S.Korea). 
   
   You may also want to consider that I'm just a one of contributors in Spark 
project, and without long-term support (shepherd) from community 
(committers/PMC members), I couldn't put efforts for this huge major feature. 
(So if you have a chance to meet some PMC members of Apache Spark in person, it 
would be better chance for you.) Moreover the necessary efforts seem to be 
beyond which I could spend my own time, so persuading my employer might be also 
needed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode

2019-08-28 Thread GitBox
HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple 
aggregates in append mode
URL: https://github.com/apache/spark/pull/23576#issuecomment-525646638
 
 
   > Beam does not trigger output unless the watermark pass the end of window + 
allowed lateness. There is no triggering between end of window and allowed 
lateness. Close and output is at the same time.
   
   Ah OK I see. That's a bit different from what I read a book for Flink so 
assuming there're some differences between Beam and Flink... (BTW I also read 
"Streaming Systems", though it mostly explains theory and not having pretty 
much details on Beam.)
   
   > Ah I thought we were talking about watermark. For choosing the event 
timestamp, Beam uses a TimestampCombiner which default policy is to set the 
resulting timestamp to the end of the window for new record.
   
   That seems to only explain the case where window is applied. How it works 
for other cases? Does it keep the origin event timestamp as it is? In windowed 
stream-stream join it also makes sense, but there're also non-windowed 
stream-stream join as well, and then output should have only one event time 
whereas there're two inputs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode

2019-08-27 Thread GitBox
HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple 
aggregates in append mode
URL: https://github.com/apache/spark/pull/23576#issuecomment-525225611
 
 
   > Still there is something I do not understand. With previous Spark Dstream 
framework, multiple-aggregations were supported. What has changed in Spark 
watermark behavior that makes it not supported now with Structured Streaming ?
   
   Sorry I'm not aware of DStream's behavior, kinda started from structured 
streaming and didn't mind DStream much. But as there's no notion of event time 
and watermark in DStream doc, I'd rather avoid dealing with DStream for event 
time processing. You'll likely be doing processing time, with limit to batch 
duration.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode

2019-08-27 Thread GitBox
HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple 
aggregates in append mode
URL: https://github.com/apache/spark/pull/23576#issuecomment-525217328
 
 
   > Regarding output mode, most of Beam runners (spark for ex) support 
discarding output in which element from different windows are independent and 
previous states are dropped.
   
   I'm not sure I understand it correctly. The point for Append mode is, output 
for specific key (key shouldn't be necessary to be windowed, but should include 
"event time" column) will be provided only once in any case, regardless of 
allowed lateness, no case of "upsert". If Beam doesn't close the window when 
watermark passes by (but still doesn't pass by allowed lateness) but triggers 
window and emits the output of window so far (so output could be emitted 
multiple times), it's not compatible with Spark's Append mode.
   
   stream-stream join should decide which "event time" should be taken even we 
change the way of storing event time, as there're two rows being joined. How 
Beam decides "event time" for new record from two records? In column based 
event time (current Spark), it should be hard to choose "min" or "max" of event 
time, as which column to pick as event time should be determined by query plan 
phase.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode

2019-08-25 Thread GitBox
HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple 
aggregates in append mode
URL: https://github.com/apache/spark/pull/23576#issuecomment-524686985
 
 
   I revisited and thought about this briefly, and felt that the watermark and 
mode Spark provide are different with other frameworks. 
   
   Append mode is tricky if you are familiar with other frameworks. In Append 
mode, Spark tries to ensure there's only one output for each key, which "delay 
threshold" is taken into consideration as well. AFAIK, Flink emits another 
output if late but allowed tuple comes later than watermark and updates output, 
hence dealing with "upsert" is necessary. (Not sure for Beam but I guess Flink 
follows the Beam model so I would expect similar.) In Spark, "upsert" is still 
yet defined for DSv2, and hence UPDATE mode will be disabled for Spark 3. 
(#23859)
   
   Suppose there's stateful operator OP1 with batch B2, and watermark is 
defined before OP1 with delay threshold set to 1hr. The range of outputs OP1 
can emit in B2 are following:
   
   `WM(OP1B1) - delay threshold` <= outputs < `WM(OP1B2) - delay threshold`
   
   as it denotes that outputs which were not evicted (emitted) from previous 
batch but match condition of evicting (emitting) for this batch.
   
   If we have OP2 having OP1 as upstream, it will retrieve outputs as above, 
and to not drop any intermediate outputs, either 1) OP2 should inherit 
WM(OP1B1) as WM(OP2B2) and also have equal bigger delay threshold, or 2) OP2 
should define WM(OP2B2) as `WM(OP1B1) - delay threshold`.
   
   I think Spark needs to make some changes before introducing advanced 
features. 
   
   I think the main issue of Spark Structured Streaming is being "flexible" on 
watermark, flexible enough to let end users mess up their query easily. I 
assume other frameworks have special field for "event time" and prevent 
modifying the field, but for Spark it's just same as other column and open for 
modification. If event time is modified, it's no longer in line with watermark 
and the result would be indeterministic. Same for `withWatermark`, end users 
can call `withWatermark` between OP1 and OP2, then everything is up to end 
users - what would be WM(OP2)? - and Spark can't help there.
   
   Similarly, which is event time column for stream-stream joined output where 
event time column is defined per each input? I'm not seeing clear definition of 
this.
   
   I'd in favor to let streaming engine manages event time and watermark once 
value of event time is defined, and restrict end users to modify event time 
(one-time update). To achieve this, each row should have meta-column of "event 
time", and once it's defined, further update should be done only from Spark 
side - each stateful operator needs to decide the event time of output 
according to its watermark. (e.g. for windowed aggregation, "window.start" 
should be used for "event time" and it shouldn't be changed.) That's the only 
way Spark could ensure event time and watermark are in sync during multiple 
stateful operations.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode

2019-06-12 Thread GitBox
HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple 
aggregates in append mode
URL: https://github.com/apache/spark/pull/23576#issuecomment-501466445
 
 
   I've left comment in the doc. Sorry I shouldn't leave comment here to make 
confusion. Let's talk in doc. Btw I guess this patch is not addressing the doc 
yet, then you may want to mark this patch as `WIP`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode

2019-06-12 Thread GitBox
HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple 
aggregates in append mode
URL: https://github.com/apache/spark/pull/23576#issuecomment-501180447
 
 
   I just found the case I missed somehow - watermark for (N+1) level of stage 
shouldn't refer "input" watermark of N level of stage. It should refer "output" 
watermark of N level of stage. The option 2 doesn't address the input/output 
watermark hence it should be considered as well. (I just added comment on 
design doc.)
   
   Once input watermark and output watermark for each stateful operator are 
properly adopted, I think it would work.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode

2019-05-28 Thread GitBox
HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple 
aggregates in append mode
URL: https://github.com/apache/spark/pull/23576#issuecomment-496409067
 
 
   I'm trying hard to think about the cases which this proposal can bring 
correctness issue, and I found nothing. Please correct me if I'm mistaken here.
   
   First of all, Spark doesn't support late firing - Spark includes `delay` in 
watermark instead of dealing them separately, so the possible latest fire for 
specific window would be when watermark passes by the window. In other words, 
once watermark passes by specific window, the window should be evicted (and 
might be also fired) and never be sent in further batches.
   
   (If you may indicate, yes, "complete mode" breaks this. But you know, 
"complete mode" just ignores watermark and only does upsert so irrelevant with 
watermark in any way, and not suitable on stateful query. I think "complete 
mode" is not the one worth to discuss about "logically correct" - we should 
just drop out.)
   
   Second, if I'm not missing anything, all of these approaches we are 
considering as options ensure that watermark for (N+1) level of stage cannot be 
higher than watermark for N level of stage (even there're multiple output 
stages in N level of stage).
   
   Considering two conditions together, these conditions ensure that (N+1) 
level of stage never misses the outputs from N level of stage. This should 
apply to all modes except complete mode.
   
   Let's apply this to update mode. You can get early firing outputs from N 
level of stages in (N+1) level of stage (and retraction is needed to handle 
these outputs correctly), but watermark for (N+1) stage will not be advanced 
anyway if we ensure condition 2 in above, so it doesn't break watermark for 
(N+1) stage.
   
   What do you think? Does my explanation sound good to convince us?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org