I would recommend keeping watermarks and end-of-stream separate. It is lossy to represent end-of-stream as a watermark - does that mean we hit the max watermark on a stream or that we're in a bounded stream with an end-of-stream marker? Also keep in mind that watermarks will eventually be user overridable and it would be possible for a user to effectively emit an end-of-stream control message on an unbounded stream.
On Wed, May 31, 2017 at 8:12 PM, Yi Pan <nickpa...@gmail.com> wrote: > Hi, Xinyu, > > Thanks for the update. So I have two suggestions: > - It seems to me that EndOfStream can be implemented as a special type of > Watermark as well. a) we can use MAX_INT in the watermark value to indicate > the end-of-stream; b) the streamId are simply the key to the Map<String, > Long> in the source ingestion task. When the source ingestion task received > enough count of EoS, it simply emits an EoS with its own taskName to the > intermediate stream as a watermark and the watermark propagation rule will > work. The only different thing the tasks will do in EoS is shutdown the > current tasks, while non EoS watermarks does not trigger shutdown. However, > that will allow us to simplify the type of messages and data structure to > pass through. And the reasoning in reconsilation in the downstream tasks > are pretty simple: a) # of watermarks == # of upstream tasks (i.e. > producers) b) propagation rule for the watermark message is the same; > - Based on our discussion yesterday, I think that we also need a detailed > description in the design to talk about the failure recovery scenario, > especially to answer the questions: a) in failure recovery, how the > checkpoint of offsets in the input streams and the watermark checkpoint > recovered in the current task? b) What's the correlation between the input > offsets and watermarks in the checkpoint in the current task?; c) what's > the implication of re-emitted watermarks from the current task to the > downstream tasks? > > And for Beam's watermark algorithm that Chris pointed out, I think that > OldestWork(stage) would be corresponding to the watermark timestamp of some > messages/state that we buffer in the current task and have not generate the > output or commit the state change yet. That would be needed if we implement > the exact-once algorithm Backett is working on since the algorithm will > require buffering message/state that are not committed yet. For now, if we > are processing each incoming message immediately and only checkpoint the > messages being processed completely, I think that we can ignore it. > > Just my few points. > > Thanks! > > -Yi > > On Tue, May 30, 2017 at 5:47 PM, xinyu liu <xinyuliu...@gmail.com> wrote: > >> @Chris: thanks a lot for providing the definitions. The first equation is >> exactly what I want to say about the watermark reconciliation. I haven't >> got to the second equation yet. Will probably think it through once I get >> there. >> >> @Yi: I updated the SEP-6 based on your feedback. Some replies to your >> questions are below: >> >> >> the proposal is for all types of control messages, not just for >> end-of-stream, right? Better to define the scope and layout the comment >> requirements of control message delivery. >> >> Right, the proposal is to support the general control messages. I added >> more content in the problem description about watermarks and also listed >> supporting watermark propagation as once of the goals. >> >> >> in step-3, how does the consumer of intermediate streams know how many >> EOS messages should be received? And we should make it clear that it should >> be EOS / producer and the count of the downstream consumer is counting on >> the number of unique EOS from all producers from the upstream. >> >> The control message itself currently contains the count of upstream >> producers (tasks). Chris suggested earlier if each processor has a global >> view of the each job models, then we can remove the count field. Right now >> we are using this field to keep track of the total count. I also updated >> the description of this part. >> >> >> In comparison table, “checkpoint the control messages received” ==> is >> it >> referring to the partially accumulated upstream EOS messages? >> >> Yes, that's correct. We will check point all the upstream tasks that has >> reached end-of-stream for a streamId. >> >> >> Please make a clear definition on “Watermark” and “EndOfStream”. Why are >> they different? Are they both control messages that requires the same >> delivery pattern (i.e. broadcast to downstream, reconcile at the consumer)? >> If yes, should we make the “watermark” vs “EndOfStream” a sub-category in >> control message? >> They are different: watermark contains a timestamp from the producer task, >> while EndOfStream message indicates the producer task has completely >> processed a stream. They both are control messages which require same >> delivery pattern. I updated the SEP to make it clearer they are >> sub-category of control message. >> >> >> As for the serde for intermediate stream, I assume that we will need an >> envelope serde that is avro to wrap the user message and control message >> in? So, user-defined serde now only applies to the “UserMessage”? And >> what’s the message key in the message format? >> The serde wrapper for the message is customized: the first byte indicates >> the message type, and the following byte array is the actual message. For >> user message, we will apply user provided serde. For control message, we >> will use JSON. The key is the same. We do not need customized serde since >> we can infer the serde from message. >> >> >> A big question regarding to the watermark propagation: “When Samza >> receives watermark messages, it will emit a watermark with the earliest >> event time across all the stream partitions. No emission if the earliest >> event time doesn’t change.” Does the watermark propagation requires >> synchronization/coordination between all producers at the source? Say, if >> the task taking one input source emits watermark at 1min interval and the >> task taking another input source emits watermark at 5min interval, how does >> the downstream consumer reconcile the watermarks? >> >> Watermark propagation does not require synchronization. Chris's equations >> are very accurate about how the calculations work. Please take a look. >> >> >> In the checkpoint message format, it seems that it is only design for >> watermark messages? Any streamId info that EoS is carrying over? >> >> Sorry, I forgot to add the Eos checkpoint there. I updated the SEP for it. >> Now the EOS checkpoint has the streamId. >> >> Thanks, >> Xinyu >> >> On Tue, May 30, 2017 at 11:03 AM, Chris Pettitt < >> cpett...@linkedin.com.invalid> wrote: >> >> > FWIW, there is a Beam presentation that has a very crisp set of rules >> > around watermarks. From memory it boils down to something like: >> > >> > InputWatermark(stage) = min { OutputWatermark(stage') for stage' in >> > Upstream(stage) } >> > OutputWatermark(stage) = min { InputWatermark(stage), OldestWork(stage) } >> > >> > OldestWork(stage) is the oldest message that has been received by the >> stage >> > but not yet processed. >> > >> > - Chris >> > >> > On Tue, May 30, 2017 at 1:39 PM, Yi Pan <nickpa...@gmail.com> wrote: >> > >> > > Hi, Xinyu, >> > > >> > > Thanks for the proposal. I took a quick pass and had the following >> > > questions/comments: >> > > >> > > - message shuffling ==> data shuffling??? >> > > >> > > - the proposal is for all types of control messages, not just for >> > > end-of-stream, right? Better to define the scope and layout the comment >> > > requirements of control message delivery. >> > > >> > > - dropped option should go to “Rejected alternatives” >> > > >> > > - “Samza finds out the following intermediate streams that all the >> inputs >> > > have been end-of-stream” what does it mean? The task consuming the >> input >> > > stream(s) reconcile all EoS from all input stream partitions and then >> > > propagate EoS messages to all partitions in intermediate streams? This >> is >> > > not super clear to me. >> > > >> > > - in step-3, how does the consumer of intermediate streams know how >> many >> > > EOS messages should be received? And we should make it clear that it >> > should >> > > be EOS / producer and the count of the downstream consumer is counting >> on >> > > the number of unique EOS from all producers from the upstream. >> > > >> > > - In comparison table, “checkpoint the control messages received” ==> >> is >> > it >> > > referring to the partially accumulated upstream EOS messages? >> > > >> > > - Please make a clear definition on “Watermark” and “EndOfStream”. Why >> > are >> > > they different? Are they both control messages that requires the same >> > > delivery pattern (i.e. broadcast to downstream, reconcile at the >> > consumer)? >> > > If yes, should we make the “watermark” vs “EndOfStream” a sub-category >> in >> > > control message? >> > > >> > > - As for the serde for intermediate stream, I assume that we will need >> an >> > > envelope serde that is avro to wrap the user message and control >> message >> > > in? So, user-defined serde now only applies to the “UserMessage”? And >> > > what’s the message key in the message format? >> > > >> > > - A big question regarding to the watermark propagation: “When Samza >> > > receives watermark messages, it will emit a watermark with the earliest >> > > event time across all the stream partitions. No emission if the >> earliest >> > > event time doesn’t change.” Does the watermark propagation requires >> > > synchronization/coordination between all producers at the source? Say, >> if >> > > the task taking one input source emits watermark at 1min interval and >> the >> > > task taking another input source emits watermark at 5min interval, how >> > does >> > > the downstream consumer reconcile the watermarks? >> > > >> > > - In the checkpoint message format, it seems that it is only design for >> > > watermark messages? Any streamId info that EoS is carrying over? >> > > >> > > >> > > Thanks a lot! >> > > >> > > >> > > -Yi >> > > >> > > On Tue, May 30, 2017 at 9:46 AM, xinyu liu <xinyuliu...@gmail.com> >> > wrote: >> > > >> > > > Makes sense. I noticed that too and I dropped the ControlMessage type >> > in >> > > my >> > > > code. I also moved taskName, taskCount to the parent ControlMessage >> > > class. >> > > > Just updated the SEP-6. Please take a look again. >> > > > >> > > > Thanks, >> > > > Xinyu >> > > > >> > > > On Tue, May 30, 2017 at 9:12 AM, Chris Pettitt < >> > > > cpett...@linkedin.com.invalid> wrote: >> > > > >> > > > > MessageType and ControlMessage.Type look redundant. You could >> either >> > > use >> > > > > "ControlMessage" as the type in MessageType or drop >> > > ControlMessage.Type. >> > > > > >> > > > > On Fri, May 26, 2017 at 5:14 PM, xinyu liu <xinyuliu...@gmail.com> >> > > > wrote: >> > > > > >> > > > > > Thanks a lot for the comments. I updated the SEP with more >> details >> > > and >> > > > > > clarification. Please let me know if you have further questions. >> > > > > > >> > > > > > Thanks, >> > > > > > Xinyu >> > > > > > >> > > > > > On Thu, May 25, 2017 at 11:19 AM, Prateek Maheshwari < >> > > > > > pmaheshw...@linkedin.com.invalid> wrote: >> > > > > > >> > > > > > > Hi Xinyu, >> > > > > > > >> > > > > > > Thanks for the proposal. Some requests for clarifications. >> Let's >> > > > update >> > > > > > the >> > > > > > > SEP directly instead of replying here. >> > > > > > > >> > > > > > > E.g., in "For any following intermediate stream whose input >> > streams >> > > > are >> > > > > > all >> > > > > > > end-of-stream, it will be marked as pending EOS" - Should >> clarify >> > > > that >> > > > > > > (IIUC) something is injecting EOS messages in all intermediate >> > > stream >> > > > > > > partitions once it receives EOS from all input stream >> partitions >> > > it's >> > > > > > > consuming. Should also clarify what is that something. >> > > > > > > Same for "declare end of stream once all the EOS messages have >> > been >> > > > > > > received." - What does this declaration involve and who is >> doing >> > > > this? >> > > > > > > >> > > > > > > In pro for approach 2: Not clear what this means - "The >> watermark >> > > can >> > > > > > > conclude the input messages before this watermark have been >> > > > complete." >> > > > > > > >> > > > > > > For the cons of approach 2: "Complicated failure scenario of >> the >> > > > second >> > > > > > > job. It needs to checkpoint all the watermark messages >> received, >> > so >> > > > > when >> > > > > > it >> > > > > > > recovered from failure, it can still count." - How is this >> > related >> > > to >> > > > > > EOS? >> > > > > > > How is this related to the checkpoint watermark section? >> > > > > > > Also, what is the "more messages required to write.. " >> referring >> > > to? >> > > > > > > >> > > > > > > "Samza needs to reconcile based on the task counts." - Please >> > > explain >> > > > > > what >> > > > > > > reconciliation means, why it needs to happen, and why we need >> to >> > > > track >> > > > > > the >> > > > > > > producer task and total task count in the watermark message to >> do >> > > > this. >> > > > > > > >> > > > > > > Checkpoint watermarks section is also unclear. What problem are >> > we >> > > > > trying >> > > > > > > to solve here? >> > > > > > > >> > > > > > > Should also move the message format and the watermark message >> > > > interface >> > > > > > > sections to the bottom, since they depend on details in the >> event >> > > > time >> > > > > > and >> > > > > > > checkpoint watermark sections. >> > > > > > > >> > > > > > > Thanks, >> > > > > > > Prateek >> > > > > > > >> > > > > > > >> > > > > > > On Wed, May 24, 2017 at 11:30 AM, xinyu liu < >> > xinyuliu...@gmail.com >> > > > >> > > > > > wrote: >> > > > > > > >> > > > > > > > Hi all, >> > > > > > > > >> > > > > > > > I created SEP-6 for SAMZA-1260 >> > > > > > > > <https://issues.apache.org/jira/browse/SAMZA-1260>: Support >> > > > > Watermark >> > > > > > > > Across Intermediate Streams for Batch Processing. The link to >> > the >> > > > SEP >> > > > > > is >> > > > > > > > here: >> > > > > > > > >> > > > > > > > https://cwiki.apache.org/confluence/display/SAMZA/SEP- >> > > > > > > > 6+Support+Watermark+Across+Intermediate+Streams+for+ >> > > > Batch+Processing >> > > > > > > > >> > > > > > > > Please review and comments are welcome! >> > > > > > > > >> > > > > > > > Thanks, >> > > > > > > > Xinyu >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >>