Hi Navinder, 

Thanks for the ping. Yes, that all sounds right to me. The name 
“RESTORING_GLOBAL” sounds fine, too. 

I think as far as warnings go, we’d just propose to mention it in the javadoc 
of the relevant methods that the given topics should be compacted. 

Thanks!
-John

On Fri, Aug 28, 2020, at 12:42, Navinder Brar wrote:
> Gentle ping.
> 
> ~ Navinder
>     On Wednesday, 19 August, 2020, 06:59:58 pm IST, Navinder Brar 
> <navinder_b...@yahoo.com.invalid> wrote:  
>  
>   
> Thanks Matthias & John, 
> 
> 
> 
> I am glad we are converging towards an understanding. So, to summarize, 
> 
> we will still keep treating this change in KIP and instead of providing a 
> reset
> 
> strategy, we will cleanup, and reset to earliest and build the state. 
> 
> When we hit the exception and we are building the state, we will stop all 
> 
> processing and change the state of KafkaStreams to something like 
> 
> “RESTORING_GLOBAL” or the like. 
> 
> 
> 
> How do we plan to educate users on the non desired effects of using 
> 
> non-compacted global topics? (via the KIP itself?)
> 
> 
> +1 on changing the KTable behavior, reset policy for global, connecting 
> processors to global for a later stage when demanded.
> 
> Regards,
> Navinder
>     On Wednesday, 19 August, 2020, 01:00:58 pm IST, Matthias J. Sax 
> <mj...@apache.org> wrote:  
>  
>  Your observation is correct. Connecting (regular) stores to processors
> is necessary to "merge" sub-topologies into single ones if a store is
> shared. -- For global stores, the structure of the program does not
> change and thus connecting srocessors to global stores is not required.
> 
> Also given our experience with restoring regular state stores (ie,
> partial processing of task that don't need restore), it seems better to
> pause processing and move all CPU and network resources to the global
> thread to rebuild the global store as soon as possible instead of
> potentially slowing down the restore in order to make progress on some
> tasks.
> 
> Of course, if we collect real world experience and it becomes an issue,
> we could still try to change it?
> 
> 
> -Matthias
> 
> 
> On 8/18/20 3:31 PM, John Roesler wrote:
> > Thanks Matthias,
> > 
> > Sounds good. I'm on board with no public API change and just
> > recovering instead of crashing.
> > 
> > Also, to be clear, I wouldn't drag KTables into it; I was
> > just trying to wrap my head around the congruity of our
> > choice for GlobalKTable with respect to KTable.
> > 
> > I agree that whatever we decide to do would probably also
> > resolve KAFKA-7380.
> > 
> > Moving on to discuss the behavior change, I'm wondering if
> > we really need to block all the StreamThreads. It seems like
> > we only need to prevent processing on any task that's
> > connected to the GlobalStore. 
> > 
> > I just took a look at the topology building code, and it
> > actually seems that connections to global stores don't need
> > to be declared. That's a bummer, since it means that we
> > really do have to stop all processing while the global
> > thread catches up.
> > 
> > Changing this seems like it'd be out of scope right now, but
> > I bring it up in case I'm wrong and it actually is possible
> > to know which specific tasks need to be synchronized with
> > which global state stores. If we could know that, then we'd
> > only have to block some of the tasks, not all of the
> > threads.
> > 
> > Thanks,
> > -John
> > 
> > 
> > On Tue, 2020-08-18 at 14:10 -0700, Matthias J. Sax wrote:
> >> Thanks for the discussion.
> >>
> >> I agree that this KIP is justified in any case -- even if we don't
> >> change public API, as the change in behavior is significant.
> >>
> >> A better documentation for cleanup policy is always good (even if I am
> >> not aware of any concrete complaints atm that users were not aware of
> >> the implications). Of course, for a regular KTable, one can
> >> enable/disable the source-topic-changelog optimization and thus can use
> >> a non-compacted topic for this case, what is quite a difference to
> >> global stores/tables; so maybe it's worth to point out this difference
> >> explicitly.
> >>
> >> As mentioned before, the main purpose of the original Jira was to avoid
> >> the crash situation but to allow for auto-recovering while it was an
> >> open question if it makes sense / would be useful to allow users to
> >> specify a custom reset policy instead of using a hard-coded "earliest"
> >> strategy. -- It seem it's still unclear if it would be useful and thus
> >> it might be best to not add it for now -- we can still add it later if
> >> there are concrete use-cases that need this feature.
> >>
> >> @John: I actually agree that it's also questionable to allow a custom
> >> reset policy for KTables... Not sure if we want to drag this question
> >> into this KIP though?
> >>
> >> So it seem, we all agree that we actually don't need any public API
> >> changes, but we only want to avoid crashing?
> >>
> >> For this case, to preserve the current behavior that guarantees that the
> >> global store/table is always loaded first, it seems we need to have a
> >> stop-the-world mechanism for the main `StreamThreads` for this case --
> >> do we need to add a new state to KafkaStreams client for this case?
> >>
> >> Having a new state might also be helpful for
> >> https://issues.apache.org/jira/browse/KAFKA-7380 ?
> >>
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >>
> >> On 8/17/20 7:34 AM, John Roesler wrote:
> >>> Hi Navinder,
> >>>
> >>> I see what you mean about the global consumer being similar
> >>> to the restore consumer.
> >>>
> >>> I also agree that automatically performing the recovery
> >>> steps should be strictly an improvement over the current
> >>> situation.
> >>>
> >>> Also, yes, it would be a good idea to make it clear that the
> >>> global topic should be compacted in order to ensure correct
> >>> semantics. It's the same way with input topics for KTables;
> >>> we rely on users to ensure the topics are compacted, and if
> >>> they aren't, then the execution semantics will be broken.
> >>>
> >>> Thanks,
> >>> -John
> >>>
> >>> On Sun, 2020-08-16 at 11:44 +0000, Navinder Brar wrote:
> >>>> Hi John,
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> Thanks for your inputs. Since, global topics are in a way their own 
> >>>> changelog, wouldn’t the global consumers be more akin to restore 
> >>>> consumers than the main consumer? 
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> I am also +1 on catching the exception and setting it to the earliest 
> >>>> for now. Whenever an instance starts, currently global stream thread(if 
> >>>> available) goes to RUNNING before stream threads are started so that 
> >>>> means the global state is available when the processing by stream 
> >>>> threads start. So, with the new change of catching the exception, 
> >>>> cleaning store and resetting to earlier would probably be “stop the 
> >>>> world” as you said John, as I think we will have to pause the stream 
> >>>> threads till the whole global state is recovered. I assume it is "stop 
> >>>> the world" right now as well, since now also if an 
> >>>> InvalidOffsetException comes, we throw streams exception and the user 
> >>>> has to clean up and handle all this manually and when that instance will 
> >>>> start, it will restore global state first.
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> I had an additional thought to this whole problem, would it be helpful 
> >>>> to educate the users that global topics should have cleanup policy as 
> >>>> compact, so that this invalid offset exception never arises for them. 
> >>>> Assume for example, that the cleanup policy in global topic is "delete" 
> >>>> and it has deleted k1, k2 keys(via retention.ms) although all the 
> >>>> instances had already consumed them so they are in all global stores and 
> >>>> all other instances are up to date on the global data(so no 
> >>>> InvalidOffsetException). Now, a new instance is added to the cluster, 
> >>>> and we have already lost k1, k2 from the global topic so it will start 
> >>>> consuming from the earliest point in the global topic. So, wouldn’t this 
> >>>> global store on the new instance has 2 keys less than all the other 
> >>>> global stores already available in the cluster? Please let me know if I 
> >>>> am missing something. Thanks.
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> Regards,
> >>>>
> >>>> Navinder
> >>>>
> >>>>
> >>>>    On Friday, 14 August, 2020, 10:03:42 am IST, John Roesler 
> >>>><vvcep...@apache.org> wrote:  
> >>>>  
> >>>>  Hi all,
> >>>>
> >>>> It seems like the main motivation for this proposal is satisfied if we 
> >>>> just implement some recovery mechanism instead of crashing. If the 
> >>>> mechanism is going to be pausing all the threads until the state is 
> >>>> recovered, then it still seems like a big enough behavior change to 
> >>>> warrant a KIP still. 
> >>>>
> >>>> I have to confess I’m a little unclear on why a custom reset policy for 
> >>>> a global store, table, or even consumer might be considered wrong. It’s 
> >>>> clearly wrong for the restore consumer, but the global consumer seems 
> >>>> more semantically akin to the main consumer than the restore consumer. 
> >>>>
> >>>> In other words, if it’s wrong to reset a GlobalKTable from latest, 
> >>>> shouldn’t it also be wrong for a KTable, for exactly the same reason? It 
> >>>> certainly seems like it would be an odd choice, but I’ve seen many 
> >>>> choices I thought were odd turn out to have perfectly reasonable use 
> >>>> cases. 
> >>>>
> >>>> As far as the PAPI global store goes, I could see adding the option to 
> >>>> configure it, since as Matthias pointed out, there’s really no specific 
> >>>> semantics for the PAPI. But if automatic recovery is really all Navinder 
> >>>> wanted, the I could also see deferring this until someone specifically 
> >>>> wants it.
> >>>>
> >>>> So the tl;dr is, if we just want to catch the exception and rebuild the 
> >>>> store by seeking to earliest with no config or API changes, then I’m +1.
> >>>>
> >>>> I’m wondering if we can improve on the “stop the world” effect of 
> >>>> rebuilding the global store, though. It seems like we could put our 
> >>>> heads together and come up with a more fine-grained approach to 
> >>>> maintaining the right semantics during recovery while still making some 
> >>>> progress.  
> >>>>
> >>>> Thanks,
> >>>> John
> >>>>
> >>>>
> >>>> On Sun, Aug 9, 2020, at 02:04, Navinder Brar wrote:
> >>>>> Hi Matthias,
> >>>>>
> >>>>> IMHO, now as you explained using ‘global.consumer.auto.offset.reset’ is 
> >>>>> not as straightforward 
> >>>>> as it seems and it might change the existing behavior for users without 
> >>>>> they releasing it, I also 
> >>>>>
> >>>>> think that we should change the behavior inside global stream thread to 
> >>>>> not die on 
> >>>>>
> >>>>> InvalidOffsetException and instead clean and rebuild the state from the 
> >>>>> earliest. On this, as you 
> >>>>>
> >>>>> mentioned that we would need to pause the stream threads till the 
> >>>>> global store is completely restored. 
> >>>>>
> >>>>> Without it, there will be incorrect processing results if they are 
> >>>>> utilizing a global store during processing. 
> >>>>>
> >>>>>
> >>>>>
> >>>>> So, basically we can divide the use-cases into 4 parts.
> >>>>>    
> >>>>>    - PAPI based global stores (will have the earliest hardcoded)
> >>>>>    - PAPI based state stores (already has auto.reset.config)
> >>>>>    - DSL based GlobalKTables (will have earliest hardcoded)
> >>>>>    - DSL based KTables (will continue with auto.reset.config)
> >>>>>
> >>>>>
> >>>>>
> >>>>> So, this would mean that we are not changing any existing behaviors 
> >>>>> with this if I am right.
> >>>>>
> >>>>>
> >>>>>
> >>>>> I guess we could improve the code to actually log a warning for this
> >>>>>
> >>>>> case, similar to what we do for some configs already (cf
> >>>>>
> >>>>> StreamsConfig#NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS).
> >>>>>
> >>>>>>> I like this idea. In case we go ahead with the above approach and if 
> >>>>>>> we can’t 
> >>>>>
> >>>>> deprecate it, we should educate users that this config doesn’t work.
> >>>>>
> >>>>>
> >>>>>
> >>>>> Looking forward to hearing thoughts from others as well.
> >>>>>  
> >>>>>
> >>>>> - Navinder    On Tuesday, 4 August, 2020, 05:07:59 am IST, Matthias J. 
> >>>>> Sax <mj...@apache.org> wrote:  
> >>>>>  
> >>>>>  Navinder,
> >>>>>
> >>>>> thanks for updating the KIP. I think the motivation section is not
> >>>>> totally accurate (what is not your fault though, as the history of how
> >>>>> we handle this case is intertwined...) For example, "auto.offset.reset"
> >>>>> is hard-coded for the global consumer to "none" and using
> >>>>> "global.consumer.auto.offset.reset" has no effect (cf
> >>>>> https://kafka.apache.org/25/documentation/streams/developer-guide/config-streams.html#default-values)
> >>>>>
> >>>>> Also, we could not even really deprecate the config as mentioned in
> >>>>> rejected alternatives sections, because we need `auto.offset.reset` for
> >>>>> the main consumer -- and adding a prefix is independent of it. Also,
> >>>>> because we ignore the config, it's is also deprecated/removed if you 
> >>>>> wish.
> >>>>>
> >>>>> I guess we could improve the code to actually log a warning for this
> >>>>> case, similar to what we do for some configs already (cf
> >>>>> StreamsConfig#NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS).
> >>>>>
> >>>>>
> >>>>> The other question is about compatibility with regard to default
> >>>>> behavior: if we want to reintroduce `global.consumer.auto.offset.reset`
> >>>>> this basically implies that we need to respect `auto.offset.reset`, too.
> >>>>> Remember, that any config without prefix is applied to all clients that
> >>>>> support this config. Thus, if a user does not limit the scope of the
> >>>>> config to the main consumer (via `main.consumer.auto.offset.reset`) but
> >>>>> uses the non-prefix versions and sets it to "latest" (and relies on the
> >>>>> current behavior that `auto.offset.reset` is "none", and effectively
> >>>>> "earliest" on the global consumer), the user might end up with a
> >>>>> surprise as the global consumer behavior would switch from "earliest" to
> >>>>> "latest" (most likely unintentionally). Bottom line is, that users might
> >>>>> need to change configs to preserve the old behavior...
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> However, before we discuss those details, I think we should discuss the
> >>>>> topic in a broader context first:
> >>>>>
> >>>>>  - for a GlobalKTable, does it even make sense from a correctness point
> >>>>> of view, to allow users to set a custom reset policy? It seems you
> >>>>> currently don't propose this in the KIP, but as you don't mention it
> >>>>> explicitly it's unclear if that on purpose of an oversight?
> >>>>>
> >>>>>  - Should we treat global stores differently to GlobalKTables and allow
> >>>>> for more flexibility (as the PAPI does not really provide any semantic
> >>>>> contract). It seems that is what you propose in the KIP. We should
> >>>>> discuss if this flexibility does make sense or not for the PAPI, or if
> >>>>> we should apply the same reasoning about correctness we use for KTables
> >>>>> to global stores? To what extend are/should they be different?
> >>>>>
> >>>>>  - If we support auto.offset.reset for global store, how should we
> >>>>> handle the initial bootstrapping of the store/table (that is hard-coded
> >>>>> atm)? Should we skip it if the policy is "latest" and start with an
> >>>>> empty state? Note that we did consider this behavior incorrect via
> >>>>> https://issues.apache.org/jira/browse/KAFKA-6121 and thus I am wondering
> >>>>> why should we change it back again?
> >>>>>
> >>>>>
> >>>>> Finally, the main motivation for the Jira ticket was to let the runtime
> >>>>> auto-recover instead of dying as it does currently. If we decide that a
> >>>>> custom reset policy does actually not make sense, we can just change the
> >>>>> global-thread to not die any longer on an `InvalidOffsetException` but
> >>>>> rebuild the state automatically. This would be "only" a behavior change
> >>>>> but does not require any public API changes. -- For this case, we should
> >>>>> also think about the synchronization with the main processing threads?
> >>>>> On startup we bootstrap the global stores before processing happens.
> >>>>> Thus, if an `InvalidOffsetException` happen and the global thread dies,
> >>>>> the main threads cannot access the global stores any longer an also die.
> >>>>> If we re-build the state though, do we need to pause the main thread
> >>>>> during this phase?
> >>>>>
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>>
> >>>>>
> >>>>> On 8/2/20 8:48 AM, Navinder Brar wrote:
> >>>>>> Hi John,
> >>>>>>
> >>>>>> I have updated the KIP to make the motivation more clear. In a 
> >>>>>> nutshell, we will use the already existing config 
> >>>>>> "global.consumer.auto.offset.reset" for users to set a blanket reset 
> >>>>>> policy for all global topics and add a new interface to set per-topic 
> >>>>>> reset policy for each global topic(for which we specifically need this 
> >>>>>> KIP). There was a point raised from Matthias above to always reset to 
> >>>>>> earliest by cleaning the stores and seekToBeginning in case of 
> >>>>>> InvalidOffsetException. We can go with that route as well and I don't 
> >>>>>> think it would need a KIP as if we are not providing users an option 
> >>>>>> to have blanket reset policy on global topics, then a per-topic 
> >>>>>> override would also not be required(the KIP is required basically for 
> >>>>>> that). Although, I think if users have an option to choose reset 
> >>>>>> policy for StreamThread then the option should be provided for 
> >>>>>> GlobalStreamThread as well and if we don't want to use the 
> >>>>>> "global.consumer.auto.offset.reset" then we would need to deprecate it 
> >>>>>> because currently it's not serving any purpose. For now, I have added 
> >>>>>> it in rejected alternatives but we can discuss this.
> >>>>>>
> >>>>>> On the query that I had for Guozhang, thanks to Matthias we have fixed 
> >>>>>> it last week as part of KAFKA-10306.
> >>>>>>
> >>>>>> ~Navinder
> >>>>>>  
> >>>>>>
> >>>>>>    On Sunday, 26 July, 2020, 07:37:34 pm IST, Navinder Brar 
> >>>>>><navinder_b...@yahoo.com.invalid> wrote:  
> >>>>>>  
> >>>>>>  
> >>>>>> Hi,
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> Sorry, it took some time to respond back.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> “but I thought we would pass the config through to the client.”
> >>>>>>
> >>>>>>>> @John, sure we can use the config in GloablStreamThread, that could 
> >>>>>>>> be one of the way to solve it.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> @Matthias, sure cleaning the store and recreating is one way but since 
> >>>>>> we are giving an option to reset in StreamThread why the 
> >>>>>> implementation should be different in GlobalStreamThread. I think we 
> >>>>>> should use the global.consumer.auto.offset.reset config to accept the 
> >>>>>> reset strategy opted by the user although I would be ok with just 
> >>>>>> cleaning and resetting to the latest as well for now. Currently, we 
> >>>>>> throw a StreamsException in case of InvalidOffsetException in 
> >>>>>> GlobalStreamThread so just resetting would still be better than what 
> >>>>>> happens currently. 
> >>>>>>
> >>>>>> Matthias, I found this comment in StreamBuilder for GlobalKTable ‘* 
> >>>>>> Note that {@link GlobalKTable} always applies {@code 
> >>>>>> "auto.offset.reset"} strategy {@code "earliest"} regardless of the 
> >>>>>> specified value in {@link StreamsConfig} or {@link Consumed}.’ 
> >>>>>> So, I guess we are already cleaning up and recreating for GlobalKTable 
> >>>>>> from earliest offset.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> @Guozhan while looking at the code, I also noticed a TODO: pending in 
> >>>>>> GlobalStateManagerImpl, when InvalidOffsetException is thrown. 
> >>>>>> Earlier, we were directly clearing the store here and recreating from 
> >>>>>> scratch but that code piece is removed now. Are you working on a 
> >>>>>> follow-up PR for this or just handling the reset in GlobalStreamThread 
> >>>>>> should be sufficient?
> >>>>>>
> >>>>>> Regards,
> >>>>>> Navinder
> >>>>>>
> >>>>>>    On Tuesday, 7 July, 2020, 12:53:36 am IST, Matthias J. Sax 
> >>>>>><mj...@apache.org> wrote:  
> >>>>>>  
> >>>>>>  Atm, the config should be ignored and the global-consumer should use
> >>>>>> "none" in a hard-coded way.
> >>>>>>
> >>>>>> However, if am still wondering if we actually want/need to allow users
> >>>>>> to specify the reset policy? It might be worth to consider, to just
> >>>>>> change the behavior: catch the exception, log an ERROR (for information
> >>>>>> purpose), wipe the store, seekToBeginning(), and recreate the store?
> >>>>>>
> >>>>>> Btw: if we want to allow users to set the reset policy, this should be
> >>>>>> possible via the config, or via overwriting the config in the method
> >>>>>> itself. Thus, we would need to add the new overloaded method to
> >>>>>> `Topology` and `StreamsBuilder`.
> >>>>>>
> >>>>>> Another question to ask: what about GlobalKTables? Should they behave
> >>>>>> the same? An alternative design could be, to allow users to specify a
> >>>>>> flexible reset policy for global-stores, but not for GlobalKTables and
> >>>>>> use the strategy suggested above for this case.
> >>>>>>
> >>>>>> Thoughts?
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>> On 7/2/20 2:14 PM, John Roesler wrote:
> >>>>>>> Hi Navinder,
> >>>>>>>
> >>>>>>> Thanks for the response. I’m sorry if I’m being dense... You said we 
> >>>>>>> are not currently using the config, but I thought we would pass the 
> >>>>>>> config through to the client.  Can you confirm whether or not the 
> >>>>>>> existing config works for your use case?
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> John
> >>>>>>>
> >>>>>>> On Sun, Jun 28, 2020, at 14:09, Navinder Brar wrote:
> >>>>>>>> Sorry my bad. Found it.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Prefix used to override {@link KafkaConsumer consumer} configs for 
> >>>>>>>> the 
> >>>>>>>> global consumer client from
> >>>>>>>>
> >>>>>>>> * the general consumer client configs. The override precedence is 
> >>>>>>>> the 
> >>>>>>>> following (from highest to lowest precedence):
> >>>>>>>> * 1. global.consumer.[config-name]..
> >>>>>>>> public static final String GLOBAL_CONSUMER_PREFIX = 
> >>>>>>>> "global.consumer.";
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> So, that's great. We already have a config exposed to reset offsets 
> >>>>>>>> for 
> >>>>>>>> global topics via global.consumer.auto.offset.reset just that we are 
> >>>>>>>> not actually using it inside GlobalStreamThread to reset.
> >>>>>>>>
> >>>>>>>> -Navinder
> >>>>>>>>    On Monday, 29 June, 2020, 12:24:21 am IST, Navinder Brar 
> >>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:  
> >>>>>>>>  
> >>>>>>>>  Hi John,
> >>>>>>>>
> >>>>>>>> Thanks for your feedback. 
> >>>>>>>> 1. I think there is some confusion on my first point, the enum I am 
> >>>>>>>> sure we can use the same one but the external config which controls 
> >>>>>>>> the 
> >>>>>>>> resetting in global stream thread either we can the same one which 
> >>>>>>>> users use for source topics(StreamThread) or we can provide a new 
> >>>>>>>> one 
> >>>>>>>> which specifically controls global topics. For e.g. currently if I 
> >>>>>>>> get 
> >>>>>>>> an InvalidOffsetException in any of my source topics, I can choose 
> >>>>>>>> whether to reset from Earliest or Latest(with auto.offset.reset). 
> >>>>>>>> Now 
> >>>>>>>> either we can use the same option and say if I get the same 
> >>>>>>>> exception 
> >>>>>>>> for global topics I will follow same resetting. Or some users might 
> >>>>>>>> want to have totally different setting for both source and global 
> >>>>>>>> topics, like for source topic I want resetting from Latest but for 
> >>>>>>>> global topics I want resetting from Earliest so in that case adding 
> >>>>>>>> a 
> >>>>>>>> new config might be better.
> >>>>>>>>
> >>>>>>>> 2. I couldn't find this config currently 
> >>>>>>>> "global.consumer.auto.offset.reset". Infact in 
> >>>>>>>> GlobalStreamThread.java 
> >>>>>>>> we are throwing a StreamsException for InvalidOffsetException and 
> >>>>>>>> there 
> >>>>>>>> is a test as 
> >>>>>>>> well GlobalStreamThreadTest#shouldDieOnInvalidOffsetException(), so 
> >>>>>>>> I 
> >>>>>>>> think this is the config we are trying to introduce with this KIP.
> >>>>>>>>
> >>>>>>>> -Navinder  On Saturday, 27 June, 2020, 07:03:04 pm IST, John Roesler 
> >>>>>>>> <j...@vvcephei.org> wrote:  
> >>>>>>>>  
> >>>>>>>>  Hi Navinder,
> >>>>>>>>
> >>>>>>>> Thanks for this proposal!
> >>>>>>>>
> >>>>>>>> Regarding your question about whether to use the same policy
> >>>>>>>> enum or not, the underlying mechanism is the same, so I think
> >>>>>>>> we can just use the same AutoOffsetReset enum.
> >>>>>>>>
> >>>>>>>> Can you confirm whether setting the reset policy config on the
> >>>>>>>> global consumer currently works or not? Based on my reading
> >>>>>>>> of StreamsConfig, it looks like it would be:
> >>>>>>>> "global.consumer.auto.offset.reset".
> >>>>>>>>
> >>>>>>>> If that does work, would you still propose to augment the
> >>>>>>>> Java API?
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> -John
> >>>>>>>>
> >>>>>>>> On Fri, Jun 26, 2020, at 23:52, Navinder Brar wrote:
> >>>>>>>>> Hi,
> >>>>>>>>>
> >>>>>>>>> KIP: 
> >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy
> >>>>>>>>>
> >>>>>>>>> I have taken over this KIP since it has been dormant for a long 
> >>>>>>>>> time 
> >>>>>>>>> and this looks important for use-cases that have large global data, 
> >>>>>>>>> so 
> >>>>>>>>> rebuilding global stores from scratch might seem overkill in case 
> >>>>>>>>> of 
> >>>>>>>>> InvalidOffsetExecption.
> >>>>>>>>>
> >>>>>>>>> We want to give users the control to use reset policy(as we do in 
> >>>>>>>>> StreamThread) in case they hit invalid offsets. I have still not 
> >>>>>>>>> decided whether to restrict this option to the same reset policy 
> >>>>>>>>> being 
> >>>>>>>>> used by StreamThread(using auto.offset.reset config) or add another 
> >>>>>>>>> reset config specifically for global stores 
> >>>>>>>>> "global.auto.offset.reset" which gives users more control to choose 
> >>>>>>>>> separate policies for global and stream threads.
> >>>>>>>>>
> >>>>>>>>> I would like to hear your opinions on the KIP.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> -Navinder
> >>>>>>    
> >>>>>>
> >>>>>  
> > 
>

Reply via email to