I would recommend keeping watermarks and end-of-stream separate. It is
lossy to represent end-of-stream as a watermark - does that mean we
hit the max watermark on a stream or that we're in a bounded stream
with an end-of-stream marker? Also keep in mind that watermarks will
eventually be user overridable and it would be possible for a user to
effectively emit an end-of-stream control message on an unbounded
stream.

On Wed, May 31, 2017 at 8:12 PM, Yi Pan <nickpa...@gmail.com> wrote:
> Hi, Xinyu,
>
> Thanks for the update. So I have two suggestions:
> - It seems to me that EndOfStream can be implemented as a special type of
> Watermark as well. a) we can use MAX_INT in the watermark value to indicate
> the end-of-stream; b) the streamId are simply the key to the Map<String,
> Long> in the source ingestion task. When the source ingestion task received
> enough count of EoS, it simply emits an EoS with its own taskName to the
> intermediate stream as a watermark and the watermark propagation rule will
> work. The only different thing the tasks will do in EoS is shutdown the
> current tasks, while non EoS watermarks does not trigger shutdown. However,
> that will allow us to simplify the type of messages and data structure to
> pass through. And the reasoning in reconsilation in the downstream tasks
> are pretty simple: a) # of watermarks == # of upstream tasks (i.e.
> producers) b) propagation rule for the watermark message is the same;
> - Based on our discussion yesterday, I think that we also need a detailed
> description in the design to talk about the failure recovery scenario,
> especially to answer the questions: a) in failure recovery, how the
> checkpoint of offsets in the input streams and the watermark checkpoint
> recovered in the current task? b) What's the correlation between the input
> offsets and watermarks in the checkpoint in the current task?; c) what's
> the implication of re-emitted watermarks from the current task to the
> downstream tasks?
>
> And for Beam's watermark algorithm that Chris pointed out, I think that
> OldestWork(stage) would be corresponding to the watermark timestamp of some
> messages/state that we buffer in the current task and have not generate the
> output or commit the state change yet. That would be needed if we implement
> the exact-once algorithm Backett is working on since the algorithm will
> require buffering message/state that are not committed yet. For now, if we
> are processing each incoming message immediately and only checkpoint the
> messages being processed completely, I think that we can ignore it.
>
> Just my few points.
>
> Thanks!
>
> -Yi
>
> On Tue, May 30, 2017 at 5:47 PM, xinyu liu <xinyuliu...@gmail.com> wrote:
>
>> @Chris: thanks a lot for providing the definitions. The first equation is
>> exactly what I want to say about the watermark reconciliation. I haven't
>> got to the second equation yet. Will probably think it through once I get
>> there.
>>
>> @Yi: I updated the SEP-6 based on your feedback. Some replies to your
>> questions are below:
>>
>> >> the proposal is for all types of control messages, not just for
>> end-of-stream, right? Better to define the scope and layout the comment
>> requirements of control message delivery.
>>
>> Right, the proposal is to support the general control messages. I added
>> more content in the problem description about watermarks and also listed
>> supporting watermark propagation as once of the goals.
>>
>> >> in step-3, how does the consumer of intermediate streams know how many
>> EOS messages should be received? And we should make it clear that it should
>> be EOS / producer and the count of the downstream consumer is counting on
>> the number of unique EOS from all producers from the upstream.
>>
>> The control message itself currently contains the count of upstream
>> producers (tasks). Chris suggested earlier if each processor has a global
>> view of the each job models, then we can remove the count field. Right now
>> we are using this field to keep track of the total count. I also updated
>> the description of this part.
>>
>> >> In comparison table, “checkpoint the control messages received” ==> is
>> it
>> referring to the partially accumulated upstream EOS messages?
>>
>> Yes, that's correct. We will check point all the upstream tasks that has
>> reached end-of-stream for a streamId.
>>
>> >> Please make a clear definition on “Watermark” and “EndOfStream”. Why are
>> they different? Are they both control messages that requires the same
>> delivery pattern (i.e. broadcast to downstream, reconcile at the consumer)?
>> If yes, should we make the “watermark” vs “EndOfStream” a sub-category in
>> control message?
>> They are different: watermark contains a timestamp from the producer task,
>> while EndOfStream message indicates the producer task has completely
>> processed a stream. They both are control messages which require same
>> delivery pattern. I updated the SEP to make it clearer they are
>> sub-category of control message.
>>
>> >> As for the serde for intermediate stream, I assume that we will need an
>> envelope serde that is avro to wrap the user message and control message
>> in? So, user-defined serde now only applies to the “UserMessage”? And
>> what’s the message key in the message format?
>> The serde wrapper for the message is customized: the first byte indicates
>> the message type, and the following byte array is the actual message. For
>> user message, we will apply user provided serde. For control message, we
>> will use JSON. The key is the same. We do not need customized serde since
>> we can infer the serde from message.
>>
>> >> A big question regarding to the watermark propagation: “When Samza
>> receives watermark messages, it will emit a watermark with the earliest
>> event time across all the stream partitions. No emission if the earliest
>> event time doesn’t change.” Does the watermark propagation requires
>> synchronization/coordination between all producers at the source? Say, if
>> the task taking one input source emits watermark at 1min interval and the
>> task taking another input source emits watermark at 5min interval, how does
>> the downstream consumer reconcile the watermarks?
>>
>> Watermark propagation does not require synchronization. Chris's equations
>> are very accurate about how the calculations work. Please take a look.
>>
>> >> In the checkpoint message format, it seems that it is only design for
>> watermark messages? Any streamId info that EoS is carrying over?
>>
>> Sorry, I forgot to add the Eos checkpoint there. I updated the SEP for it.
>> Now the EOS checkpoint has the streamId.
>>
>> Thanks,
>> Xinyu
>>
>> On Tue, May 30, 2017 at 11:03 AM, Chris Pettitt <
>> cpett...@linkedin.com.invalid> wrote:
>>
>> > FWIW, there is a Beam presentation that has a very crisp set of rules
>> > around watermarks. From memory it boils down to something like:
>> >
>> > InputWatermark(stage) = min { OutputWatermark(stage') for stage' in
>> > Upstream(stage) }
>> > OutputWatermark(stage) = min { InputWatermark(stage), OldestWork(stage) }
>> >
>> > OldestWork(stage) is the oldest message that has been received by the
>> stage
>> > but not yet processed.
>> >
>> > - Chris
>> >
>> > On Tue, May 30, 2017 at 1:39 PM, Yi Pan <nickpa...@gmail.com> wrote:
>> >
>> > > Hi, Xinyu,
>> > >
>> > > Thanks for the proposal. I took a quick pass and had the following
>> > > questions/comments:
>> > >
>> > > - message shuffling ==> data shuffling???
>> > >
>> > > - the proposal is for all types of control messages, not just for
>> > > end-of-stream, right? Better to define the scope and layout the comment
>> > > requirements of control message delivery.
>> > >
>> > > - dropped option should go to “Rejected alternatives”
>> > >
>> > > - “Samza finds out the following intermediate streams that all the
>> inputs
>> > > have been end-of-stream” what does it mean? The task consuming the
>> input
>> > > stream(s) reconcile all EoS from all input stream partitions and then
>> > > propagate EoS messages to all partitions in intermediate streams? This
>> is
>> > > not super clear to me.
>> > >
>> > > - in step-3, how does the consumer of intermediate streams know how
>> many
>> > > EOS messages should be received? And we should make it clear that it
>> > should
>> > > be EOS / producer and the count of the downstream consumer is counting
>> on
>> > > the number of unique EOS from all producers from the upstream.
>> > >
>> > > - In comparison table, “checkpoint the control messages received” ==>
>> is
>> > it
>> > > referring to the partially accumulated upstream EOS messages?
>> > >
>> > > - Please make a clear definition on “Watermark” and “EndOfStream”. Why
>> > are
>> > > they different? Are they both control messages that requires the same
>> > > delivery pattern (i.e. broadcast to downstream, reconcile at the
>> > consumer)?
>> > > If yes, should we make the “watermark” vs “EndOfStream” a sub-category
>> in
>> > > control message?
>> > >
>> > > - As for the serde for intermediate stream, I assume that we will need
>> an
>> > > envelope serde that is avro to wrap the user message and control
>> message
>> > > in? So, user-defined serde now only applies to the “UserMessage”? And
>> > > what’s the message key in the message format?
>> > >
>> > > - A big question regarding to the watermark propagation: “When Samza
>> > > receives watermark messages, it will emit a watermark with the earliest
>> > > event time across all the stream partitions. No emission if the
>> earliest
>> > > event time doesn’t change.” Does the watermark propagation requires
>> > > synchronization/coordination between all producers at the source? Say,
>> if
>> > > the task taking one input source emits watermark at 1min interval and
>> the
>> > > task taking another input source emits watermark at 5min interval, how
>> > does
>> > > the downstream consumer reconcile the watermarks?
>> > >
>> > > - In the checkpoint message format, it seems that it is only design for
>> > > watermark messages? Any streamId info that EoS is carrying over?
>> > >
>> > >
>> > > Thanks a lot!
>> > >
>> > >
>> > > -Yi
>> > >
>> > > On Tue, May 30, 2017 at 9:46 AM, xinyu liu <xinyuliu...@gmail.com>
>> > wrote:
>> > >
>> > > > Makes sense. I noticed that too and I dropped the ControlMessage type
>> > in
>> > > my
>> > > > code. I also moved taskName, taskCount to the parent ControlMessage
>> > > class.
>> > > > Just updated the SEP-6. Please take a look again.
>> > > >
>> > > > Thanks,
>> > > > Xinyu
>> > > >
>> > > > On Tue, May 30, 2017 at 9:12 AM, Chris Pettitt <
>> > > > cpett...@linkedin.com.invalid> wrote:
>> > > >
>> > > > > MessageType and ControlMessage.Type look redundant. You could
>> either
>> > > use
>> > > > > "ControlMessage" as the type in MessageType or drop
>> > > ControlMessage.Type.
>> > > > >
>> > > > > On Fri, May 26, 2017 at 5:14 PM, xinyu liu <xinyuliu...@gmail.com>
>> > > > wrote:
>> > > > >
>> > > > > > Thanks a lot for the comments. I updated the SEP with more
>> details
>> > > and
>> > > > > > clarification. Please let me know if you have further questions.
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Xinyu
>> > > > > >
>> > > > > > On Thu, May 25, 2017 at 11:19 AM, Prateek Maheshwari <
>> > > > > > pmaheshw...@linkedin.com.invalid> wrote:
>> > > > > >
>> > > > > > > Hi Xinyu,
>> > > > > > >
>> > > > > > > Thanks for the proposal. Some requests for clarifications.
>> Let's
>> > > > update
>> > > > > > the
>> > > > > > > SEP directly instead of replying here.
>> > > > > > >
>> > > > > > > E.g., in "For any following intermediate stream whose input
>> > streams
>> > > > are
>> > > > > > all
>> > > > > > > end-of-stream, it will be marked as pending EOS" - Should
>> clarify
>> > > > that
>> > > > > > > (IIUC) something is injecting EOS messages in all intermediate
>> > > stream
>> > > > > > > partitions once it receives EOS from all input stream
>> partitions
>> > > it's
>> > > > > > > consuming. Should also clarify what is that something.
>> > > > > > > Same for "declare end of stream once all the EOS messages have
>> > been
>> > > > > > > received." - What does this declaration involve and who is
>> doing
>> > > > this?
>> > > > > > >
>> > > > > > > In pro for approach 2: Not clear what this means - "The
>> watermark
>> > > can
>> > > > > > > conclude the input messages before this watermark have been
>> > > > complete."
>> > > > > > >
>> > > > > > > For the cons of approach 2: "Complicated failure scenario of
>> the
>> > > > second
>> > > > > > > job. It needs to checkpoint all the watermark messages
>> received,
>> > so
>> > > > > when
>> > > > > > it
>> > > > > > > recovered from failure, it can still count." - How is this
>> > related
>> > > to
>> > > > > > EOS?
>> > > > > > > How is this related to the checkpoint watermark section?
>> > > > > > > Also, what is the "more messages required to write.. "
>> referring
>> > > to?
>> > > > > > >
>> > > > > > > "Samza needs to reconcile based on the task counts." - Please
>> > > explain
>> > > > > > what
>> > > > > > > reconciliation means, why it needs to happen, and why we need
>> to
>> > > > track
>> > > > > > the
>> > > > > > > producer task and total task count in the watermark message to
>> do
>> > > > this.
>> > > > > > >
>> > > > > > > Checkpoint watermarks section is also unclear. What problem are
>> > we
>> > > > > trying
>> > > > > > > to solve here?
>> > > > > > >
>> > > > > > > Should also move the message format and the watermark message
>> > > > interface
>> > > > > > > sections to the bottom, since they depend on details in the
>> event
>> > > > time
>> > > > > > and
>> > > > > > > checkpoint watermark sections.
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > > Prateek
>> > > > > > >
>> > > > > > >
>> > > > > > > On Wed, May 24, 2017 at 11:30 AM, xinyu liu <
>> > xinyuliu...@gmail.com
>> > > >
>> > > > > > wrote:
>> > > > > > >
>> > > > > > > > Hi all,
>> > > > > > > >
>> > > > > > > > I created SEP-6 for SAMZA-1260
>> > > > > > > > <https://issues.apache.org/jira/browse/SAMZA-1260>: Support
>> > > > > Watermark
>> > > > > > > > Across Intermediate Streams for Batch Processing. The link to
>> > the
>> > > > SEP
>> > > > > > is
>> > > > > > > > here:
>> > > > > > > >
>> > > > > > > > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
>> > > > > > > > 6+Support+Watermark+Across+Intermediate+Streams+for+
>> > > > Batch+Processing
>> > > > > > > >
>> > > > > > > > Please review and comments are welcome!
>> > > > > > > >
>> > > > > > > > Thanks,
>> > > > > > > > Xinyu
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>

Reply via email to