Hey Vinoth, I started going over the KIP again yesterday. There are a lot of updates, and I didn't finish my feedback in one day. I'm working on it now.
Thanks, John On Thu, Oct 31, 2019 at 11:42 AM Vinoth Chandar <vchan...@confluent.io> wrote: > > Wondering if anyone has thoughts on these changes? I liked that the new > metadata fetch APIs provide all the information at once with consistent > naming.. > > Any guidance on what you would like to be discussed or fleshed out more > before we call a VOTE? > > On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar > <navinder_b...@yahoo.com.invalid> wrote: > > > Hi, > > We have made some edits in the KIP( > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance) > > after due deliberation on the agreed design to support the new query > > design. This includes the new public API to query offset/time lag > > information and other details related to querying standby tasks which have > > come up after thinking of thorough details. > > > > > > > > - Addition of new config, “lag.fetch.interval.ms” to configure the > > interval of time/offset lag > > - Addition of new class StoreLagInfo to store the periodically obtained > > time/offset lag > > - Addition of two new functions in KafkaStreams, List<StoreLagInfo> > > allLagInfo() and List<StoreLagInfo> lagInfoForStore(String storeName) to > > return the lag information for an instance and a store respectively > > - Addition of new class KeyQueryMetadata. We need topicPartition for > > each key to be matched with the lag API for the topic partition. One way is > > to add new functions and fetch topicPartition from StreamsMetadataState but > > we thought having one call and fetching StreamsMetadata and topicPartition > > is more cleaner. > > - > > Renaming partitionsForHost to activePartitionsForHost in > > StreamsMetadataState > > and partitionsByHostState to activePartitionsByHostState > > in StreamsPartitionAssignor > > - We have also added the pseudo code of how all the changes will exist > > together and support the new querying APIs > > > > Please let me know if anything is pending now, before a vote can be > > started on this. On Saturday, 26 October, 2019, 05:41:44 pm IST, Navinder > > Brar <navinder_b...@yahoo.com.invalid> wrote: > > > > >> Since there are two soft votes for separate active/standby API > > methods, I also change my position on that. Fine with 2 separate > > methods. Once we remove the lag information from these APIs, returning a > > List is less attractive, since the ordering has no special meaning now. > > Agreed, now that we are not returning lag, I am also sold on having two > > separate functions. We already have one which returns streamsMetadata for > > active tasks, and now we can add another one for standbys. > > > > > > > > On Saturday, 26 October, 2019, 03:55:16 am IST, Vinoth Chandar < > > vchan...@confluent.io> wrote: > > > > +1 to Sophie's suggestion. Having both lag in terms of time and offsets is > > good and makes for a more complete API. > > > > Since there are two soft votes for separate active/standby API methods, I > > also change my position on that. Fine with 2 separate methods. > > Once we remove the lag information from these APIs, returning a List is > > less attractive, since the ordering has no special meaning now. > > > > >> lag in offsets vs time: Having both, as suggested by Sophie would of > > course be best. What is a little unclear to me is, how in details are we > > going to compute both? > > @navinder may be next step is to flesh out these details and surface any > > larger changes we need to make if need be. > > > > Any other details we need to cover, before a VOTE can be called on this? > > > > > > On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck <bbej...@gmail.com> wrote: > > > > > I am jumping in a little late here. > > > > > > Overall I agree with the proposal to push decision making on what/how to > > > query in the query layer. > > > > > > For point 5 from above, I'm slightly in favor of having a new method, > > > "standbyMetadataForKey()" or something similar. > > > Because even if we return all tasks in one list, the user will still have > > > to perform some filtering to separate the different tasks, so I don't > > feel > > > making two calls is a burden, and IMHO makes things more transparent for > > > the user. > > > If the final vote is for using an "isActive" field, I'm good with that as > > > well. > > > > > > Just my 2 cents. > > > > > > On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar > > > <navinder_b...@yahoo.com.invalid> wrote: > > > > > > > I think now we are aligned on almost all the design parts. Summarising > > > > below what has been discussed above and we have a general consensus on. > > > > > > > > > > > > - Rather than broadcasting lag across all nodes at rebalancing/with > > > the > > > > heartbeat, we will just return a list of all available standby’s in the > > > > system and the user can make IQ query any of those nodes which will > > > return > > > > the response, and the lag and offset time. Based on which user can > > decide > > > > if he wants to return the response back or call another standby. > > > > - The current metadata query frequency will not change. It will be > > > the > > > > same as it does now, i.e. before each query. > > > > > > > > - For fetching list<StreamsMetadata> in StreamsMetadataState.java > > and > > > > List<QueryableStoreProvider> in StreamThreadStateStoreProvider.java > > > (which > > > > will return all active stores which are running/restoring and replica > > > > stores which are running), we will add new functions and not disturb > > the > > > > existing functions > > > > > > > > - There is no need to add new StreamsConfig for implementing this > > KIP > > > > > > > > - We will add standbyPartitionsByHost in AssignmentInfo and > > > > StreamsMetadataState which would change the existing rebuildMetadata() > > > and > > > > setPartitionsByHostState() > > > > > > > > > > > > > > > > If anyone has any more concerns please feel free to add. Post this I > > will > > > > be initiating a vote. > > > > ~Navinder > > > > > > > > On Friday, 25 October, 2019, 12:05:29 pm IST, Matthias J. Sax < > > > > matth...@confluent.io> wrote: > > > > > > > > Just to close the loop @Vinoth: > > > > > > > > > 1. IIUC John intends to add (or we can do this in this KIP) lag > > > > information > > > > > to AssignmentInfo, which gets sent to every participant. > > > > > > > > As explained by John, currently KIP-441 plans to only report the > > > > information to the leader. But I guess, with the new proposal to not > > > > broadcast this information anyway, this concern is invalidated anyway > > > > > > > > > 2. At-least I was under the assumption that it can be called per > > query, > > > > > since the API docs don't seem to suggest otherwise. Do you see any > > > > > potential issues if we call this every query? (we should benchmark > > this > > > > > nonetheless) > > > > > > > > I did not see a real issue if people refresh the metadata frequently, > > > > because it would be a local call. My main point was, that this would > > > > change the current usage pattern of the API, and we would clearly need > > > > to communicate this change. Similar to (1), this concern in invalidated > > > > anyway. > > > > > > > > > > > > @John: I think it's a great idea to get rid of reporting lag, and > > > > pushing the decision making process about "what to query" into the > > query > > > > serving layer itself. This simplifies the overall design of this KIP > > > > significantly, and actually aligns very well with the idea that Kafka > > > > Streams (as it is a library) should only provide the basic building > > > > block. Many of my raised questions are invalided by this. > > > > > > > > > > > > > > > > Some questions are still open though: > > > > > > > > > 10) Do we need to distinguish between active(restoring) and standby > > > > > tasks? Or could be treat both as the same? > > > > > > > > > > > > @Vinoth: about (5). I see your point about multiple calls vs a single > > > > call. I still slightly prefer multiple calls, but it's highly > > subjective > > > > and I would also be fine to add an #isActive() method. Would be good > > the > > > > get feedback from others. > > > > > > > > > > > > For (14), ie, lag in offsets vs time: Having both, as suggested by > > > > Sophie would of course be best. What is a little unclear to me is, how > > > > in details are we going to compute both? > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > > On 10/24/19 11:07 PM, Sophie Blee-Goldman wrote: > > > > > Just to chime in on the "report lag vs timestamp difference" issue, I > > > > would > > > > > actually advocate for both. As mentioned already, time difference is > > > > > probably a lot easier and/or more useful to reason about in terms of > > > > > "freshness" > > > > > of the state. But in the case when all queried stores are far behind, > > > lag > > > > > could > > > > > be used to estimate the recovery velocity. You can then get a (pretty > > > > rough) > > > > > idea of when a store might be ready, and wait until around then to > > > query > > > > > again. > > > > > > > > > > On Thu, Oct 24, 2019 at 9:53 PM Guozhang Wang <wangg...@gmail.com> > > > > wrote: > > > > > > > > > >> I think I agree with John's recent reasoning as well: instead of > > > letting > > > > >> the storeMetadataAPI to return the staleness information, letting > > the > > > > >> client to query either active or standby and letting standby query > > > > response > > > > >> to include both the values + timestamp (or lag, as in diffs of > > > > timestamps) > > > > >> would actually be more intuitive -- not only the streams client is > > > > simpler, > > > > >> from user's perspective they also do not need to periodically > > refresh > > > > their > > > > >> staleness information from the client, but only need to make > > decisions > > > > on > > > > >> the fly whenever they need to query. > > > > >> > > > > >> Again the standby replica then need to know the current active > > task's > > > > >> timestamp, which can be found from the log end record's encoded > > > > timestamp; > > > > >> today we standby tasks do not read that specific record, but only > > > > refresh > > > > >> its knowledge on the log end offset, but I think refreshing the > > latest > > > > >> record timestamp is not a very bad request to add on the standby > > > > replicas. > > > > >> > > > > >> > > > > >> Guozhang > > > > >> > > > > >> > > > > >> On Thu, Oct 24, 2019 at 5:43 PM Vinoth Chandar < > > vchan...@confluent.io > > > > > > > > >> wrote: > > > > >> > > > > >>> +1 As someone implementing a query routing layer, there is already > > a > > > > need > > > > >>> to have mechanisms in place to do healthchecks/failure detection to > > > > >> detect > > > > >>> failures for queries, while Streams rebalancing eventually kicks in > > > the > > > > >>> background. > > > > >>> So, pushing this complexity to the IQ client app keeps Streams > > > simpler > > > > as > > > > >>> well. IQs will be potentially issues at an order of magnitude more > > > > >>> frequently and it can achieve good freshness for the lag > > information. > > > > >>> > > > > >>> I would like to add however, that we would also need to introduce > > > apis > > > > in > > > > >>> KafkaStreams class, for obtaining lag information for all stores > > > local > > > > to > > > > >>> that host. This is for the IQs to relay back with the response/its > > > own > > > > >>> heartbeat mechanism. > > > > >>> > > > > >>> On Thu, Oct 24, 2019 at 3:12 PM John Roesler <j...@confluent.io> > > > > wrote: > > > > >>> > > > > >>>> Hi all, > > > > >>>> > > > > >>>> I've been mulling about this KIP, and I think I was on the wrong > > > track > > > > >>>> earlier with regard to task lags. Tl;dr: I don't think we should > > add > > > > >>>> lags at all to the metadata API (and also not to the > > AssignmentInfo > > > > >>>> protocol message). > > > > >>>> > > > > >>>> Like I mentioned early on, reporting lag via > > > > >>>> SubscriptionInfo/AssignmentInfo would only work while rebalances > > are > > > > >>>> happening. Once the group stabilizes, no members would be notified > > > of > > > > >>>> each others' lags anymore. I had been thinking that the solution > > > would > > > > >>>> be the heartbeat proposal I mentioned earlier, but that proposal > > > would > > > > >>>> have reported the heartbeats of the members only to the leader > > > member > > > > >>>> (the one who makes assignments). To be useful in the context of > > > _this_ > > > > >>>> KIP, we would also have to report the lags in the heartbeat > > > responses > > > > >>>> to of _all_ members. This is a concern to be because now _all_ the > > > > >>>> lags get reported to _all_ the members on _every_ heartbeat... a > > lot > > > > >>>> of chatter. > > > > >>>> > > > > >>>> Plus, the proposal for KIP-441 is only to report the lags of each > > > > >>>> _task_. This is the sum of the lags of all the stores in the > > tasks. > > > > >>>> But this would be insufficient for KIP-535. For this kip, we would > > > > >>>> want the lag specifically of the store we want to query. So this > > > > >>>> means, we have to report the lags of all the stores of all the > > > members > > > > >>>> to every member... even more chatter! > > > > >>>> > > > > >>>> The final nail in the coffin to me is that IQ clients would have > > to > > > > >>>> start refreshing their metadata quite frequently to stay up to > > date > > > on > > > > >>>> the lags, which adds even more overhead to the system. > > > > >>>> > > > > >>>> Consider a strawman alternative: we bring KIP-535 back to > > extending > > > > >>>> the metadata API to tell the client the active and standby > > replicas > > > > >>>> for the key in question (not including and "staleness/lag" > > > > >>>> restriction, just returning all the replicas). Then, the client > > > picks > > > > >>>> a replica and sends the query. The server returns the current lag > > > > >>>> along with the response (maybe in an HTML header or something). > > > Then, > > > > >>>> the client keeps a map of its last observed lags for each replica, > > > and > > > > >>>> uses this information to prefer fresher replicas. > > > > >>>> > > > > >>>> OR, if it wants only to query the active replica, it would throw > > an > > > > >>>> error on any lag response greater than zero, refreshes its > > metadata > > > by > > > > >>>> re-querying the metadata API, and tries again with the current > > > active > > > > >>>> replica. > > > > >>>> > > > > >>>> This way, the lag information will be super fresh for the client, > > > and > > > > >>>> we keep the Metadata API / Assignment,Subscription / and Heartbeat > > > as > > > > >>>> slim as possible. > > > > >>>> > > > > >>>> Side note: I do think that some time soon, we'll have to add a > > > library > > > > >>>> for IQ server/clients. I think that this logic will start to get > > > > >>>> pretty complex. > > > > >>>> > > > > >>>> I hope this thinking is reasonably clear! > > > > >>>> Thanks again, > > > > >>>> -John > > > > >>>> > > > > >>>> Does that > > > > >>>> > > > > >>>> On Wed, Oct 23, 2019 at 10:16 AM Vinoth Chandar < > > > > vchan...@confluent.io > > > > >>> > > > > >>>> wrote: > > > > >>>>> > > > > >>>>> Responding to the points raised by Matthias > > > > >>>>> > > > > >>>>> 1. IIUC John intends to add (or we can do this in this KIP) lag > > > > >>>> information > > > > >>>>> to AssignmentInfo, which gets sent to every participant. > > > > >>>>> > > > > >>>>> 2. At-least I was under the assumption that it can be called per > > > > >> query, > > > > >>>>> since the API docs don't seem to suggest otherwise. Do you see > > any > > > > >>>>> potential issues if we call this every query? (we should > > benchmark > > > > >> this > > > > >>>>> nonetheless) > > > > >>>>> > > > > >>>>> 4. Agree. metadataForKey() implicitly would return the active > > host > > > > >>>> metadata > > > > >>>>> (as it was before). We should also document this in that APIs > > > > >> javadoc, > > > > >>>>> given we have another method(s) that returns more host metadata > > > now. > > > > >>>>> > > > > >>>>> 5. While I see the point, the app/caller has to make two > > different > > > > >>> APIs > > > > >>>>> calls to obtain active/standby and potentially do the same set of > > > > >>>> operation > > > > >>>>> to query the state. I personally still like a method like > > > isActive() > > > > >>>>> better, but don't have strong opinions. > > > > >>>>> > > > > >>>>> 9. If we do expose the lag information, could we just leave it > > upto > > > > >> to > > > > >>>> the > > > > >>>>> caller to decide whether it errors out or not and not make the > > > > >> decision > > > > >>>>> within Streams? i.e we don't need a new config > > > > >>>>> > > > > >>>>> 14. +1 . If it's easier to do right away. We started with number > > of > > > > >>>>> records, following the lead from KIP-441 > > > > >>>>> > > > > >>>>> On Wed, Oct 23, 2019 at 5:44 AM Navinder Brar > > > > >>>>> <navinder_b...@yahoo.com.invalid> wrote: > > > > >>>>> > > > > >>>>>> > > > > >>>>>> Thanks, everyone for taking a look. Some very cool ideas have > > > flown > > > > >>> in. > > > > >>>>>> > > > > >>>>>>>> There was a follow-on idea I POCed to continuously share lag > > > > >>>>>> information in the heartbeat protocol+1 that would be great, I > > > will > > > > >>>> update > > > > >>>>>> the KIP assuming this work will finish soon > > > > >>>>>>>> I think that adding a new method to StreamsMetadataState and > > > > >>>>>> deprecating the existing method isthe best way to go; we just > > > can't > > > > >>>> change > > > > >>>>>> the return types of any existing methods.+1 on this, we will add > > > > >> new > > > > >>>>>> methods for users who would be interested in querying back a > > list > > > > >> of > > > > >>>>>> possible options to query from and leave the current function > > > > >>>>>> getStreamsMetadataForKey() untouched for users who want absolute > > > > >>>>>> consistency. > > > > >>>>>>>> why not just always return all available metadata (including > > > > >>>>>> active/standby or lag) and let the caller decide to which node > > > they > > > > >>>> want to > > > > >>>>>> route the query+1. I think this makes sense as from a user > > > > >> standpoint > > > > >>>> there > > > > >>>>>> is no difference b/w an active and a standby if both have same > > > lag, > > > > >>>> Infact > > > > >>>>>> users would be able to use this API to reduce query load on > > > > >> actives, > > > > >>> so > > > > >>>>>> returning all available options along with the current lag in > > each > > > > >>>> would > > > > >>>>>> make sense and leave it to user how they want to use this data. > > > > >> This > > > > >>>> has > > > > >>>>>> another added advantage. If a user queries any random machine > > for > > > a > > > > >>>> key and > > > > >>>>>> that machine has a replica for the partition(where key belongs) > > > > >> user > > > > >>>> might > > > > >>>>>> choose to serve the data from there itself(if it doesn’t lag > > much) > > > > >>>> rather > > > > >>>>>> than finding the active and making an IQ to that. This would > > save > > > > >>> some > > > > >>>>>> critical time in serving for some applications. > > > > >>>>>>>> Adding the lag in terms of timestamp diff comparing the > > > > >> committed > > > > >>>>>> offset.+1 on this, I think it’s more readable. But as John said > > > the > > > > >>>>>> function allMetadataForKey() is just returning the possible > > > options > > > > >>>> from > > > > >>>>>> where users can query a key, so we can even drop the parameter > > > > >>>>>> enableReplicaServing/tolerableDataStaleness and just return all > > > the > > > > >>>>>> streamsMetadata containing that key along with the offset limit. > > > > >>>>>> Answering the questions posted by Matthias in sequence. > > > > >>>>>> 1. @John can you please comment on this one.2. Yeah the usage > > > > >> pattern > > > > >>>>>> would include querying this prior to every request 3. Will add > > the > > > > >>>> changes > > > > >>>>>> to StreamsMetadata in the KIP, would include changes in > > > > >>>> rebuildMetadata() > > > > >>>>>> etc.4. Makes sense, already addressed above5. Is it important > > from > > > > >> a > > > > >>>> user > > > > >>>>>> perspective if they are querying an active(processing), > > > > >>>> active(restoring), > > > > >>>>>> a standby task if we have away of denoting lag in a readable > > > manner > > > > >>>> which > > > > >>>>>> kind of signifies the user that this is the best node to query > > the > > > > >>>> fresh > > > > >>>>>> data.6. Yes, I intend to return the actives and replicas in the > > > > >> same > > > > >>>> return > > > > >>>>>> list in allMetadataForKey()7. tricky8. yes, we need new > > functions > > > > >> to > > > > >>>> return > > > > >>>>>> activeRestoring and standbyRunning tasks.9. StreamsConfig > > doesn’t > > > > >>> look > > > > >>>> like > > > > >>>>>> of much use to me since we are giving all possible options via > > > this > > > > >>>>>> function, or they can use existing function > > > > >>> getStreamsMetadataForKey() > > > > >>>> and > > > > >>>>>> get just the active10. I think treat them both the same and let > > > the > > > > >>>> lag do > > > > >>>>>> the talking11. We are just sending them the option to query from > > > in > > > > >>>>>> allMetadataForKey(), which doesn’t include any handle. We then > > > > >> query > > > > >>>> that > > > > >>>>>> machine for the key where it calls allStores() and tries to find > > > > >> the > > > > >>>> task > > > > >>>>>> in activeRunning/activeRestoring/standbyRunning and adds the > > store > > > > >>>> handle > > > > >>>>>> here. 12. Need to verify, but during the exact point when store > > is > > > > >>>> closed > > > > >>>>>> to transition it from restoring to running the queries will > > fail. > > > > >> The > > > > >>>>>> caller in such case can have their own configurable retries to > > > > >> check > > > > >>>> again > > > > >>>>>> or try the replica if a call fails to active13. I think KIP-216 > > is > > > > >>>> working > > > > >>>>>> on those lines, we might not need few of those exceptions since > > > now > > > > >>> the > > > > >>>>>> basic idea of this KIP is to support IQ during rebalancing.14. > > > > >>>> Addressed > > > > >>>>>> above, agreed it looks more readable. > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> On Tuesday, 22 October, 2019, 08:39:07 pm IST, Matthias J. > > Sax > > > > >> < > > > > >>>>>> matth...@confluent.io> wrote: > > > > >>>>>> > > > > >>>>>> One more thought: > > > > >>>>>> > > > > >>>>>> 14) Is specifying the allowed lag in number of records a useful > > > way > > > > >>> for > > > > >>>>>> users to declare how stale an instance is allowed to be? Would > > it > > > > >> be > > > > >>>>>> more intuitive for users to specify the allowed lag in time > > units > > > > >>>> (would > > > > >>>>>> event time or processing time be better)? It seems hard for > > users > > > > >> to > > > > >>>>>> reason how "fresh" a store really is when number of records is > > > > >> used. > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> -Matthias > > > > >>>>>> > > > > >>>>>> On 10/21/19 9:02 PM, Matthias J. Sax wrote: > > > > >>>>>>> Some more follow up thoughts: > > > > >>>>>>> > > > > >>>>>>> 11) If we get a store handle of an active(restoring) task, and > > > > >> the > > > > >>>> task > > > > >>>>>>> transits to running, does the store handle become invalid and a > > > > >> new > > > > >>>> one > > > > >>>>>>> must be retrieved? Or can we "switch it out" underneath -- for > > > > >> this > > > > >>>>>>> case, how does the user know when they start to query the > > > > >>> up-to-date > > > > >>>>>> state? > > > > >>>>>>> > > > > >>>>>>> 12) Standby tasks will have the store open in regular mode, > > while > > > > >>>>>>> active(restoring) tasks open stores in "upgrade mode" for more > > > > >>>> efficient > > > > >>>>>>> bulk loading. When we switch the store into active mode, we > > close > > > > >>> it > > > > >>>> and > > > > >>>>>>> reopen it. What is the impact if we query the store during > > > > >> restore? > > > > >>>> What > > > > >>>>>>> is the impact if we close the store to transit to running (eg, > > > > >>> there > > > > >>>>>>> might be open iterators)? > > > > >>>>>>> > > > > >>>>>>> 13) Do we need to introduced new exception types? Compare > > KIP-216 > > > > >>>>>>> ( > > > > >>>>>> > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors > > > > >>>>>> ) > > > > >>>>>>> that aims to improve the user experience with regard to IQ > > > > >>>> exceptions. > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> -Matthias > > > > >>>>>>> > > > > >>>>>>> On 10/21/19 6:39 PM, Matthias J. Sax wrote: > > > > >>>>>>>> Thanks for the KIP. > > > > >>>>>>>> > > > > >>>>>>>> Couple of comments: > > > > >>>>>>>> > > > > >>>>>>>> 1) With regard to KIP-441, my current understanding is that > > the > > > > >>> lag > > > > >>>>>>>> information is only reported to the leader (please correct me > > > > >> if I > > > > >>>> am > > > > >>>>>>>> wrong). This seems to be quite a limitation to actually use > > the > > > > >>> lag > > > > >>>>>>>> information. > > > > >>>>>>>> > > > > >>>>>>>> 2) The idea of the metadata API is actually to get metadata > > once > > > > >>> and > > > > >>>>>>>> only refresh the metadata if a store was migrated. The current > > > > >>>> proposal > > > > >>>>>>>> would require to get the metadata before each query. The KIP > > > > >>> should > > > > >>>>>>>> describe the usage pattern and impact in more detail. > > > > >>>>>>>> > > > > >>>>>>>> 3) Currently, the KIP does not list the public API changes in > > > > >>>> detail. > > > > >>>>>>>> Please list all methods you intend to deprecate and list all > > > > >>>> methods you > > > > >>>>>>>> intend to add (best, using a code-block markup -- compare > > > > >>>>>>>> > > > > >>>>>> > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-470%3A+TopologyTestDriver+test+input+and+output+usability+improvements > > > > >>>>>>>> as an example) > > > > >>>>>>>> > > > > >>>>>>>> 4) Also note (as already pointed out by John), that we cannot > > > > >> have > > > > >>>> any > > > > >>>>>>>> breaking API changes. Thus, the API should be designed in a > > > > >> fully > > > > >>>>>>>> backward compatible manner. > > > > >>>>>>>> > > > > >>>>>>>> 5) Returning a list of metadata object makes it hard for user > > to > > > > >>>> know if > > > > >>>>>>>> the first object refers to the active(processing), > > > > >>>> active(restoring), or > > > > >>>>>>>> a standby task. IMHO, we should be more explicit. For > > example, a > > > > >>>>>>>> metadata object could have a flag that one can test via > > > > >>>> `#isActive()`. > > > > >>>>>>>> Or maybe even better, we could keep the current API as-is and > > > > >> add > > > > >>>>>>>> something like `standbyMetadataForKey()` (and similar methods > > > > >> for > > > > >>>>>>>> other). Having just a flag `isActive()` is a little subtle and > > > > >>>> having > > > > >>>>>>>> new overloads would make the API much clearer (passing in a > > > > >>> boolean > > > > >>>> flag > > > > >>>>>>>> does not seem to be a nice API). > > > > >>>>>>>> > > > > >>>>>>>> 6) Do you intent to return all standby metadata information at > > > > >>> once, > > > > >>>>>>>> similar to `allMetadata()` -- seems to be useful. > > > > >>>>>>>> > > > > >>>>>>>> 7) Even if the lag information is propagated to all instances, > > > > >> it > > > > >>>> will > > > > >>>>>>>> happen in an async manner. Hence, I am wondering if we should > > > > >>>> address > > > > >>>>>>>> this race condition (I think we should). The idea would be to > > > > >>> check > > > > >>>> if a > > > > >>>>>>>> standby/active(restoring) task is actually still within the > > lag > > > > >>>> bounds > > > > >>>>>>>> when a query is executed and we would throw an exception if > > not. > > > > >>>>>>>> > > > > >>>>>>>> 8) The current `KafkaStreams#state()` method only returns a > > > > >> handle > > > > >>>> to > > > > >>>>>>>> stores of active(processing) tasks. How can a user actually > > get > > > > >> a > > > > >>>> handle > > > > >>>>>>>> to an store of an active(restoring) or standby task for > > > > >> querying? > > > > >>>> Seems > > > > >>>>>>>> we should add a new method to get standby handles? Changing > > the > > > > >>>>>>>> semantics to existing `state()` would be possible, but I think > > > > >>>> adding a > > > > >>>>>>>> new method is preferable? > > > > >>>>>>>> > > > > >>>>>>>> 9) How does the user actually specify the acceptable lag? A > > > > >> global > > > > >>>>>>>> config via StreamsConfig (this would be a public API change > > that > > > > >>>> needs > > > > >>>>>>>> to be covered in the KIP)? Or on a per-store or even per-query > > > > >>>> basis for > > > > >>>>>>>> more flexibility? We could also have a global setting that is > > > > >> used > > > > >>>> as > > > > >>>>>>>> default and allow to overwrite it on a per-query basis. > > > > >>>>>>>> > > > > >>>>>>>> 10) Do we need to distinguish between active(restoring) and > > > > >>> standby > > > > >>>>>>>> tasks? Or could be treat both as the same? > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> -Matthias > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> On 10/21/19 5:40 PM, Vinoth Chandar wrote: > > > > >>>>>>>>>>> I'm wondering, rather than putting "acceptable lag" into > > the > > > > >>>>>>>>> configuration at all, or even making it a parameter on > > > > >>>>>> `allMetadataForKey`, > > > > >>>>>>>>> why not just _always_ return all available metadata > > (including > > > > >>>>>>>>> active/standby or lag) and let the caller decide to which > > node > > > > >>> they > > > > >>>>>> want to > > > > >>>>>>>>> route the query? > > > > >>>>>>>>> +1 on exposing lag information via the APIs. IMO without > > having > > > > >>>>>>>>> continuously updated/fresh lag information, its true value > > as a > > > > >>>> signal > > > > >>>>>> for > > > > >>>>>>>>> query routing decisions is much limited. But we can design > > the > > > > >>> API > > > > >>>>>> around > > > > >>>>>>>>> this model and iterate? Longer term, we should have > > > > >> continuously > > > > >>>>>> shared lag > > > > >>>>>>>>> information. > > > > >>>>>>>>> > > > > >>>>>>>>>>> more general to refactor it to "allMetadataForKey(long > > > > >>>>>>>>> tolerableDataStaleness, ...)", and when it's set to 0 it > > means > > > > >>>> "active > > > > >>>>>> task > > > > >>>>>>>>> only". > > > > >>>>>>>>> +1 IMO if we plan on having `enableReplicaServing`, it makes > > > > >>> sense > > > > >>>> to > > > > >>>>>>>>> generalize based on dataStaleness. This seems complementary > > to > > > > >>>>>> exposing the > > > > >>>>>>>>> lag information itself. > > > > >>>>>>>>> > > > > >>>>>>>>>>> This is actually not a public api change at all, and I'm > > > > >>>> planning to > > > > >>>>>>>>> implement it asap as a precursor to the rest of KIP-441 > > > > >>>>>>>>> +1 again. Do we have a concrete timeline for when this change > > > > >>> will > > > > >>>>>> land on > > > > >>>>>>>>> master? I would like to get the implementation wrapped up (as > > > > >>> much > > > > >>>> as > > > > >>>>>>>>> possible) by end of the month. :). But I agree this > > sequencing > > > > >>>> makes > > > > >>>>>>>>> sense.. > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>>> On Mon, Oct 21, 2019 at 2:56 PM Guozhang Wang < > > > > >>> wangg...@gmail.com> > > > > >>>>>> wrote: > > > > >>>>>>>>> > > > > >>>>>>>>>> Hi Navinder, > > > > >>>>>>>>>> > > > > >>>>>>>>>> Thanks for the KIP, I have a high level question about the > > > > >>>> proposed > > > > >>>>>> API > > > > >>>>>>>>>> regarding: > > > > >>>>>>>>>> > > > > >>>>>>>>>> "StreamsMetadataState::allMetadataForKey(boolean > > > > >>>>>> enableReplicaServing...)" > > > > >>>>>>>>>> > > > > >>>>>>>>>> I'm wondering if it's more general to refactor it to > > > > >>>>>>>>>> "allMetadataForKey(long tolerableDataStaleness, ...)", and > > > > >> when > > > > >>>> it's > > > > >>>>>> set to > > > > >>>>>>>>>> 0 it means "active task only". Behind the scene, we can have > > > > >> the > > > > >>>>>> committed > > > > >>>>>>>>>> offsets to encode the stream time as well, so that when > > > > >>> processing > > > > >>>>>> standby > > > > >>>>>>>>>> tasks the stream process knows not long the lag in terms of > > > > >>>> offsets > > > > >>>>>>>>>> comparing to the committed offset (internally we call it > > > > >> offset > > > > >>>>>> limit), but > > > > >>>>>>>>>> also the lag in terms of timestamp diff comparing the > > > > >> committed > > > > >>>>>> offset. > > > > >>>>>>>>>> > > > > >>>>>>>>>> Also encoding the timestamp as part of offset have other > > > > >>> benefits > > > > >>>> for > > > > >>>>>>>>>> improving Kafka Streams time semantics as well, but for > > > > >> KIP-535 > > > > >>>>>> itself I > > > > >>>>>>>>>> think it can help giving users a more intuitive interface to > > > > >>>> reason > > > > >>>>>> about. > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>> Guozhang > > > > >>>>>>>>>> > > > > >>>>>>>>>> On Mon, Oct 21, 2019 at 12:30 PM John Roesler < > > > > >>> j...@confluent.io> > > > > >>>>>> wrote: > > > > >>>>>>>>>> > > > > >>>>>>>>>>> Hey Navinder, > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Thanks for the KIP! I've been reading over the discussion > > > > >> thus > > > > >>>> far, > > > > >>>>>>>>>>> and I have a couple of thoughts to pile on as well: > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> It seems confusing to propose the API in terms of the > > current > > > > >>>> system > > > > >>>>>>>>>>> state, but also propose how the API would look if/when > > > > >> KIP-441 > > > > >>> is > > > > >>>>>>>>>>> implemented. It occurs to me that the only part of KIP-441 > > > > >> that > > > > >>>> would > > > > >>>>>>>>>>> affect you is the availability of the lag information in > > the > > > > >>>>>>>>>>> SubscriptionInfo message. This is actually not a public api > > > > >>>> change at > > > > >>>>>>>>>>> all, and I'm planning to implement it asap as a precursor > > to > > > > >>> the > > > > >>>> rest > > > > >>>>>>>>>>> of KIP-441, so maybe you can just build on top of KIP-441 > > and > > > > >>>> assume > > > > >>>>>>>>>>> the lag information will be available. Then you could have > > a > > > > >>> more > > > > >>>>>>>>>>> straightforward proposal (e.g., mention that you'd return > > the > > > > >>> lag > > > > >>>>>>>>>>> information in AssignmentInfo as well as in the > > > > >> StreamsMetadata > > > > >>>> in > > > > >>>>>>>>>>> some form, or make use of it in the API somehow). > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> I'm partially motivated in that former point because it > > seems > > > > >>>> like > > > > >>>>>>>>>>> understanding how callers would bound the staleness for > > their > > > > >>> use > > > > >>>>>> case > > > > >>>>>>>>>>> is _the_ key point for this KIP. FWIW, I think that adding > > a > > > > >>> new > > > > >>>>>>>>>>> method to StreamsMetadataState and deprecating the existing > > > > >>>> method is > > > > >>>>>>>>>>> the best way to go; we just can't change the return types > > of > > > > >>> any > > > > >>>>>>>>>>> existing methods. > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> I'm wondering, rather than putting "acceptable lag" into > > the > > > > >>>>>>>>>>> configuration at all, or even making it a parameter on > > > > >>>>>>>>>>> `allMetadataForKey`, why not just _always_ return all > > > > >> available > > > > >>>>>>>>>>> metadata (including active/standby or lag) and let the > > caller > > > > >>>> decide > > > > >>>>>>>>>>> to which node they want to route the query? This method > > isn't > > > > >>>> making > > > > >>>>>>>>>>> any queries itself; it's merely telling you where the local > > > > >>>> Streams > > > > >>>>>>>>>>> instance _thinks_ the key in question is located. Just > > > > >>> returning > > > > >>>> all > > > > >>>>>>>>>>> available information lets the caller implement any > > semantics > > > > >>>> they > > > > >>>>>>>>>>> desire around querying only active stores, or standbys, or > > > > >>>> recovering > > > > >>>>>>>>>>> stores, or whatever. > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> One fly in the ointment, which you may wish to consider if > > > > >>>> proposing > > > > >>>>>>>>>>> to use lag information, is that the cluster would only > > become > > > > >>>> aware > > > > >>>>>> of > > > > >>>>>>>>>>> new lag information during rebalances. Even in the full > > > > >>>> expression of > > > > >>>>>>>>>>> KIP-441, this information would stop being propagated when > > > > >> the > > > > >>>>>> cluster > > > > >>>>>>>>>>> achieves a balanced task distribution. There was a > > follow-on > > > > >>>> idea I > > > > >>>>>>>>>>> POCed to continuously share lag information in the > > heartbeat > > > > >>>>>> protocol, > > > > >>>>>>>>>>> which you might be interested in, if you want to make sure > > > > >> that > > > > >>>> nodes > > > > >>>>>>>>>>> are basically _always_ aware of each others' lag on > > different > > > > >>>>>>>>>>> partitions: https://github.com/apache/kafka/pull/7096 > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Thanks again! > > > > >>>>>>>>>>> -John > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> On Sat, Oct 19, 2019 at 6:06 AM Navinder Brar > > > > >>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Thanks, Vinoth. Looks like we are on the same page. I will > > > > >> add > > > > >>>> some > > > > >>>>>> of > > > > >>>>>>>>>>> these explanations to the KIP as well. Have assigned the > > > > >>>> KAFKA-6144 > > > > >>>>>> to > > > > >>>>>>>>>>> myself and KAFKA-8994 is closed(by you). As suggested, we > > > > >> will > > > > >>>>>> replace > > > > >>>>>>>>>>> "replica" with "standby". > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> In the new API, > > > > >>> "StreamsMetadataState::allMetadataForKey(boolean > > > > >>>>>>>>>>> enableReplicaServing, String storeName, K key, > > Serializer<K> > > > > >>>>>>>>>>> keySerializer)" Do we really need a per key configuration? > > > > >> or a > > > > >>>> new > > > > >>>>>>>>>>> StreamsConfig is good enough?>> Coming from experience, > > when > > > > >>>> teams > > > > >>>>>> are > > > > >>>>>>>>>>> building a platform with Kafka Streams and these API's > > serve > > > > >>>> data to > > > > >>>>>>>>>>> multiple teams, we can't have a generalized config that > > says > > > > >>> as a > > > > >>>>>>>>>> platform > > > > >>>>>>>>>>> we will support stale reads or not. It should be the choice > > > > >> of > > > > >>>>>> someone > > > > >>>>>>>>>> who > > > > >>>>>>>>>>> is calling the API's to choose whether they are ok with > > stale > > > > >>>> reads > > > > >>>>>> or > > > > >>>>>>>>>> not. > > > > >>>>>>>>>>> Makes sense? > > > > >>>>>>>>>>>> On Thursday, 17 October, 2019, 11:56:02 pm IST, Vinoth > > > > >>>> Chandar < > > > > >>>>>>>>>>> vchan...@confluent.io> wrote: > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Looks like we are covering ground :) > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Only if it is within a permissible range(say 10000) we > > > > >> will > > > > >>>> serve > > > > >>>>>>>>>> from > > > > >>>>>>>>>>>> Restoring state of active. > > > > >>>>>>>>>>>> +1 on having a knob like this.. My reasoning is as > > follows. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Looking at the Streams state as a read-only distributed kv > > > > >>>> store. > > > > >>>>>> With > > > > >>>>>>>>>>>> num_standby = f , we should be able to tolerate f failures > > > > >> and > > > > >>>> if > > > > >>>>>> there > > > > >>>>>>>>>>> is > > > > >>>>>>>>>>>> a f+1' failure, the system should be unavailable. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> A) So with num_standby=0, the system should be unavailable > > > > >>> even > > > > >>>> if > > > > >>>>>>>>>> there > > > > >>>>>>>>>>> is > > > > >>>>>>>>>>>> 1 failure and thats my argument for not allowing querying > > in > > > > >>>>>>>>>> restoration > > > > >>>>>>>>>>>> state, esp in this case it will be a total rebuild of the > > > > >>> state > > > > >>>>>> (which > > > > >>>>>>>>>>> IMO > > > > >>>>>>>>>>>> cannot be considered a normal fault free operational > > state). > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> B) Even there are standby's, say num_standby=2, if the > > user > > > > >>>> decides > > > > >>>>>> to > > > > >>>>>>>>>>> shut > > > > >>>>>>>>>>>> down all 3 instances, then only outcome should be > > > > >>> unavailability > > > > >>>>>> until > > > > >>>>>>>>>>> all > > > > >>>>>>>>>>>> of them come back or state is rebuilt on other nodes in > > the > > > > >>>>>> cluster. In > > > > >>>>>>>>>>>> normal operations, f <= 2 and when a failure does happen > > we > > > > >>> can > > > > >>>> then > > > > >>>>>>>>>>> either > > > > >>>>>>>>>>>> choose to be C over A and fail IQs until replication is > > > > >> fully > > > > >>>>>> caught up > > > > >>>>>>>>>>> or > > > > >>>>>>>>>>>> choose A over C by serving in restoring state as long as > > lag > > > > >>> is > > > > >>>>>>>>>> minimal. > > > > >>>>>>>>>>> If > > > > >>>>>>>>>>>> even with f=1 say, all the standbys are lagging a lot due > > to > > > > >>>> some > > > > >>>>>>>>>> issue, > > > > >>>>>>>>>>>> then that should be considered a failure since that is > > > > >>> different > > > > >>>>>> from > > > > >>>>>>>>>>>> normal/expected operational mode. Serving reads with > > > > >> unbounded > > > > >>>>>>>>>>> replication > > > > >>>>>>>>>>>> lag and calling it "available" may not be very usable or > > > > >> even > > > > >>>>>> desirable > > > > >>>>>>>>>>> :) > > > > >>>>>>>>>>>> IMHO, since it gives the user no way to reason about the > > app > > > > >>>> that is > > > > >>>>>>>>>>> going > > > > >>>>>>>>>>>> to query this store. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> So there is definitely a need to distinguish between : > > > > >>>> Replication > > > > >>>>>>>>>>> catchup > > > > >>>>>>>>>>>> while being in fault free state vs Restoration of state > > when > > > > >>> we > > > > >>>> lose > > > > >>>>>>>>>> more > > > > >>>>>>>>>>>> than f standbys. This knob is a great starting point > > towards > > > > >>>> this. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> If you agree with some of the explanation above, please > > feel > > > > >>>> free to > > > > >>>>>>>>>>>> include it in the KIP as well since this is sort of our > > > > >> design > > > > >>>>>>>>>> principle > > > > >>>>>>>>>>>> here.. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Small nits : > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> - let's standardize on "standby" instead of "replica", KIP > > > > >> or > > > > >>>>>> code, to > > > > >>>>>>>>>>> be > > > > >>>>>>>>>>>> consistent with rest of Streams code/docs? > > > > >>>>>>>>>>>> - Can we merge KAFKA-8994 into KAFKA-6144 now and close > > the > > > > >>>> former? > > > > >>>>>>>>>>>> Eventually need to consolidate KAFKA-6555 as well > > > > >>>>>>>>>>>> - In the new API, > > > > >>>> "StreamsMetadataState::allMetadataForKey(boolean > > > > >>>>>>>>>>>> enableReplicaServing, String storeName, K key, > > Serializer<K> > > > > >>>>>>>>>>> keySerializer)" Do > > > > >>>>>>>>>>>> we really need a per key configuration? or a new > > > > >> StreamsConfig > > > > >>>> is > > > > >>>>>> good > > > > >>>>>>>>>>>> enough? > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> On Wed, Oct 16, 2019 at 8:31 PM Navinder Brar > > > > >>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>>> @Vinoth, I have incorporated a few of the discussions we > > > > >> have > > > > >>>> had > > > > >>>>>> in > > > > >>>>>>>>>>> the > > > > >>>>>>>>>>>>> KIP. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> In the current code, t0 and t1 serve queries from > > > > >>>> Active(Running) > > > > >>>>>>>>>>>>> partition. For case t2, we are planning to return > > > > >>>>>>>>>> List<StreamsMetadata> > > > > >>>>>>>>>>>>> such that it returns <StreamsMetadata(A), > > > > >> StreamsMetadata(B)> > > > > >>>> so > > > > >>>>>> that > > > > >>>>>>>>>>> if IQ > > > > >>>>>>>>>>>>> fails on A, the replica on B can serve the data by > > enabling > > > > >>>> serving > > > > >>>>>>>>>>> from > > > > >>>>>>>>>>>>> replicas. This still does not solve case t3 and t4 since > > B > > > > >>> has > > > > >>>> been > > > > >>>>>>>>>>>>> promoted to active but it is in Restoring state to > > catchup > > > > >>>> till A’s > > > > >>>>>>>>>>> last > > > > >>>>>>>>>>>>> committed position as we don’t serve from Restoring state > > > > >> in > > > > >>>> Active > > > > >>>>>>>>>>> and new > > > > >>>>>>>>>>>>> Replica on R is building itself from scratch. Both these > > > > >>> cases > > > > >>>> can > > > > >>>>>> be > > > > >>>>>>>>>>>>> solved if we start serving from Restoring state of active > > > > >> as > > > > >>>> well > > > > >>>>>>>>>>> since it > > > > >>>>>>>>>>>>> is almost equivalent to previous Active. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> There could be a case where all replicas of a partition > > > > >>> become > > > > >>>>>>>>>>> unavailable > > > > >>>>>>>>>>>>> and active and all replicas of that partition are > > building > > > > >>>>>> themselves > > > > >>>>>>>>>>> from > > > > >>>>>>>>>>>>> scratch, in this case, the state in Active is far behind > > > > >> even > > > > >>>>>> though > > > > >>>>>>>>>>> it is > > > > >>>>>>>>>>>>> in Restoring state. To cater to such cases that we don’t > > > > >>> serve > > > > >>>> from > > > > >>>>>>>>>>> this > > > > >>>>>>>>>>>>> state we can either add another state before Restoring or > > > > >>>> check the > > > > >>>>>>>>>>>>> difference between last committed offset and current > > > > >>> position. > > > > >>>> Only > > > > >>>>>>>>>> if > > > > >>>>>>>>>>> it > > > > >>>>>>>>>>>>> is within a permissible range (say 10000) we will serve > > > > >> from > > > > >>>>>>>>>> Restoring > > > > >>>>>>>>>>> the > > > > >>>>>>>>>>>>> state of Active. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> On Wednesday, 16 October, 2019, 10:01:35 pm IST, > > Vinoth > > > > >>>> Chandar > > > > >>>>>> < > > > > >>>>>>>>>>>>> vchan...@confluent.io> wrote: > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Thanks for the updates on the KIP, Navinder! > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Few comments > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> - AssignmentInfo is not public API?. But we will change > > it > > > > >>> and > > > > >>>> thus > > > > >>>>>>>>>>> need to > > > > >>>>>>>>>>>>> increment the version and test for version_probing etc. > > > > >> Good > > > > >>> to > > > > >>>>>>>>>>> separate > > > > >>>>>>>>>>>>> that from StreamsMetadata changes (which is public API) > > > > >>>>>>>>>>>>> - From what I see, there is going to be choice between > > the > > > > >>>>>> following > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> A) introducing a new *KafkaStreams::allMetadataForKey() > > > > >> *API > > > > >>>> that > > > > >>>>>>>>>>>>> potentially returns List<StreamsMetadata> ordered from > > most > > > > >>>> upto > > > > >>>>>> date > > > > >>>>>>>>>>> to > > > > >>>>>>>>>>>>> least upto date replicas. Today we cannot fully implement > > > > >>> this > > > > >>>>>>>>>>> ordering, > > > > >>>>>>>>>>>>> since all we know is which hosts are active and which are > > > > >>>> standbys. > > > > >>>>>>>>>>>>> However, this aligns well with the future. KIP-441 adds > > the > > > > >>> lag > > > > >>>>>>>>>>> information > > > > >>>>>>>>>>>>> to the rebalancing protocol. We could also sort replicas > > > > >>> based > > > > >>>> on > > > > >>>>>> the > > > > >>>>>>>>>>>>> report lags eventually. This is fully backwards > > compatible > > > > >>> with > > > > >>>>>>>>>>> existing > > > > >>>>>>>>>>>>> clients. Only drawback I see is the naming of the > > existing > > > > >>>> method > > > > >>>>>>>>>>>>> KafkaStreams::metadataForKey, not conveying the > > distinction > > > > >>>> that it > > > > >>>>>>>>>>> simply > > > > >>>>>>>>>>>>> returns the active replica i.e allMetadataForKey.get(0). > > > > >>>>>>>>>>>>> B) Change KafkaStreams::metadataForKey() to return a > > List. > > > > >>>> Its a > > > > >>>>>>>>>>> breaking > > > > >>>>>>>>>>>>> change. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> I prefer A, since none of the semantics/behavior changes > > > > >> for > > > > >>>>>> existing > > > > >>>>>>>>>>>>> users. Love to hear more thoughts. Can we also work this > > > > >> into > > > > >>>> the > > > > >>>>>>>>>> KIP? > > > > >>>>>>>>>>>>> I already implemented A to unblock myself for now. Seems > > > > >>>> feasible > > > > >>>>>> to > > > > >>>>>>>>>>> do. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> On Tue, Oct 15, 2019 at 12:21 PM Vinoth Chandar < > > > > >>>>>>>>>> vchan...@confluent.io > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> I get your point. But suppose there is a replica which > > > > >> has > > > > >>>> just > > > > >>>>>>>>>>> become > > > > >>>>>>>>>>>>>> active, so in that case replica will still be building > > > > >>> itself > > > > >>>> from > > > > >>>>>>>>>>>>> scratch > > > > >>>>>>>>>>>>>> and this active will go to restoring state till it > > catches > > > > >>> up > > > > >>>> with > > > > >>>>>>>>>>>>> previous > > > > >>>>>>>>>>>>>> active, wouldn't serving from a restoring active make > > more > > > > >>>> sense > > > > >>>>>>>>>>> than a > > > > >>>>>>>>>>>>>> replica in such case. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> KIP-441 will change this behavior such that promotion to > > > > >>>> active > > > > >>>>>>>>>>> happens > > > > >>>>>>>>>>>>>> based on how caught up a replica is. So, once we have > > that > > > > >>>> (work > > > > >>>>>>>>>>> underway > > > > >>>>>>>>>>>>>> already for 2.5 IIUC) and user sets > > num.standby.replicas > > > > > >>> 0, > > > > >>>> then > > > > >>>>>>>>>>> the > > > > >>>>>>>>>>>>>> staleness window should not be that long as you > > describe. > > > > >>> IMO > > > > >>>> if > > > > >>>>>>>>>> user > > > > >>>>>>>>>>>>> wants > > > > >>>>>>>>>>>>>> availability for state, then should configure > > > > >>>> num.standby.replicas > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> 0. > > > > >>>>>>>>>>>>> If > > > > >>>>>>>>>>>>>> not, then on a node loss, few partitions would be > > > > >>> unavailable > > > > >>>> for > > > > >>>>>> a > > > > >>>>>>>>>>> while > > > > >>>>>>>>>>>>>> (there are other ways to bring this window down, which I > > > > >>> won't > > > > >>>>>>>>>> bring > > > > >>>>>>>>>>> in > > > > >>>>>>>>>>>>>> here). We could argue for querying a restoring active > > > > >> (say a > > > > >>>> new > > > > >>>>>>>>>> node > > > > >>>>>>>>>>>>> added > > > > >>>>>>>>>>>>>> to replace a faulty old node) based on AP vs CP > > > > >> principles. > > > > >>>> But > > > > >>>>>> not > > > > >>>>>>>>>>> sure > > > > >>>>>>>>>>>>>> reading really really old values for the sake of > > > > >>> availability > > > > >>>> is > > > > >>>>>>>>>>> useful. > > > > >>>>>>>>>>>>> No > > > > >>>>>>>>>>>>>> AP data system would be inconsistent for such a long > > time > > > > >> in > > > > >>>>>>>>>>> practice. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> So, I still feel just limiting this to standby reads > > > > >>> provides > > > > >>>> best > > > > >>>>>>>>>>>>>> semantics. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Just my 2c. Would love to see what others think as well. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> On Tue, Oct 15, 2019 at 5:34 AM Navinder Brar > > > > >>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> Hi Vinoth, > > > > >>>>>>>>>>>>>>> Thanks for the feedback. > > > > >>>>>>>>>>>>>>> Can we link the JIRA, discussion thread also to the > > > > >> KIP.>> > > > > >>>>>> Added. > > > > >>>>>>>>>>>>>>> Based on the discussion on KAFKA-6144, I was under the > > > > >>>> impression > > > > >>>>>>>>>>> that > > > > >>>>>>>>>>>>>>> this KIP is also going to cover exposing of the standby > > > > >>>>>>>>>> information > > > > >>>>>>>>>>> in > > > > >>>>>>>>>>>>>>> StreamsMetadata and thus subsume KAFKA-8994 . That > > would > > > > >>>> require > > > > >>>>>> a > > > > >>>>>>>>>>>>> public > > > > >>>>>>>>>>>>>>> API change?>> Sure, I can add changes for 8994 in this > > > > >> KIP > > > > >>>> and > > > > >>>>>>>>>> link > > > > >>>>>>>>>>>>>>> KAFKA-6144 to KAFKA-8994 as well. > > > > >>>>>>>>>>>>>>> KIP seems to be focussing on restoration when a new > > node > > > > >>> is > > > > >>>>>>>>>> added. > > > > >>>>>>>>>>>>>>> KIP-441 is underway and has some major changes proposed > > > > >> for > > > > >>>> this. > > > > >>>>>>>>>> It > > > > >>>>>>>>>>>>> would > > > > >>>>>>>>>>>>>>> be good to clarify dependencies if any. Without > > KIP-441, > > > > >> I > > > > >>>> am not > > > > >>>>>>>>>>> very > > > > >>>>>>>>>>>>> sure > > > > >>>>>>>>>>>>>>> if we should allow reads from nodes in RESTORING state, > > > > >>> which > > > > >>>>>>>>>> could > > > > >>>>>>>>>>>>> amount > > > > >>>>>>>>>>>>>>> to many minutes/few hours of stale reads? This is > > > > >>> different > > > > >>>> from > > > > >>>>>>>>>>>>> allowing > > > > >>>>>>>>>>>>>>> querying standby replicas, which could be mostly caught > > > > >> up > > > > >>>> and > > > > >>>>>> the > > > > >>>>>>>>>>>>>>> staleness window could be much smaller/tolerable. (once > > > > >>>> again the > > > > >>>>>>>>>>> focus > > > > >>>>>>>>>>>>> on > > > > >>>>>>>>>>>>>>> KAFKA-8994).>> I get your point. But suppose there is a > > > > >>>> replica > > > > >>>>>>>>>>> which > > > > >>>>>>>>>>>>> has > > > > >>>>>>>>>>>>>>> just become active, so in that case replica will still > > be > > > > >>>>>> building > > > > >>>>>>>>>>>>> itself > > > > >>>>>>>>>>>>>>> from scratch and this active will go to restoring state > > > > >>> till > > > > >>>> it > > > > >>>>>>>>>>> catches > > > > >>>>>>>>>>>>> up > > > > >>>>>>>>>>>>>>> with previous active, wouldn't serving from a restoring > > > > >>>> active > > > > >>>>>>>>>> make > > > > >>>>>>>>>>> more > > > > >>>>>>>>>>>>>>> sense than a replica in such case. > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> Finally, we may need to introduce a configuration to > > > > >>> control > > > > >>>>>> this. > > > > >>>>>>>>>>> Some > > > > >>>>>>>>>>>>>>> users may prefer errors to stale data. Can we also add > > it > > > > >>> to > > > > >>>> the > > > > >>>>>>>>>>> KIP?>> > > > > >>>>>>>>>>>>>>> Will add this. > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> Regards, > > > > >>>>>>>>>>>>>>> Navinder > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> On2019/10/14 16:56:49, Vinoth Chandar < > > v...@confluent.io > > > > >>>>> wrote: > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> Hi Navinder,> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> Thanks for sharing the KIP! Few thoughts> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> - Can we link the JIRA, discussion thread also to the > > > > >> KIP> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> - Based on the discussion on KAFKA-6144, I was under > > the > > > > >>>>>>>>>> impression > > > > >>>>>>>>>>>>>>> that> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> this KIP is also going to cover exposing of the > > standby > > > > >>>>>>>>>>> information in> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> StreamsMetadata and thus subsume KAFKA-8994 . That > > would > > > > >>>> require > > > > >>>>>>>>>> a > > > > >>>>>>>>>>>>>>> public> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> API change?> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> - KIP seems to be focussing on restoration when a new > > > > >> node > > > > >>>> is > > > > >>>>>>>>>>> added.> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> KIP-441 is underway and has some major changes > > proposed > > > > >>> for > > > > >>>>>> this. > > > > >>>>>>>>>>> It > > > > >>>>>>>>>>>>>>> would> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> be good to clarify dependencies if any. Without > > > > >> KIP-441, I > > > > >>>> am > > > > >>>>>> not > > > > >>>>>>>>>>> very > > > > >>>>>>>>>>>>>>> sure> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> if we should allow reads from nodes in RESTORING > > state, > > > > >>>> which > > > > >>>>>>>>>> could > > > > >>>>>>>>>>>>>>> amount> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> to many minutes/few hours of stale reads? This is > > > > >>> different > > > > >>>>>>>>>>>>>>> fromallowing> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> querying standby replicas, which could be mostly > > caught > > > > >> up > > > > >>>> and > > > > >>>>>>>>>> the> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> staleness window could be much smaller/tolerable. > > (once > > > > >>>> again > > > > >>>>>> the > > > > >>>>>>>>>>> focus > > > > >>>>>>>>>>>>>>> on> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> KAFKA-8994)> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> - Finally, we may need to introduce a configuration to > > > > >>>> control > > > > >>>>>>>>>>> this. > > > > >>>>>>>>>>>>>>> Some> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> users may prefer errors to stale data. Can we also add > > > > >> it > > > > >>>> to the > > > > >>>>>>>>>>> KIP?> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> Thanks> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> Vinoth> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> On Sun, Oct 13, 2019 at 3:31 PM Navinder Brar> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> <na...@yahoo.com.invalid>wrote:> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> Hi,> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> Starting a discussion on the KIP to Allow state > > stores > > > > >> to > > > > >>>> serve > > > > >>>>>>>>>>>>> stale> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> reads during rebalance(> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>> > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> ).> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> Thanks & Regards,Navinder> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> LinkedIn> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>> -- > > > > >>>>>>>>>> -- Guozhang > > > > >>>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>> > > > > >>>>>>> > > > > >>>>>> > > > > >>>> > > > > >>>> > > > > >>> > > > > >> > > > > >> > > > > >> -- > > > > >> -- Guozhang > > > > >> > > > > > > > > > > > >