Additionally it can be treated as a non-idempotent stream for recovery.
Look at APEXCORE-339. In cases where the downstream partitions require some
key based partitioning, what you are suggesting would be a good approach
but it will require more complex logic in the StreamCodec to both key and
load based partitioning.

On Thu, Feb 11, 2016 at 3:49 PM, Pramod Immaneni <[email protected]>
wrote:

> How would you know how far behind partitions are without interacting with
> BufferServer like you were mentioning in the earlier email. Secondly why
> would changing where the data is sent to based mandate re-partitioning if
> the downstream partitions can handle data with different keys.
>
> On Thu, Feb 11, 2016 at 3:43 PM, Timothy Farkas <[email protected]>
> wrote:
>
>> Hey Pramod,
>>
>> I think in general and for recovery the existing Partitioning machinery
>> can
>> be reused to update the Stream Codec.
>> The reason why is because If the operator is Stateful and changes are made
>> to the Stream Codec, the state of the partitions will also have to be
>> repartitioned.
>> In this case the number of partitions will remain the same, just the state
>> of the partitions is reshuffled. The implementation for this state
>> reshuffling in a fault tolerant way is already handled by the Dynamic
>> Partitioning logic, so it could be extended to update the Stream Codec as
>> well.
>>
>> If the operator is Stateless, it may be possible to do without redeploying
>> any containers. But with the way I am envisioning it, I think there would
>> be a lot of difficult to handle corner cases for recovery.
>>
>> Tim
>>
>> On Thu, Feb 11, 2016 at 3:07 PM, Pramod Immaneni <[email protected]>
>> wrote:
>>
>> > Comment inline.
>> >
>> > On Thu, Feb 11, 2016 at 12:21 PM, Timothy Farkas <[email protected]>
>> > wrote:
>> >
>> > > +1 for the idea.
>> > >
>> > > Gaurav, this could be done idempotently in the same way that dynamic
>> > > repartitioning is done idempotently. All the partitions are rolled
>> back
>> > to
>> > > a common checkpoint and the new StreamCodec is applied starting then.
>> The
>> > > statistics that the Stream Codec are given are the statistics for the
>> > > windows computed before the common checkpoint that the partitions are
>> > > rolled back to.
>> > >
>> > > In fact I think this feature could be added easily by avoiding buffer
>> > > server entirely and by allowing the Partitioner to redefine the
>> > StreamCodec
>> > > for the operator when define partitions is called.
>> > >
>> >
>> > Are you saying this in context of recovery or in general?
>> >
>> >
>> > >
>> > > Thanks,
>> > > Tim
>> > >
>> > > On Thu, Feb 11, 2016 at 12:07 PM, Amol Kekre <[email protected]>
>> > wrote:
>> > >
>> > > > Gaurav,
>> > > > It would not be idempotent per partition, but will be across all
>> > > partitions
>> > > > combined. In this case the user would have explicitly asked for
>> such a
>> > > > pattern.
>> > > >
>> > > > Thks,
>> > > > Amol
>> > > >
>> > > >
>> > > > On Thu, Feb 11, 2016 at 12:04 PM, Gaurav Gupta <
>> > [email protected]
>> > > >
>> > > > wrote:
>> > > >
>> > > > > Pramod,
>> > > > >
>> > > > > How would it work with recovery? There could be cases where a
>> tuple
>> > > went
>> > > > to
>> > > > > P1 and post recovery it can go to P2
>> > > > >
>> > > > > Gaurav
>> > > > >
>> > > > > On Thu, Feb 11, 2016 at 11:56 AM, Pramod Immaneni <
>> > > > [email protected]>
>> > > > > wrote:
>> > > > >
>> > > > > > Hi,
>> > > > > >
>> > > > > > There are scenarios where the downstream partitions of an
>> upstream
>> > > > > operator
>> > > > > > are generally not performing uniformly resulting in an overall
>> > > > > sub-optimal
>> > > > > > performance dictated by the slowest partitions. The reasons
>> could
>> > be
>> > > > data
>> > > > > > related such as some partitions are receiving more data to
>> process
>> > > than
>> > > > > the
>> > > > > > others or could be environment related such as some partitions
>> are
>> > > > > running
>> > > > > > slower than others because they are on heavily loaded nodes.
>> > > > > >
>> > > > > > A solution based on currently available functionality in the
>> engine
>> > > > would
>> > > > > > be to write a StreamCodec implementation to distribute data
>> among
>> > the
>> > > > > > partitions such that each partition is receiving similar amount
>> of
>> > > data
>> > > > > to
>> > > > > > process. We should consider adding StreamCodecs like these to
>> the
>> > > > library
>> > > > > > but these however do not solve the problem when it is
>> environment
>> > > > > related.
>> > > > > >
>> > > > > > For that a better and more comprehensive approach would be look
>> at
>> > > how
>> > > > > data
>> > > > > > is being consumed by the downstream partitions from the
>> > BufferServer
>> > > > and
>> > > > > > use that information to make decisions on how to send future
>> data.
>> > If
>> > > > > some
>> > > > > > partitions are behind others in consuming data then data can be
>> > > > directed
>> > > > > to
>> > > > > > the other partitions. One way to do this would be to relay this
>> > type
>> > > of
>> > > > > > statistical and positional information from BufferServer to the
>> > > > upstream
>> > > > > > publishers. The publishers can use this information in ways
>> such as
>> > > > > making
>> > > > > > it available to StreamCodecs to affect destination of future
>> data.
>> > > > > >
>> > > > > > What do you think.
>> > > > > >
>> > > > > > Thanks
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to