Comments inline On Thu, Feb 11, 2016 at 4:13 PM, Timothy Farkas <[email protected]> wrote:
> Hey Pramod, > > I agree if APEX-339 is in place then it would work without redeploying > containers for operators that are Stateless, or a subset of Stateful > operators. > > Addressing your previous questions. > > - The StatsListener can be used to see how far behind operators are. You > could determine what window the operator is on, or the number of tuples > it's processed so far, or how long > it takes it to complete a window. > What if tuples are different sizes and number of tuples processed doesn't reflect how far ahead or behind a downstream partition is? How is the information from StatsListener made available to the upstream partition codecs. > > - Some examples of Stateful operators that require repartitioning of state > are the following: > - Deduper > In this case after updating the stream codec the operator may > allow a previously seen value to pass because the partition didn't receive > that value with the previous stream codec. > - A key value store that holds aggregations for each key. > In this case multiple partitions would hold partial aggregations > for a key, when they are expecting to hold the complete aggregation. > Agreed for deduper. For the second case a unifier is a better approach so that you are not affected by key skew in general. > > Tim > > On Thu, Feb 11, 2016 at 4:04 PM, Pramod Immaneni <[email protected]> > wrote: > > > Additionally it can be treated as a non-idempotent stream for recovery. > > Look at APEXCORE-339. In cases where the downstream partitions require > some > > key based partitioning, what you are suggesting would be a good approach > > but it will require more complex logic in the StreamCodec to both key and > > load based partitioning. > > > > On Thu, Feb 11, 2016 at 3:49 PM, Pramod Immaneni <[email protected] > > > > wrote: > > > > > How would you know how far behind partitions are without interacting > with > > > BufferServer like you were mentioning in the earlier email. Secondly > why > > > would changing where the data is sent to based mandate re-partitioning > if > > > the downstream partitions can handle data with different keys. > > > > > > On Thu, Feb 11, 2016 at 3:43 PM, Timothy Farkas <[email protected]> > > > wrote: > > > > > >> Hey Pramod, > > >> > > >> I think in general and for recovery the existing Partitioning > machinery > > >> can > > >> be reused to update the Stream Codec. > > >> The reason why is because If the operator is Stateful and changes are > > made > > >> to the Stream Codec, the state of the partitions will also have to be > > >> repartitioned. > > >> In this case the number of partitions will remain the same, just the > > state > > >> of the partitions is reshuffled. The implementation for this state > > >> reshuffling in a fault tolerant way is already handled by the Dynamic > > >> Partitioning logic, so it could be extended to update the Stream Codec > > as > > >> well. > > >> > > >> If the operator is Stateless, it may be possible to do without > > redeploying > > >> any containers. But with the way I am envisioning it, I think there > > would > > >> be a lot of difficult to handle corner cases for recovery. > > >> > > >> Tim > > >> > > >> On Thu, Feb 11, 2016 at 3:07 PM, Pramod Immaneni < > > [email protected]> > > >> wrote: > > >> > > >> > Comment inline. > > >> > > > >> > On Thu, Feb 11, 2016 at 12:21 PM, Timothy Farkas < > [email protected] > > > > > >> > wrote: > > >> > > > >> > > +1 for the idea. > > >> > > > > >> > > Gaurav, this could be done idempotently in the same way that > dynamic > > >> > > repartitioning is done idempotently. All the partitions are rolled > > >> back > > >> > to > > >> > > a common checkpoint and the new StreamCodec is applied starting > > then. > > >> The > > >> > > statistics that the Stream Codec are given are the statistics for > > the > > >> > > windows computed before the common checkpoint that the partitions > > are > > >> > > rolled back to. > > >> > > > > >> > > In fact I think this feature could be added easily by avoiding > > buffer > > >> > > server entirely and by allowing the Partitioner to redefine the > > >> > StreamCodec > > >> > > for the operator when define partitions is called. > > >> > > > > >> > > > >> > Are you saying this in context of recovery or in general? > > >> > > > >> > > > >> > > > > >> > > Thanks, > > >> > > Tim > > >> > > > > >> > > On Thu, Feb 11, 2016 at 12:07 PM, Amol Kekre < > [email protected]> > > >> > wrote: > > >> > > > > >> > > > Gaurav, > > >> > > > It would not be idempotent per partition, but will be across all > > >> > > partitions > > >> > > > combined. In this case the user would have explicitly asked for > > >> such a > > >> > > > pattern. > > >> > > > > > >> > > > Thks, > > >> > > > Amol > > >> > > > > > >> > > > > > >> > > > On Thu, Feb 11, 2016 at 12:04 PM, Gaurav Gupta < > > >> > [email protected] > > >> > > > > > >> > > > wrote: > > >> > > > > > >> > > > > Pramod, > > >> > > > > > > >> > > > > How would it work with recovery? There could be cases where a > > >> tuple > > >> > > went > > >> > > > to > > >> > > > > P1 and post recovery it can go to P2 > > >> > > > > > > >> > > > > Gaurav > > >> > > > > > > >> > > > > On Thu, Feb 11, 2016 at 11:56 AM, Pramod Immaneni < > > >> > > > [email protected]> > > >> > > > > wrote: > > >> > > > > > > >> > > > > > Hi, > > >> > > > > > > > >> > > > > > There are scenarios where the downstream partitions of an > > >> upstream > > >> > > > > operator > > >> > > > > > are generally not performing uniformly resulting in an > overall > > >> > > > > sub-optimal > > >> > > > > > performance dictated by the slowest partitions. The reasons > > >> could > > >> > be > > >> > > > data > > >> > > > > > related such as some partitions are receiving more data to > > >> process > > >> > > than > > >> > > > > the > > >> > > > > > others or could be environment related such as some > partitions > > >> are > > >> > > > > running > > >> > > > > > slower than others because they are on heavily loaded nodes. > > >> > > > > > > > >> > > > > > A solution based on currently available functionality in the > > >> engine > > >> > > > would > > >> > > > > > be to write a StreamCodec implementation to distribute data > > >> among > > >> > the > > >> > > > > > partitions such that each partition is receiving similar > > amount > > >> of > > >> > > data > > >> > > > > to > > >> > > > > > process. We should consider adding StreamCodecs like these > to > > >> the > > >> > > > library > > >> > > > > > but these however do not solve the problem when it is > > >> environment > > >> > > > > related. > > >> > > > > > > > >> > > > > > For that a better and more comprehensive approach would be > > look > > >> at > > >> > > how > > >> > > > > data > > >> > > > > > is being consumed by the downstream partitions from the > > >> > BufferServer > > >> > > > and > > >> > > > > > use that information to make decisions on how to send future > > >> data. > > >> > If > > >> > > > > some > > >> > > > > > partitions are behind others in consuming data then data can > > be > > >> > > > directed > > >> > > > > to > > >> > > > > > the other partitions. One way to do this would be to relay > > this > > >> > type > > >> > > of > > >> > > > > > statistical and positional information from BufferServer to > > the > > >> > > > upstream > > >> > > > > > publishers. The publishers can use this information in ways > > >> such as > > >> > > > > making > > >> > > > > > it available to StreamCodecs to affect destination of future > > >> data. > > >> > > > > > > > >> > > > > > What do you think. > > >> > > > > > > > >> > > > > > Thanks > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > > > > > > >
