+1 to John, suggestion on Duration/Instant and dropping the API to fetch all store's lags. However, I do think we need to return lags per topic partition. So not sure if single return value would work? We need some new class that holds a TopicPartition and Duration/Instant variables together?
10) Because we needed to return the topicPartition the key belongs to, in order to correlate with the lag information from the other set of APIs. Otherwise, we don't know which topic partition's lag estimate to use. We tried to illustrate this on the example code. StreamsMetadata is simply capturing state of a streams host/instance, where as TopicPartition depends on the key passed in. This is a side effect of our decision to decouple lag based filtering on the metadata apis. 20) Goes back to the previous point. We needed to return information that is key specific, at which point it seemed natural for the KeyQueryMetadata to contain active, standby, topic partition for that key. If we merely returned a standbyMetadataForKey() -> Collection<StreamsMetadata> standby, an active metadataForKey() -> StreamsMetadata, and new getTopicPartition(key) -> topicPartition object back to the caller, then arguably you could do the same kind of correlation. IMO having a the KeyQueryMetadata class to encapsulate all this is a friendlier API. allStandbyMetadata() and allStandbyMetadataForStore() are just counter parts for metadataForStore() and allMetadata() that we introduce mostly for consistent API semantics. (their presence implicitly could help denote metadataForStore() is for active instances. Happy to drop them if their utility is not clear) 30) This would assume we refresh all the standby lag information every time we query for that StreamsMetadata for a specific store? For time based lag, this will involve fetching the tail kafka record at once from multiple kafka topic partitions? I would prefer not to couple them like this and have the ability to make granular store (or even topic partition level) fetches for lag information. 32) I actually prefer John's suggestion to let the application drive the lag fetches/updation and not have flags as the KIP current points to. Are you reexamining that position? On fetching lag information, +1 we could do this much more efficiently with a broker changes. Given I don't yet have a burning need for the time based lag, I think we can sequence the APIs such that the offset based ones are implemented first, while we have a broker side change? Given we decoupled the offset and time based lag API, I am willing to drop the time based lag functionality (since its not needed right away for my use-case). @navinder . thoughts? On Tue, Nov 5, 2019 at 11:10 PM Matthias J. Sax <matth...@confluent.io> wrote: > Navinder, > > thanks for updating the KIP. Couple of follow up questions: > > > (10) Why do we need to introduce the class `KeyQueryMetadata`? > > (20) Why do we introduce the two methods `allMetadataForKey()`? Would it > not be simpler to add `Collection<StreamMetatdata> > standbyMetadataForKey(...)`. This would align with new methods > `#allStandbyMetadata()` and `#allStandbyMetadataForStore()`? > > (30) Why do we need the class `StoreLagInfo` -- it seems simpler to just > extend `StreamMetadata` with the corresponding attributes and methods > (of active task, the lag would always be reported as zero) > > (32) Via (30) we can avoid the two new methods `#allLagInfo()` and > `#lagInfoForStore()`, too, reducing public API and making it simpler to > use the feature. > > Btw: If we make `StreamMetadata` thread safe, the lag information can be > updated in the background without the need that the application > refreshes its metadata. Hence, the user can get active and/or standby > metadata once, and only needs to refresh it, if a rebalance happened. > > > About point (4) of the previous thread: I was also thinking about > when/how to update the time-lag information, and I agree that we should > not update it for each query. > > "How": That we need to fetch the last record is a little bit > unfortunate, but I don't see any other way without a broker change. One > issue I still see is with "exactly-once" -- if transaction markers are > in the topic, the last message is not at offset "endOffset - 1" and as > multiple transaction markers might be after each other, it's unclear how > to identify the offset of the last record... Thoughts? > > Hence, it might be worth to look into a broker change as a potential > future improvement. It might be possible that the broker caches the > latest timestamp per partition to serve this data efficiently, similar > to `#endOffset()`. > > "When": We refresh the end-offset information based on the > `commit.interval.ms` -- doing it more often is not really useful, as > state store caches will most likely buffer up all writes to changelogs > anyway and are only flushed on commit (including a flush of the > producer). Hence, I would suggest to update the time-lag information > based on the same strategy in the background. This way there is no > additional config or methods and the user does not need to worry about > it at all. > > To avoid refresh overhead if we don't need it (a user might not use IQ > to begin with), it might be worth to maintain an internal flag > `updateTimeLagEnabled` that is set to `false` initially and only set to > `true` on the first call of a user to get standby-metadata. > > > -Matthias > > > > On 11/4/19 5:13 PM, Vinoth Chandar wrote: > >>> I'm having some trouble wrapping my head around what race conditions > > might occur, other than the fundamentally broken state in which different > > instances are running totally different topologies. > > 3. @both Without the topic partitions that the tasks can map back to, we > > have to rely on topology/cluster metadata in each Streams instance to map > > the task back. If the source topics are wild carded for e,g then each > > instance could have different source topics in topology, until the next > > rebalance happens. You can also read my comments from here > > > https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106 > > > > > >>> seems hard to imagine how encoding arbitrarily long topic names plus an > > integer for the partition number could be as efficient as task ids, which > > are just two integers. > > 3. if you still have concerns about the efficacy of dictionary encoding, > > happy to engage. The link above also has some benchmark code I used. > > Theoretically, we would send each topic name atleast once, so yes if you > > compare a 10-20 character topic name + an integer to two integers, it > will > > be more bytes. But its constant overhead proportional to size of topic > name > > and with 4,8,12, partitions the size difference between baseline > (version 4 > > where we just repeated topic names for each topic partition) and the two > > approaches becomes narrow. > > > >>> Plus, Navinder is going to implement a bunch of protocol code that we > > might just want to change when the discussion actually does take place, > if > > ever. > >>> it'll just be a mental burden for everyone to remember that we want to > > have this follow-up discussion. > > 3. Is n't people changing same parts of code and tracking follow ups a > > common thing, we need to deal with anyway? For this KIP, is n't it > enough > > to reason about whether the additional map on top of the topic dictionary > > would incur more overhead than the sending task_ids? I don't think it's > > case, both of them send two integers. As I see it, we can do a separate > > follow up to (re)pursue the task_id conversion and get it working for > both > > maps within the next release? > > > >>> Can you elaborate on "breaking up the API"? It looks like there are > > already separate API calls in the proposal, one for time-lag, and another > > for offset-lag, so are they not already broken up? > > The current APIs (e.g lagInfoForStore) for lags return StoreLagInfo > objects > > which has both time and offset lags. If we had separate APIs, say (e.g > > offsetLagForStore(), timeLagForStore()), we can implement offset version > > using the offset lag that the streams instance already tracks i.e no need > > for external calls. The time based lag API would incur the kafka read for > > the timestamp. makes sense? > > > > Based on the discussions so far, I only see these two pending issues to > be > > aligned on. Is there any other open item people want to bring up? > > > > On Mon, Nov 4, 2019 at 11:24 AM Sophie Blee-Goldman <sop...@confluent.io > > > > wrote: > > > >> Regarding 3) I'm wondering, does your concern still apply even now > >> that the pluggable PartitionGrouper interface has been deprecated? > >> Now that we can be sure that the DefaultPartitionGrouper is used to > >> generate > >> the taskId -> partitions mapping, we should be able to convert any > taskId > >> to any > >> partitions. > >> > >> On Mon, Nov 4, 2019 at 11:17 AM John Roesler <j...@confluent.io> wrote: > >> > >>> Hey Vinoth, thanks for the reply! > >>> > >>> 3. > >>> I get that it's not the main focus of this KIP, but if it's ok, it > >>> would be nice to hash out this point right now. It only came up > >>> because this KIP-535 is substantially extending the pattern in > >>> question. If we push it off until later, then the reviewers are going > >>> to have to suspend their concerns not just while voting for the KIP, > >>> but also while reviewing the code. Plus, Navinder is going to > >>> implement a bunch of protocol code that we might just want to change > >>> when the discussion actually does take place, if ever. Finally, it'll > >>> just be a mental burden for everyone to remember that we want to have > >>> this follow-up discussion. > >>> > >>> It makes sense what you say... the specific assignment is already > >>> encoded in the "main" portion of the assignment, not in the "userdata" > >>> part. It also makes sense that it's simpler to reason about races if > >>> you simply get all the information about the topics and partitions > >>> directly from the assignor, rather than get the partition number from > >>> the assignor and the topic name from your own a priori knowledge of > >>> the topology. On the other hand, I'm having some trouble wrapping my > >>> head around what race conditions might occur, other than the > >>> fundamentally broken state in which different instances are running > >>> totally different topologies. Sorry, but can you remind us of the > >>> specific condition? > >>> > >>> To the efficiency counterargument, it seems hard to imagine how > >>> encoding arbitrarily long topic names plus an integer for the > >>> partition number could be as efficient as task ids, which are just two > >>> integers. It seems like this would only be true if topic names were 4 > >>> characters or less. > >>> > >>> 4. > >>> Yeah, clearly, it would not be a good idea to query the metadata > >>> before every single IQ query. I think there are plenty of established > >>> patterns for distributed database clients to follow. Can you elaborate > >>> on "breaking up the API"? It looks like there are already separate API > >>> calls in the proposal, one for time-lag, and another for offset-lag, > >>> so are they not already broken up? FWIW, yes, I agree, the offset lag > >>> is already locally known, so we don't need to build in an extra > >>> synchronous broker API call, just one for the time-lag. > >>> > >>> Thanks again for the discussion, > >>> -John > >>> > >>> On Mon, Nov 4, 2019 at 11:17 AM Vinoth Chandar <vchan...@confluent.io> > >>> wrote: > >>>> > >>>> 3. Right now, we still get the topic partitions assigned as a part of > >> the > >>>> top level Assignment object (the one that wraps AssignmentInfo) and > use > >>>> that to convert taskIds back. This list of only contains assignments > >> for > >>>> that particular instance. Attempting to also reverse map for "all" the > >>>> tasksIds in the streams cluster i.e all the topic partitions in these > >>>> global assignment maps was what was problematic. By explicitly sending > >>> the > >>>> global assignment maps as actual topic partitions, group coordinator > >>> (i.e > >>>> the leader that computes the assignment's ) is able to consistently > >>> enforce > >>>> its view of the topic metadata. Still don't think doing such a change > >>> that > >>>> forces you to reconsider semantics, is not needed to save bits on > wire. > >>> May > >>>> be we can discuss this separately from this KIP? > >>>> > >>>> 4. There needs to be some caching/interval somewhere though since we > >>> don't > >>>> want to make 1 kafka read per 1 IQ potentially. But I think its a > valid > >>>> suggestion, to make this call just synchronous and leave the caching > or > >>> how > >>>> often you want to call to the application. Would it be good to then > >> break > >>>> up the APIs for time and offset based lag? We can obtain offset based > >>> lag > >>>> for free? Only incur the overhead of reading kafka if we want time > >>>> based lags? > >>>> > >>>> On Fri, Nov 1, 2019 at 2:49 PM Sophie Blee-Goldman < > >> sop...@confluent.io> > >>>> wrote: > >>>> > >>>>> Adding on to John's response to 3), can you clarify when and why > >>> exactly we > >>>>> cannot > >>>>> convert between taskIds and partitions? If that's really the case I > >>> don't > >>>>> feel confident > >>>>> that the StreamsPartitionAssignor is not full of bugs... > >>>>> > >>>>> It seems like it currently just encodes a list of all partitions (the > >>>>> assignment) and also > >>>>> a list of the corresponding task ids, duplicated to ensure each > >>> partition > >>>>> has the corresponding > >>>>> taskId at the same offset into the list. Why is that problematic? > >>>>> > >>>>> > >>>>> On Fri, Nov 1, 2019 at 12:39 PM John Roesler <j...@confluent.io> > >>> wrote: > >>>>> > >>>>>> Thanks, all, for considering the points! > >>>>>> > >>>>>> 3. Interesting. I have a vague recollection of that... Still, > >> though, > >>>>>> it seems a little fishy. After all, we return the assignments > >>>>>> themselves as task ids, and the members have to map these to topic > >>>>>> partitions in order to configure themselves properly. If it's too > >>>>>> complicated to get this right, then how do we know that Streams is > >>>>>> computing the correct partitions at all? > >>>>>> > >>>>>> 4. How about just checking the log-end timestamp when you call the > >>>>>> method? Then, when you get an answer, it's as fresh as it could > >>>>>> possibly be. And as a user you have just one, obvious, "knob" to > >>>>>> configure how much overhead you want to devote to checking... If > >> you > >>>>>> want to call the broker API less frequently, you just call the > >>> Streams > >>>>>> API less frequently. And you don't have to worry about the > >>>>>> relationship between your invocations of that method and the config > >>>>>> setting (e.g., you'll never get a negative number, which you could > >> if > >>>>>> you check the log-end timestamp less frequently than you check the > >>>>>> lag). > >>>>>> > >>>>>> Thanks, > >>>>>> -John > >>>>>> > >>>>>> On Thu, Oct 31, 2019 at 11:52 PM Navinder Brar > >>>>>> <navinder_b...@yahoo.com.invalid> wrote: > >>>>>>> > >>>>>>> Thanks John for going through this. > >>>>>>> > >>>>>>> - +1, makes sense > >>>>>>> - +1, no issues there > >>>>>>> - Yeah the initial patch I had submitted for K-7149( > >>>>>> https://github.com/apache/kafka/pull/6935) to reduce > >> assignmentInfo > >>>>>> object had taskIds but the merged PR had similar size according to > >>> Vinoth > >>>>>> and it was simpler so if the end result is of same size, it would > >> not > >>>>> make > >>>>>> sense to pivot from dictionary and again move to taskIDs. > >>>>>>> - Not sure about what a good default would be if we don't > >> have a > >>>>>> configurable setting. This gives the users the flexibility to the > >>> users > >>>>> to > >>>>>> serve their requirements as at the end of the day it would take CPU > >>>>> cycles. > >>>>>> I am ok with starting it with a default and see how it goes based > >>> upon > >>>>>> feedback. > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Navinder > >>>>>>> On Friday, 1 November, 2019, 03:46:42 am IST, Vinoth Chandar > >> < > >>>>>> vchan...@confluent.io> wrote: > >>>>>>> > >>>>>>> 1. Was trying to spell them out separately. but makes sense for > >>>>>>> readability. done > >>>>>>> > >>>>>>> 2. No I immediately agree :) .. makes sense. @navinder? > >>>>>>> > >>>>>>> 3. I actually attempted only sending taskIds while working on > >>>>> KAFKA-7149. > >>>>>>> Its non-trivial to handle edges cases resulting from newly added > >>> topic > >>>>>>> partitions and wildcarded topic entries. I ended up simplifying > >> it > >>> to > >>>>>> just > >>>>>>> dictionary encoding the topic names to reduce size. We can apply > >>> the > >>>>> same > >>>>>>> technique here for this map. Additionally, we could also > >> dictionary > >>>>>> encode > >>>>>>> HostInfo, given its now repeated twice. I think this would save > >>> more > >>>>>> space > >>>>>>> than having a flag per topic partition entry. Lmk if you are okay > >>> with > >>>>>>> this. > >>>>>>> > >>>>>>> 4. This opens up a good discussion. Given we support time lag > >>> estimates > >>>>>>> also, we need to read the tail record of the changelog > >> periodically > >>>>>> (unlike > >>>>>>> offset lag, which we can potentially piggyback on metadata in > >>>>>>> ConsumerRecord IIUC). we thought we should have a config that > >>> control > >>>>> how > >>>>>>> often this read happens? Let me know if there is a simple way to > >>> get > >>>>>>> timestamp value of the tail record that we are missing. > >>>>>>> > >>>>>>> On Thu, Oct 31, 2019 at 12:58 PM John Roesler <j...@confluent.io > >>> > >>>>> wrote: > >>>>>>> > >>>>>>>> Hey Navinder, > >>>>>>>> > >>>>>>>> Thanks for updating the KIP, it's a lot easier to see the > >> current > >>>>>>>> state of the proposal now. > >>>>>>>> > >>>>>>>> A few remarks: > >>>>>>>> 1. I'm sure it was just an artifact of revisions, but you have > >>> two > >>>>>>>> separate sections where you list additions to the KafkaStreams > >>>>>>>> interface. Can you consolidate those so we can see all the > >>> additions > >>>>>>>> at once? > >>>>>>>> > >>>>>>>> 2. For messageLagEstimate, can I suggest "offsetLagEstimate" > >>> instead, > >>>>>>>> to be clearer that we're specifically measuring a number of > >>> offsets? > >>>>>>>> If you don't immediately agree, then I'd at least point out > >> that > >>> we > >>>>>>>> usually refer to elements of Kafka topics as "records", not > >>>>>>>> "messages", so "recordLagEstimate" might be more appropriate. > >>>>>>>> > >>>>>>>> 3. The proposal mentions adding a map of the standby > >>> _partitions_ for > >>>>>>>> each host to AssignmentInfo. I assume this is designed to > >> mirror > >>> the > >>>>>>>> existing "partitionsByHost" map. To keep the size of these > >>> metadata > >>>>>>>> messages down, maybe we can consider making two changes: > >>>>>>>> (a) for both actives and standbys, encode the _task ids_ > >> instead > >>> of > >>>>>>>> _partitions_. Every member of the cluster has a copy of the > >>> topology, > >>>>>>>> so they can convert task ids into specific partitions on their > >>> own, > >>>>>>>> and task ids are only (usually) three characters. > >>>>>>>> (b) instead of encoding two maps (hostinfo -> actives AND > >>> hostinfo -> > >>>>>>>> standbys), which requires serializing all the hostinfos twice, > >>> maybe > >>>>>>>> we can pack them together in one map with a structured value > >>>>> (hostinfo > >>>>>>>> -> [actives,standbys]). > >>>>>>>> Both of these ideas still require bumping the protocol version > >>> to 6, > >>>>>>>> and they basically mean we drop the existing `PartitionsByHost` > >>> field > >>>>>>>> and add a new `TasksByHost` field with the structured value I > >>>>>>>> mentioned. > >>>>>>>> > >>>>>>>> 4. Can we avoid adding the new "lag refresh" config? The lags > >>> would > >>>>>>>> necessarily be approximate anyway, so adding the config seems > >> to > >>>>>>>> increase the operational complexity of the system for little > >>> actual > >>>>>>>> benefit. > >>>>>>>> > >>>>>>>> Thanks for the pseudocode, by the way, it really helps > >> visualize > >>> how > >>>>>>>> these new interfaces would play together. And thanks again for > >>> the > >>>>>>>> update! > >>>>>>>> -John > >>>>>>>> > >>>>>>>> On Thu, Oct 31, 2019 at 2:41 PM John Roesler < > >> j...@confluent.io> > >>>>>> wrote: > >>>>>>>>> > >>>>>>>>> Hey Vinoth, > >>>>>>>>> > >>>>>>>>> I started going over the KIP again yesterday. There are a lot > >>> of > >>>>>>>>> updates, and I didn't finish my feedback in one day. I'm > >>> working on > >>>>>> it > >>>>>>>>> now. > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> John > >>>>>>>>> > >>>>>>>>> On Thu, Oct 31, 2019 at 11:42 AM Vinoth Chandar < > >>>>>> vchan...@confluent.io> > >>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>> Wondering if anyone has thoughts on these changes? I liked > >>> that > >>>>>> the new > >>>>>>>>>> metadata fetch APIs provide all the information at once > >> with > >>>>>> consistent > >>>>>>>>>> naming.. > >>>>>>>>>> > >>>>>>>>>> Any guidance on what you would like to be discussed or > >>> fleshed > >>>>> out > >>>>>> more > >>>>>>>>>> before we call a VOTE? > >>>>>>>>>> > >>>>>>>>>> On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar > >>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi, > >>>>>>>>>>> We have made some edits in the KIP( > >>>>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance > >>>>>>>> ) > >>>>>>>>>>> after due deliberation on the agreed design to support > >> the > >>> new > >>>>>> query > >>>>>>>>>>> design. This includes the new public API to query > >>> offset/time > >>>>> lag > >>>>>>>>>>> information and other details related to querying standby > >>> tasks > >>>>>>>> which have > >>>>>>>>>>> come up after thinking of thorough details. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> - Addition of new config, “lag.fetch.interval.ms” to > >>>>>> configure > >>>>>>>> the > >>>>>>>>>>> interval of time/offset lag > >>>>>>>>>>> - Addition of new class StoreLagInfo to store the > >>>>> periodically > >>>>>>>> obtained > >>>>>>>>>>> time/offset lag > >>>>>>>>>>> - Addition of two new functions in KafkaStreams, > >>>>>>>> List<StoreLagInfo> > >>>>>>>>>>> allLagInfo() and List<StoreLagInfo> > >> lagInfoForStore(String > >>>>>>>> storeName) to > >>>>>>>>>>> return the lag information for an instance and a store > >>>>>> respectively > >>>>>>>>>>> - Addition of new class KeyQueryMetadata. We need > >>>>>> topicPartition > >>>>>>>> for > >>>>>>>>>>> each key to be matched with the lag API for the topic > >>>>> partition. > >>>>>> One > >>>>>>>> way is > >>>>>>>>>>> to add new functions and fetch topicPartition from > >>>>>>>> StreamsMetadataState but > >>>>>>>>>>> we thought having one call and fetching StreamsMetadata > >> and > >>>>>>>> topicPartition > >>>>>>>>>>> is more cleaner. > >>>>>>>>>>> - > >>>>>>>>>>> Renaming partitionsForHost to activePartitionsForHost in > >>>>>>>> StreamsMetadataState > >>>>>>>>>>> and partitionsByHostState to activePartitionsByHostState > >>>>>>>>>>> in StreamsPartitionAssignor > >>>>>>>>>>> - We have also added the pseudo code of how all the > >>> changes > >>>>>> will > >>>>>>>> exist > >>>>>>>>>>> together and support the new querying APIs > >>>>>>>>>>> > >>>>>>>>>>> Please let me know if anything is pending now, before a > >>> vote > >>>>> can > >>>>>> be > >>>>>>>>>>> started on this. On Saturday, 26 October, 2019, 05:41:44 > >>> pm > >>>>> IST, > >>>>>>>> Navinder > >>>>>>>>>>> Brar <navinder_b...@yahoo.com.invalid> wrote: > >>>>>>>>>>> > >>>>>>>>>>> >> Since there are two soft votes for separate > >>> active/standby > >>>>>> API > >>>>>>>>>>> methods, I also change my position on that. Fine with 2 > >>>>> separate > >>>>>>>>>>> methods. Once we remove the lag information from these > >>> APIs, > >>>>>>>> returning a > >>>>>>>>>>> List is less attractive, since the ordering has no > >> special > >>>>>> meaning > >>>>>>>> now. > >>>>>>>>>>> Agreed, now that we are not returning lag, I am also sold > >>> on > >>>>>> having > >>>>>>>> two > >>>>>>>>>>> separate functions. We already have one which returns > >>>>>>>> streamsMetadata for > >>>>>>>>>>> active tasks, and now we can add another one for > >> standbys. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Saturday, 26 October, 2019, 03:55:16 am IST, Vinoth > >>>>>> Chandar < > >>>>>>>>>>> vchan...@confluent.io> wrote: > >>>>>>>>>>> > >>>>>>>>>>> +1 to Sophie's suggestion. Having both lag in terms of > >>> time > >>>>> and > >>>>>>>> offsets is > >>>>>>>>>>> good and makes for a more complete API. > >>>>>>>>>>> > >>>>>>>>>>> Since there are two soft votes for separate > >> active/standby > >>> API > >>>>>>>> methods, I > >>>>>>>>>>> also change my position on that. Fine with 2 separate > >>> methods. > >>>>>>>>>>> Once we remove the lag information from these APIs, > >>> returning a > >>>>>> List > >>>>>>>> is > >>>>>>>>>>> less attractive, since the ordering has no special > >> meaning > >>> now. > >>>>>>>>>>> > >>>>>>>>>>>>> lag in offsets vs time: Having both, as suggested by > >>> Sophie > >>>>>> would > >>>>>>>> of > >>>>>>>>>>> course be best. What is a little unclear to me is, how in > >>>>> details > >>>>>>>> are we > >>>>>>>>>>> going to compute both? > >>>>>>>>>>> @navinder may be next step is to flesh out these details > >>> and > >>>>>> surface > >>>>>>>> any > >>>>>>>>>>> larger changes we need to make if need be. > >>>>>>>>>>> > >>>>>>>>>>> Any other details we need to cover, before a VOTE can be > >>> called > >>>>>> on > >>>>>>>> this? > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck < > >>> bbej...@gmail.com > >>>>>> > >>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> I am jumping in a little late here. > >>>>>>>>>>>> > >>>>>>>>>>>> Overall I agree with the proposal to push decision > >>> making on > >>>>>>>> what/how to > >>>>>>>>>>>> query in the query layer. > >>>>>>>>>>>> > >>>>>>>>>>>> For point 5 from above, I'm slightly in favor of having > >>> a new > >>>>>>>> method, > >>>>>>>>>>>> "standbyMetadataForKey()" or something similar. > >>>>>>>>>>>> Because even if we return all tasks in one list, the > >> user > >>>>> will > >>>>>>>> still have > >>>>>>>>>>>> to perform some filtering to separate the different > >>> tasks, > >>>>> so I > >>>>>>>> don't > >>>>>>>>>>> feel > >>>>>>>>>>>> making two calls is a burden, and IMHO makes things > >> more > >>>>>>>> transparent for > >>>>>>>>>>>> the user. > >>>>>>>>>>>> If the final vote is for using an "isActive" field, I'm > >>> good > >>>>>> with > >>>>>>>> that as > >>>>>>>>>>>> well. > >>>>>>>>>>>> > >>>>>>>>>>>> Just my 2 cents. > >>>>>>>>>>>> > >>>>>>>>>>>> On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar > >>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> I think now we are aligned on almost all the design > >>> parts. > >>>>>>>> Summarising > >>>>>>>>>>>>> below what has been discussed above and we have a > >>> general > >>>>>>>> consensus on. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> - Rather than broadcasting lag across all nodes at > >>>>>>>> rebalancing/with > >>>>>>>>>>>> the > >>>>>>>>>>>>> heartbeat, we will just return a list of all > >> available > >>>>>> standby’s > >>>>>>>> in the > >>>>>>>>>>>>> system and the user can make IQ query any of those > >>> nodes > >>>>>> which > >>>>>>>> will > >>>>>>>>>>>> return > >>>>>>>>>>>>> the response, and the lag and offset time. Based on > >>> which > >>>>>> user > >>>>>>>> can > >>>>>>>>>>> decide > >>>>>>>>>>>>> if he wants to return the response back or call > >> another > >>>>>> standby. > >>>>>>>>>>>>> - The current metadata query frequency will not > >>> change. > >>>>>> It > >>>>>>>> will be > >>>>>>>>>>>> the > >>>>>>>>>>>>> same as it does now, i.e. before each query. > >>>>>>>>>>>>> > >>>>>>>>>>>>> - For fetching list<StreamsMetadata> in > >>>>>>>> StreamsMetadataState.java > >>>>>>>>>>> and > >>>>>>>>>>>>> List<QueryableStoreProvider> in > >>>>>>>> StreamThreadStateStoreProvider.java > >>>>>>>>>>>> (which > >>>>>>>>>>>>> will return all active stores which are > >>> running/restoring > >>>>> and > >>>>>>>> replica > >>>>>>>>>>>>> stores which are running), we will add new functions > >>> and > >>>>> not > >>>>>>>> disturb > >>>>>>>>>>> the > >>>>>>>>>>>>> existing functions > >>>>>>>>>>>>> > >>>>>>>>>>>>> - There is no need to add new StreamsConfig for > >>>>>> implementing > >>>>>>>> this > >>>>>>>>>>> KIP > >>>>>>>>>>>>> > >>>>>>>>>>>>> - We will add standbyPartitionsByHost in > >>> AssignmentInfo > >>>>>> and > >>>>>>>>>>>>> StreamsMetadataState which would change the existing > >>>>>>>> rebuildMetadata() > >>>>>>>>>>>> and > >>>>>>>>>>>>> setPartitionsByHostState() > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> If anyone has any more concerns please feel free to > >>> add. > >>>>> Post > >>>>>>>> this I > >>>>>>>>>>> will > >>>>>>>>>>>>> be initiating a vote. > >>>>>>>>>>>>> ~Navinder > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Friday, 25 October, 2019, 12:05:29 pm IST, > >>> Matthias > >>>>> J. > >>>>>> Sax > >>>>>>>> < > >>>>>>>>>>>>> matth...@confluent.io> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>> Just to close the loop @Vinoth: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> 1. IIUC John intends to add (or we can do this in > >>> this > >>>>>> KIP) lag > >>>>>>>>>>>>> information > >>>>>>>>>>>>>> to AssignmentInfo, which gets sent to every > >>> participant. > >>>>>>>>>>>>> > >>>>>>>>>>>>> As explained by John, currently KIP-441 plans to only > >>>>> report > >>>>>> the > >>>>>>>>>>>>> information to the leader. But I guess, with the new > >>>>>> proposal to > >>>>>>>> not > >>>>>>>>>>>>> broadcast this information anyway, this concern is > >>>>>> invalidated > >>>>>>>> anyway > >>>>>>>>>>>>> > >>>>>>>>>>>>>> 2. At-least I was under the assumption that it can > >> be > >>>>>> called > >>>>>>>> per > >>>>>>>>>>> query, > >>>>>>>>>>>>>> since the API docs don't seem to suggest otherwise. > >>> Do > >>>>> you > >>>>>> see > >>>>>>>> any > >>>>>>>>>>>>>> potential issues if we call this every query? (we > >>> should > >>>>>>>> benchmark > >>>>>>>>>>> this > >>>>>>>>>>>>>> nonetheless) > >>>>>>>>>>>>> > >>>>>>>>>>>>> I did not see a real issue if people refresh the > >>> metadata > >>>>>>>> frequently, > >>>>>>>>>>>>> because it would be a local call. My main point was, > >>> that > >>>>>> this > >>>>>>>> would > >>>>>>>>>>>>> change the current usage pattern of the API, and we > >>> would > >>>>>>>> clearly need > >>>>>>>>>>>>> to communicate this change. Similar to (1), this > >>> concern in > >>>>>>>> invalidated > >>>>>>>>>>>>> anyway. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> @John: I think it's a great idea to get rid of > >>> reporting > >>>>>> lag, and > >>>>>>>>>>>>> pushing the decision making process about "what to > >>> query" > >>>>>> into > >>>>>>>> the > >>>>>>>>>>> query > >>>>>>>>>>>>> serving layer itself. This simplifies the overall > >>> design of > >>>>>> this > >>>>>>>> KIP > >>>>>>>>>>>>> significantly, and actually aligns very well with the > >>> idea > >>>>>> that > >>>>>>>> Kafka > >>>>>>>>>>>>> Streams (as it is a library) should only provide the > >>> basic > >>>>>>>> building > >>>>>>>>>>>>> block. Many of my raised questions are invalided by > >>> this. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> Some questions are still open though: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> 10) Do we need to distinguish between > >>> active(restoring) > >>>>> and > >>>>>>>> standby > >>>>>>>>>>>>>> tasks? Or could be treat both as the same? > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> @Vinoth: about (5). I see your point about multiple > >>> calls > >>>>> vs > >>>>>> a > >>>>>>>> single > >>>>>>>>>>>>> call. I still slightly prefer multiple calls, but > >> it's > >>>>> highly > >>>>>>>>>>> subjective > >>>>>>>>>>>>> and I would also be fine to add an #isActive() > >> method. > >>>>> Would > >>>>>> be > >>>>>>>> good > >>>>>>>>>>> the > >>>>>>>>>>>>> get feedback from others. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> For (14), ie, lag in offsets vs time: Having both, as > >>>>>> suggested > >>>>>>>> by > >>>>>>>>>>>>> Sophie would of course be best. What is a little > >>> unclear to > >>>>>> me > >>>>>>>> is, how > >>>>>>>>>>>>> in details are we going to compute both? > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On 10/24/19 11:07 PM, Sophie Blee-Goldman wrote: > >>>>>>>>>>>>>> Just to chime in on the "report lag vs timestamp > >>>>>> difference" > >>>>>>>> issue, I > >>>>>>>>>>>>> would > >>>>>>>>>>>>>> actually advocate for both. As mentioned already, > >>> time > >>>>>>>> difference is > >>>>>>>>>>>>>> probably a lot easier and/or more useful to reason > >>> about > >>>>> in > >>>>>>>> terms of > >>>>>>>>>>>>>> "freshness" > >>>>>>>>>>>>>> of the state. But in the case when all queried > >>> stores are > >>>>>> far > >>>>>>>> behind, > >>>>>>>>>>>> lag > >>>>>>>>>>>>>> could > >>>>>>>>>>>>>> be used to estimate the recovery velocity. You can > >>> then > >>>>>> get a > >>>>>>>> (pretty > >>>>>>>>>>>>> rough) > >>>>>>>>>>>>>> idea of when a store might be ready, and wait until > >>>>> around > >>>>>>>> then to > >>>>>>>>>>>> query > >>>>>>>>>>>>>> again. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Thu, Oct 24, 2019 at 9:53 PM Guozhang Wang < > >>>>>>>> wangg...@gmail.com> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I think I agree with John's recent reasoning as > >>> well: > >>>>>> instead > >>>>>>>> of > >>>>>>>>>>>> letting > >>>>>>>>>>>>>>> the storeMetadataAPI to return the staleness > >>>>> information, > >>>>>>>> letting > >>>>>>>>>>> the > >>>>>>>>>>>>>>> client to query either active or standby and > >> letting > >>>>>> standby > >>>>>>>> query > >>>>>>>>>>>>> response > >>>>>>>>>>>>>>> to include both the values + timestamp (or lag, as > >>> in > >>>>>> diffs of > >>>>>>>>>>>>> timestamps) > >>>>>>>>>>>>>>> would actually be more intuitive -- not only the > >>> streams > >>>>>>>> client is > >>>>>>>>>>>>> simpler, > >>>>>>>>>>>>>>> from user's perspective they also do not need to > >>>>>> periodically > >>>>>>>>>>> refresh > >>>>>>>>>>>>> their > >>>>>>>>>>>>>>> staleness information from the client, but only > >>> need to > >>>>>> make > >>>>>>>>>>> decisions > >>>>>>>>>>>>> on > >>>>>>>>>>>>>>> the fly whenever they need to query. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Again the standby replica then need to know the > >>> current > >>>>>> active > >>>>>>>>>>> task's > >>>>>>>>>>>>>>> timestamp, which can be found from the log end > >>> record's > >>>>>>>> encoded > >>>>>>>>>>>>> timestamp; > >>>>>>>>>>>>>>> today we standby tasks do not read that specific > >>> record, > >>>>>> but > >>>>>>>> only > >>>>>>>>>>>>> refresh > >>>>>>>>>>>>>>> its knowledge on the log end offset, but I think > >>>>>> refreshing > >>>>>>>> the > >>>>>>>>>>> latest > >>>>>>>>>>>>>>> record timestamp is not a very bad request to add > >>> on the > >>>>>>>> standby > >>>>>>>>>>>>> replicas. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Thu, Oct 24, 2019 at 5:43 PM Vinoth Chandar < > >>>>>>>>>>> vchan...@confluent.io > >>>>>>>>>>>>> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> +1 As someone implementing a query routing layer, > >>> there > >>>>>> is > >>>>>>>> already > >>>>>>>>>>> a > >>>>>>>>>>>>> need > >>>>>>>>>>>>>>>> to have mechanisms in place to do > >>> healthchecks/failure > >>>>>>>> detection to > >>>>>>>>>>>>>>> detect > >>>>>>>>>>>>>>>> failures for queries, while Streams rebalancing > >>>>>> eventually > >>>>>>>> kicks in > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>> background. > >>>>>>>>>>>>>>>> So, pushing this complexity to the IQ client app > >>> keeps > >>>>>>>> Streams > >>>>>>>>>>>> simpler > >>>>>>>>>>>>> as > >>>>>>>>>>>>>>>> well. IQs will be potentially issues at an order > >> of > >>>>>>>> magnitude more > >>>>>>>>>>>>>>>> frequently and it can achieve good freshness for > >>> the > >>>>> lag > >>>>>>>>>>> information. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I would like to add however, that we would also > >>> need to > >>>>>>>> introduce > >>>>>>>>>>>> apis > >>>>>>>>>>>>> in > >>>>>>>>>>>>>>>> KafkaStreams class, for obtaining lag information > >>> for > >>>>> all > >>>>>>>> stores > >>>>>>>>>>>> local > >>>>>>>>>>>>> to > >>>>>>>>>>>>>>>> that host. This is for the IQs to relay back with > >>> the > >>>>>>>> response/its > >>>>>>>>>>>> own > >>>>>>>>>>>>>>>> heartbeat mechanism. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Thu, Oct 24, 2019 at 3:12 PM John Roesler < > >>>>>>>> j...@confluent.io> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> I've been mulling about this KIP, and I think I > >>> was on > >>>>>> the > >>>>>>>> wrong > >>>>>>>>>>>> track > >>>>>>>>>>>>>>>>> earlier with regard to task lags. Tl;dr: I don't > >>> think > >>>>>> we > >>>>>>>> should > >>>>>>>>>>> add > >>>>>>>>>>>>>>>>> lags at all to the metadata API (and also not to > >>> the > >>>>>>>>>>> AssignmentInfo > >>>>>>>>>>>>>>>>> protocol message). > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Like I mentioned early on, reporting lag via > >>>>>>>>>>>>>>>>> SubscriptionInfo/AssignmentInfo would only work > >>> while > >>>>>>>> rebalances > >>>>>>>>>>> are > >>>>>>>>>>>>>>>>> happening. Once the group stabilizes, no members > >>> would > >>>>>> be > >>>>>>>> notified > >>>>>>>>>>>> of > >>>>>>>>>>>>>>>>> each others' lags anymore. I had been thinking > >>> that > >>>>> the > >>>>>>>> solution > >>>>>>>>>>>> would > >>>>>>>>>>>>>>>>> be the heartbeat proposal I mentioned earlier, > >> but > >>>>> that > >>>>>>>> proposal > >>>>>>>>>>>> would > >>>>>>>>>>>>>>>>> have reported the heartbeats of the members only > >>> to > >>>>> the > >>>>>>>> leader > >>>>>>>>>>>> member > >>>>>>>>>>>>>>>>> (the one who makes assignments). To be useful in > >>> the > >>>>>>>> context of > >>>>>>>>>>>> _this_ > >>>>>>>>>>>>>>>>> KIP, we would also have to report the lags in > >> the > >>>>>> heartbeat > >>>>>>>>>>>> responses > >>>>>>>>>>>>>>>>> to of _all_ members. This is a concern to be > >>> because > >>>>> now > >>>>>>>> _all_ the > >>>>>>>>>>>>>>>>> lags get reported to _all_ the members on > >> _every_ > >>>>>>>> heartbeat... a > >>>>>>>>>>> lot > >>>>>>>>>>>>>>>>> of chatter. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Plus, the proposal for KIP-441 is only to report > >>> the > >>>>>> lags > >>>>>>>> of each > >>>>>>>>>>>>>>>>> _task_. This is the sum of the lags of all the > >>> stores > >>>>>> in the > >>>>>>>>>>> tasks. > >>>>>>>>>>>>>>>>> But this would be insufficient for KIP-535. For > >>> this > >>>>>> kip, > >>>>>>>> we would > >>>>>>>>>>>>>>>>> want the lag specifically of the store we want > >> to > >>>>>> query. So > >>>>>>>> this > >>>>>>>>>>>>>>>>> means, we have to report the lags of all the > >>> stores of > >>>>>> all > >>>>>>>> the > >>>>>>>>>>>> members > >>>>>>>>>>>>>>>>> to every member... even more chatter! > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> The final nail in the coffin to me is that IQ > >>> clients > >>>>>> would > >>>>>>>> have > >>>>>>>>>>> to > >>>>>>>>>>>>>>>>> start refreshing their metadata quite frequently > >>> to > >>>>>> stay up > >>>>>>>> to > >>>>>>>>>>> date > >>>>>>>>>>>> on > >>>>>>>>>>>>>>>>> the lags, which adds even more overhead to the > >>> system. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Consider a strawman alternative: we bring > >> KIP-535 > >>> back > >>>>>> to > >>>>>>>>>>> extending > >>>>>>>>>>>>>>>>> the metadata API to tell the client the active > >> and > >>>>>> standby > >>>>>>>>>>> replicas > >>>>>>>>>>>>>>>>> for the key in question (not including and > >>>>>> "staleness/lag" > >>>>>>>>>>>>>>>>> restriction, just returning all the replicas). > >>> Then, > >>>>> the > >>>>>>>> client > >>>>>>>>>>>> picks > >>>>>>>>>>>>>>>>> a replica and sends the query. The server > >> returns > >>> the > >>>>>>>> current lag > >>>>>>>>>>>>>>>>> along with the response (maybe in an HTML header > >>> or > >>>>>>>> something). > >>>>>>>>>>>> Then, > >>>>>>>>>>>>>>>>> the client keeps a map of its last observed lags > >>> for > >>>>>> each > >>>>>>>> replica, > >>>>>>>>>>>> and > >>>>>>>>>>>>>>>>> uses this information to prefer fresher > >> replicas. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> OR, if it wants only to query the active > >> replica, > >>> it > >>>>>> would > >>>>>>>> throw > >>>>>>>>>>> an > >>>>>>>>>>>>>>>>> error on any lag response greater than zero, > >>> refreshes > >>>>>> its > >>>>>>>>>>> metadata > >>>>>>>>>>>> by > >>>>>>>>>>>>>>>>> re-querying the metadata API, and tries again > >>> with the > >>>>>>>> current > >>>>>>>>>>>> active > >>>>>>>>>>>>>>>>> replica. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> This way, the lag information will be super > >> fresh > >>> for > >>>>>> the > >>>>>>>> client, > >>>>>>>>>>>> and > >>>>>>>>>>>>>>>>> we keep the Metadata API / > >>> Assignment,Subscription / > >>>>> and > >>>>>>>> Heartbeat > >>>>>>>>>>>> as > >>>>>>>>>>>>>>>>> slim as possible. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Side note: I do think that some time soon, we'll > >>> have > >>>>> to > >>>>>>>> add a > >>>>>>>>>>>> library > >>>>>>>>>>>>>>>>> for IQ server/clients. I think that this logic > >>> will > >>>>>> start > >>>>>>>> to get > >>>>>>>>>>>>>>>>> pretty complex. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> I hope this thinking is reasonably clear! > >>>>>>>>>>>>>>>>> Thanks again, > >>>>>>>>>>>>>>>>> -John > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Does that > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Wed, Oct 23, 2019 at 10:16 AM Vinoth Chandar > >> < > >>>>>>>>>>>>> vchan...@confluent.io > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Responding to the points raised by Matthias > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 1. IIUC John intends to add (or we can do this > >> in > >>>>> this > >>>>>>>> KIP) lag > >>>>>>>>>>>>>>>>> information > >>>>>>>>>>>>>>>>>> to AssignmentInfo, which gets sent to every > >>>>>> participant. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 2. At-least I was under the assumption that it > >>> can be > >>>>>>>> called per > >>>>>>>>>>>>>>> query, > >>>>>>>>>>>>>>>>>> since the API docs don't seem to suggest > >>> otherwise. > >>>>> Do > >>>>>> you > >>>>>>>> see > >>>>>>>>>>> any > >>>>>>>>>>>>>>>>>> potential issues if we call this every query? > >> (we > >>>>>> should > >>>>>>>>>>> benchmark > >>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>> nonetheless) > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 4. Agree. metadataForKey() implicitly would > >>> return > >>>>> the > >>>>>>>> active > >>>>>>>>>>> host > >>>>>>>>>>>>>>>>> metadata > >>>>>>>>>>>>>>>>>> (as it was before). We should also document > >> this > >>> in > >>>>>> that > >>>>>>>> APIs > >>>>>>>>>>>>>>> javadoc, > >>>>>>>>>>>>>>>>>> given we have another method(s) that returns > >> more > >>>>> host > >>>>>>>> metadata > >>>>>>>>>>>> now. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 5. While I see the point, the app/caller has > >> to > >>> make > >>>>>> two > >>>>>>>>>>> different > >>>>>>>>>>>>>>>> APIs > >>>>>>>>>>>>>>>>>> calls to obtain active/standby and potentially > >>> do the > >>>>>> same > >>>>>>>> set of > >>>>>>>>>>>>>>>>> operation > >>>>>>>>>>>>>>>>>> to query the state. I personally still like a > >>> method > >>>>>> like > >>>>>>>>>>>> isActive() > >>>>>>>>>>>>>>>>>> better, but don't have strong opinions. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 9. If we do expose the lag information, could > >> we > >>> just > >>>>>>>> leave it > >>>>>>>>>>> upto > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> caller to decide whether it errors out or not > >>> and not > >>>>>> make > >>>>>>>> the > >>>>>>>>>>>>>>> decision > >>>>>>>>>>>>>>>>>> within Streams? i.e we don't need a new config > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 14. +1 . If it's easier to do right away. We > >>> started > >>>>>> with > >>>>>>>> number > >>>>>>>>>>> of > >>>>>>>>>>>>>>>>>> records, following the lead from KIP-441 > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Wed, Oct 23, 2019 at 5:44 AM Navinder Brar > >>>>>>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks, everyone for taking a look. Some very > >>> cool > >>>>>> ideas > >>>>>>>> have > >>>>>>>>>>>> flown > >>>>>>>>>>>>>>>> in. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> There was a follow-on idea I POCed to > >>> continuously > >>>>>>>> share lag > >>>>>>>>>>>>>>>>>>> information in the heartbeat protocol+1 that > >>> would > >>>>> be > >>>>>>>> great, I > >>>>>>>>>>>> will > >>>>>>>>>>>>>>>>> update > >>>>>>>>>>>>>>>>>>> the KIP assuming this work will finish soon > >>>>>>>>>>>>>>>>>>>>> I think that adding a new method to > >>>>>>>> StreamsMetadataState and > >>>>>>>>>>>>>>>>>>> deprecating the existing method isthe best way > >>> to > >>>>> go; > >>>>>> we > >>>>>>>> just > >>>>>>>>>>>> can't > >>>>>>>>>>>>>>>>> change > >>>>>>>>>>>>>>>>>>> the return types of any existing methods.+1 on > >>> this, > >>>>>> we > >>>>>>>> will add > >>>>>>>>>>>>>>> new > >>>>>>>>>>>>>>>>>>> methods for users who would be interested in > >>>>> querying > >>>>>>>> back a > >>>>>>>>>>> list > >>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>> possible options to query from and leave the > >>> current > >>>>>>>> function > >>>>>>>>>>>>>>>>>>> getStreamsMetadataForKey() untouched for users > >>> who > >>>>>> want > >>>>>>>> absolute > >>>>>>>>>>>>>>>>>>> consistency. > >>>>>>>>>>>>>>>>>>>>> why not just always return all available > >>> metadata > >>>>>>>> (including > >>>>>>>>>>>>>>>>>>> active/standby or lag) and let the caller > >>> decide to > >>>>>> which > >>>>>>>> node > >>>>>>>>>>>> they > >>>>>>>>>>>>>>>>> want to > >>>>>>>>>>>>>>>>>>> route the query+1. I think this makes sense as > >>> from > >>>>> a > >>>>>> user > >>>>>>>>>>>>>>> standpoint > >>>>>>>>>>>>>>>>> there > >>>>>>>>>>>>>>>>>>> is no difference b/w an active and a standby > >> if > >>> both > >>>>>> have > >>>>>>>> same > >>>>>>>>>>>> lag, > >>>>>>>>>>>>>>>>> Infact > >>>>>>>>>>>>>>>>>>> users would be able to use this API to reduce > >>> query > >>>>>> load > >>>>>>>> on > >>>>>>>>>>>>>>> actives, > >>>>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>>>>> returning all available options along with the > >>>>> current > >>>>>>>> lag in > >>>>>>>>>>> each > >>>>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>>> make sense and leave it to user how they want > >>> to use > >>>>>> this > >>>>>>>> data. > >>>>>>>>>>>>>>> This > >>>>>>>>>>>>>>>>> has > >>>>>>>>>>>>>>>>>>> another added advantage. If a user queries any > >>>>> random > >>>>>>>> machine > >>>>>>>>>>> for > >>>>>>>>>>>> a > >>>>>>>>>>>>>>>>> key and > >>>>>>>>>>>>>>>>>>> that machine has a replica for the > >>> partition(where > >>>>> key > >>>>>>>> belongs) > >>>>>>>>>>>>>>> user > >>>>>>>>>>>>>>>>> might > >>>>>>>>>>>>>>>>>>> choose to serve the data from there itself(if > >> it > >>>>>> doesn’t > >>>>>>>> lag > >>>>>>>>>>> much) > >>>>>>>>>>>>>>>>> rather > >>>>>>>>>>>>>>>>>>> than finding the active and making an IQ to > >>> that. > >>>>> This > >>>>>>>> would > >>>>>>>>>>> save > >>>>>>>>>>>>>>>> some > >>>>>>>>>>>>>>>>>>> critical time in serving for some > >> applications. > >>>>>>>>>>>>>>>>>>>>> Adding the lag in terms of timestamp diff > >>>>> comparing > >>>>>> the > >>>>>>>>>>>>>>> committed > >>>>>>>>>>>>>>>>>>> offset.+1 on this, I think it’s more readable. > >>> But > >>>>> as > >>>>>>>> John said > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> function allMetadataForKey() is just returning > >>> the > >>>>>>>> possible > >>>>>>>>>>>> options > >>>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>> where users can query a key, so we can even > >>> drop the > >>>>>>>> parameter > >>>>>>>>>>>>>>>>>>> enableReplicaServing/tolerableDataStaleness > >> and > >>> just > >>>>>>>> return all > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> streamsMetadata containing that key along with > >>> the > >>>>>> offset > >>>>>>>> limit. > >>>>>>>>>>>>>>>>>>> Answering the questions posted by Matthias in > >>>>>> sequence. > >>>>>>>>>>>>>>>>>>> 1. @John can you please comment on this one.2. > >>> Yeah > >>>>>> the > >>>>>>>> usage > >>>>>>>>>>>>>>> pattern > >>>>>>>>>>>>>>>>>>> would include querying this prior to every > >>> request > >>>>> 3. > >>>>>>>> Will add > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>> changes > >>>>>>>>>>>>>>>>>>> to StreamsMetadata in the KIP, would include > >>> changes > >>>>>> in > >>>>>>>>>>>>>>>>> rebuildMetadata() > >>>>>>>>>>>>>>>>>>> etc.4. Makes sense, already addressed above5. > >>> Is it > >>>>>>>> important > >>>>>>>>>>> from > >>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>> user > >>>>>>>>>>>>>>>>>>> perspective if they are querying an > >>>>>> active(processing), > >>>>>>>>>>>>>>>>> active(restoring), > >>>>>>>>>>>>>>>>>>> a standby task if we have away of denoting lag > >>> in a > >>>>>>>> readable > >>>>>>>>>>>> manner > >>>>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>>> kind of signifies the user that this is the > >> best > >>>>> node > >>>>>> to > >>>>>>>> query > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>> fresh > >>>>>>>>>>>>>>>>>>> data.6. Yes, I intend to return the actives > >> and > >>>>>> replicas > >>>>>>>> in the > >>>>>>>>>>>>>>> same > >>>>>>>>>>>>>>>>> return > >>>>>>>>>>>>>>>>>>> list in allMetadataForKey()7. tricky8. yes, we > >>> need > >>>>>> new > >>>>>>>>>>> functions > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> return > >>>>>>>>>>>>>>>>>>> activeRestoring and standbyRunning tasks.9. > >>>>>> StreamsConfig > >>>>>>>>>>> doesn’t > >>>>>>>>>>>>>>>> look > >>>>>>>>>>>>>>>>> like > >>>>>>>>>>>>>>>>>>> of much use to me since we are giving all > >>> possible > >>>>>>>> options via > >>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>> function, or they can use existing function > >>>>>>>>>>>>>>>> getStreamsMetadataForKey() > >>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>> get just the active10. I think treat them both > >>> the > >>>>>> same > >>>>>>>> and let > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> lag do > >>>>>>>>>>>>>>>>>>> the talking11. We are just sending them the > >>> option > >>>>> to > >>>>>>>> query from > >>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>> allMetadataForKey(), which doesn’t include any > >>>>>> handle. We > >>>>>>>> then > >>>>>>>>>>>>>>> query > >>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>> machine for the key where it calls allStores() > >>> and > >>>>>> tries > >>>>>>>> to find > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> task > >>>>>>>>>>>>>>>>>>> in > >> activeRunning/activeRestoring/standbyRunning > >>> and > >>>>>> adds > >>>>>>>> the > >>>>>>>>>>> store > >>>>>>>>>>>>>>>>> handle > >>>>>>>>>>>>>>>>>>> here. 12. Need to verify, but during the exact > >>> point > >>>>>> when > >>>>>>>> store > >>>>>>>>>>> is > >>>>>>>>>>>>>>>>> closed > >>>>>>>>>>>>>>>>>>> to transition it from restoring to running the > >>>>> queries > >>>>>>>> will > >>>>>>>>>>> fail. > >>>>>>>>>>>>>>> The > >>>>>>>>>>>>>>>>>>> caller in such case can have their own > >>> configurable > >>>>>>>> retries to > >>>>>>>>>>>>>>> check > >>>>>>>>>>>>>>>>> again > >>>>>>>>>>>>>>>>>>> or try the replica if a call fails to > >> active13. > >>> I > >>>>>> think > >>>>>>>> KIP-216 > >>>>>>>>>>> is > >>>>>>>>>>>>>>>>> working > >>>>>>>>>>>>>>>>>>> on those lines, we might not need few of those > >>>>>> exceptions > >>>>>>>> since > >>>>>>>>>>>> now > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> basic idea of this KIP is to support IQ during > >>>>>>>> rebalancing.14. > >>>>>>>>>>>>>>>>> Addressed > >>>>>>>>>>>>>>>>>>> above, agreed it looks more readable. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Tuesday, 22 October, 2019, 08:39:07 pm > >>> IST, > >>>>>>>> Matthias J. > >>>>>>>>>>> Sax > >>>>>>>>>>>>>>> < > >>>>>>>>>>>>>>>>>>> matth...@confluent.io> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> One more thought: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 14) Is specifying the allowed lag in number of > >>>>>> records a > >>>>>>>> useful > >>>>>>>>>>>> way > >>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>> users to declare how stale an instance is > >>> allowed to > >>>>>> be? > >>>>>>>> Would > >>>>>>>>>>> it > >>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>> more intuitive for users to specify the > >> allowed > >>> lag > >>>>> in > >>>>>>>> time > >>>>>>>>>>> units > >>>>>>>>>>>>>>>>> (would > >>>>>>>>>>>>>>>>>>> event time or processing time be better)? It > >>> seems > >>>>>> hard > >>>>>>>> for > >>>>>>>>>>> users > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> reason how "fresh" a store really is when > >>> number of > >>>>>>>> records is > >>>>>>>>>>>>>>> used. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On 10/21/19 9:02 PM, Matthias J. Sax wrote: > >>>>>>>>>>>>>>>>>>>> Some more follow up thoughts: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 11) If we get a store handle of an > >>>>> active(restoring) > >>>>>>>> task, and > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> task > >>>>>>>>>>>>>>>>>>>> transits to running, does the store handle > >>> become > >>>>>>>> invalid and a > >>>>>>>>>>>>>>> new > >>>>>>>>>>>>>>>>> one > >>>>>>>>>>>>>>>>>>>> must be retrieved? Or can we "switch it out" > >>>>>> underneath > >>>>>>>> -- for > >>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>> case, how does the user know when they start > >> to > >>>>>> query the > >>>>>>>>>>>>>>>> up-to-date > >>>>>>>>>>>>>>>>>>> state? > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 12) Standby tasks will have the store open in > >>>>> regular > >>>>>>>> mode, > >>>>>>>>>>> while > >>>>>>>>>>>>>>>>>>>> active(restoring) tasks open stores in > >> "upgrade > >>>>> mode" > >>>>>>>> for more > >>>>>>>>>>>>>>>>> efficient > >>>>>>>>>>>>>>>>>>>> bulk loading. When we switch the store into > >>> active > >>>>>> mode, > >>>>>>>> we > >>>>>>>>>>> close > >>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>> reopen it. What is the impact if we query the > >>> store > >>>>>>>> during > >>>>>>>>>>>>>>> restore? > >>>>>>>>>>>>>>>>> What > >>>>>>>>>>>>>>>>>>>> is the impact if we close the store to > >> transit > >>> to > >>>>>>>> running (eg, > >>>>>>>>>>>>>>>> there > >>>>>>>>>>>>>>>>>>>> might be open iterators)? > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 13) Do we need to introduced new exception > >>> types? > >>>>>> Compare > >>>>>>>>>>> KIP-216 > >>>>>>>>>>>>>>>>>>>> ( > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors > >>>>>>>>>>>>>>>>>>> ) > >>>>>>>>>>>>>>>>>>>> that aims to improve the user experience with > >>>>> regard > >>>>>> to > >>>>>>>> IQ > >>>>>>>>>>>>>>>>> exceptions. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On 10/21/19 6:39 PM, Matthias J. Sax wrote: > >>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Couple of comments: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 1) With regard to KIP-441, my current > >>>>> understanding > >>>>>> is > >>>>>>>> that > >>>>>>>>>>> the > >>>>>>>>>>>>>>>> lag > >>>>>>>>>>>>>>>>>>>>> information is only reported to the leader > >>> (please > >>>>>>>> correct me > >>>>>>>>>>>>>>> if I > >>>>>>>>>>>>>>>>> am > >>>>>>>>>>>>>>>>>>>>> wrong). This seems to be quite a limitation > >> to > >>>>>> actually > >>>>>>>> use > >>>>>>>>>>> the > >>>>>>>>>>>>>>>> lag > >>>>>>>>>>>>>>>>>>>>> information. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 2) The idea of the metadata API is actually > >>> to get > >>>>>>>> metadata > >>>>>>>>>>> once > >>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>> only refresh the metadata if a store was > >>> migrated. > >>>>>> The > >>>>>>>> current > >>>>>>>>>>>>>>>>> proposal > >>>>>>>>>>>>>>>>>>>>> would require to get the metadata before > >> each > >>>>> query. > >>>>>>>> The KIP > >>>>>>>>>>>>>>>> should > >>>>>>>>>>>>>>>>>>>>> describe the usage pattern and impact in > >> more > >>>>>> detail. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 3) Currently, the KIP does not list the > >>> public API > >>>>>>>> changes in > >>>>>>>>>>>>>>>>> detail. > >>>>>>>>>>>>>>>>>>>>> Please list all methods you intend to > >>> deprecate > >>>>> and > >>>>>>>> list all > >>>>>>>>>>>>>>>>> methods you > >>>>>>>>>>>>>>>>>>>>> intend to add (best, using a code-block > >>> markup -- > >>>>>>>> compare > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-470%3A+TopologyTestDriver+test+input+and+output+usability+improvements > >>>>>>>>>>>>>>>>>>>>> as an example) > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 4) Also note (as already pointed out by > >> John), > >>>>> that > >>>>>> we > >>>>>>>> cannot > >>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>> any > >>>>>>>>>>>>>>>>>>>>> breaking API changes. Thus, the API should > >> be > >>>>>> designed > >>>>>>>> in a > >>>>>>>>>>>>>>> fully > >>>>>>>>>>>>>>>>>>>>> backward compatible manner. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 5) Returning a list of metadata object makes > >>> it > >>>>> hard > >>>>>>>> for user > >>>>>>>>>>> to > >>>>>>>>>>>>>>>>> know if > >>>>>>>>>>>>>>>>>>>>> the first object refers to the > >>> active(processing), > >>>>>>>>>>>>>>>>> active(restoring), or > >>>>>>>>>>>>>>>>>>>>> a standby task. IMHO, we should be more > >>> explicit. > >>>>>> For > >>>>>>>>>>> example, a > >>>>>>>>>>>>>>>>>>>>> metadata object could have a flag that one > >> can > >>>>> test > >>>>>> via > >>>>>>>>>>>>>>>>> `#isActive()`. > >>>>>>>>>>>>>>>>>>>>> Or maybe even better, we could keep the > >>> current > >>>>> API > >>>>>>>> as-is and > >>>>>>>>>>>>>>> add > >>>>>>>>>>>>>>>>>>>>> something like `standbyMetadataForKey()` > >> (and > >>>>>> similar > >>>>>>>> methods > >>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>> other). Having just a flag `isActive()` is a > >>>>> little > >>>>>>>> subtle and > >>>>>>>>>>>>>>>>> having > >>>>>>>>>>>>>>>>>>>>> new overloads would make the API much > >> clearer > >>>>>> (passing > >>>>>>>> in a > >>>>>>>>>>>>>>>> boolean > >>>>>>>>>>>>>>>>> flag > >>>>>>>>>>>>>>>>>>>>> does not seem to be a nice API). > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 6) Do you intent to return all standby > >>> metadata > >>>>>>>> information at > >>>>>>>>>>>>>>>> once, > >>>>>>>>>>>>>>>>>>>>> similar to `allMetadata()` -- seems to be > >>> useful. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 7) Even if the lag information is propagated > >>> to > >>>>> all > >>>>>>>> instances, > >>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>>> happen in an async manner. Hence, I am > >>> wondering > >>>>> if > >>>>>> we > >>>>>>>> should > >>>>>>>>>>>>>>>>> address > >>>>>>>>>>>>>>>>>>>>> this race condition (I think we should). The > >>> idea > >>>>>> would > >>>>>>>> be to > >>>>>>>>>>>>>>>> check > >>>>>>>>>>>>>>>>> if a > >>>>>>>>>>>>>>>>>>>>> standby/active(restoring) task is actually > >>> still > >>>>>> within > >>>>>>>> the > >>>>>>>>>>> lag > >>>>>>>>>>>>>>>>> bounds > >>>>>>>>>>>>>>>>>>>>> when a query is executed and we would throw > >> an > >>>>>>>> exception if > >>>>>>>>>>> not. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 8) The current `KafkaStreams#state()` method > >>> only > >>>>>>>> returns a > >>>>>>>>>>>>>>> handle > >>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>> stores of active(processing) tasks. How can > >> a > >>> user > >>>>>>>> actually > >>>>>>>>>>> get > >>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>> handle > >>>>>>>>>>>>>>>>>>>>> to an store of an active(restoring) or > >> standby > >>>>> task > >>>>>> for > >>>>>>>>>>>>>>> querying? > >>>>>>>>>>>>>>>>> Seems > >>>>>>>>>>>>>>>>>>>>> we should add a new method to get standby > >>> handles? > >>>>>>>> Changing > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> semantics to existing `state()` would be > >>> possible, > >>>>>> but > >>>>>>>> I think > >>>>>>>>>>>>>>>>> adding a > >>>>>>>>>>>>>>>>>>>>> new method is preferable? > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 9) How does the user actually specify the > >>>>> acceptable > >>>>>>>> lag? A > >>>>>>>>>>>>>>> global > >>>>>>>>>>>>>>>>>>>>> config via StreamsConfig (this would be a > >>> public > >>>>> API > >>>>>>>> change > >>>>>>>>>>> that > >>>>>>>>>>>>>>>>> needs > >>>>>>>>>>>>>>>>>>>>> to be covered in the KIP)? Or on a per-store > >>> or > >>>>> even > >>>>>>>> per-query > >>>>>>>>>>>>>>>>> basis for > >>>>>>>>>>>>>>>>>>>>> more flexibility? We could also have a > >> global > >>>>>> setting > >>>>>>>> that is > >>>>>>>>>>>>>>> used > >>>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>> default and allow to overwrite it on a > >>> per-query > >>>>>> basis. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 10) Do we need to distinguish between > >>>>>> active(restoring) > >>>>>>>> and > >>>>>>>>>>>>>>>> standby > >>>>>>>>>>>>>>>>>>>>> tasks? Or could be treat both as the same? > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On 10/21/19 5:40 PM, Vinoth Chandar wrote: > >>>>>>>>>>>>>>>>>>>>>>>> I'm wondering, rather than putting > >>> "acceptable > >>>>>> lag" > >>>>>>>> into > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> configuration at all, or even making it a > >>>>>> parameter on > >>>>>>>>>>>>>>>>>>> `allMetadataForKey`, > >>>>>>>>>>>>>>>>>>>>>> why not just _always_ return all available > >>>>> metadata > >>>>>>>>>>> (including > >>>>>>>>>>>>>>>>>>>>>> active/standby or lag) and let the caller > >>> decide > >>>>> to > >>>>>>>> which > >>>>>>>>>>> node > >>>>>>>>>>>>>>>> they > >>>>>>>>>>>>>>>>>>> want to > >>>>>>>>>>>>>>>>>>>>>> route the query? > >>>>>>>>>>>>>>>>>>>>>> +1 on exposing lag information via the > >> APIs. > >>> IMO > >>>>>>>> without > >>>>>>>>>>> having > >>>>>>>>>>>>>>>>>>>>>> continuously updated/fresh lag information, > >>> its > >>>>>> true > >>>>>>>> value > >>>>>>>>>>> as a > >>>>>>>>>>>>>>>>> signal > >>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>> query routing decisions is much limited. > >> But > >>> we > >>>>> can > >>>>>>>> design > >>>>>>>>>>> the > >>>>>>>>>>>>>>>> API > >>>>>>>>>>>>>>>>>>> around > >>>>>>>>>>>>>>>>>>>>>> this model and iterate? Longer term, we > >>> should > >>>>> have > >>>>>>>>>>>>>>> continuously > >>>>>>>>>>>>>>>>>>> shared lag > >>>>>>>>>>>>>>>>>>>>>> information. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> more general to refactor it to > >>>>>>>> "allMetadataForKey(long > >>>>>>>>>>>>>>>>>>>>>> tolerableDataStaleness, ...)", and when > >> it's > >>> set > >>>>>> to 0 > >>>>>>>> it > >>>>>>>>>>> means > >>>>>>>>>>>>>>>>> "active > >>>>>>>>>>>>>>>>>>> task > >>>>>>>>>>>>>>>>>>>>>> only". > >>>>>>>>>>>>>>>>>>>>>> +1 IMO if we plan on having > >>>>>> `enableReplicaServing`, it > >>>>>>>> makes > >>>>>>>>>>>>>>>> sense > >>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>> generalize based on dataStaleness. This > >> seems > >>>>>>>> complementary > >>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> exposing the > >>>>>>>>>>>>>>>>>>>>>> lag information itself. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> This is actually not a public api change > >> at > >>>>> all, > >>>>>> and > >>>>>>>> I'm > >>>>>>>>>>>>>>>>> planning to > >>>>>>>>>>>>>>>>>>>>>> implement it asap as a precursor to the > >> rest > >>> of > >>>>>> KIP-441 > >>>>>>>>>>>>>>>>>>>>>> +1 again. Do we have a concrete timeline > >> for > >>> when > >>>>>> this > >>>>>>>> change > >>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>> land on > >>>>>>>>>>>>>>>>>>>>>> master? I would like to get the > >>> implementation > >>>>>> wrapped > >>>>>>>> up (as > >>>>>>>>>>>>>>>> much > >>>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>> possible) by end of the month. :). But I > >>> agree > >>>>> this > >>>>>>>>>>> sequencing > >>>>>>>>>>>>>>>>> makes > >>>>>>>>>>>>>>>>>>>>>> sense.. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> On Mon, Oct 21, 2019 at 2:56 PM Guozhang > >>> Wang < > >>>>>>>>>>>>>>>> wangg...@gmail.com> > >>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Hi Navinder, > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP, I have a high level > >>> question > >>>>>>>> about the > >>>>>>>>>>>>>>>>> proposed > >>>>>>>>>>>>>>>>>>> API > >>>>>>>>>>>>>>>>>>>>>>> regarding: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>> "StreamsMetadataState::allMetadataForKey(boolean > >>>>>>>>>>>>>>>>>>> enableReplicaServing...)" > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> I'm wondering if it's more general to > >>> refactor > >>>>> it > >>>>>> to > >>>>>>>>>>>>>>>>>>>>>>> "allMetadataForKey(long > >>> tolerableDataStaleness, > >>>>>>>> ...)", and > >>>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>>> it's > >>>>>>>>>>>>>>>>>>> set to > >>>>>>>>>>>>>>>>>>>>>>> 0 it means "active task only". Behind the > >>> scene, > >>>>>> we > >>>>>>>> can have > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> committed > >>>>>>>>>>>>>>>>>>>>>>> offsets to encode the stream time as well, > >>> so > >>>>> that > >>>>>>>> when > >>>>>>>>>>>>>>>> processing > >>>>>>>>>>>>>>>>>>> standby > >>>>>>>>>>>>>>>>>>>>>>> tasks the stream process knows not long > >> the > >>> lag > >>>>> in > >>>>>>>> terms of > >>>>>>>>>>>>>>>>> offsets > >>>>>>>>>>>>>>>>>>>>>>> comparing to the committed offset > >>> (internally we > >>>>>> call > >>>>>>>> it > >>>>>>>>>>>>>>> offset > >>>>>>>>>>>>>>>>>>> limit), but > >>>>>>>>>>>>>>>>>>>>>>> also the lag in terms of timestamp diff > >>>>> comparing > >>>>>> the > >>>>>>>>>>>>>>> committed > >>>>>>>>>>>>>>>>>>> offset. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Also encoding the timestamp as part of > >>> offset > >>>>> have > >>>>>>>> other > >>>>>>>>>>>>>>>> benefits > >>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>> improving Kafka Streams time semantics as > >>> well, > >>>>>> but > >>>>>>>> for > >>>>>>>>>>>>>>> KIP-535 > >>>>>>>>>>>>>>>>>>> itself I > >>>>>>>>>>>>>>>>>>>>>>> think it can help giving users a more > >>> intuitive > >>>>>>>> interface to > >>>>>>>>>>>>>>>>> reason > >>>>>>>>>>>>>>>>>>> about. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> On Mon, Oct 21, 2019 at 12:30 PM John > >>> Roesler < > >>>>>>>>>>>>>>>> j...@confluent.io> > >>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Hey Navinder, > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP! I've been reading > >> over > >>> the > >>>>>>>> discussion > >>>>>>>>>>>>>>> thus > >>>>>>>>>>>>>>>>> far, > >>>>>>>>>>>>>>>>>>>>>>>> and I have a couple of thoughts to pile > >> on > >>> as > >>>>>> well: > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> It seems confusing to propose the API in > >>> terms > >>>>>> of the > >>>>>>>>>>> current > >>>>>>>>>>>>>>>>> system > >>>>>>>>>>>>>>>>>>>>>>>> state, but also propose how the API would > >>> look > >>>>>>>> if/when > >>>>>>>>>>>>>>> KIP-441 > >>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>>> implemented. It occurs to me that the > >> only > >>> part > >>>>>> of > >>>>>>>> KIP-441 > >>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>>>>>>>> affect you is the availability of the lag > >>>>>>>> information in > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> SubscriptionInfo message. This is > >> actually > >>> not > >>>>> a > >>>>>>>> public api > >>>>>>>>>>>>>>>>> change at > >>>>>>>>>>>>>>>>>>>>>>>> all, and I'm planning to implement it > >> asap > >>> as a > >>>>>>>> precursor > >>>>>>>>>>> to > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> rest > >>>>>>>>>>>>>>>>>>>>>>>> of KIP-441, so maybe you can just build > >> on > >>> top > >>>>> of > >>>>>>>> KIP-441 > >>>>>>>>>>> and > >>>>>>>>>>>>>>>>> assume > >>>>>>>>>>>>>>>>>>>>>>>> the lag information will be available. > >>> Then you > >>>>>>>> could have > >>>>>>>>>>> a > >>>>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>>>>>>> straightforward proposal (e.g., mention > >>> that > >>>>>> you'd > >>>>>>>> return > >>>>>>>>>>> the > >>>>>>>>>>>>>>>> lag > >>>>>>>>>>>>>>>>>>>>>>>> information in AssignmentInfo as well as > >>> in the > >>>>>>>>>>>>>>> StreamsMetadata > >>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>> some form, or make use of it in the API > >>>>> somehow). > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> I'm partially motivated in that former > >>> point > >>>>>> because > >>>>>>>> it > >>>>>>>>>>> seems > >>>>>>>>>>>>>>>>> like > >>>>>>>>>>>>>>>>>>>>>>>> understanding how callers would bound the > >>>>>> staleness > >>>>>>>> for > >>>>>>>>>>> their > >>>>>>>>>>>>>>>> use > >>>>>>>>>>>>>>>>>>> case > >>>>>>>>>>>>>>>>>>>>>>>> is _the_ key point for this KIP. FWIW, I > >>> think > >>>>>> that > >>>>>>>> adding > >>>>>>>>>>> a > >>>>>>>>>>>>>>>> new > >>>>>>>>>>>>>>>>>>>>>>>> method to StreamsMetadataState and > >>> deprecating > >>>>>> the > >>>>>>>> existing > >>>>>>>>>>>>>>>>> method is > >>>>>>>>>>>>>>>>>>>>>>>> the best way to go; we just can't change > >>> the > >>>>>> return > >>>>>>>> types > >>>>>>>>>>> of > >>>>>>>>>>>>>>>> any > >>>>>>>>>>>>>>>>>>>>>>>> existing methods. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> I'm wondering, rather than putting > >>> "acceptable > >>>>>> lag" > >>>>>>>> into > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> configuration at all, or even making it a > >>>>>> parameter > >>>>>>>> on > >>>>>>>>>>>>>>>>>>>>>>>> `allMetadataForKey`, why not just > >> _always_ > >>>>>> return all > >>>>>>>>>>>>>>> available > >>>>>>>>>>>>>>>>>>>>>>>> metadata (including active/standby or > >> lag) > >>> and > >>>>>> let > >>>>>>>> the > >>>>>>>>>>> caller > >>>>>>>>>>>>>>>>> decide > >>>>>>>>>>>>>>>>>>>>>>>> to which node they want to route the > >> query? > >>>>> This > >>>>>>>> method > >>>>>>>>>>> isn't > >>>>>>>>>>>>>>>>> making > >>>>>>>>>>>>>>>>>>>>>>>> any queries itself; it's merely telling > >> you > >>>>> where > >>>>>>>> the local > >>>>>>>>>>>>>>>>> Streams > >>>>>>>>>>>>>>>>>>>>>>>> instance _thinks_ the key in question is > >>>>> located. > >>>>>>>> Just > >>>>>>>>>>>>>>>> returning > >>>>>>>>>>>>>>>>> all > >>>>>>>>>>>>>>>>>>>>>>>> available information lets the caller > >>> implement > >>>>>> any > >>>>>>>>>>> semantics > >>>>>>>>>>>>>>>>> they > >>>>>>>>>>>>>>>>>>>>>>>> desire around querying only active > >> stores, > >>> or > >>>>>>>> standbys, or > >>>>>>>>>>>>>>>>> recovering > >>>>>>>>>>>>>>>>>>>>>>>> stores, or whatever. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> One fly in the ointment, which you may > >>> wish to > >>>>>>>> consider if > >>>>>>>>>>>>>>>>> proposing > >>>>>>>>>>>>>>>>>>>>>>>> to use lag information, is that the > >> cluster > >>>>> would > >>>>>>>> only > >>>>>>>>>>> become > >>>>>>>>>>>>>>>>> aware > >>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>> new lag information during rebalances. > >>> Even in > >>>>>> the > >>>>>>>> full > >>>>>>>>>>>>>>>>> expression of > >>>>>>>>>>>>>>>>>>>>>>>> KIP-441, this information would stop > >> being > >>>>>>>> propagated when > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> cluster > >>>>>>>>>>>>>>>>>>>>>>>> achieves a balanced task distribution. > >>> There > >>>>> was > >>>>>> a > >>>>>>>>>>> follow-on > >>>>>>>>>>>>>>>>> idea I > >>>>>>>>>>>>>>>>>>>>>>>> POCed to continuously share lag > >>> information in > >>>>>> the > >>>>>>>>>>> heartbeat > >>>>>>>>>>>>>>>>>>> protocol, > >>>>>>>>>>>>>>>>>>>>>>>> which you might be interested in, if you > >>> want > >>>>> to > >>>>>>>> make sure > >>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>> nodes > >>>>>>>>>>>>>>>>>>>>>>>> are basically _always_ aware of each > >>> others' > >>>>> lag > >>>>>> on > >>>>>>>>>>> different > >>>>>>>>>>>>>>>>>>>>>>>> partitions: > >>>>>>>> https://github.com/apache/kafka/pull/7096 > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Thanks again! > >>>>>>>>>>>>>>>>>>>>>>>> -John > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> On Sat, Oct 19, 2019 at 6:06 AM Navinder > >>> Brar > >>>>>>>>>>>>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Thanks, Vinoth. Looks like we are on the > >>> same > >>>>>> page. > >>>>>>>> I will > >>>>>>>>>>>>>>> add > >>>>>>>>>>>>>>>>> some > >>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>> these explanations to the KIP as well. > >> Have > >>>>>> assigned > >>>>>>>> the > >>>>>>>>>>>>>>>>> KAFKA-6144 > >>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> myself and KAFKA-8994 is closed(by you). > >> As > >>>>>>>> suggested, we > >>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>> replace > >>>>>>>>>>>>>>>>>>>>>>>> "replica" with "standby". > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> In the new API, > >>>>>>>>>>>>>>>> "StreamsMetadataState::allMetadataForKey(boolean > >>>>>>>>>>>>>>>>>>>>>>>> enableReplicaServing, String storeName, K > >>> key, > >>>>>>>>>>> Serializer<K> > >>>>>>>>>>>>>>>>>>>>>>>> keySerializer)" Do we really need a per > >> key > >>>>>>>> configuration? > >>>>>>>>>>>>>>> or a > >>>>>>>>>>>>>>>>> new > >>>>>>>>>>>>>>>>>>>>>>>> StreamsConfig is good enough?>> Coming > >> from > >>>>>>>> experience, > >>>>>>>>>>> when > >>>>>>>>>>>>>>>>> teams > >>>>>>>>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>>>>>>> building a platform with Kafka Streams > >> and > >>>>> these > >>>>>>>> API's > >>>>>>>>>>> serve > >>>>>>>>>>>>>>>>> data to > >>>>>>>>>>>>>>>>>>>>>>>> multiple teams, we can't have a > >> generalized > >>>>>> config > >>>>>>>> that > >>>>>>>>>>> says > >>>>>>>>>>>>>>>> as a > >>>>>>>>>>>>>>>>>>>>>>> platform > >>>>>>>>>>>>>>>>>>>>>>>> we will support stale reads or not. It > >>> should > >>>>> be > >>>>>> the > >>>>>>>> choice > >>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>> someone > >>>>>>>>>>>>>>>>>>>>>>> who > >>>>>>>>>>>>>>>>>>>>>>>> is calling the API's to choose whether > >>> they are > >>>>>> ok > >>>>>>>> with > >>>>>>>>>>> stale > >>>>>>>>>>>>>>>>> reads > >>>>>>>>>>>>>>>>>>> or > >>>>>>>>>>>>>>>>>>>>>>> not. > >>>>>>>>>>>>>>>>>>>>>>>> Makes sense? > >>>>>>>>>>>>>>>>>>>>>>>>> On Thursday, 17 October, 2019, > >>> 11:56:02 pm > >>>>>> IST, > >>>>>>>> Vinoth > >>>>>>>>>>>>>>>>> Chandar < > >>>>>>>>>>>>>>>>>>>>>>>> vchan...@confluent.io> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Looks like we are covering ground :) > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Only if it is within a permissible > >>>>> range(say > >>>>>>>> 10000) we > >>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>> serve > >>>>>>>>>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>>>>>>> Restoring state of active. > >>>>>>>>>>>>>>>>>>>>>>>>> +1 on having a knob like this.. My > >>> reasoning > >>>>> is > >>>>>> as > >>>>>>>>>>> follows. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Looking at the Streams state as a > >>> read-only > >>>>>>>> distributed kv > >>>>>>>>>>>>>>>>> store. > >>>>>>>>>>>>>>>>>>> With > >>>>>>>>>>>>>>>>>>>>>>>>> num_standby = f , we should be able to > >>>>> tolerate > >>>>>> f > >>>>>>>> failures > >>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>>>>> there > >>>>>>>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>>>> a f+1' failure, the system should be > >>>>>> unavailable. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> A) So with num_standby=0, the system > >>> should be > >>>>>>>> unavailable > >>>>>>>>>>>>>>>> even > >>>>>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>>>>>>>>> there > >>>>>>>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>>>> 1 failure and thats my argument for not > >>>>> allowing > >>>>>>>> querying > >>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>> restoration > >>>>>>>>>>>>>>>>>>>>>>>>> state, esp in this case it will be a > >> total > >>>>>> rebuild > >>>>>>>> of the > >>>>>>>>>>>>>>>> state > >>>>>>>>>>>>>>>>>>> (which > >>>>>>>>>>>>>>>>>>>>>>>> IMO > >>>>>>>>>>>>>>>>>>>>>>>>> cannot be considered a normal fault free > >>>>>> operational > >>>>>>>>>>> state). > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> B) Even there are standby's, say > >>>>> num_standby=2, > >>>>>> if > >>>>>>>> the > >>>>>>>>>>> user > >>>>>>>>>>>>>>>>> decides > >>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> shut > >>>>>>>>>>>>>>>>>>>>>>>>> down all 3 instances, then only outcome > >>> should > >>>>>> be > >>>>>>>>>>>>>>>> unavailability > >>>>>>>>>>>>>>>>>>> until > >>>>>>>>>>>>>>>>>>>>>>>> all > >>>>>>>>>>>>>>>>>>>>>>>>> of them come back or state is rebuilt on > >>> other > >>>>>>>> nodes in > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> cluster. In > >>>>>>>>>>>>>>>>>>>>>>>>> normal operations, f <= 2 and when a > >>> failure > >>>>>> does > >>>>>>>> happen > >>>>>>>>>>> we > >>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>> then > >>>>>>>>>>>>>>>>>>>>>>>> either > >>>>>>>>>>>>>>>>>>>>>>>>> choose to be C over A and fail IQs until > >>>>>>>> replication is > >>>>>>>>>>>>>>> fully > >>>>>>>>>>>>>>>>>>> caught up > >>>>>>>>>>>>>>>>>>>>>>>> or > >>>>>>>>>>>>>>>>>>>>>>>>> choose A over C by serving in restoring > >>> state > >>>>> as > >>>>>>>> long as > >>>>>>>>>>> lag > >>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>> minimal. > >>>>>>>>>>>>>>>>>>>>>>>> If > >>>>>>>>>>>>>>>>>>>>>>>>> even with f=1 say, all the standbys are > >>>>> lagging > >>>>>> a > >>>>>>>> lot due > >>>>>>>>>>> to > >>>>>>>>>>>>>>>>> some > >>>>>>>>>>>>>>>>>>>>>>> issue, > >>>>>>>>>>>>>>>>>>>>>>>>> then that should be considered a failure > >>> since > >>>>>> that > >>>>>>>> is > >>>>>>>>>>>>>>>> different > >>>>>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>>>>>>> normal/expected operational mode. > >> Serving > >>>>> reads > >>>>>> with > >>>>>>>>>>>>>>> unbounded > >>>>>>>>>>>>>>>>>>>>>>>> replication > >>>>>>>>>>>>>>>>>>>>>>>>> lag and calling it "available" may not > >> be > >>> very > >>>>>>>> usable or > >>>>>>>>>>>>>>> even > >>>>>>>>>>>>>>>>>>> desirable > >>>>>>>>>>>>>>>>>>>>>>>> :) > >>>>>>>>>>>>>>>>>>>>>>>>> IMHO, since it gives the user no way to > >>> reason > >>>>>>>> about the > >>>>>>>>>>> app > >>>>>>>>>>>>>>>>> that is > >>>>>>>>>>>>>>>>>>>>>>>> going > >>>>>>>>>>>>>>>>>>>>>>>>> to query this store. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> So there is definitely a need to > >>> distinguish > >>>>>>>> between : > >>>>>>>>>>>>>>>>> Replication > >>>>>>>>>>>>>>>>>>>>>>>> catchup > >>>>>>>>>>>>>>>>>>>>>>>>> while being in fault free state vs > >>> Restoration > >>>>>> of > >>>>>>>> state > >>>>>>>>>>> when > >>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>> lose > >>>>>>>>>>>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>>>>>>>> than f standbys. This knob is a great > >>> starting > >>>>>> point > >>>>>>>>>>> towards > >>>>>>>>>>>>>>>>> this. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> If you agree with some of the > >> explanation > >>>>> above, > >>>>>>>> please > >>>>>>>>>>> feel > >>>>>>>>>>>>>>>>> free to > >>>>>>>>>>>>>>>>>>>>>>>>> include it in the KIP as well since this > >>> is > >>>>>> sort of > >>>>>>>> our > >>>>>>>>>>>>>>> design > >>>>>>>>>>>>>>>>>>>>>>> principle > >>>>>>>>>>>>>>>>>>>>>>>>> here.. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Small nits : > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> - let's standardize on "standby" instead > >>> of > >>>>>>>> "replica", KIP > >>>>>>>>>>>>>>> or > >>>>>>>>>>>>>>>>>>> code, to > >>>>>>>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>>> consistent with rest of Streams > >> code/docs? > >>>>>>>>>>>>>>>>>>>>>>>>> - Can we merge KAFKA-8994 into > >> KAFKA-6144 > >>> now > >>>>>> and > >>>>>>>> close > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>> former? > >>>>>>>>>>>>>>>>>>>>>>>>> Eventually need to consolidate > >> KAFKA-6555 > >>> as > >>>>>> well > >>>>>>>>>>>>>>>>>>>>>>>>> - In the new API, > >>>>>>>>>>>>>>>>> "StreamsMetadataState::allMetadataForKey(boolean > >>>>>>>>>>>>>>>>>>>>>>>>> enableReplicaServing, String storeName, > >> K > >>> key, > >>>>>>>>>>> Serializer<K> > >>>>>>>>>>>>>>>>>>>>>>>> keySerializer)" Do > >>>>>>>>>>>>>>>>>>>>>>>>> we really need a per key configuration? > >>> or a > >>>>> new > >>>>>>>>>>>>>>> StreamsConfig > >>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>> good > >>>>>>>>>>>>>>>>>>>>>>>>> enough? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Oct 16, 2019 at 8:31 PM Navinder > >>> Brar > >>>>>>>>>>>>>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> > >> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> @Vinoth, I have incorporated a few of > >> the > >>>>>>>> discussions we > >>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>> had > >>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>> KIP. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> In the current code, t0 and t1 serve > >>> queries > >>>>>> from > >>>>>>>>>>>>>>>>> Active(Running) > >>>>>>>>>>>>>>>>>>>>>>>>>> partition. For case t2, we are planning > >>> to > >>>>>> return > >>>>>>>>>>>>>>>>>>>>>>> List<StreamsMetadata> > >>>>>>>>>>>>>>>>>>>>>>>>>> such that it returns > >> <StreamsMetadata(A), > >>>>>>>>>>>>>>> StreamsMetadata(B)> > >>>>>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>> if IQ > >>>>>>>>>>>>>>>>>>>>>>>>>> fails on A, the replica on B can serve > >>> the > >>>>>> data by > >>>>>>>>>>> enabling > >>>>>>>>>>>>>>>>> serving > >>>>>>>>>>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>>>>>>>> replicas. This still does not solve > >> case > >>> t3 > >>>>>> and t4 > >>>>>>>> since > >>>>>>>>>>> B > >>>>>>>>>>>>>>>> has > >>>>>>>>>>>>>>>>> been > >>>>>>>>>>>>>>>>>>>>>>>>>> promoted to active but it is in > >> Restoring > >>>>>> state to > >>>>>>>>>>> catchup > >>>>>>>>>>>>>>>>> till A’s > >>>>>>>>>>>>>>>>>>>>>>>> last > >>>>>>>>>>>>>>>>>>>>>>>>>> committed position as we don’t serve > >> from > >>>>>>>> Restoring state > >>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>> Active > >>>>>>>>>>>>>>>>>>>>>>>> and new > >>>>>>>>>>>>>>>>>>>>>>>>>> Replica on R is building itself from > >>> scratch. > >>>>>> Both > >>>>>>>> these > >>>>>>>>>>>>>>>> cases > >>>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>>>> solved if we start serving from > >> Restoring > >>>>>> state of > >>>>>>>> active > >>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>> well > >>>>>>>>>>>>>>>>>>>>>>>> since it > >>>>>>>>>>>>>>>>>>>>>>>>>> is almost equivalent to previous > >> Active. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> There could be a case where all > >> replicas > >>> of a > >>>>>>>> partition > >>>>>>>>>>>>>>>> become > >>>>>>>>>>>>>>>>>>>>>>>> unavailable > >>>>>>>>>>>>>>>>>>>>>>>>>> and active and all replicas of that > >>> partition > >>>>>> are > >>>>>>>>>>> building > >>>>>>>>>>>>>>>>>>> themselves > >>>>>>>>>>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>>>>>>>> scratch, in this case, the state in > >>> Active is > >>>>>> far > >>>>>>>> behind > >>>>>>>>>>>>>>> even > >>>>>>>>>>>>>>>>>>> though > >>>>>>>>>>>>>>>>>>>>>>>> it is > >>>>>>>>>>>>>>>>>>>>>>>>>> in Restoring state. To cater to such > >>> cases > >>>>>> that we > >>>>>>>> don’t > >>>>>>>>>>>>>>>> serve > >>>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>>>>> state we can either add another state > >>> before > >>>>>>>> Restoring or > >>>>>>>>>>>>>>>>> check the > >>>>>>>>>>>>>>>>>>>>>>>>>> difference between last committed > >> offset > >>> and > >>>>>>>> current > >>>>>>>>>>>>>>>> position. > >>>>>>>>>>>>>>>>> Only > >>>>>>>>>>>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>>>>>> is within a permissible range (say > >>> 10000) we > >>>>>> will > >>>>>>>> serve > >>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>>>>> Restoring > >>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>> state of Active. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> On Wednesday, 16 October, 2019, > >>> 10:01:35 > >>>>> pm > >>>>>> IST, > >>>>>>>>>>> Vinoth > >>>>>>>>>>>>>>>>> Chandar > >>>>>>>>>>>>>>>>>>> < > >>>>>>>>>>>>>>>>>>>>>>>>>> vchan...@confluent.io> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the updates on the KIP, > >>> Navinder! > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Few comments > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> - AssignmentInfo is not public API?. > >> But > >>> we > >>>>>> will > >>>>>>>> change > >>>>>>>>>>> it > >>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>> thus > >>>>>>>>>>>>>>>>>>>>>>>> need to > >>>>>>>>>>>>>>>>>>>>>>>>>> increment the version and test for > >>>>>> version_probing > >>>>>>>> etc. > >>>>>>>>>>>>>>> Good > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> separate > >>>>>>>>>>>>>>>>>>>>>>>>>> that from StreamsMetadata changes > >> (which > >>> is > >>>>>> public > >>>>>>>> API) > >>>>>>>>>>>>>>>>>>>>>>>>>> - From what I see, there is going to be > >>>>> choice > >>>>>>>> between > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> following > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> A) introducing a new > >>>>>>>> *KafkaStreams::allMetadataForKey() > >>>>>>>>>>>>>>> *API > >>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>>>> potentially returns > >> List<StreamsMetadata> > >>>>>> ordered > >>>>>>>> from > >>>>>>>>>>> most > >>>>>>>>>>>>>>>>> upto > >>>>>>>>>>>>>>>>>>> date > >>>>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>>>> least upto date replicas. Today we > >> cannot > >>>>> fully > >>>>>>>> implement > >>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>>> ordering, > >>>>>>>>>>>>>>>>>>>>>>>>>> since all we know is which hosts are > >>> active > >>>>> and > >>>>>>>> which are > >>>>>>>>>>>>>>>>> standbys. > >>>>>>>>>>>>>>>>>>>>>>>>>> However, this aligns well with the > >>> future. > >>>>>> KIP-441 > >>>>>>>> adds > >>>>>>>>>>> the > >>>>>>>>>>>>>>>> lag > >>>>>>>>>>>>>>>>>>>>>>>> information > >>>>>>>>>>>>>>>>>>>>>>>>>> to the rebalancing protocol. We could > >>> also > >>>>> sort > >>>>>>>> replicas > >>>>>>>>>>>>>>>> based > >>>>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>> report lags eventually. This is fully > >>>>> backwards > >>>>>>>>>>> compatible > >>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>>>> existing > >>>>>>>>>>>>>>>>>>>>>>>>>> clients. Only drawback I see is the > >>> naming of > >>>>>> the > >>>>>>>>>>> existing > >>>>>>>>>>>>>>>>> method > >>>>>>>>>>>>>>>>>>>>>>>>>> KafkaStreams::metadataForKey, not > >>> conveying > >>>>> the > >>>>>>>>>>> distinction > >>>>>>>>>>>>>>>>> that it > >>>>>>>>>>>>>>>>>>>>>>>> simply > >>>>>>>>>>>>>>>>>>>>>>>>>> returns the active replica i.e > >>>>>>>> allMetadataForKey.get(0). > >>>>>>>>>>>>>>>>>>>>>>>>>> B) Change > >>> KafkaStreams::metadataForKey() to > >>>>>>>> return a > >>>>>>>>>>> List. > >>>>>>>>>>>>>>>>> Its a > >>>>>>>>>>>>>>>>>>>>>>>> breaking > >>>>>>>>>>>>>>>>>>>>>>>>>> change. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> I prefer A, since none of the > >>>>>> semantics/behavior > >>>>>>>> changes > >>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>> existing > >>>>>>>>>>>>>>>>>>>>>>>>>> users. Love to hear more thoughts. Can > >> we > >>>>> also > >>>>>>>> work this > >>>>>>>>>>>>>>> into > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> KIP? > >>>>>>>>>>>>>>>>>>>>>>>>>> I already implemented A to unblock > >>> myself for > >>>>>> now. > >>>>>>>> Seems > >>>>>>>>>>>>>>>>> feasible > >>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> do. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Oct 15, 2019 at 12:21 PM Vinoth > >>>>>> Chandar < > >>>>>>>>>>>>>>>>>>>>>>> vchan...@confluent.io > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I get your point. But suppose there > >>> is a > >>>>>>>> replica which > >>>>>>>>>>>>>>> has > >>>>>>>>>>>>>>>>> just > >>>>>>>>>>>>>>>>>>>>>>>> become > >>>>>>>>>>>>>>>>>>>>>>>>>>> active, so in that case replica will > >>> still > >>>>> be > >>>>>>>> building > >>>>>>>>>>>>>>>> itself > >>>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>>>>>>>> scratch > >>>>>>>>>>>>>>>>>>>>>>>>>>> and this active will go to restoring > >>> state > >>>>>> till it > >>>>>>>>>>> catches > >>>>>>>>>>>>>>>> up > >>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>>>>>> previous > >>>>>>>>>>>>>>>>>>>>>>>>>>> active, wouldn't serving from a > >>> restoring > >>>>>> active > >>>>>>>> make > >>>>>>>>>>> more > >>>>>>>>>>>>>>>>> sense > >>>>>>>>>>>>>>>>>>>>>>>> than a > >>>>>>>>>>>>>>>>>>>>>>>>>>> replica in such case. > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> KIP-441 will change this behavior such > >>> that > >>>>>>>> promotion to > >>>>>>>>>>>>>>>>> active > >>>>>>>>>>>>>>>>>>>>>>>> happens > >>>>>>>>>>>>>>>>>>>>>>>>>>> based on how caught up a replica is. > >> So, > >>>>> once > >>>>>> we > >>>>>>>> have > >>>>>>>>>>> that > >>>>>>>>>>>>>>>>> (work > >>>>>>>>>>>>>>>>>>>>>>>> underway > >>>>>>>>>>>>>>>>>>>>>>>>>>> already for 2.5 IIUC) and user sets > >>>>>>>>>>> num.standby.replicas > > >>>>>>>>>>>>>>>> 0, > >>>>>>>>>>>>>>>>> then > >>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>> staleness window should not be that > >>> long as > >>>>>> you > >>>>>>>>>>> describe. > >>>>>>>>>>>>>>>> IMO > >>>>>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>>>>>>>>> user > >>>>>>>>>>>>>>>>>>>>>>>>>> wants > >>>>>>>>>>>>>>>>>>>>>>>>>>> availability for state, then should > >>>>> configure > >>>>>>>>>>>>>>>>> num.standby.replicas > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 0. > >>>>>>>>>>>>>>>>>>>>>>>>>> If > >>>>>>>>>>>>>>>>>>>>>>>>>>> not, then on a node loss, few > >> partitions > >>>>>> would be > >>>>>>>>>>>>>>>> unavailable > >>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>>>> while > >>>>>>>>>>>>>>>>>>>>>>>>>>> (there are other ways to bring this > >>> window > >>>>>> down, > >>>>>>>> which I > >>>>>>>>>>>>>>>> won't > >>>>>>>>>>>>>>>>>>>>>>> bring > >>>>>>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>>>>> here). We could argue for querying a > >>>>> restoring > >>>>>>>> active > >>>>>>>>>>>>>>> (say a > >>>>>>>>>>>>>>>>> new > >>>>>>>>>>>>>>>>>>>>>>> node > >>>>>>>>>>>>>>>>>>>>>>>>>> added > >>>>>>>>>>>>>>>>>>>>>>>>>>> to replace a faulty old node) based on > >>> AP vs > >>>>>> CP > >>>>>>>>>>>>>>> principles. > >>>>>>>>>>>>>>>>> But > >>>>>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>>>>>> sure > >>>>>>>>>>>>>>>>>>>>>>>>>>> reading really really old values for > >> the > >>>>> sake > >>>>>> of > >>>>>>>>>>>>>>>> availability > >>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>>> useful. > >>>>>>>>>>>>>>>>>>>>>>>>>> No > >>>>>>>>>>>>>>>>>>>>>>>>>>> AP data system would be inconsistent > >> for > >>>>> such > >>>>>> a > >>>>>>>> long > >>>>>>>>>>> time > >>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>> practice. > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> So, I still feel just limiting this to > >>>>> standby > >>>>>>>> reads > >>>>>>>>>>>>>>>> provides > >>>>>>>>>>>>>>>>> best > >>>>>>>>>>>>>>>>>>>>>>>>>>> semantics. > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Just my 2c. Would love to see what > >>> others > >>>>>> think > >>>>>>>> as well. > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Oct 15, 2019 at 5:34 AM > >> Navinder > >>>>> Brar > >>>>>>>>>>>>>>>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> > >>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Vinoth, > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback. > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Can we link the JIRA, discussion > >>> thread > >>>>>> also to > >>>>>>>> the > >>>>>>>>>>>>>>> KIP.>> > >>>>>>>>>>>>>>>>>>> Added. > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Based on the discussion on > >> KAFKA-6144, > >>> I > >>>>> was > >>>>>>>> under the > >>>>>>>>>>>>>>>>> impression > >>>>>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>>>>>> this KIP is also going to cover > >>> exposing of > >>>>>> the > >>>>>>>> standby > >>>>>>>>>>>>>>>>>>>>>>> information > >>>>>>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>>>>>> StreamsMetadata and thus subsume > >>>>> KAFKA-8994 . > >>>>>>>> That > >>>>>>>>>>> would > >>>>>>>>>>>>>>>>> require > >>>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>>>>>> public > >>>>>>>>>>>>>>>>>>>>>>>>>>>> API change?>> Sure, I can add changes > >>> for > >>>>>> 8994 > >>>>>>>> in this > >>>>>>>>>>>>>>> KIP > >>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>> link > >>>>>>>>>>>>>>>>>>>>>>>>>>>> KAFKA-6144 to KAFKA-8994 as well. > >>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP seems to be focussing on > >>> restoration > >>>>>> when a > >>>>>>>> new > >>>>>>>>>>> node > >>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>> added. > >>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP-441 is underway and has some > >> major > >>>>>> changes > >>>>>>>> proposed > >>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>> this. > >>>>>>>>>>>>>>>>>>>>>>> It > >>>>>>>>>>>>>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>>>>>>>>>>>> be good to clarify dependencies if > >> any. > >>>>>> Without > >>>>>>>>>>> KIP-441, > >>>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>> am not > >>>>>>>>>>>>>>>>>>>>>>>> very > >>>>>>>>>>>>>>>>>>>>>>>>>> sure > >>>>>>>>>>>>>>>>>>>>>>>>>>>> if we should allow reads from nodes > >> in > >>>>>> RESTORING > >>>>>>>> state, > >>>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>>>>>>>>>> amount > >>>>>>>>>>>>>>>>>>>>>>>>>>>> to many minutes/few hours of stale > >>> reads? > >>>>>> This > >>>>>>>> is > >>>>>>>>>>>>>>>> different > >>>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>>>>>>>> allowing > >>>>>>>>>>>>>>>>>>>>>>>>>>>> querying standby replicas, which > >> could > >>> be > >>>>>> mostly > >>>>>>>> caught > >>>>>>>>>>>>>>> up > >>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>> staleness window could be much > >>>>>>>> smaller/tolerable. (once > >>>>>>>>>>>>>>>>> again the > >>>>>>>>>>>>>>>>>>>>>>>> focus > >>>>>>>>>>>>>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>>>>>>>>>>>>> KAFKA-8994).>> I get your point. But > >>>>> suppose > >>>>>>>> there is a > >>>>>>>>>>>>>>>>> replica > >>>>>>>>>>>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>>>>>>>>>> has > >>>>>>>>>>>>>>>>>>>>>>>>>>>> just become active, so in that case > >>> replica > >>>>>> will > >>>>>>>> still > >>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>> building > >>>>>>>>>>>>>>>>>>>>>>>>>> itself > >>>>>>>>>>>>>>>>>>>>>>>>>>>> from scratch and this active will go > >> to > >>>>>>>> restoring state > >>>>>>>>>>>>>>>> till > >>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>>>> catches > >>>>>>>>>>>>>>>>>>>>>>>>>> up > >>>>>>>>>>>>>>>>>>>>>>>>>>>> with previous active, wouldn't > >> serving > >>>>> from a > >>>>>>>> restoring > >>>>>>>>>>>>>>>>> active > >>>>>>>>>>>>>>>>>>>>>>> make > >>>>>>>>>>>>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>>>>>>>>>>> sense than a replica in such case. > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Finally, we may need to introduce a > >>>>>>>> configuration to > >>>>>>>>>>>>>>>> control > >>>>>>>>>>>>>>>>>>> this. > >>>>>>>>>>>>>>>>>>>>>>>> Some > >>>>>>>>>>>>>>>>>>>>>>>>>>>> users may prefer errors to stale > >> data. > >>> Can > >>>>> we > >>>>>>>> also add > >>>>>>>>>>> it > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> KIP?>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Will add this. > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Navinder > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> On2019/10/14 16:56:49, Vinoth > >> Chandar < > >>>>>>>>>>> v...@confluent.io > >>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Navinder,> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for sharing the KIP! Few > >>> thoughts> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Can we link the JIRA, discussion > >>> thread > >>>>>> also > >>>>>>>> to the > >>>>>>>>>>>>>>> KIP> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Based on the discussion on > >>> KAFKA-6144, I > >>>>>> was > >>>>>>>> under > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> impression > >>>>>>>>>>>>>>>>>>>>>>>>>>>> that> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> this KIP is also going to cover > >>> exposing > >>>>> of > >>>>>> the > >>>>>>>>>>> standby > >>>>>>>>>>>>>>>>>>>>>>>> information in> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> StreamsMetadata and thus subsume > >>>>> KAFKA-8994 > >>>>>> . > >>>>>>>> That > >>>>>>>>>>> would > >>>>>>>>>>>>>>>>> require > >>>>>>>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>>>>>>>> public> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> API change?> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> - KIP seems to be focussing on > >>> restoration > >>>>>> when > >>>>>>>> a new > >>>>>>>>>>>>>>> node > >>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>>> added.> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP-441 is underway and has some > >> major > >>>>>> changes > >>>>>>>>>>> proposed > >>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>> this. > >>>>>>>>>>>>>>>>>>>>>>>> It > >>>>>>>>>>>>>>>>>>>>>>>>>>>> would> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> be good to clarify dependencies if > >>> any. > >>>>>> Without > >>>>>>>>>>>>>>> KIP-441, I > >>>>>>>>>>>>>>>>> am > >>>>>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>>>>>> very > >>>>>>>>>>>>>>>>>>>>>>>>>>>> sure> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> if we should allow reads from nodes > >> in > >>>>>> RESTORING > >>>>>>>>>>> state, > >>>>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>>>>>>>>>>>> amount> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> to many minutes/few hours of stale > >>> reads? > >>>>>> This > >>>>>>>> is > >>>>>>>>>>>>>>>> different > >>>>>>>>>>>>>>>>>>>>>>>>>>>> fromallowing> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> querying standby replicas, which > >>> could be > >>>>>> mostly > >>>>>>>>>>> caught > >>>>>>>>>>>>>>> up > >>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>> the> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> staleness window could be much > >>>>>>>> smaller/tolerable. > >>>>>>>>>>> (once > >>>>>>>>>>>>>>>>> again > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> focus > >>>>>>>>>>>>>>>>>>>>>>>>>>>> on> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> KAFKA-8994)> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Finally, we may need to introduce > >> a > >>>>>>>> configuration to > >>>>>>>>>>>>>>>>> control > >>>>>>>>>>>>>>>>>>>>>>>> this. > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Some> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> users may prefer errors to stale > >>> data. Can > >>>>>> we > >>>>>>>> also add > >>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>> to the > >>>>>>>>>>>>>>>>>>>>>>>> KIP?> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Vinoth> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Sun, Oct 13, 2019 at 3:31 PM > >>> Navinder > >>>>>> Brar> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> <na...@yahoo.com.invalid>wrote:> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Starting a discussion on the KIP to > >>> Allow > >>>>>> state > >>>>>>>>>>> stores > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> serve > >>>>>>>>>>>>>>>>>>>>>>>>>> stale> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reads during rebalance(> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ).> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks & Regards,Navinder> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LinkedIn> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>>>>>>> -- Guozhang > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>> -- Guozhang > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >>> > >>> > >> > > > >