Hi, Xinyu,

Thanks for the update. So I have two suggestions:
- It seems to me that EndOfStream can be implemented as a special type of
Watermark as well. a) we can use MAX_INT in the watermark value to indicate
the end-of-stream; b) the streamId are simply the key to the Map<String,
Long> in the source ingestion task. When the source ingestion task received
enough count of EoS, it simply emits an EoS with its own taskName to the
intermediate stream as a watermark and the watermark propagation rule will
work. The only different thing the tasks will do in EoS is shutdown the
current tasks, while non EoS watermarks does not trigger shutdown. However,
that will allow us to simplify the type of messages and data structure to
pass through. And the reasoning in reconsilation in the downstream tasks
are pretty simple: a) # of watermarks == # of upstream tasks (i.e.
producers) b) propagation rule for the watermark message is the same;
- Based on our discussion yesterday, I think that we also need a detailed
description in the design to talk about the failure recovery scenario,
especially to answer the questions: a) in failure recovery, how the
checkpoint of offsets in the input streams and the watermark checkpoint
recovered in the current task? b) What's the correlation between the input
offsets and watermarks in the checkpoint in the current task?; c) what's
the implication of re-emitted watermarks from the current task to the
downstream tasks?

And for Beam's watermark algorithm that Chris pointed out, I think that
OldestWork(stage) would be corresponding to the watermark timestamp of some
messages/state that we buffer in the current task and have not generate the
output or commit the state change yet. That would be needed if we implement
the exact-once algorithm Backett is working on since the algorithm will
require buffering message/state that are not committed yet. For now, if we
are processing each incoming message immediately and only checkpoint the
messages being processed completely, I think that we can ignore it.

Just my few points.

Thanks!

-Yi

On Tue, May 30, 2017 at 5:47 PM, xinyu liu <xinyuliu...@gmail.com> wrote:

> @Chris: thanks a lot for providing the definitions. The first equation is
> exactly what I want to say about the watermark reconciliation. I haven't
> got to the second equation yet. Will probably think it through once I get
> there.
>
> @Yi: I updated the SEP-6 based on your feedback. Some replies to your
> questions are below:
>
> >> the proposal is for all types of control messages, not just for
> end-of-stream, right? Better to define the scope and layout the comment
> requirements of control message delivery.
>
> Right, the proposal is to support the general control messages. I added
> more content in the problem description about watermarks and also listed
> supporting watermark propagation as once of the goals.
>
> >> in step-3, how does the consumer of intermediate streams know how many
> EOS messages should be received? And we should make it clear that it should
> be EOS / producer and the count of the downstream consumer is counting on
> the number of unique EOS from all producers from the upstream.
>
> The control message itself currently contains the count of upstream
> producers (tasks). Chris suggested earlier if each processor has a global
> view of the each job models, then we can remove the count field. Right now
> we are using this field to keep track of the total count. I also updated
> the description of this part.
>
> >> In comparison table, “checkpoint the control messages received” ==> is
> it
> referring to the partially accumulated upstream EOS messages?
>
> Yes, that's correct. We will check point all the upstream tasks that has
> reached end-of-stream for a streamId.
>
> >> Please make a clear definition on “Watermark” and “EndOfStream”. Why are
> they different? Are they both control messages that requires the same
> delivery pattern (i.e. broadcast to downstream, reconcile at the consumer)?
> If yes, should we make the “watermark” vs “EndOfStream” a sub-category in
> control message?
> They are different: watermark contains a timestamp from the producer task,
> while EndOfStream message indicates the producer task has completely
> processed a stream. They both are control messages which require same
> delivery pattern. I updated the SEP to make it clearer they are
> sub-category of control message.
>
> >> As for the serde for intermediate stream, I assume that we will need an
> envelope serde that is avro to wrap the user message and control message
> in? So, user-defined serde now only applies to the “UserMessage”? And
> what’s the message key in the message format?
> The serde wrapper for the message is customized: the first byte indicates
> the message type, and the following byte array is the actual message. For
> user message, we will apply user provided serde. For control message, we
> will use JSON. The key is the same. We do not need customized serde since
> we can infer the serde from message.
>
> >> A big question regarding to the watermark propagation: “When Samza
> receives watermark messages, it will emit a watermark with the earliest
> event time across all the stream partitions. No emission if the earliest
> event time doesn’t change.” Does the watermark propagation requires
> synchronization/coordination between all producers at the source? Say, if
> the task taking one input source emits watermark at 1min interval and the
> task taking another input source emits watermark at 5min interval, how does
> the downstream consumer reconcile the watermarks?
>
> Watermark propagation does not require synchronization. Chris's equations
> are very accurate about how the calculations work. Please take a look.
>
> >> In the checkpoint message format, it seems that it is only design for
> watermark messages? Any streamId info that EoS is carrying over?
>
> Sorry, I forgot to add the Eos checkpoint there. I updated the SEP for it.
> Now the EOS checkpoint has the streamId.
>
> Thanks,
> Xinyu
>
> On Tue, May 30, 2017 at 11:03 AM, Chris Pettitt <
> cpett...@linkedin.com.invalid> wrote:
>
> > FWIW, there is a Beam presentation that has a very crisp set of rules
> > around watermarks. From memory it boils down to something like:
> >
> > InputWatermark(stage) = min { OutputWatermark(stage') for stage' in
> > Upstream(stage) }
> > OutputWatermark(stage) = min { InputWatermark(stage), OldestWork(stage) }
> >
> > OldestWork(stage) is the oldest message that has been received by the
> stage
> > but not yet processed.
> >
> > - Chris
> >
> > On Tue, May 30, 2017 at 1:39 PM, Yi Pan <nickpa...@gmail.com> wrote:
> >
> > > Hi, Xinyu,
> > >
> > > Thanks for the proposal. I took a quick pass and had the following
> > > questions/comments:
> > >
> > > - message shuffling ==> data shuffling???
> > >
> > > - the proposal is for all types of control messages, not just for
> > > end-of-stream, right? Better to define the scope and layout the comment
> > > requirements of control message delivery.
> > >
> > > - dropped option should go to “Rejected alternatives”
> > >
> > > - “Samza finds out the following intermediate streams that all the
> inputs
> > > have been end-of-stream” what does it mean? The task consuming the
> input
> > > stream(s) reconcile all EoS from all input stream partitions and then
> > > propagate EoS messages to all partitions in intermediate streams? This
> is
> > > not super clear to me.
> > >
> > > - in step-3, how does the consumer of intermediate streams know how
> many
> > > EOS messages should be received? And we should make it clear that it
> > should
> > > be EOS / producer and the count of the downstream consumer is counting
> on
> > > the number of unique EOS from all producers from the upstream.
> > >
> > > - In comparison table, “checkpoint the control messages received” ==>
> is
> > it
> > > referring to the partially accumulated upstream EOS messages?
> > >
> > > - Please make a clear definition on “Watermark” and “EndOfStream”. Why
> > are
> > > they different? Are they both control messages that requires the same
> > > delivery pattern (i.e. broadcast to downstream, reconcile at the
> > consumer)?
> > > If yes, should we make the “watermark” vs “EndOfStream” a sub-category
> in
> > > control message?
> > >
> > > - As for the serde for intermediate stream, I assume that we will need
> an
> > > envelope serde that is avro to wrap the user message and control
> message
> > > in? So, user-defined serde now only applies to the “UserMessage”? And
> > > what’s the message key in the message format?
> > >
> > > - A big question regarding to the watermark propagation: “When Samza
> > > receives watermark messages, it will emit a watermark with the earliest
> > > event time across all the stream partitions. No emission if the
> earliest
> > > event time doesn’t change.” Does the watermark propagation requires
> > > synchronization/coordination between all producers at the source? Say,
> if
> > > the task taking one input source emits watermark at 1min interval and
> the
> > > task taking another input source emits watermark at 5min interval, how
> > does
> > > the downstream consumer reconcile the watermarks?
> > >
> > > - In the checkpoint message format, it seems that it is only design for
> > > watermark messages? Any streamId info that EoS is carrying over?
> > >
> > >
> > > Thanks a lot!
> > >
> > >
> > > -Yi
> > >
> > > On Tue, May 30, 2017 at 9:46 AM, xinyu liu <xinyuliu...@gmail.com>
> > wrote:
> > >
> > > > Makes sense. I noticed that too and I dropped the ControlMessage type
> > in
> > > my
> > > > code. I also moved taskName, taskCount to the parent ControlMessage
> > > class.
> > > > Just updated the SEP-6. Please take a look again.
> > > >
> > > > Thanks,
> > > > Xinyu
> > > >
> > > > On Tue, May 30, 2017 at 9:12 AM, Chris Pettitt <
> > > > cpett...@linkedin.com.invalid> wrote:
> > > >
> > > > > MessageType and ControlMessage.Type look redundant. You could
> either
> > > use
> > > > > "ControlMessage" as the type in MessageType or drop
> > > ControlMessage.Type.
> > > > >
> > > > > On Fri, May 26, 2017 at 5:14 PM, xinyu liu <xinyuliu...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Thanks a lot for the comments. I updated the SEP with more
> details
> > > and
> > > > > > clarification. Please let me know if you have further questions.
> > > > > >
> > > > > > Thanks,
> > > > > > Xinyu
> > > > > >
> > > > > > On Thu, May 25, 2017 at 11:19 AM, Prateek Maheshwari <
> > > > > > pmaheshw...@linkedin.com.invalid> wrote:
> > > > > >
> > > > > > > Hi Xinyu,
> > > > > > >
> > > > > > > Thanks for the proposal. Some requests for clarifications.
> Let's
> > > > update
> > > > > > the
> > > > > > > SEP directly instead of replying here.
> > > > > > >
> > > > > > > E.g., in "For any following intermediate stream whose input
> > streams
> > > > are
> > > > > > all
> > > > > > > end-of-stream, it will be marked as pending EOS" - Should
> clarify
> > > > that
> > > > > > > (IIUC) something is injecting EOS messages in all intermediate
> > > stream
> > > > > > > partitions once it receives EOS from all input stream
> partitions
> > > it's
> > > > > > > consuming. Should also clarify what is that something.
> > > > > > > Same for "declare end of stream once all the EOS messages have
> > been
> > > > > > > received." - What does this declaration involve and who is
> doing
> > > > this?
> > > > > > >
> > > > > > > In pro for approach 2: Not clear what this means - "The
> watermark
> > > can
> > > > > > > conclude the input messages before this watermark have been
> > > > complete."
> > > > > > >
> > > > > > > For the cons of approach 2: "Complicated failure scenario of
> the
> > > > second
> > > > > > > job. It needs to checkpoint all the watermark messages
> received,
> > so
> > > > > when
> > > > > > it
> > > > > > > recovered from failure, it can still count." - How is this
> > related
> > > to
> > > > > > EOS?
> > > > > > > How is this related to the checkpoint watermark section?
> > > > > > > Also, what is the "more messages required to write.. "
> referring
> > > to?
> > > > > > >
> > > > > > > "Samza needs to reconcile based on the task counts." - Please
> > > explain
> > > > > > what
> > > > > > > reconciliation means, why it needs to happen, and why we need
> to
> > > > track
> > > > > > the
> > > > > > > producer task and total task count in the watermark message to
> do
> > > > this.
> > > > > > >
> > > > > > > Checkpoint watermarks section is also unclear. What problem are
> > we
> > > > > trying
> > > > > > > to solve here?
> > > > > > >
> > > > > > > Should also move the message format and the watermark message
> > > > interface
> > > > > > > sections to the bottom, since they depend on details in the
> event
> > > > time
> > > > > > and
> > > > > > > checkpoint watermark sections.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Prateek
> > > > > > >
> > > > > > >
> > > > > > > On Wed, May 24, 2017 at 11:30 AM, xinyu liu <
> > xinyuliu...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > I created SEP-6 for SAMZA-1260
> > > > > > > > <https://issues.apache.org/jira/browse/SAMZA-1260>: Support
> > > > > Watermark
> > > > > > > > Across Intermediate Streams for Batch Processing. The link to
> > the
> > > > SEP
> > > > > > is
> > > > > > > > here:
> > > > > > > >
> > > > > > > > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> > > > > > > > 6+Support+Watermark+Across+Intermediate+Streams+for+
> > > > Batch+Processing
> > > > > > > >
> > > > > > > > Please review and comments are welcome!
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Xinyu
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to