Comments inline

On Thu, Feb 11, 2016 at 4:13 PM, Timothy Farkas <[email protected]> wrote:

> Hey Pramod,
>
> I agree if APEX-339 is in place then it would work without redeploying
> containers for operators that are Stateless, or a subset of Stateful
> operators.
>
> Addressing your previous questions.
>
> - The StatsListener can be used to see how far behind operators are. You
> could determine what window the operator is on, or the number of tuples
> it's processed so far, or how long
> it takes it to complete a window.
>

What if tuples are different sizes and number of tuples processed doesn't
reflect how far ahead or behind a downstream partition is? How is the
information from StatsListener made available to the upstream partition
codecs.


>
> - Some examples of Stateful operators that require repartitioning of state
> are the following:
>       - Deduper
>            In this case after updating the stream codec the operator may
> allow a previously seen value to pass because the partition didn't receive
> that value with the previous stream codec.
>       - A key value store that holds aggregations for each key.
>            In this case multiple partitions would hold partial aggregations
> for a key, when they are expecting to hold the complete aggregation.
>

Agreed for deduper. For the second case a unifier is a better approach so
that you are not affected by key skew in general.


>
> Tim
>
> On Thu, Feb 11, 2016 at 4:04 PM, Pramod Immaneni <[email protected]>
> wrote:
>
> > Additionally it can be treated as a non-idempotent stream for recovery.
> > Look at APEXCORE-339. In cases where the downstream partitions require
> some
> > key based partitioning, what you are suggesting would be a good approach
> > but it will require more complex logic in the StreamCodec to both key and
> > load based partitioning.
> >
> > On Thu, Feb 11, 2016 at 3:49 PM, Pramod Immaneni <[email protected]
> >
> > wrote:
> >
> > > How would you know how far behind partitions are without interacting
> with
> > > BufferServer like you were mentioning in the earlier email. Secondly
> why
> > > would changing where the data is sent to based mandate re-partitioning
> if
> > > the downstream partitions can handle data with different keys.
> > >
> > > On Thu, Feb 11, 2016 at 3:43 PM, Timothy Farkas <[email protected]>
> > > wrote:
> > >
> > >> Hey Pramod,
> > >>
> > >> I think in general and for recovery the existing Partitioning
> machinery
> > >> can
> > >> be reused to update the Stream Codec.
> > >> The reason why is because If the operator is Stateful and changes are
> > made
> > >> to the Stream Codec, the state of the partitions will also have to be
> > >> repartitioned.
> > >> In this case the number of partitions will remain the same, just the
> > state
> > >> of the partitions is reshuffled. The implementation for this state
> > >> reshuffling in a fault tolerant way is already handled by the Dynamic
> > >> Partitioning logic, so it could be extended to update the Stream Codec
> > as
> > >> well.
> > >>
> > >> If the operator is Stateless, it may be possible to do without
> > redeploying
> > >> any containers. But with the way I am envisioning it, I think there
> > would
> > >> be a lot of difficult to handle corner cases for recovery.
> > >>
> > >> Tim
> > >>
> > >> On Thu, Feb 11, 2016 at 3:07 PM, Pramod Immaneni <
> > [email protected]>
> > >> wrote:
> > >>
> > >> > Comment inline.
> > >> >
> > >> > On Thu, Feb 11, 2016 at 12:21 PM, Timothy Farkas <
> [email protected]
> > >
> > >> > wrote:
> > >> >
> > >> > > +1 for the idea.
> > >> > >
> > >> > > Gaurav, this could be done idempotently in the same way that
> dynamic
> > >> > > repartitioning is done idempotently. All the partitions are rolled
> > >> back
> > >> > to
> > >> > > a common checkpoint and the new StreamCodec is applied starting
> > then.
> > >> The
> > >> > > statistics that the Stream Codec are given are the statistics for
> > the
> > >> > > windows computed before the common checkpoint that the partitions
> > are
> > >> > > rolled back to.
> > >> > >
> > >> > > In fact I think this feature could be added easily by avoiding
> > buffer
> > >> > > server entirely and by allowing the Partitioner to redefine the
> > >> > StreamCodec
> > >> > > for the operator when define partitions is called.
> > >> > >
> > >> >
> > >> > Are you saying this in context of recovery or in general?
> > >> >
> > >> >
> > >> > >
> > >> > > Thanks,
> > >> > > Tim
> > >> > >
> > >> > > On Thu, Feb 11, 2016 at 12:07 PM, Amol Kekre <
> [email protected]>
> > >> > wrote:
> > >> > >
> > >> > > > Gaurav,
> > >> > > > It would not be idempotent per partition, but will be across all
> > >> > > partitions
> > >> > > > combined. In this case the user would have explicitly asked for
> > >> such a
> > >> > > > pattern.
> > >> > > >
> > >> > > > Thks,
> > >> > > > Amol
> > >> > > >
> > >> > > >
> > >> > > > On Thu, Feb 11, 2016 at 12:04 PM, Gaurav Gupta <
> > >> > [email protected]
> > >> > > >
> > >> > > > wrote:
> > >> > > >
> > >> > > > > Pramod,
> > >> > > > >
> > >> > > > > How would it work with recovery? There could be cases where a
> > >> tuple
> > >> > > went
> > >> > > > to
> > >> > > > > P1 and post recovery it can go to P2
> > >> > > > >
> > >> > > > > Gaurav
> > >> > > > >
> > >> > > > > On Thu, Feb 11, 2016 at 11:56 AM, Pramod Immaneni <
> > >> > > > [email protected]>
> > >> > > > > wrote:
> > >> > > > >
> > >> > > > > > Hi,
> > >> > > > > >
> > >> > > > > > There are scenarios where the downstream partitions of an
> > >> upstream
> > >> > > > > operator
> > >> > > > > > are generally not performing uniformly resulting in an
> overall
> > >> > > > > sub-optimal
> > >> > > > > > performance dictated by the slowest partitions. The reasons
> > >> could
> > >> > be
> > >> > > > data
> > >> > > > > > related such as some partitions are receiving more data to
> > >> process
> > >> > > than
> > >> > > > > the
> > >> > > > > > others or could be environment related such as some
> partitions
> > >> are
> > >> > > > > running
> > >> > > > > > slower than others because they are on heavily loaded nodes.
> > >> > > > > >
> > >> > > > > > A solution based on currently available functionality in the
> > >> engine
> > >> > > > would
> > >> > > > > > be to write a StreamCodec implementation to distribute data
> > >> among
> > >> > the
> > >> > > > > > partitions such that each partition is receiving similar
> > amount
> > >> of
> > >> > > data
> > >> > > > > to
> > >> > > > > > process. We should consider adding StreamCodecs like these
> to
> > >> the
> > >> > > > library
> > >> > > > > > but these however do not solve the problem when it is
> > >> environment
> > >> > > > > related.
> > >> > > > > >
> > >> > > > > > For that a better and more comprehensive approach would be
> > look
> > >> at
> > >> > > how
> > >> > > > > data
> > >> > > > > > is being consumed by the downstream partitions from the
> > >> > BufferServer
> > >> > > > and
> > >> > > > > > use that information to make decisions on how to send future
> > >> data.
> > >> > If
> > >> > > > > some
> > >> > > > > > partitions are behind others in consuming data then data can
> > be
> > >> > > > directed
> > >> > > > > to
> > >> > > > > > the other partitions. One way to do this would be to relay
> > this
> > >> > type
> > >> > > of
> > >> > > > > > statistical and positional information from BufferServer to
> > the
> > >> > > > upstream
> > >> > > > > > publishers. The publishers can use this information in ways
> > >> such as
> > >> > > > > making
> > >> > > > > > it available to StreamCodecs to affect destination of future
> > >> data.
> > >> > > > > >
> > >> > > > > > What do you think.
> > >> > > > > >
> > >> > > > > > Thanks
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>

Reply via email to