Hey Pramod,
I agree if APEX-339 is in place then it would work without redeploying
containers for operators that are Stateless, or a subset of Stateful
operators.
Addressing your previous questions.
- The StatsListener can be used to see how far behind operators are. You
could determine what window the operator is on, or the number of tuples
it's processed so far, or how long
it takes it to complete a window.
- Some examples of Stateful operators that require repartitioning of state
are the following:
- Deduper
In this case after updating the stream codec the operator may
allow a previously seen value to pass because the partition didn't receive
that value with the previous stream codec.
- A key value store that holds aggregations for each key.
In this case multiple partitions would hold partial aggregations
for a key, when they are expecting to hold the complete aggregation.
Tim
On Thu, Feb 11, 2016 at 4:04 PM, Pramod Immaneni <[email protected]>
wrote:
> Additionally it can be treated as a non-idempotent stream for recovery.
> Look at APEXCORE-339. In cases where the downstream partitions require some
> key based partitioning, what you are suggesting would be a good approach
> but it will require more complex logic in the StreamCodec to both key and
> load based partitioning.
>
> On Thu, Feb 11, 2016 at 3:49 PM, Pramod Immaneni <[email protected]>
> wrote:
>
> > How would you know how far behind partitions are without interacting with
> > BufferServer like you were mentioning in the earlier email. Secondly why
> > would changing where the data is sent to based mandate re-partitioning if
> > the downstream partitions can handle data with different keys.
> >
> > On Thu, Feb 11, 2016 at 3:43 PM, Timothy Farkas <[email protected]>
> > wrote:
> >
> >> Hey Pramod,
> >>
> >> I think in general and for recovery the existing Partitioning machinery
> >> can
> >> be reused to update the Stream Codec.
> >> The reason why is because If the operator is Stateful and changes are
> made
> >> to the Stream Codec, the state of the partitions will also have to be
> >> repartitioned.
> >> In this case the number of partitions will remain the same, just the
> state
> >> of the partitions is reshuffled. The implementation for this state
> >> reshuffling in a fault tolerant way is already handled by the Dynamic
> >> Partitioning logic, so it could be extended to update the Stream Codec
> as
> >> well.
> >>
> >> If the operator is Stateless, it may be possible to do without
> redeploying
> >> any containers. But with the way I am envisioning it, I think there
> would
> >> be a lot of difficult to handle corner cases for recovery.
> >>
> >> Tim
> >>
> >> On Thu, Feb 11, 2016 at 3:07 PM, Pramod Immaneni <
> [email protected]>
> >> wrote:
> >>
> >> > Comment inline.
> >> >
> >> > On Thu, Feb 11, 2016 at 12:21 PM, Timothy Farkas <[email protected]
> >
> >> > wrote:
> >> >
> >> > > +1 for the idea.
> >> > >
> >> > > Gaurav, this could be done idempotently in the same way that dynamic
> >> > > repartitioning is done idempotently. All the partitions are rolled
> >> back
> >> > to
> >> > > a common checkpoint and the new StreamCodec is applied starting
> then.
> >> The
> >> > > statistics that the Stream Codec are given are the statistics for
> the
> >> > > windows computed before the common checkpoint that the partitions
> are
> >> > > rolled back to.
> >> > >
> >> > > In fact I think this feature could be added easily by avoiding
> buffer
> >> > > server entirely and by allowing the Partitioner to redefine the
> >> > StreamCodec
> >> > > for the operator when define partitions is called.
> >> > >
> >> >
> >> > Are you saying this in context of recovery or in general?
> >> >
> >> >
> >> > >
> >> > > Thanks,
> >> > > Tim
> >> > >
> >> > > On Thu, Feb 11, 2016 at 12:07 PM, Amol Kekre <[email protected]>
> >> > wrote:
> >> > >
> >> > > > Gaurav,
> >> > > > It would not be idempotent per partition, but will be across all
> >> > > partitions
> >> > > > combined. In this case the user would have explicitly asked for
> >> such a
> >> > > > pattern.
> >> > > >
> >> > > > Thks,
> >> > > > Amol
> >> > > >
> >> > > >
> >> > > > On Thu, Feb 11, 2016 at 12:04 PM, Gaurav Gupta <
> >> > [email protected]
> >> > > >
> >> > > > wrote:
> >> > > >
> >> > > > > Pramod,
> >> > > > >
> >> > > > > How would it work with recovery? There could be cases where a
> >> tuple
> >> > > went
> >> > > > to
> >> > > > > P1 and post recovery it can go to P2
> >> > > > >
> >> > > > > Gaurav
> >> > > > >
> >> > > > > On Thu, Feb 11, 2016 at 11:56 AM, Pramod Immaneni <
> >> > > > [email protected]>
> >> > > > > wrote:
> >> > > > >
> >> > > > > > Hi,
> >> > > > > >
> >> > > > > > There are scenarios where the downstream partitions of an
> >> upstream
> >> > > > > operator
> >> > > > > > are generally not performing uniformly resulting in an overall
> >> > > > > sub-optimal
> >> > > > > > performance dictated by the slowest partitions. The reasons
> >> could
> >> > be
> >> > > > data
> >> > > > > > related such as some partitions are receiving more data to
> >> process
> >> > > than
> >> > > > > the
> >> > > > > > others or could be environment related such as some partitions
> >> are
> >> > > > > running
> >> > > > > > slower than others because they are on heavily loaded nodes.
> >> > > > > >
> >> > > > > > A solution based on currently available functionality in the
> >> engine
> >> > > > would
> >> > > > > > be to write a StreamCodec implementation to distribute data
> >> among
> >> > the
> >> > > > > > partitions such that each partition is receiving similar
> amount
> >> of
> >> > > data
> >> > > > > to
> >> > > > > > process. We should consider adding StreamCodecs like these to
> >> the
> >> > > > library
> >> > > > > > but these however do not solve the problem when it is
> >> environment
> >> > > > > related.
> >> > > > > >
> >> > > > > > For that a better and more comprehensive approach would be
> look
> >> at
> >> > > how
> >> > > > > data
> >> > > > > > is being consumed by the downstream partitions from the
> >> > BufferServer
> >> > > > and
> >> > > > > > use that information to make decisions on how to send future
> >> data.
> >> > If
> >> > > > > some
> >> > > > > > partitions are behind others in consuming data then data can
> be
> >> > > > directed
> >> > > > > to
> >> > > > > > the other partitions. One way to do this would be to relay
> this
> >> > type
> >> > > of
> >> > > > > > statistical and positional information from BufferServer to
> the
> >> > > > upstream
> >> > > > > > publishers. The publishers can use this information in ways
> >> such as
> >> > > > > making
> >> > > > > > it available to StreamCodecs to affect destination of future
> >> data.
> >> > > > > >
> >> > > > > > What do you think.
> >> > > > > >
> >> > > > > > Thanks
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>