Re: [ANNOUNCE] New Kafka PMC Member: Bill Bejeck

2021-04-21 Thread Navinder Brar
Congrats Bill, very well deserved!

Regards,Navinder Pal Singh BrarLinkedIn
 

On Wednesday, 21 April, 2021, 10:37:53 pm IST, Sarwar Bhuiyan 
 wrote:  
 
 Congrats Bill!

On Wed, Apr 21, 2021 at 5:52 PM Mickael Maison 
wrote:

> Congratulations Bill, well deserved!
>
> On Sat, Apr 17, 2021 at 11:26 PM Adam Bellemare
>  wrote:
> >
> > Congratulations Bill!!
> >
> > > On Apr 17, 2021, at 5:20 PM, Kowshik Prakasam 
> > > 
> wrote:
> > >
> > > Congrats Bill!
> > >
> > >
> > > Cheers,
> > > Kowshik
> > >
> > >> On Mon, Apr 12, 2021, 11:15 AM Randall Hauch 
> wrote:
> > >>
> > >> Congratulations, Bill!
> > >>
> > >>> On Mon, Apr 12, 2021 at 11:02 AM Guozhang Wang 
> wrote:
> > >>>
> > >>> Congratulations Bill !
> > >>>
> > >>> Guozhang
> > >>>
> >  On Wed, Apr 7, 2021 at 6:16 PM Matthias J. Sax 
> wrote:
> > >>>
> >  Hi,
> > 
> >  It's my pleasure to announce that Bill Bejeck in now a member of the
> >  Kafka PMC.
> > 
> >  Bill has been a Kafka committer since Feb 2019. He has remained
> >  active in the community since becoming a committer.
> > 
> > 
> > 
> >  Congratulations Bill!
> > 
> >  -Matthias, on behalf of Apache Kafka PMC
> > 
> > >>>
> > >>>
> > >>> --
> > >>> -- Guozhang
> > >>>
> > >>
>
  

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-10-26 Thread Navinder Brar
Hi,

I have updated the KIP-406 with the discussions that we have had above. Is 
there any KIP proposed yet to change the state machine so that I can link it to 
the KIP?

Also, is there any suggestion whether this KIP should be labeled as 
Under-discussion or Blocked on the KIPs page? 

Thanks,
Navinder 

On Thursday, 8 October, 2020, 11:31:28 pm IST, Navinder Brar 
 wrote:  
 
 Thanks, Sophie, Guozhang, and Matthias for sharing your thoughts. I am glad 
that another meaningful KIP is coming out of this discussion. I am good towards 
parking this KIP, till we can make the changes towards the RESTORING state we 
have discussed above. I will update this KIP with the closure we currently 
have, i.e. assuming we will make the change to add global stores also to the 
RESTORING phase so that active tasks don't start processing when the state is 
RESTORING.

Regards,
Navinder 
    On Wednesday, 7 October, 2020, 11:39:05 pm IST, Matthias J. Sax 
 wrote:  
 
 I synced with John in-person and he emphasized his concerns about
breaking code if we change the state machine. From an impl point of
view, I am concerned that maintaining two state machines at the same
time, might be very complex. John had the idea though, that we could
actually do an internal translation: Internally, we switch the state
machine to the new one, but translate new-stated to old-state before
doing the callback? (We only need two separate "state enums" and we add
a new method to register callbacks for the new state enums and deprecate
the existing method).

However, also with regard to the work Guozhang pointed out, I am
wondering if we should split out a independent KIP just for the state
machine changes? It seems complex enough be itself. We would hold-off
this KIP until the state machine change is done and resume it afterwards?

Thoughts?

-Matthias

On 10/6/20 8:55 PM, Guozhang Wang wrote:
> Sorry I'm late to the party.
> 
> Matthias raised a point to me regarding the recent development of moving
> restoration from stream threads to separate restore threads and allowing
> the stream threads to process any processible tasks even when some other
> tasks are still being restored by the restore threads:
> 
> https://issues.apache.org/jira/browse/KAFKA-10526
> https://issues.apache.org/jira/browse/KAFKA-10577
> 
> That would cause the restoration of non-global states to be more similar to
> global states such that some tasks would be processed even though the state
> of the stream thread is not yet in RUNNING (because today we only transit
> to it when ALL assigned tasks have completed restoration and are
> processible).
> 
> Also, as Sophie already mentioned, today during REBALANCING (in stream
> thread level, it is PARTITION_REVOKED -> PARTITION_ASSIGNED) some tasks may
> still be processed, and because of KIP-429 the RUNNING -> PARTITION_REVOKED
> -> PARTITION_ASSIGNED can be within a single call and hence be very
> "transient", whereas PARTITION_ASSIGNED -> RUNNING could still take time as
> it only do the transition when all tasks are processible.
> 
> So I think it makes sense to add a RESTORING state at the stream client
> level, defined as "at least one of the state stores assigned to this
> client, either global or non-global, is still restoring", and emphasize
> that during this state the client may still be able to process records,
> just probably not in full-speed.
> 
> As for REBALANCING, I think it is a bit less relevant to this KIP but
> here's a dump of my thoughts: if we can capture the period when "some tasks
> do not belong to any clients and hence processing is not full-speed" it
> would still be valuable, but unfortunately right now since
> onPartitionRevoked is not triggered each time on all clients, today's
> transition would just make a lot of very short REBALANCING state period
> which is not very useful really. So if we still want to keep that state
> maybe we can consider the following tweak: at the thread level, we replace
> PARTITION_REVOKED / PARTITION_ASSIGNED with just a single REBALANCING
> state, and we will transit to this state upon onPartitionRevoked, but we
> will only transit out of this state upon onAssignment when the assignor
> decides there's no follow-up rebalance immediately (note we also schedule
> future rebalances for workload balancing, but that would still trigger
> transiting out of it). On the client level, we would enter REBALANCING when
> any threads enter REBALANCING and we would transit out of it when all
> transits out of it. In this case, it is possible that during a rebalance,
> only those clients that have to revoke some partition would enter the
> REBALANCING state while others that only get additional tasks would not
> enter this state at all.
> 
> With all that being said, I think

Re: [ANNOUNCE] New committer: A. Sophie Blee-Goldman

2020-10-19 Thread Navinder Brar
That's great news. Congrats Sophie! Well deserved.

Regards,
Navinder
On Monday, 19 October, 2020, 10:12:16 pm IST, Bruno Cadonna 
 wrote:  
 
 Congrats Sophie! Very well deserved!

Bruno

On 19.10.20 18:40, Matthias J. Sax wrote:
> Hi all,
> 
> I am excited to announce that A. Sophie Blee-Goldman has accepted her
> invitation to become an Apache Kafka committer.
> 
> Sophie is actively contributing to Kafka since Feb 2019 and has
> accumulated 140 commits. She authored 4 KIPs in the lead
> 
>  - KIP-453: Add close() method to RocksDBConfigSetter
>  - KIP-445: In-memory Session Store
>  - KIP-428: Add in-memory window store
>  - KIP-613: Add end-to-end latency metrics to Streams
> 
> and helped to implement two critical KIPs, 429 (incremental rebalancing)
> and 441 (smooth auto-scaling; not just implementation but also design).
> 
> In addition, she participates in basically every Kafka Streams related
> KIP discussion, reviewed 142 PRs, and is active on the user mailing list.
> 
> Thanks for all the contributions, Sophie!
> 
> 
> Please join me to congratulate her!
>  -Matthias
> 
  

Re: [ANNOUNCE] New committer: David Jacot

2020-10-19 Thread Navinder Brar
Many Congratulations David.

Best Regards,Navinder

On Monday, 19 October, 2020, 12:53:43 pm IST, Dongjin Lee 
 wrote:  
 
 Congratulations, David!

Best,
Dongjin

On Mon, Oct 19, 2020 at 12:20 PM Hu Xi  wrote:

> Congrats, David! Well deserved!
>
>
> 
> 发件人: Vahid Hashemian 
> 发送时间: 2020年10月19日 11:17
> 收件人: dev 
> 主题: Re: [ANNOUNCE] New committer: David Jacot
>
> Congrats David!
>
> --Vahid
>
> On Sun, Oct 18, 2020 at 4:23 PM Satish Duggana 
> wrote:
>
> > Congratulations David!
> >
> > On Sat, Oct 17, 2020 at 10:46 AM Boyang Chen  >
> > wrote:
> > >
> > > Congrats David, well deserved!
> > >
> > > On Fri, Oct 16, 2020 at 6:45 PM John Roesler 
> > wrote:
> > >
> > > > Congratulations, David!
> > > > -John
> > > >
> > > > On Fri, Oct 16, 2020, at 20:15, Konstantine Karantasis wrote:
> > > > > Congrats, David!
> > > > >
> > > > > Konstantine
> > > > >
> > > > >
> > > > > On Fri, Oct 16, 2020 at 3:36 PM Ismael Juma 
> > wrote:
> > > > >
> > > > > > Congratulations David!
> > > > > >
> > > > > > Ismael
> > > > > >
> > > > > > On Fri, Oct 16, 2020 at 9:01 AM Gwen Shapira 
> > > > wrote:
> > > > > >
> > > > > > > The PMC for Apache Kafka has invited David Jacot as a
> committer,
> > and
> > > > > > > we are excited to say that he accepted!
> > > > > > >
> > > > > > > David Jacot has been contributing to Apache Kafka since July
> > 2015 (!)
> > > > > > > and has been very active since August 2019. He contributed
> > several
> > > > > > > notable KIPs:
> > > > > > >
> > > > > > > KIP-511: Collect and Expose Client Name and Version in Brokers
> > > > > > > KIP-559: Make the Kafka Protocol Friendlier with L7 Proxies:
> > > > > > > KIP-570: Add leader epoch in StopReplicaReques
> > > > > > > KIP-599: Throttle Create Topic, Create Partition and Delete
> Topic
> > > > > > > Operations
> > > > > > > KIP-496 Added an API for the deletion of consumer offsets
> > > > > > >
> > > > > > > In addition, David Jacot reviewed many community contributions
> > and
> > > > > > > showed great technical and architectural taste. Great reviews
> are
> > > > hard
> > > > > > > and often thankless work - but this is what makes Kafka a great
> > > > > > > product and helps us grow our community.
> > > > > > >
> > > > > > > Thanks for all the contributions, David! Looking forward to
> more
> > > > > > > collaboration in the Apache Kafka community.
> > > > > > >
> > > > > > > --
> > > > > > > Gwen Shapira
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
>
>
> --
>
> Thanks!
> --Vahid
>


-- 
*Dongjin Lee*

*A hitchhiker in the mathematical world.*




*github:  github.com/dongjinleekr
keybase: https://keybase.io/dongjinleekr
linkedin: kr.linkedin.com/in/dongjinleekr
speakerdeck: speakerdeck.com/dongjin
*  

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-10-08 Thread Navinder Brar
on, Oct 5, 2020 at 2:37 PM Sophie Blee-Goldman 
> wrote:
> 
>> It seems a little misleading, but I actually have no real qualms about
>> transitioning to the
>> REBALANCING state *after* RESTORING. One of the side effects of KIP-429 was
>> that in
>> most cases we actually don't transition to REBALANCING at all until the
>> very end of the
>> rebalance, so REBALANCING doesn't really mean all that much any more. These
>> days
>> the majority of the time an instance spends in the REBALANCING state is
>> actually spent
>> on restoration anyways.
>>
>> If users are listening in on the REBALANCING -> RUNNING transition, then
>> they might
>> also be listening for the RUNNING -> REBALANCING transition, so we may need
>> to actually
>> go RUNNING -> REBALANCING -> RESTORING -> REBALANCING -> RUNNING. This
>> feels a bit unwieldy but I don't think there's anything specifically wrong
>> with it.
>>
>> That said, it makes me question the value of having a REBALANCING state at
>> all. In the
>> pre-KIP-429 days it made sense, because all tasks were paused and
>> unavailable for IQ
>> for the duration of the rebalance. But these days, the threads can continue
>> processing
>> any tasks they own during a rebalance, so the only time that tasks are
>> truly unavailable
>> is during the restoration phase.
>>
>> So, I find the idea of getting rid of the REBALANCING state altogether to
>> be pretty
>> appealing, in which case we'd probably need to introduce a new state
>> listener and
>> deprecate the current one as John proposed. I also wonder if this is the
>> sort of thing
>> we can just swallow as a breaking change in the upcoming 3.0 release.
>>
>> On Sat, Oct 3, 2020 at 11:02 PM Navinder Brar
>>  wrote:
>>
>>>
>>>
>>>
>>> Thanks a lot, Matthias for detailed feedback. I tend to agree with
>>> changing the state machine
>>>
>>> itself if required. I think at the end of the day InvalidOffsetException
>>> is a rare event and is not
>>>
>>> as frequent as rebalancing. So, pausing all tasks for once in while
>> should
>>> be ok from a processing
>>>
>>> standpoint.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> I was also wondering if instead of adding RESTORING state b/w REBALANCING
>>> & RUNNING
>>>
>>> can we add it before REBALANCING. Whenever an application starts anyways
>>> there is no need for
>>>
>>> active/replica tasks to be present there for us to build global stores
>>> there. We can restore global stores first
>>>
>>> and then trigger a rebalancing to get the tasks assigned. This might help
>>> us in shielding the users
>>>
>>> from changing what they listen to currently(which is REBALANCING ->
>>> RUNNING). So, we go
>>>
>>> RESTORING -> REBALANCING -> RUNNING. The only drawback here might be that
>>> replicas would
>>>
>>> also be paused while we are restoring global stores but as Matthias said
>>> we would want to give
>>>
>>> complete bandwidth to restoring global stores in such a case and
>>> considering it is a rare event this
>>>
>>> should be ok. On the plus side, this would not lead to any race condition
>>> and we would not need to
>>>
>>> change the behavior of any stores. But this also means that this
>> RESTORING
>>> state is only for global stores
>>>
>>> like the GLOBAL_RESTORING state we discussed before :) as regular tasks
>>> still restore inside REBALANCING.
>>>
>>> @John, @Sophie do you think this would work?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Regards,
>>>
>>>
>>>
>>>
>>> Navinder
>>>
>>>
>>>
>>>
>>>    On Wednesday, 30 September, 2020, 09:39:07 pm IST, Matthias J. Sax <
>>> mj...@apache.org> wrote:
>>>
>>>  I guess we need to have some cleanup mechanism for this case anyway,
>>> because, the global thread can enter RESTORING state at any point in
>>> time, and thus, even if we set a flag to pause processing on the
>>> StreamThreads we are subject to a race condition.
>>>
>>> Beside that, on a high level I am fine with either "busy waiting" (ie,
>>> just lock the global-store and ret

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-10-04 Thread Navinder Brar

 

Thanks a lot, Matthias for detailed feedback. I tend to agree with changing the 
state machine

itself if required. I think at the end of the day InvalidOffsetException is a 
rare event and is not

as frequent as rebalancing. So, pausing all tasks for once in while should be 
ok from a processing

standpoint. 







I was also wondering if instead of adding RESTORING state b/w REBALANCING & 
RUNNING

can we add it before REBALANCING. Whenever an application starts anyways there 
is no need for

active/replica tasks to be present there for us to build global stores there. 
We can restore global stores first

and then trigger a rebalancing to get the tasks assigned. This might help us in 
shielding the users

from changing what they listen to currently(which is REBALANCING -> RUNNING). 
So, we go

RESTORING -> REBALANCING -> RUNNING. The only drawback here might be that 
replicas would

also be paused while we are restoring global stores but as Matthias said we 
would want to give

complete bandwidth to restoring global stores in such a case and considering it 
is a rare event this

should be ok. On the plus side, this would not lead to any race condition and 
we would not need to

change the behavior of any stores. But this also means that this RESTORING 
state is only for global stores

like the GLOBAL_RESTORING state we discussed before :) as regular tasks still 
restore inside REBALANCING.

@John, @Sophie do you think this would work?







Regards,




Navinder




On Wednesday, 30 September, 2020, 09:39:07 pm IST, Matthias J. Sax 
 wrote:  
 
 I guess we need to have some cleanup mechanism for this case anyway,
because, the global thread can enter RESTORING state at any point in
time, and thus, even if we set a flag to pause processing on the
StreamThreads we are subject to a race condition.

Beside that, on a high level I am fine with either "busy waiting" (ie,
just lock the global-store and retry) or setting a flag. However, there
are some trade-offs to consider:

As we need a cleanup mechanism anyway, it might be ok to just use a
single mechanism. -- We should consider the impact in EOS though, as we
might need to wipe out the store of regular tasks for this case. Thus,
setting a flag might actually help to prevent that we repeatably wipe
the store on retries... On the other hand, we plan to avoid wiping the
store in case of error for EOS anyway, and if we get this improvement,
we might not need the flag.

For the client state machine: I would actually prefer to have a
RESTORING state and I would also prefer to pause _all_ tasks. This might
imply that we want a flag. In the past, we allowed to interleave restore
and processing in StreamThread (for regular tasks) what slowed down
restoring and we changed it back to not process any tasks until all
tasks are restored). Of course, in our case we have two different
threads (not a single one). However, the network is still shared, so it
might be desirable to give the full network bandwidth to the global
consumer to restore as fast as possible (maybe an improvement we could
add to `StreamThreads` too, if we have multiple threads)? And as a side
effect, it does not muddy the waters what each client state means.

Thus, overall, I tend to prefer a flag on `StreamThread` as it seems to
provide a cleaner end-to-end solution (and we avoid the dependency to
improve EOS state management).

Btw: I am not sure if we actually need to preserve compatibility for the
state machine? To me, it seems not to be a strict contract, and I would
personally be ok to just change it.


-Matthias


On 9/22/20 11:08 PM, Navinder Brar wrote:
> Thanks a lot John for these suggestions. @Matthias can share your thoughts on 
> the last two comments made in this chain.
> 
> Thanks,Navinder 
> 
>    On Monday, 14 September, 2020, 09:03:32 pm IST, John Roesler 
> wrote:  
>  
>  Hi Navinder,
> 
> Thanks for the reply.
> 
> I wasn't thinking of an _exponential_ backoff, but
> otherwise, yes, that was the basic idea. Note, the mechanism
> would be similar (if not the same) to what Matthias is
> implementing for KIP-572:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams
> 
> Regarding whether we'd stay in RUNNING during global
> restoration or not, I can see your point. It seems like we
> have three choices with how we set the state during global
> restoration:
> 1. stay in RUNNING: Users might get confused, since
> processing could get stopped for some tasks. On the other
> hand, processing for tasks not blocked by the global
> restoration could proceed (if we adopt the other idea), so
> maybe it still makes sense.
> 2. transition to REBALANCING: Users might get confused,
> since there is no actual rebalance. However, the current
> state for Kafka Streams during state restoration is actually
> REBALANCING, so 

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-09-23 Thread Navinder Brar
Thanks a lot John for these suggestions. @Matthias can share your thoughts on 
the last two comments made in this chain.

Thanks,Navinder 

On Monday, 14 September, 2020, 09:03:32 pm IST, John Roesler 
 wrote:  
 
 Hi Navinder,

Thanks for the reply.

I wasn't thinking of an _exponential_ backoff, but
otherwise, yes, that was the basic idea. Note, the mechanism
would be similar (if not the same) to what Matthias is
implementing for KIP-572:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams

Regarding whether we'd stay in RUNNING during global
restoration or not, I can see your point. It seems like we
have three choices with how we set the state during global
restoration:
1. stay in RUNNING: Users might get confused, since
processing could get stopped for some tasks. On the other
hand, processing for tasks not blocked by the global
restoration could proceed (if we adopt the other idea), so
maybe it still makes sense.
2. transition to REBALANCING: Users might get confused,
since there is no actual rebalance. However, the current
state for Kafka Streams during state restoration is actually
REBALANCING, so it seems people already should understand
that REBALANCING really means REBALANCING|RESTORING. This
choice would preseve the existing state machine as well as
the existing meaning of all states
3. add RESTORING: This could clarify the state machine, at
the expense of breaking compatibility. We could implement a
migration path by adding a new "state listener" interface
for the new state machine.

It seems like option 3 results in the most sensible system,
but I'm not sure if it's worth the hassle. It certainly
seems orthogonal to the goal of this KIP. Option 2 is
probably the best practical choice.


Regarding _how_ the global state restoration could set a
flag preventing access to the store... This is indeed the
central challenge to this new idea. Just throwing out one
possibility: Once the global thread marks the store for
restoration, it would throw an exception, such as
"StoreIsRestoringException" on any access. The processor
would _not_ catch this exception. Instead, the StreamThread
would catch it, put this record/task on ice, and re-try it
later.

That last mechanism is actually pretty complicated. For
example, what if the record is already partially processed
in the topology? We'd have to remember which ProcessorNode
to resume from when we re-try later.

This is really where the spiritual overlap with KIP-572
comes in. Maybe Matthias can share some thoughts.

Thanks,
-John

On Sun, 2020-09-13 at 07:50 +, Navinder Brar wrote:
>  
> Hi John,
> 
> 
> 
> 
> 
> 
> 
> If I understand this correctly, you are proposing to use exponential backoff
> 
> in globalStore.get() to keep polling the global thread (whether it has 
> restored
> 
> completely or not) while the processor pauses the processing of a particular
> 
> message which required querying on global store. That is stream threads
> 
> are still in RUNNING state but kind of paused till global thread restores and
> 
> gives a go-ahead that complete state has been restored. I like the idea for
> the first two reasons that you have mentioned but thinking from 
> semanticspoint of view stream threads will be in RUNNING but still not 
> processing events,
> will it be misleading for the users? Or you think we are doing it at enough
> 
> places already and an exception should suffice.  As they will not understand
> 
> why the stream thread is not processing and how much more time it will not
> 
> process for. If the state explicitly stated RESTORING,
> 
> users might have clearly understood that why it is happening. 
> 
> 
> 
> 
> 
> 
> 
> Also, to achieve what we are discussing above, the store.get() on which call 
> is
> 
> made has to understand whether it is a global store or not and if it is a 
> global store
> 
> check whether it is restoring or not because both might be happening
> 
> simultaneously with the above approach. With KIP-535 we have started serving
> 
> normal stores in restoring state but those are just interactive queries but 
> here
> 
> globalStore.get() might be called while processing which we don’t want. So,
> 
> restore for global store and get() might have to be exclusive. Is there a way 
> for a
> 
> store to know if it global store or not because now internally global and 
> normal
> 
> stores will behave differently. Although if everyone is fine with the above 
> approach
> 
> we can discuss this in PR as well.
> 
> 
> 
> 
> 
> 
> 
> Regards,
> Navinder
> 
>    On Saturday, 5 September, 2020, 02:09:07 am IST, John Roesler 
> wrote:  
>  
>  Hi all,
> 
> This conversation sounds good to me so far.
> 

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-09-13 Thread Navinder Brar
 
Hi John,







If I understand this correctly, you are proposing to use exponential backoff

in globalStore.get() to keep polling the global thread (whether it has restored

completely or not) while the processor pauses the processing of a particular

message which required querying on global store. That is stream threads

are still in RUNNING state but kind of paused till global thread restores and

gives a go-ahead that complete state has been restored. I like the idea for
the first two reasons that you have mentioned but thinking from semanticspoint 
of view stream threads will be in RUNNING but still not processing events,
will it be misleading for the users? Or you think we are doing it at enough

places already and an exception should suffice.  As they will not understand

why the stream thread is not processing and how much more time it will not

process for. If the state explicitly stated RESTORING,

users might have clearly understood that why it is happening. 







Also, to achieve what we are discussing above, the store.get() on which call is

made has to understand whether it is a global store or not and if it is a 
global store

check whether it is restoring or not because both might be happening

simultaneously with the above approach. With KIP-535 we have started serving

normal stores in restoring state but those are just interactive queries but here

globalStore.get() might be called while processing which we don’t want. So,

restore for global store and get() might have to be exclusive. Is there a way 
for a

store to know if it global store or not because now internally global and normal

stores will behave differently. Although if everyone is fine with the above 
approach

we can discuss this in PR as well.







Regards,
Navinder

On Saturday, 5 September, 2020, 02:09:07 am IST, John Roesler 
 wrote:  
 
 Hi all,

This conversation sounds good to me so far.

Sophie raised a concern before that changing the state
machine would break state restore listeners. This is true,
and we actually have not changed the main state machine in a
long time. The last change I remember was that we used to go
"CREATED -> RUNNING -> REBALANCING -> RUNNING", and now we
just go "CREATED -> REBALANCING -> RUNNING". This is
actually the reason why many state listeners check for
"REBALANCING -> RUNNING", to filter out the old "phantom
running" transition from "CREATED -> RUNNING".

Anyway, the observation is that dropping the "phantom
running" state didn't break any real use case we were aware
of. But adding RESTORING between REBALACING and RUNNING
certainly would break the common pattern that we're aware
of. This would indeed be the first time we introduce a
practically breaking change to the state machine at least
since 2.0, and maybe since 1.0 too. We should probably
consider the impact.

One alternative is to consider the state machine and the
state listener to be coupled APIs. We can deprecate and
replace the current state listener, and also introduce a new
state machine enum with our desired new state and
transitions, while leaving the existing one alone and
deprecating it. Then, no existing code would break, only get
deprecation warnings.



Matthias gave me an idea a few messages back with his note
about setting/checking "flags". What if we flip it around,
and set the flags on the global stores themselves. Then, we
throw an exception when a processor queries the store while
it's restoring. When they get that exception, they just put
that task on the back burner for a while and try again later
(similar to Matthias's timeout handling KIP). The global
thread sets the flag on a particular store when it realizes
it needs to be re-created and unsets it when the restore
completes.

Then:
1. Only the global stores that actually need to be restored
block anything
2. Only the tasks that access the stores get blocked
3. No new states need to be introduced

WDYT?
-John

On Fri, 2020-09-04 at 13:18 +, Navinder Brar wrote:
> Hi Sophie,
> 
> Thanks for the detailed explanation. I agree from a user standpoint, I don't 
> think there is any use-case to take any separate action in case of 
> GLOBAL_RESTORING and RESTORING phase. 
> 
> So, internally in the code we can handle the cases as Matthiasexplained above 
> and we can discuss those in the PR. I will update the KIP based on what all 
> we have converged towards including having an uber RESTORING(rather than 
> GLOBAL_RESTORING)state which takes stream and global threads into 
> consideration.
> 
> I will update the KIP soon and share it again as a lot has changed from where 
> we started this KIP from.
> 
> Regards,Navinder
> 
>    On Friday, 4 September, 2020, 04:19:20 am IST, Sophie Blee-Goldman 
> wrote:  
>  
>  Thanks Matthias, that sounds like what I was thinking. I think we should
> alway

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-09-04 Thread Navinder Brar
Hi Sophie,

Thanks for the detailed explanation. I agree from a user standpoint, I don't 
think there is any use-case to take any separate action in case of 
GLOBAL_RESTORING and RESTORING phase. 

So, internally in the code we can handle the cases as Matthiasexplained above 
and we can discuss those in the PR. I will update the KIP based on what all we 
have converged towards including having an uber RESTORING(rather than 
GLOBAL_RESTORING)state which takes stream and global threads into consideration.

I will update the KIP soon and share it again as a lot has changed from where 
we started this KIP from.

Regards,Navinder

On Friday, 4 September, 2020, 04:19:20 am IST, Sophie Blee-Goldman 
 wrote:  
 
 Thanks Matthias, that sounds like what I was thinking. I think we should
always be
able to figure out what to do in various scenarios as outlined in the
previous email.

>  For the same reason, I wouldn't want to combine global restoring and
normal restoring
> because then it would make all the restorings independent but we don't
want that. We
> want global stores to be available before any processing starts on the
active tasks.

I'm not sure I follow this specific point, but I don't think I did a good
job of explaining my
proposal so it's probably my own fault. When I say that we should merge
RESTORING
and GLOBAL_RESTORING, I just mean that we should provide a single
user-facing
state to encompass any ongoing restoration. The point of the KafkaStreams
RESTORING
state is to alert users that their state may be unavailable for IQ, and
active tasks may be
idle. This is true for both global and non-global restoration. I think the
ultimate question
is whether as a user, I would react any differently to a GLOBAL_RESTORING
state vs
the regular RESTORING. My take is "no", in which case we should just
provide a single
unified state for the minimal public API. But if anyone can think of a
reason for the user
to need to distinguish between different types of restoration, that would
be a good
argument to keep them separate.

Internally, we do need to keep track of a "global restore" flag to
determine the course
of action -- for example if a StreamThread transitions to RUNNING but sees
that the
KafkaStreams state is RESTORING, should it start processing or not? The
answer
depends on whether the state is RESTORING due to any global stores. But the
KafkaStreams state is a public interface, not an internal bookkeeper, so we
shouldn't
try to push our internal logic into the user-facing API.


On Thu, Sep 3, 2020 at 7:36 AM Matthias J. Sax  wrote:

> I think this issue can actually be resolved.
>
>  - We need a flag on the stream-threads if global-restore is in
> progress; for this case, the stream-thread may go into RUNNING state,
> but it's not allowed to actually process data -- it will be allowed to
> update standby-task though.
>
>  - If a stream-thread restores, its own state is RESTORING and it does
> not need to care about the "global restore flag".
>
>  - The global-thread just does was we discussed, including using state
> RESTORING.
>
>  - The KafkaStreams client state is in RESTORING, if at least one thread
> (stream-thread or global-thread) is in state RESTORING.
>
>  - On startup, if there is a global-thread, the just set the
> global-restore flag upfront before we start the stream-threads (we can
> actually still do the rebalance and potential restore in stream-thread
> in parallel to global restore) and rely on the global-thread to unset
> the flag.
>
>  - The tricky thing is, to "stop" processing in stream-threads if we
> need to wipe the global-store and rebuilt it. For this, we should set
> the "global restore flag" on the stream-threads, but we also need to
> "lock down" the global store in question and throw an exception if the
> stream-thread tries to access it; if the stream-thread get this
> exception, it need to cleanup itself, and wait until the "global restore
> flag" is unset before it can continue.
>
>
> Do we think this would work? -- Of course, the devil is in the details
> but it seems to become a PR discussion, and there is no reason to make
> it part of the KIP.
>
>
> -Matthias
>
> On 9/3/20 3:41 AM, Navinder Brar wrote:
> > Hi,
> >
> > Thanks, John, Matthias and Sophie for great feedback.
> >
> > On the point raised by Sophie that maybe we should allow normal
> restoring during GLOBAL_RESTORING, I think it makes sense but the challenge
> would be what happens when normal restoring(on actives) has finished but
> GLOBAL_RESTORINGis still going on. Currently, all restorings are
> independent of each other i.e. restoring happening on one task/thread
> doesn't affect another. But if we do go ahead with allowing normal
> restoring during GLOBAL_RE

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-09-03 Thread Navinder Brar
 `KafakStreams`
>> client we should not transit to `REBALANCING` but to the new state, and
>> maybe also make the "bootstrapping" non-blocking.
>>
>> I guess it's worth to mention this in the KIP.
>>
>> Btw: The new state for KafkaStreams should also be part of the KIP as it
>> is a public API change, too.
>>
>>
>> -Matthias
>>
>> On 8/29/20 9:37 AM, John Roesler wrote:
>>> Hi Navinder,
>>>
>>> Thanks for the ping. Yes, that all sounds right to me. The name
>> “RESTORING_GLOBAL” sounds fine, too.
>>>
>>> I think as far as warnings go, we’d just propose to mention it in the
>> javadoc of the relevant methods that the given topics should be compacted.
>>>
>>> Thanks!
>>> -John
>>>
>>> On Fri, Aug 28, 2020, at 12:42, Navinder Brar wrote:
>>>> Gentle ping.
>>>>
>>>> ~ Navinder
>>>>    On Wednesday, 19 August, 2020, 06:59:58 pm IST, Navinder Brar
>>>>  wrote:
>>>>
>>>>
>>>> Thanks Matthias & John,
>>>>
>>>>
>>>>
>>>> I am glad we are converging towards an understanding. So, to summarize,
>>>>
>>>> we will still keep treating this change in KIP and instead of providing
>> a reset
>>>>
>>>> strategy, we will cleanup, and reset to earliest and build the state.
>>>>
>>>> When we hit the exception and we are building the state, we will stop
>> all
>>>>
>>>> processing and change the state of KafkaStreams to something like
>>>>
>>>> “RESTORING_GLOBAL” or the like.
>>>>
>>>>
>>>>
>>>> How do we plan to educate users on the non desired effects of using
>>>>
>>>> non-compacted global topics? (via the KIP itself?)
>>>>
>>>>
>>>> +1 on changing the KTable behavior, reset policy for global, connecting
>>>> processors to global for a later stage when demanded.
>>>>
>>>> Regards,
>>>> Navinder
>>>>    On Wednesday, 19 August, 2020, 01:00:58 pm IST, Matthias J. Sax
>>>>  wrote:
>>>>
>>>>  Your observation is correct. Connecting (regular) stores to processors
>>>> is necessary to "merge" sub-topologies into single ones if a store is
>>>> shared. -- For global stores, the structure of the program does not
>>>> change and thus connecting srocessors to global stores is not required.
>>>>
>>>> Also given our experience with restoring regular state stores (ie,
>>>> partial processing of task that don't need restore), it seems better to
>>>> pause processing and move all CPU and network resources to the global
>>>> thread to rebuild the global store as soon as possible instead of
>>>> potentially slowing down the restore in order to make progress on some
>>>> tasks.
>>>>
>>>> Of course, if we collect real world experience and it becomes an issue,
>>>> we could still try to change it?
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 8/18/20 3:31 PM, John Roesler wrote:
>>>>> Thanks Matthias,
>>>>>
>>>>> Sounds good. I'm on board with no public API change and just
>>>>> recovering instead of crashing.
>>>>>
>>>>> Also, to be clear, I wouldn't drag KTables into it; I was
>>>>> just trying to wrap my head around the congruity of our
>>>>> choice for GlobalKTable with respect to KTable.
>>>>>
>>>>> I agree that whatever we decide to do would probably also
>>>>> resolve KAFKA-7380.
>>>>>
>>>>> Moving on to discuss the behavior change, I'm wondering if
>>>>> we really need to block all the StreamThreads. It seems like
>>>>> we only need to prevent processing on any task that's
>>>>> connected to the GlobalStore.
>>>>>
>>>>> I just took a look at the topology building code, and it
>>>>> actually seems that connections to global stores don't need
>>>>> to be declared. That's a bummer, since it means that we
>>>>> really do have to stop all processing while the global
>>>>> thread catches up.
>>>>>
>>>>> Changing this seems like it'd be out of scope right now, but
>>>>> I bring it up in cas

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-08-28 Thread Navinder Brar
Gentle ping.

~ Navinder
On Wednesday, 19 August, 2020, 06:59:58 pm IST, Navinder Brar 
 wrote:  
 
  
Thanks Matthias & John, 



I am glad we are converging towards an understanding. So, to summarize, 

we will still keep treating this change in KIP and instead of providing a reset

strategy, we will cleanup, and reset to earliest and build the state. 

When we hit the exception and we are building the state, we will stop all 

processing and change the state of KafkaStreams to something like 

“RESTORING_GLOBAL” or the like. 



How do we plan to educate users on the non desired effects of using 

non-compacted global topics? (via the KIP itself?)


+1 on changing the KTable behavior, reset policy for global, connecting 
processors to global for a later stage when demanded.

Regards,
Navinder
    On Wednesday, 19 August, 2020, 01:00:58 pm IST, Matthias J. Sax 
 wrote:  
 
 Your observation is correct. Connecting (regular) stores to processors
is necessary to "merge" sub-topologies into single ones if a store is
shared. -- For global stores, the structure of the program does not
change and thus connecting srocessors to global stores is not required.

Also given our experience with restoring regular state stores (ie,
partial processing of task that don't need restore), it seems better to
pause processing and move all CPU and network resources to the global
thread to rebuild the global store as soon as possible instead of
potentially slowing down the restore in order to make progress on some
tasks.

Of course, if we collect real world experience and it becomes an issue,
we could still try to change it?


-Matthias


On 8/18/20 3:31 PM, John Roesler wrote:
> Thanks Matthias,
> 
> Sounds good. I'm on board with no public API change and just
> recovering instead of crashing.
> 
> Also, to be clear, I wouldn't drag KTables into it; I was
> just trying to wrap my head around the congruity of our
> choice for GlobalKTable with respect to KTable.
> 
> I agree that whatever we decide to do would probably also
> resolve KAFKA-7380.
> 
> Moving on to discuss the behavior change, I'm wondering if
> we really need to block all the StreamThreads. It seems like
> we only need to prevent processing on any task that's
> connected to the GlobalStore. 
> 
> I just took a look at the topology building code, and it
> actually seems that connections to global stores don't need
> to be declared. That's a bummer, since it means that we
> really do have to stop all processing while the global
> thread catches up.
> 
> Changing this seems like it'd be out of scope right now, but
> I bring it up in case I'm wrong and it actually is possible
> to know which specific tasks need to be synchronized with
> which global state stores. If we could know that, then we'd
> only have to block some of the tasks, not all of the
> threads.
> 
> Thanks,
> -John
> 
> 
> On Tue, 2020-08-18 at 14:10 -0700, Matthias J. Sax wrote:
>> Thanks for the discussion.
>>
>> I agree that this KIP is justified in any case -- even if we don't
>> change public API, as the change in behavior is significant.
>>
>> A better documentation for cleanup policy is always good (even if I am
>> not aware of any concrete complaints atm that users were not aware of
>> the implications). Of course, for a regular KTable, one can
>> enable/disable the source-topic-changelog optimization and thus can use
>> a non-compacted topic for this case, what is quite a difference to
>> global stores/tables; so maybe it's worth to point out this difference
>> explicitly.
>>
>> As mentioned before, the main purpose of the original Jira was to avoid
>> the crash situation but to allow for auto-recovering while it was an
>> open question if it makes sense / would be useful to allow users to
>> specify a custom reset policy instead of using a hard-coded "earliest"
>> strategy. -- It seem it's still unclear if it would be useful and thus
>> it might be best to not add it for now -- we can still add it later if
>> there are concrete use-cases that need this feature.
>>
>> @John: I actually agree that it's also questionable to allow a custom
>> reset policy for KTables... Not sure if we want to drag this question
>> into this KIP though?
>>
>> So it seem, we all agree that we actually don't need any public API
>> changes, but we only want to avoid crashing?
>>
>> For this case, to preserve the current behavior that guarantees that the
>> global store/table is always loaded first, it seems we need to have a
>> stop-the-world mechanism for the main `StreamThreads` for this case --
>> do we need to add a new state to KafkaStreams client for this case?
>>
>> Having 

[jira] [Created] (KAFKA-10429) Group Coordinator is unavailable leads to missing events

2020-08-24 Thread Navinder Brar (Jira)
Navinder Brar created KAFKA-10429:
-

 Summary: Group Coordinator is unavailable leads to missing events
 Key: KAFKA-10429
 URL: https://issues.apache.org/jira/browse/KAFKA-10429
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.1.1
Reporter: Navinder Brar


We are regularly getting this Exception in logs.

[2020-08-25 03:24:59,214] INFO [Consumer 
clientId=appId-StreamThread-1-consumer, groupId=dashavatara] Group coordinator 
ip:9092 (id: 1452096777 rack: null) is *unavailable* or invalid, will attempt 
rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

 

And after sometime it becomes discoverable:

[2020-08-25 03:25:02,218] INFO [Consumer 
clientId=appId-c3d1d186-e487-4993-ae3d-5fed75887e6b-StreamThread-1-consumer, 
groupId=appId] Discovered group coordinator ip:9092 (id: 1452096777 rack: null) 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

 

Now, the doubt I have is why this unavailability doesn't trigger a rebalance in 
the cluster. We have few hours of retention on the source Kafka Topics and 
sometimes this unavailability stays over for more than few hours and since it 
doesn't trigger a rebalance or stops processing on other nodes(which are 
connected to GC) we never come to know that some issue has happened and till 
then we lose events from our source topics. 

 

There are some resolutions mentioned on stackoverflow but those configs are 
already set in our kafka:

default.replication.factor=3

offsets.topic.replication.factor=3

 

It would be great to understand why this issue is happening and why it doesn't 
trigger a rebalance and is there any known solution for it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-08-19 Thread Navinder Brar
>>
>>
>> -Matthias
>>
>>
>>
>>
>> On 8/17/20 7:34 AM, John Roesler wrote:
>>> Hi Navinder,
>>>
>>> I see what you mean about the global consumer being similar
>>> to the restore consumer.
>>>
>>> I also agree that automatically performing the recovery
>>> steps should be strictly an improvement over the current
>>> situation.
>>>
>>> Also, yes, it would be a good idea to make it clear that the
>>> global topic should be compacted in order to ensure correct
>>> semantics. It's the same way with input topics for KTables;
>>> we rely on users to ensure the topics are compacted, and if
>>> they aren't, then the execution semantics will be broken.
>>>
>>> Thanks,
>>> -John
>>>
>>> On Sun, 2020-08-16 at 11:44 +, Navinder Brar wrote:
>>>> Hi John,
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Thanks for your inputs. Since, global topics are in a way their own 
>>>> changelog, wouldn’t the global consumers be more akin to restore consumers 
>>>> than the main consumer? 
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> I am also +1 on catching the exception and setting it to the earliest for 
>>>> now. Whenever an instance starts, currently global stream thread(if 
>>>> available) goes to RUNNING before stream threads are started so that means 
>>>> the global state is available when the processing by stream threads start. 
>>>> So, with the new change of catching the exception, cleaning store and 
>>>> resetting to earlier would probably be “stop the world” as you said John, 
>>>> as I think we will have to pause the stream threads till the whole global 
>>>> state is recovered. I assume it is "stop the world" right now as well, 
>>>> since now also if an InvalidOffsetException comes, we throw streams 
>>>> exception and the user has to clean up and handle all this manually and 
>>>> when that instance will start, it will restore global state first.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> I had an additional thought to this whole problem, would it be helpful to 
>>>> educate the users that global topics should have cleanup policy as 
>>>> compact, so that this invalid offset exception never arises for them. 
>>>> Assume for example, that the cleanup policy in global topic is "delete" 
>>>> and it has deleted k1, k2 keys(via retention.ms) although all the 
>>>> instances had already consumed them so they are in all global stores and 
>>>> all other instances are up to date on the global data(so no 
>>>> InvalidOffsetException). Now, a new instance is added to the cluster, and 
>>>> we have already lost k1, k2 from the global topic so it will start 
>>>> consuming from the earliest point in the global topic. So, wouldn’t this 
>>>> global store on the new instance has 2 keys less than all the other global 
>>>> stores already available in the cluster? Please let me know if I am 
>>>> missing something. Thanks.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Regards,
>>>>
>>>> Navinder
>>>>
>>>>
>>>>    On Friday, 14 August, 2020, 10:03:42 am IST, John Roesler 
>>>> wrote:  
>>>>  
>>>>  Hi all,
>>>>
>>>> It seems like the main motivation for this proposal is satisfied if we 
>>>> just implement some recovery mechanism instead of crashing. If the 
>>>> mechanism is going to be pausing all the threads until the state is 
>>>> recovered, then it still seems like a big enough behavior change to 
>>>> warrant a KIP still. 
>>>>
>>>> I have to confess I’m a little unclear on why a custom reset policy for a 
>>>> global store, table, or even consumer might be considered wrong. It’s 
>>>> clearly wrong for the restore consumer, but the global consumer seems more 
>>>> semantically akin to the main consumer than the restore consumer. 
>>>>
>>>> In other words, if it’s wrong to reset a GlobalKTable from latest, 
>>>> shouldn’t it also be wrong for a KTable, for exactly the sa

Re: [VOTE] KIP-657: Add Customized Kafka Streams Logo

2020-08-18 Thread Navinder Brar
Hi,
Thanks for the KIP, really like the idea. I am +1(non-binding) on A mainly 
because I felt like you have to tilt your head to realize the otter's head in 
B. 
Regards,Navinder 

On Tuesday, 18 August, 2020, 11:44:20 pm IST, Guozhang Wang 
 wrote:  
 
 I'm leaning towards design B primarily because it reminds me of the Firefox
logo which I like a lot. But I also share Adam's concern that it should
better not obscure the Kafka logo --- so if we can tweak a bit to fix it my
vote goes to B, otherwise A :)


Guozhang

On Tue, Aug 18, 2020 at 9:48 AM Bruno Cadonna  wrote:

> Thanks for the KIP!
>
> I am +1 (non-binding) for A.
>
> I would also like to hear opinions whether the logo should be colorized
> or just black and white.
>
> Best,
> Bruno
>
>
> On 15.08.20 16:05, Adam Bellemare wrote:
> > I prefer Design B, but given that I missed the discussion thread, I think
> > it would be better without the Otter obscuring any part of the Kafka
> logo.
> >
> > On Thu, Aug 13, 2020 at 6:31 PM Boyang Chen 
> > wrote:
> >
> >> Hello everyone,
> >>
> >> I would like to start a vote thread for KIP-657:
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-657%3A+Add+Customized+Kafka+Streams+Logo
> >>
> >> This KIP is aiming to add a new logo for the Kafka Streams library. And
> we
> >> prepared two candidates with a cute otter. You could look up the KIP to
> >> find those logos.
> >>
> >>
> >> Please post your vote against these two customized logos. For example, I
> >> would vote for *design-A (binding)*.
> >>
> >> This vote thread shall be open for one week to collect enough votes to
> call
> >> for a winner. Still, feel free to post any question you may have
> regarding
> >> this KIP, thanks!
> >>
> >
>


-- 
-- Guozhang
  

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-08-16 Thread Navinder Brar

Hi John,







Thanks for your inputs. Since, global topics are in a way their own changelog, 
wouldn’t the global consumers be more akin to restore consumers than the main 
consumer? 







I am also +1 on catching the exception and setting it to the earliest for now. 
Whenever an instance starts, currently global stream thread(if available) goes 
to RUNNING before stream threads are started so that means the global state is 
available when the processing by stream threads start. So, with the new change 
of catching the exception, cleaning store and resetting to earlier would 
probably be “stop the world” as you said John, as I think we will have to pause 
the stream threads till the whole global state is recovered. I assume it is 
"stop the world" right now as well, since now also if an InvalidOffsetException 
comes, we throw streams exception and the user has to clean up and handle all 
this manually and when that instance will start, it will restore global state 
first.







I had an additional thought to this whole problem, would it be helpful to 
educate the users that global topics should have cleanup policy as compact, so 
that this invalid offset exception never arises for them. Assume for example, 
that the cleanup policy in global topic is "delete" and it has deleted k1, k2 
keys(via retention.ms) although all the instances had already consumed them so 
they are in all global stores and all other instances are up to date on the 
global data(so no InvalidOffsetException). Now, a new instance is added to the 
cluster, and we have already lost k1, k2 from the global topic so it will start 
consuming from the earliest point in the global topic. So, wouldn’t this global 
store on the new instance has 2 keys less than all the other global stores 
already available in the cluster? Please let me know if I am missing something. 
Thanks.







Regards,

Navinder


On Friday, 14 August, 2020, 10:03:42 am IST, John Roesler 
 wrote:  
 
 Hi all,

It seems like the main motivation for this proposal is satisfied if we just 
implement some recovery mechanism instead of crashing. If the mechanism is 
going to be pausing all the threads until the state is recovered, then it still 
seems like a big enough behavior change to warrant a KIP still. 

I have to confess I’m a little unclear on why a custom reset policy for a 
global store, table, or even consumer might be considered wrong. It’s clearly 
wrong for the restore consumer, but the global consumer seems more semantically 
akin to the main consumer than the restore consumer. 

In other words, if it’s wrong to reset a GlobalKTable from latest, shouldn’t it 
also be wrong for a KTable, for exactly the same reason? It certainly seems 
like it would be an odd choice, but I’ve seen many choices I thought were odd 
turn out to have perfectly reasonable use cases. 

As far as the PAPI global store goes, I could see adding the option to 
configure it, since as Matthias pointed out, there’s really no specific 
semantics for the PAPI. But if automatic recovery is really all Navinder 
wanted, the I could also see deferring this until someone specifically wants it.

So the tl;dr is, if we just want to catch the exception and rebuild the store 
by seeking to earliest with no config or API changes, then I’m +1.

I’m wondering if we can improve on the “stop the world” effect of rebuilding 
the global store, though. It seems like we could put our heads together and 
come up with a more fine-grained approach to maintaining the right semantics 
during recovery while still making some progress.  

Thanks,
John


On Sun, Aug 9, 2020, at 02:04, Navinder Brar wrote:
> Hi Matthias,
> 
> IMHO, now as you explained using ‘global.consumer.auto.offset.reset’ is 
> not as straightforward 
> as it seems and it might change the existing behavior for users without 
> they releasing it, I also 
> 
> think that we should change the behavior inside global stream thread to 
> not die on 
> 
> InvalidOffsetException and instead clean and rebuild the state from the 
> earliest. On this, as you 
> 
> mentioned that we would need to pause the stream threads till the 
> global store is completely restored. 
> 
> Without it, there will be incorrect processing results if they are 
> utilizing a global store during processing. 
> 
> 
> 
> So, basically we can divide the use-cases into 4 parts.
>    
>    - PAPI based global stores (will have the earliest hardcoded)
>    - PAPI based state stores (already has auto.reset.config)
>    - DSL based GlobalKTables (will have earliest hardcoded)
>    - DSL based KTables (will continue with auto.reset.config)
> 
> 
> 
> So, this would mean that we are not changing any existing behaviors 
> with this if I am right.
> 
> 
> 
> I guess we could improve the code to actually log a warning for this
> 
> case, similar to wh

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-08-09 Thread Navinder Brar
ink about the synchronization with the main processing threads?
On startup we bootstrap the global stores before processing happens.
Thus, if an `InvalidOffsetException` happen and the global thread dies,
the main threads cannot access the global stores any longer an also die.
If we re-build the state though, do we need to pause the main thread
during this phase?



-Matthias



On 8/2/20 8:48 AM, Navinder Brar wrote:
> Hi John,
> 
> I have updated the KIP to make the motivation more clear. In a nutshell, we 
> will use the already existing config "global.consumer.auto.offset.reset" for 
> users to set a blanket reset policy for all global topics and add a new 
> interface to set per-topic reset policy for each global topic(for which we 
> specifically need this KIP). There was a point raised from Matthias above to 
> always reset to earliest by cleaning the stores and seekToBeginning in case 
> of InvalidOffsetException. We can go with that route as well and I don't 
> think it would need a KIP as if we are not providing users an option to have 
> blanket reset policy on global topics, then a per-topic override would also 
> not be required(the KIP is required basically for that). Although, I think if 
> users have an option to choose reset policy for StreamThread then the option 
> should be provided for GlobalStreamThread as well and if we don't want to use 
> the "global.consumer.auto.offset.reset" then we would need to deprecate it 
> because currently it's not serving any purpose. For now, I have added it in 
> rejected alternatives but we can discuss this.
> 
> On the query that I had for Guozhang, thanks to Matthias we have fixed it 
> last week as part of KAFKA-10306.
> 
> ~Navinder
>  
> 
>    On Sunday, 26 July, 2020, 07:37:34 pm IST, Navinder Brar 
> wrote:  
>  
>  
> Hi,
> 
> 
> 
> Sorry, it took some time to respond back.
> 
> 
> 
> 
> 
> 
> 
> “but I thought we would pass the config through to the client.”
> 
>>> @John, sure we can use the config in GloablStreamThread, that could be one 
>>> of the way to solve it.
> 
> 
> 
> 
> 
> 
> @Matthias, sure cleaning the store and recreating is one way but since we are 
> giving an option to reset in StreamThread why the implementation should be 
> different in GlobalStreamThread. I think we should use the 
> global.consumer.auto.offset.reset config to accept the reset strategy opted 
> by the user although I would be ok with just cleaning and resetting to the 
> latest as well for now. Currently, we throw a StreamsException in case of 
> InvalidOffsetException in GlobalStreamThread so just resetting would still be 
> better than what happens currently. 
> 
> Matthias, I found this comment in StreamBuilder for GlobalKTable ‘* Note that 
> {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy 
> {@code "earliest"} regardless of the specified value in {@link StreamsConfig} 
> or {@link Consumed}.’ 
> So, I guess we are already cleaning up and recreating for GlobalKTable from 
> earliest offset.
> 
> 
> 
> 
> 
> 
> 
> 
> @Guozhan while looking at the code, I also noticed a TODO: pending in 
> GlobalStateManagerImpl, when InvalidOffsetException is thrown. Earlier, we 
> were directly clearing the store here and recreating from scratch but that 
> code piece is removed now. Are you working on a follow-up PR for this or just 
> handling the reset in GlobalStreamThread should be sufficient?
> 
> Regards,
> Navinder
> 
>     On Tuesday, 7 July, 2020, 12:53:36 am IST, Matthias J. Sax 
>  wrote:  
>  
>  Atm, the config should be ignored and the global-consumer should use
> "none" in a hard-coded way.
> 
> However, if am still wondering if we actually want/need to allow users
> to specify the reset policy? It might be worth to consider, to just
> change the behavior: catch the exception, log an ERROR (for information
> purpose), wipe the store, seekToBeginning(), and recreate the store?
> 
> Btw: if we want to allow users to set the reset policy, this should be
> possible via the config, or via overwriting the config in the method
> itself. Thus, we would need to add the new overloaded method to
> `Topology` and `StreamsBuilder`.
> 
> Another question to ask: what about GlobalKTables? Should they behave
> the same? An alternative design could be, to allow users to specify a
> flexible reset policy for global-stores, but not for GlobalKTables and
> use the strategy suggested above for this case.
> 
> Thoughts?
> 
> 
> -Matthias
> 
> 
> On 7/2/20 2:14 PM, John Roesler wrote:
>> Hi Navinder,
>>
>> Thanks for the response. I’m sorr

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-08-02 Thread Navinder Brar
Hi John,

I have updated the KIP to make the motivation more clear. In a nutshell, we 
will use the already existing config "global.consumer.auto.offset.reset" for 
users to set a blanket reset policy for all global topics and add a new 
interface to set per-topic reset policy for each global topic(for which we 
specifically need this KIP). There was a point raised from Matthias above to 
always reset to earliest by cleaning the stores and seekToBeginning in case of 
InvalidOffsetException. We can go with that route as well and I don't think it 
would need a KIP as if we are not providing users an option to have blanket 
reset policy on global topics, then a per-topic override would also not be 
required(the KIP is required basically for that). Although, I think if users 
have an option to choose reset policy for StreamThread then the option should 
be provided for GlobalStreamThread as well and if we don't want to use the 
"global.consumer.auto.offset.reset" then we would need to deprecate it because 
currently it's not serving any purpose. For now, I have added it in rejected 
alternatives but we can discuss this.

On the query that I had for Guozhang, thanks to Matthias we have fixed it last 
week as part of KAFKA-10306.

~Navinder
 

On Sunday, 26 July, 2020, 07:37:34 pm IST, Navinder Brar 
 wrote:  
 
 
Hi,



Sorry, it took some time to respond back.







“but I thought we would pass the config through to the client.”

>> @John, sure we can use the config in GloablStreamThread, that could be one 
>> of the way to solve it.






@Matthias, sure cleaning the store and recreating is one way but since we are 
giving an option to reset in StreamThread why the implementation should be 
different in GlobalStreamThread. I think we should use the 
global.consumer.auto.offset.reset config to accept the reset strategy opted by 
the user although I would be ok with just cleaning and resetting to the latest 
as well for now. Currently, we throw a StreamsException in case of 
InvalidOffsetException in GlobalStreamThread so just resetting would still be 
better than what happens currently. 

Matthias, I found this comment in StreamBuilder for GlobalKTable ‘* Note that 
{@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code 
"earliest"} regardless of the specified value in {@link StreamsConfig} or 
{@link Consumed}.’ 
So, I guess we are already cleaning up and recreating for GlobalKTable from 
earliest offset.








@Guozhan while looking at the code, I also noticed a TODO: pending in 
GlobalStateManagerImpl, when InvalidOffsetException is thrown. Earlier, we were 
directly clearing the store here and recreating from scratch but that code 
piece is removed now. Are you working on a follow-up PR for this or just 
handling the reset in GlobalStreamThread should be sufficient?

Regards,
Navinder

    On Tuesday, 7 July, 2020, 12:53:36 am IST, Matthias J. Sax 
 wrote:  
 
 Atm, the config should be ignored and the global-consumer should use
"none" in a hard-coded way.

However, if am still wondering if we actually want/need to allow users
to specify the reset policy? It might be worth to consider, to just
change the behavior: catch the exception, log an ERROR (for information
purpose), wipe the store, seekToBeginning(), and recreate the store?

Btw: if we want to allow users to set the reset policy, this should be
possible via the config, or via overwriting the config in the method
itself. Thus, we would need to add the new overloaded method to
`Topology` and `StreamsBuilder`.

Another question to ask: what about GlobalKTables? Should they behave
the same? An alternative design could be, to allow users to specify a
flexible reset policy for global-stores, but not for GlobalKTables and
use the strategy suggested above for this case.

Thoughts?


-Matthias


On 7/2/20 2:14 PM, John Roesler wrote:
> Hi Navinder,
> 
> Thanks for the response. I’m sorry if I’m being dense... You said we are not 
> currently using the config, but I thought we would pass the config through to 
> the client.  Can you confirm whether or not the existing config works for 
> your use case?
> 
> Thanks,
> John
> 
> On Sun, Jun 28, 2020, at 14:09, Navinder Brar wrote:
>> Sorry my bad. Found it.
>>
>>
>>
>> Prefix used to override {@link KafkaConsumer consumer} configs for the 
>> global consumer client from
>>
>> * the general consumer client configs. The override precedence is the 
>> following (from highest to lowest precedence):
>> * 1. global.consumer.[config-name]..
>> public static final String GLOBAL_CONSUMER_PREFIX = "global.consumer.";
>>
>>
>>
>> So, that's great. We already have a config exposed to reset offsets for 
>> global topics via global.consumer.auto.offset.reset just that we are 
>> not actu

Re: [VOTE] KIP-648: Renaming getter method for Interactive Queries

2020-07-29 Thread Navinder Brar
+1 (non-binding). Thanks John, looks good to me.

~NavinderOn Wednesday, 29 July, 2020, 04:32:25 am IST, John Thomas 
 wrote:  
 
 Hello everyone,

I'd like to kick-off a vote for KIP-648 : Renaming getter method for 
Interactive Queries
https://cwiki.apache.org/confluence/display/KAFKA/KIP-648%3A+Renaming+getter+method+for+Interactive+Queries

It's a straight forward change to include new getters and deprecate the 
existing ones for the class KeyQueryMetadata.

Thanks,
John
  

Re: [VOTE] KIP-607: Add Metrics to Record the Memory Used by RocksDB to Kafka Streams

2020-07-27 Thread Navinder Brar
+1 (non-binding). Thanks for the KIP, Bruno.

~Navinder 

On Friday, 24 July, 2020, 08:41:03 pm IST, John Roesler 
 wrote:  
 
 Thanks, Bruno!

I'm +1 (binding)

-John

On Fri, Jul 24, 2020, at 07:04, Bruno Cadonna wrote:
> Hi,
> 
> After re-opening the discussion about
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Report+Properties+of+RocksDB
> 
> I would like to re-open the voting for this KIP.
> 
> The discussion thread can be found here:
>  
> http://mail-archives.apache.org/mod_mbox/kafka-dev/202005.mbox/%3CCADR0NwzJBJa6WihnpmGj0R%2BYPVrojq4Kg_hOArNEytHAG-tZAQ%40mail.gmail.com%3E
> 
> Best,
> Bruno
> 
> On 19.05.20 10:00, Bruno Cadonna wrote:
> > Thank you for voting!
> > 
> > This KIP passes with:
> > 4 binding +1
> > 1 non-binding +1
> > 0 -1
> > 
> > Best,
> > Bruno
> > 
> > On Fri, May 15, 2020 at 11:34 PM Matthias J. Sax  wrote:
> >>
> >> +1 (binding)
> >>
> >> -Matthias
> >>
> >> On 5/15/20 11:48 AM, John Roesler wrote:
> >>> Thanks, Bruno!
> >>>
> >>> I’m +1 (binding)
> >>>
> >>> -John
> >>>
> >>> On Fri, May 15, 2020, at 11:32, Sophie Blee-Goldman wrote:
>  Thanks Bruno! +1 (non-binding)
> 
>  Sophie
> 
>  On Fri, May 15, 2020 at 8:15 AM Bill Bejeck  wrote:
> 
> > Thanks for the KIP!
> >
> > +1 (binding)
> >
> > -Bill
> >
> > On Fri, May 15, 2020 at 11:12 AM Guozhang Wang  
> > wrote:
> >
> >> +1.
> >>
> >> Thanks!
> >>
> >> On Fri, May 15, 2020 at 1:36 AM Bruno Cadonna 
> > wrote:
> >>
> >>> Hi all,
> >>>
> >>> I'd like to call for votes on
> >>>
> >>> KIP-607: Add Metrics to Record the Memory Used by RocksDB to Kafka
> >> Streams
> >>>
> >>> The KIP can be found here
> >>>
> >>>
> >>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Record+the+Memory+Used+by+RocksDB
> >>>
> >>> The discussion thread can be found here:
> >>>
> >>>
> >>
> > http://mail-archives.apache.org/mod_mbox/kafka-dev/202005.mbox/%3CCADR0NwzJBJa6WihnpmGj0R%2BYPVrojq4Kg_hOArNEytHAG-tZAQ%40mail.gmail.com%3E
> >>>
> >>> Best,
> >>> Bruno
> >>>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> 
> >>
>  

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-07-26 Thread Navinder Brar

Hi,



Sorry, it took some time to respond back.







“but I thought we would pass the config through to the client.”

>> @John, sure we can use the config in GloablStreamThread, that could be one 
>> of the way to solve it.






@Matthias, sure cleaning the store and recreating is one way but since we are 
giving an option to reset in StreamThread why the implementation should be 
different in GlobalStreamThread. I think we should use the 
global.consumer.auto.offset.reset config to accept the reset strategy opted by 
the user although I would be ok with just cleaning and resetting to the latest 
as well for now. Currently, we throw a StreamsException in case of 
InvalidOffsetException in GlobalStreamThread so just resetting would still be 
better than what happens currently. 

Matthias, I found this comment in StreamBuilder for GlobalKTable ‘* Note that 
{@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code 
"earliest"} regardless of the specified value in {@link StreamsConfig} or 
{@link Consumed}.’ 
So, I guess we are already cleaning up and recreating for GlobalKTable from 
earliest offset.








@Guozhan while looking at the code, I also noticed a TODO: pending in 
GlobalStateManagerImpl, when InvalidOffsetException is thrown. Earlier, we were 
directly clearing the store here and recreating from scratch but that code 
piece is removed now. Are you working on a follow-up PR for this or just 
handling the reset in GlobalStreamThread should be sufficient?

Regards,
Navinder

On Tuesday, 7 July, 2020, 12:53:36 am IST, Matthias J. Sax 
 wrote:  
 
 Atm, the config should be ignored and the global-consumer should use
"none" in a hard-coded way.

However, if am still wondering if we actually want/need to allow users
to specify the reset policy? It might be worth to consider, to just
change the behavior: catch the exception, log an ERROR (for information
purpose), wipe the store, seekToBeginning(), and recreate the store?

Btw: if we want to allow users to set the reset policy, this should be
possible via the config, or via overwriting the config in the method
itself. Thus, we would need to add the new overloaded method to
`Topology` and `StreamsBuilder`.

Another question to ask: what about GlobalKTables? Should they behave
the same? An alternative design could be, to allow users to specify a
flexible reset policy for global-stores, but not for GlobalKTables and
use the strategy suggested above for this case.

Thoughts?


-Matthias


On 7/2/20 2:14 PM, John Roesler wrote:
> Hi Navinder,
> 
> Thanks for the response. I’m sorry if I’m being dense... You said we are not 
> currently using the config, but I thought we would pass the config through to 
> the client.  Can you confirm whether or not the existing config works for 
> your use case?
> 
> Thanks,
> John
> 
> On Sun, Jun 28, 2020, at 14:09, Navinder Brar wrote:
>> Sorry my bad. Found it.
>>
>>
>>
>> Prefix used to override {@link KafkaConsumer consumer} configs for the 
>> global consumer client from
>>
>> * the general consumer client configs. The override precedence is the 
>> following (from highest to lowest precedence):
>> * 1. global.consumer.[config-name]..
>> public static final String GLOBAL_CONSUMER_PREFIX = "global.consumer.";
>>
>>
>>
>> So, that's great. We already have a config exposed to reset offsets for 
>> global topics via global.consumer.auto.offset.reset just that we are 
>> not actually using it inside GlobalStreamThread to reset.
>>
>> -Navinder
>>    On Monday, 29 June, 2020, 12:24:21 am IST, Navinder Brar 
>>  wrote:  
>>  
>>  Hi John,
>>
>> Thanks for your feedback. 
>> 1. I think there is some confusion on my first point, the enum I am 
>> sure we can use the same one but the external config which controls the 
>> resetting in global stream thread either we can the same one which 
>> users use for source topics(StreamThread) or we can provide a new one 
>> which specifically controls global topics. For e.g. currently if I get 
>> an InvalidOffsetException in any of my source topics, I can choose 
>> whether to reset from Earliest or Latest(with auto.offset.reset). Now 
>> either we can use the same option and say if I get the same exception 
>> for global topics I will follow same resetting. Or some users might 
>> want to have totally different setting for both source and global 
>> topics, like for source topic I want resetting from Latest but for 
>> global topics I want resetting from Earliest so in that case adding a 
>> new config might be better.
>>
>> 2. I couldn't find this config currently 
>> "global.consumer.auto.offset.reset". Infac

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-06-28 Thread Navinder Brar
Sorry my bad. Found it.



Prefix used to override {@link KafkaConsumer consumer} configs for the global 
consumer client from

* the general consumer client configs. The override precedence is the following 
(from highest to lowest precedence):
* 1. global.consumer.[config-name]..
public static final String GLOBAL_CONSUMER_PREFIX = "global.consumer.";



So, that's great. We already have a config exposed to reset offsets for global 
topics via global.consumer.auto.offset.reset just that we are not actually 
using it inside GlobalStreamThread to reset.

-Navinder
On Monday, 29 June, 2020, 12:24:21 am IST, Navinder Brar 
 wrote:  
 
 Hi John,

Thanks for your feedback. 
1. I think there is some confusion on my first point, the enum I am sure we can 
use the same one but the external config which controls the resetting in global 
stream thread either we can the same one which users use for source 
topics(StreamThread) or we can provide a new one which specifically controls 
global topics. For e.g. currently if I get an InvalidOffsetException in any of 
my source topics, I can choose whether to reset from Earliest or Latest(with 
auto.offset.reset). Now either we can use the same option and say if I get the 
same exception for global topics I will follow same resetting. Or some users 
might want to have totally different setting for both source and global topics, 
like for source topic I want resetting from Latest but for global topics I want 
resetting from Earliest so in that case adding a new config might be better.

2. I couldn't find this config currently "global.consumer.auto.offset.reset". 
Infact in GlobalStreamThread.java we are throwing a StreamsException for 
InvalidOffsetException and there is a test as well 
GlobalStreamThreadTest#shouldDieOnInvalidOffsetException(), so I think this is 
the config we are trying to introduce with this KIP.

-Navinder  On Saturday, 27 June, 2020, 07:03:04 pm IST, John Roesler 
 wrote:  
 
 Hi Navinder,

Thanks for this proposal!

Regarding your question about whether to use the same policy
enum or not, the underlying mechanism is the same, so I think
we can just use the same AutoOffsetReset enum.

Can you confirm whether setting the reset policy config on the
global consumer currently works or not? Based on my reading
of StreamsConfig, it looks like it would be:
"global.consumer.auto.offset.reset".

If that does work, would you still propose to augment the
Java API?

Thanks,
-John

On Fri, Jun 26, 2020, at 23:52, Navinder Brar wrote:
> Hi,
> 
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy
> 
> I have taken over this KIP since it has been dormant for a long time 
> and this looks important for use-cases that have large global data, so 
> rebuilding global stores from scratch might seem overkill in case of 
> InvalidOffsetExecption.
> 
> We want to give users the control to use reset policy(as we do in 
> StreamThread) in case they hit invalid offsets. I have still not 
> decided whether to restrict this option to the same reset policy being 
> used by StreamThread(using auto.offset.reset config) or add another 
> reset config specifically for global stores 
> "global.auto.offset.reset" which gives users more control to choose 
> separate policies for global and stream threads.
> 
> I would like to hear your opinions on the KIP.
> 
> 
> -Navinder    

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-06-28 Thread Navinder Brar
Hi John,

Thanks for your feedback. 
1. I think there is some confusion on my first point, the enum I am sure we can 
use the same one but the external config which controls the resetting in global 
stream thread either we can the same one which users use for source 
topics(StreamThread) or we can provide a new one which specifically controls 
global topics. For e.g. currently if I get an InvalidOffsetException in any of 
my source topics, I can choose whether to reset from Earliest or Latest(with 
auto.offset.reset). Now either we can use the same option and say if I get the 
same exception for global topics I will follow same resetting. Or some users 
might want to have totally different setting for both source and global topics, 
like for source topic I want resetting from Latest but for global topics I want 
resetting from Earliest so in that case adding a new config might be better.

2. I couldn't find this config currently "global.consumer.auto.offset.reset". 
Infact in GlobalStreamThread.java we are throwing a StreamsException for 
InvalidOffsetException and there is a test as well 
GlobalStreamThreadTest#shouldDieOnInvalidOffsetException(), so I think this is 
the config we are trying to introduce with this KIP.

-Navinder   On Saturday, 27 June, 2020, 07:03:04 pm IST, John Roesler 
 wrote:  
 
 Hi Navinder,

Thanks for this proposal!

Regarding your question about whether to use the same policy
enum or not, the underlying mechanism is the same, so I think
we can just use the same AutoOffsetReset enum.

Can you confirm whether setting the reset policy config on the
global consumer currently works or not? Based on my reading
of StreamsConfig, it looks like it would be:
"global.consumer.auto.offset.reset".

If that does work, would you still propose to augment the
Java API?

Thanks,
-John

On Fri, Jun 26, 2020, at 23:52, Navinder Brar wrote:
> Hi,
> 
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy
> 
> I have taken over this KIP since it has been dormant for a long time 
> and this looks important for use-cases that have large global data, so 
> rebuilding global stores from scratch might seem overkill in case of 
> InvalidOffsetExecption.
> 
> We want to give users the control to use reset policy(as we do in 
> StreamThread) in case they hit invalid offsets. I have still not 
> decided whether to restrict this option to the same reset policy being 
> used by StreamThread(using auto.offset.reset config) or add another 
> reset config specifically for global stores 
> "global.auto.offset.reset" which gives users more control to choose 
> separate policies for global and stream threads.
> 
> I would like to hear your opinions on the KIP.
> 
> 
> -Navinder  

[DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-06-26 Thread Navinder Brar
Hi,

KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy

I have taken over this KIP since it has been dormant for a long time and this 
looks important for use-cases that have large global data, so rebuilding global 
stores from scratch might seem overkill in case of InvalidOffsetExecption.

We want to give users the control to use reset policy(as we do in StreamThread) 
in case they hit invalid offsets. I have still not decided whether to restrict 
this option to the same reset policy being used by StreamThread(using 
auto.offset.reset config) or add another reset config specifically for global 
stores "global.auto.offset.reset" which gives users more control to choose 
separate policies for global and stream threads.

I would like to hear your opinions on the KIP.


-Navinder 

Re: [ANNOUNCE] New committer: Boyang Chen

2020-06-22 Thread Navinder Brar
Many Congratulations Boyang. Very well deserved.

Regards,Navinder  

On Tuesday, 23 June, 2020, 07:21:23 am IST, Matt Wang  
wrote:  
 
 Congratulations, Boyang!


--

Best,
Matt Wang


On 06/23/2020 07:59,Boyang Chen wrote:
Thanks a lot everyone, I really appreciate the recognition, and hope to
make more solid contributions to the community in the future!

On Mon, Jun 22, 2020 at 4:50 PM Matthias J. Sax  wrote:

Congrats! Well deserved!

-Matthias

On 6/22/20 4:38 PM, Bill Bejeck wrote:
Congratulations Boyang! Well deserved.

-Bill

On Mon, Jun 22, 2020 at 7:35 PM Colin McCabe  wrote:

Congratulations, Boyang!

cheers,
Colin

On Mon, Jun 22, 2020, at 16:26, Guozhang Wang wrote:
The PMC for Apache Kafka has invited Boyang Chen as a committer and we
are
pleased to announce that he has accepted!

Boyang has been active in the Kafka community more than two years ago.
Since then he has presented his experience operating with Kafka Streams
at
Pinterest as well as several feature development including rebalance
improvements (KIP-345) and exactly-once scalability improvements
(KIP-447)
in various Kafka Summit and Kafka Meetups. More recently he's also been
participating in Kafka broker development including post-Zookeeper
controller design (KIP-500). Besides all the code contributions, Boyang
has
also helped reviewing even more PRs and KIPs than his own.

Thanks for all the contributions Boyang! And look forward to more
collaborations with you on Apache Kafka.


-- Guozhang, on behalf of the Apache Kafka PMC





  

Re: [VOTE] KIP-626: Rename StreamsConfig config variable name

2020-06-18 Thread Navinder Brar
Hi Matthias,

+1 non-binding. Thanks for the KIP.

Thanks,
Navinder  

On Tuesday, 16 June, 2020, 06:34:19 pm IST, Bruno Cadonna 
 wrote:  
 
 Hi Matthias,

Thank you for the KIP.

I am +1 (non-binding).

Said that, you have a typo in the "Public Interface" section.
TOPLOGY_OPTIMIZATION_CONFIG instead of TOPOLOGY_OPTIMIZATION_CONFIG.

Best,
Bruno

On Tue, Jun 16, 2020 at 5:02 AM Matthias J. Sax  wrote:
>
> Hi,
>
> I found a small inconsistency in our public API and propose a small KIP
> to fix it. As the change is trivial, I skip the discussion and call
> directly for a VOTE.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-626%3A+Rename+StreamsConfig+config+variable+name
>
>
> -Matthias
>
  

Re: [ANNOUNCE] Apache Kafka 2.5.0

2020-04-15 Thread Navinder Brar
Thanks for running the release David. Congratulations to everyone involved.

-Navinder Pal Singh Brar
 

On Thursday, 16 April, 2020, 07:26:25 am IST, Konstantine Karantasis 
 wrote:  
 
 Thanks for driving the release David and congrats to all the contributors.
Nicely done!

Konstantine

On Wed, Apr 15, 2020 at 3:16 PM Randall Hauch  wrote:

> Thanks, David!
>
> Congratulations to the whole AK community, and thanks to everyone that
> contributed!
>
> On Wed, Apr 15, 2020 at 4:47 PM Sönke Liebau
>  wrote:
>
> > Thanks David!!
> >
> >
> > On Wed, 15 Apr 2020 at 23:07, Bill Bejeck  wrote:
> >
> > > David,
> > >
> > > Thanks for running the release!
> > >
> > > -Bill
> > >
> > > On Wed, Apr 15, 2020 at 4:45 PM Matthias J. Sax 
> > wrote:
> > >
> > > > Thanks for running the release David!
> > > >
> > > > -Matthias
> > > >
> > > > On 4/15/20 1:15 PM, David Arthur wrote:
> > > > > The Apache Kafka community is pleased to announce the release for
> > > Apache
> > > > > Kafka 2.5.0
> > > > >
> > > > > This release includes many new features, including:
> > > > >
> > > > > * TLS 1.3 support (1.2 is now the default)
> > > > > * Co-groups for Kafka Streams
> > > > > * Incremental rebalance for Kafka Consumer
> > > > > * New metrics for better operational insight
> > > > > * Upgrade Zookeeper to 3.5.7
> > > > > * Deprecate support for Scala 2.11
> > > > >
> > > > > All of the changes in this release can be found in the release
> notes:
> > > > > https://www.apache.org/dist/kafka/2.5.0/RELEASE_NOTES.html
> > > > >
> > > > >
> > > > > You can download the source and binary release (Scala 2.12 and
> 2.13)
> > > > from:
> > > > > https://kafka.apache.org/downloads#2.5.0
> > > > >
> > > > >
> > > >
> > >
> >
> ---
> > > > >
> > > > >
> > > > > Apache Kafka is a distributed streaming platform with four core
> APIs:
> > > > >
> > > > >
> > > > > ** The Producer API allows an application to publish a stream
> records
> > > to
> > > > > one or more Kafka topics.
> > > > >
> > > > > ** The Consumer API allows an application to subscribe to one or
> more
> > > > > topics and process the stream of records produced to them.
> > > > >
> > > > > ** The Streams API allows an application to act as a stream
> > processor,
> > > > > consuming an input stream from one or more topics and producing an
> > > > > output stream to one or more output topics, effectively
> transforming
> > > the
> > > > > input streams to output streams.
> > > > >
> > > > > ** The Connector API allows building and running reusable producers
> > or
> > > > > consumers that connect Kafka topics to existing applications or
> data
> > > > > systems. For example, a connector to a relational database might
> > > > > capture every change to a table.
> > > > >
> > > > >
> > > > > With these APIs, Kafka can be used for two broad classes of
> > > application:
> > > > >
> > > > > ** Building real-time streaming data pipelines that reliably get
> data
> > > > > between systems or applications.
> > > > >
> > > > > ** Building real-time streaming applications that transform or
> react
> > > > > to the streams of data.
> > > > >
> > > > >
> > > > > Apache Kafka is in use at large and small companies worldwide,
> > > including
> > > > > Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest,
> > > Rabobank,
> > > > > Target, The New York Times, Uber, Yelp, and Zalando, among others.
> > > > >
> > > > > A big thank you for the following 108 contributors to this release!
> > > > >
> > > > > A. Sophie Blee-Goldman, Adam Bellemare, Alaa Zbair, Alex Kokachev,
> > Alex
> > > > > Leung, Alex Mironov, Alice, Andrew Olson, Andy Coates, Anna
> Povzner,
> > > > Antony
> > > > > Stubbs, Arvind Thirunarayanan, belugabehr, bill, Bill Bejeck, Bob
> > > > Barrett,
> > > > > Boyang Chen, Brian Bushree, Brian Byrne, Bruno Cadonna, Bryan Ji,
> > > > Chia-Ping
> > > > > Tsai, Chris Egerton, Chris Pettitt, Chris Stromberger, Colin P.
> > Mccabe,
> > > > > Colin Patrick McCabe, commandini, Cyrus Vafadari, Dae-Ho Kim, David
> > > > Arthur,
> > > > > David Jacot, David Kim, David Mao, dengziming, Dhruvil Shah,
> Edoardo
> > > > Comar,
> > > > > Eduardo Pinto, Fábio Silva, gkomissarov, Grant Henke, Greg Harris,
> > > Gunnar
> > > > > Morling, Guozhang Wang, Harsha Laxman, high.lee, highluck, Hossein
> > > > Torabi,
> > > > > huxi, huxihx, Ismael Juma, Ivan Yurchenko, Jason Gustafson,
> > jiameixie,
> > > > John
> > > > > Roesler, José Armando García Sancio, Jukka Karvanen, Karan Kumar,
> > Kevin
> > > > Lu,
> > > > > Konstantine Karantasis, Lee Dongjin, Lev Zemlyanov, Levani
> > Kokhreidze,
> > > > > Lucas Bradstreet, Manikumar Reddy, Mathias Kub, Matthew Wong,
> > Matthias
> > > J.
> > > > > Sax, Michael Gyarmathy, Michael Viamari, Mickael Maison, Mitch,
> > > > > mmanna-sapfgl, NanerLee, Narek Karapetian, Navinder Pal Singh Brar,
> > > > > nicolasguyomar, Nigel Liang, NIkhil Bhatia, Nikolay, 

Re: [DISCUSS]KIP-216: IQ should throw different exceptions for different errors

2020-02-21 Thread Navinder Brar
Hi Vito,

I checked the code and I think you are right. If a user provides a wrong 
partition there will be NPE at tasks.get(keyTaskId).getStore(storeName) as that 
task is not available at this machine.

I think we split the line: 
https://github.com/apache/kafka/blob/bbfecaef725456f648f03530d26a5395042966fa/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java#L62
 into 2 parts and check tasks.get(keyTaskId) separately. If it is null, we can 
throw an InvalidPartitionException. WDYS?

Thanks,
Navinder


On Saturday, 22 February, 2020, 06:22:14 am IST, Vito Jeng 
 wrote:  
 
 Hi, Matthias and Navinder,

I have a question about the valid partition in
StreamThreadStateStoreProvider.

In the StreamThreadStateStoreProvider#createKeyTaskId(storeName, partition):
https://github.com/apache/kafka/blob/bbfecaef725456f648f03530d26a5395042966fa/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java#L103

We pass an integer as partition and then use this partition to create
TaskId instance in the topic group while loop. How do we make sure the
partition is valid? If we pass a correct storeName and a invalid partition
into createKeyTaskId() , it still looks can be created a new TaskId and
would not throw InvalidStateStorePartitionException.

I guess this would cause a NullPointerException at line #62 because this
keyTaskId cannot found in the task list.
https://github.com/apache/kafka/blob/bbfecaef725456f648f03530d26a5395042966fa/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java#L62

Does this right? or there something wrong with me?

---
Vito


On Wed, Feb 5, 2020 at 2:53 AM Navinder Brar
 wrote:

> Thanks Vito, for incorporating this. Makes sense.
>
> -Navinder
>
>
> On Wednesday, February 5, 2020, 12:17 AM, Matthias J. Sax <
> mj...@apache.org> wrote:
>
> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA512
>
> Thanks Vito!
>
> That makes sense to me.
>
>
> On 2/1/20 11:29 PM, Vito Jeng wrote:
> > Hi, folks,
> >
> > KIP-562(KAFKA-9445) already merged three days ago.
> >
> > I have updated KIP-216 to reflect the KIP-562. The main change is
> > to introduce a new exception `InvalidStateStorePartitionException`,
> > will be thrown when user requested partition not available.
> >
> > Please take a look and any feedback is welcome. Thanks Matthias for
> > the reminder.
> >
> > --- Vito
> >
> >
> > On Thu, Jan 23, 2020 at 2:13 PM Vito Jeng 
> > wrote:
> >
> >> Got it, thanks Matthias.
> >>
> >> --- Vito
> >>
> >>
> >> On Thu, Jan 23, 2020 at 1:31 PM Matthias J. Sax
> >>  wrote:
> >>
> >>> Thanks Vito.
> >>>
> >>> I am also ok with either name. Just a personal slight
> >>> preference, but not a important.
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 1/21/20 6:52 PM, Vito Jeng wrote:
> >>>> Thanks Matthias.
> >>>>
> >>>> The KIP is about InvalidStateStoreException. I pick
> >>>> `StateStoreNotAvailableException` because it may be more
> >>> intuitive
> >>>> than `StreamsNotRunningException`.
> >>>>
> >>>> No matter which one picked, it's good to me.
> >>>>
> >>>> --- Vito
> >>>>
> >>>>
> >>>> On Wed, Jan 22, 2020 at 7:44 AM Matthias J. Sax
> >>>>  wrote:
> >>>>
> >>>>> Thanks for updating the KIP!
> >>>>>
> >>>>> One last comment/question: you kept
> >>>>> `StateStoreNotAvailableException`
> >>> in
> >>>>> favor of `StreamsNotRunningException` (to merge both as
> >>>>> suggested).
> >>>>>
> >>>>> I am wondering, if it might be better to keep
> >>>>> `StreamsNotRunningException` instead of
> >>>>> `StateStoreNotAvailableException`, because this exception
> >>>>> is thrown if Streams is in state PENDING_SHUTDOWN /
> >>>>> NOT_RUNNING / ERROR ?
> >>>>>
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 1/17/20 9:56 PM, John Roesler wrote:
> >>>>>> Thanks, Vito. I've just cast my vote. -John
> >>>>>>
> >>>>>> On Fri, Jan 17, 2020, at 21:32, Vito Jeng wrote:
> >>>>>>> Hi, folks,
> >>>>>>>

[jira] [Created] (KAFKA-9588) Add rocksdb event listeners in KS

2020-02-20 Thread Navinder Brar (Jira)
Navinder Brar created KAFKA-9588:


 Summary: Add rocksdb event listeners in KS
 Key: KAFKA-9588
 URL: https://issues.apache.org/jira/browse/KAFKA-9588
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Navinder Brar


Rocsdb is coming up with the support of event listeners(like 
onCompactionCompleted) in jni which would be really helpful in KS to trigger 
checkpointing on flush completed due to filling up of memtables, rather than 
doing it periodically etc. This task is currently blocked on 
https://issues.apache.org/jira/browse/KAFKA-8897.

 

Linking this task to https://issues.apache.org/jira/browse/KAFKA-9450 as well 
for tracking.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9487) Followup : KAFKA-9445(Allow fetching a key from a single partition); addressing code review comments

2020-02-11 Thread Navinder Brar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar resolved KAFKA-9487.
--
Resolution: Fixed

> Followup : KAFKA-9445(Allow fetching a key from a single partition); 
> addressing code review comments
> 
>
> Key: KAFKA-9487
> URL: https://issues.apache.org/jira/browse/KAFKA-9487
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>    Reporter: Navinder Brar
>    Assignee: Navinder Brar
>Priority: Blocker
> Fix For: 2.5.0
>
>
> A few code review comments are left to be addressed from Kafka 9445, which I 
> will be addressing in this PR.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-568: Explicit rebalance triggering on the Consumer

2020-02-11 Thread Navinder Brar
Thanks Sophie, much required.
+1 non-binding.


Sent from Yahoo Mail for iPhone


On Tuesday, February 11, 2020, 10:33 PM, John Roesler  
wrote:

Thanks Sophie,

I'm +1 (binding)

-John

On Mon, Feb 10, 2020, at 20:54, Sophie Blee-Goldman wrote:
> Hey all,
> 
> I'd like to start the voting on KIP-568. It proposes the new
> Consumer#enforceRebalance API to facilitate triggering efficient rebalances.
> 
> For reference, here is the KIP link again:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer
> 
> Thanks!
> Sophie
>





Re: [DISCUSS] : KIP-562: Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-02-06 Thread Navinder Brar
Hi,

While implementing 562, we have decided to rename StoreQueryParams -> 
StoreQueryParameters. I have updated the PR and confluence. Please share if 
anyone has feedback on it.

Thanks & Regards,
Navinder Pal Singh Brar 

On Friday, 24 January, 2020, 08:45:15 am IST, Navinder Brar 
 wrote:  
 
 Hi John,

Thanks for the responses. I will make the below changes as I had suggested 
earlier, and then close the vote in a few hours.

includeStaleStores -> staleStores
withIncludeStaleStores() > enableStaleStores()
includeStaleStores() -> staleStoresEnabled()

Thanks,
Navinder

Sent from Yahoo Mail for iPhone


On Friday, January 24, 2020, 5:36 AM, John Roesler  wrote:

Hi Bruno,

Thanks for your question; it's a very reasonable response to 
what I said before.

I didn't mean "field" as in an instance variable, just as in a specific
property or attribute. It's hard to talk about because all the words
for this abstract concept are also words that people commonly use
for instance variables.

The principle is that these classes should be simple data containers.
It's not so much that the methods match the field name, or that the
field name matches the methods, but that all three bear a simple
and direct relationship to each other. Or maybe I should say that
the getters, setters, and fields are all directly named after a property.

The point is that we should make sure we're not "playing games" in
these classes but simply setting a property and offering a transparent
way to get exactly what you just set.

I actually do think that the instance variable itself should have the
same name as the "field" or "property" that the getters and setters
are named for. This is not a violation of encapsulation because those 
instance variables are required to be private. 

 I guess you can think of  this rule as more of a style guide than a 
grammar, but whatever. As a maintainer, I think we should discourage 
these particular classes to have different instance variables than 
method names. Otherwise,  it's just silly. Either "includeStaleStores" 
or "staleStoresEnabled" is a fine name, but not both. There's no 
reason at all to name all the accessors one of them and the instance 
variable they access the  other name.

Thanks,
-John


On Thu, Jan 23, 2020, at 17:27, Bruno Cadonna wrote:
> Hi John,
> 
> One question: Why must the field name be involved in the naming? It
> somehow contradicts encapsulation. Field name `includeStaleStores` (or
> `staleStoresEnabled`) sounds perfectly fine to me. IMO, we should
> decouple the parameter name from the actual field name.
> 
> Bruno
> 
> On Thu, Jan 23, 2020 at 3:02 PM John Roesler  wrote:
> >
> > Hi all,
> >
> > Thanks for the discussion!
> >
> > The basic idea I used in the original draft of the grammar was to avoid
> > "fancy code" and just write "normal java". That's why I favored java bean
> > spec over Kafka code traditions.
> >
> > According to the bean spec, setters always start with "set" and getters
> > always start with "get" (or "is" for booleans). This often yields absurd
> > or awkward readability. On the other hand, the "kafka" idioms
> > seems to descend from Scala idiom of naming getters and setters
> > exactly the same as the field they get and set. This plays to a language
> > feature of Scala (getter referential transparency) that is not present
> > in Java. My feeling is that we probably keep this idiom around
> > precisely to avoid the absurd phrasing that the bean spec leads to.
> >
> > On the other hand, adopting the Kafka/Scala idiom brings in an
> > additional burden I was trying to avoid: you have to pick a good
> > name. Basically I was trying to avoid exactly this conversation ;)
> > i.e., "X sounds weird, how about Y", "well, Y also sounds weird,
> > what about Z", "Z sounds good, but then the setter sounds weird",
> > etc.
> >
> > Maybe avoiding discussion was too ambitious, and I can't deny that
> > bean spec names probably result in no one being happy, so I'm on
> > board with the current proposal:
> >
> > setters:
> > set{FieldName}(value)
> > {enable/disable}{FieldName}()
> >
> > getters:
> > {fieldName}()
> > {fieldName}{Enabled/Disabled}()
> >
> > Probably, we'll find cases that are silly under that formula too,
> > but we'll cross that bridge when we get to it.
> >
> > I'll update the grammar when I get the chance.
> >
> > Thanks!
> > -John
> >
> > On Thu, Jan 23, 2020, at 12:37, Navinder Brar wrote:
> > > Thanks Bruno, for the comments.
> > > 1) Fixed.
> >

Re: [DISCUSS]KIP-216: IQ should throw different exceptions for different errors

2020-02-04 Thread Navinder Brar
Thanks Vito, for incorporating this. Makes sense.

-Navinder


On Wednesday, February 5, 2020, 12:17 AM, Matthias J. Sax  
wrote:

-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512

Thanks Vito!

That makes sense to me.


On 2/1/20 11:29 PM, Vito Jeng wrote:
> Hi, folks,
>
> KIP-562(KAFKA-9445) already merged three days ago.
>
> I have updated KIP-216 to reflect the KIP-562. The main change is
> to introduce a new exception `InvalidStateStorePartitionException`,
> will be thrown when user requested partition not available.
>
> Please take a look and any feedback is welcome. Thanks Matthias for
> the reminder.
>
> --- Vito
>
>
> On Thu, Jan 23, 2020 at 2:13 PM Vito Jeng 
> wrote:
>
>> Got it, thanks Matthias.
>>
>> --- Vito
>>
>>
>> On Thu, Jan 23, 2020 at 1:31 PM Matthias J. Sax
>>  wrote:
>>
>>> Thanks Vito.
>>>
>>> I am also ok with either name. Just a personal slight
>>> preference, but not a important.
>>>
>>>
>>> -Matthias
>>>
>>> On 1/21/20 6:52 PM, Vito Jeng wrote:
 Thanks Matthias.

 The KIP is about InvalidStateStoreException. I pick
 `StateStoreNotAvailableException` because it may be more
>>> intuitive
 than `StreamsNotRunningException`.

 No matter which one picked, it's good to me.

 --- Vito


 On Wed, Jan 22, 2020 at 7:44 AM Matthias J. Sax
  wrote:

> Thanks for updating the KIP!
>
> One last comment/question: you kept
> `StateStoreNotAvailableException`
>>> in
> favor of `StreamsNotRunningException` (to merge both as
> suggested).
>
> I am wondering, if it might be better to keep
> `StreamsNotRunningException` instead of
> `StateStoreNotAvailableException`, because this exception
> is thrown if Streams is in state PENDING_SHUTDOWN /
> NOT_RUNNING / ERROR ?
>
>
>
> -Matthias
>
> On 1/17/20 9:56 PM, John Roesler wrote:
>> Thanks, Vito. I've just cast my vote. -John
>>
>> On Fri, Jan 17, 2020, at 21:32, Vito Jeng wrote:
>>> Hi, folks,
>>>
>>> Just update the KIP, please take a look.
>>>
>>> Thanks!
>>>
>>> --- Vito
>>>
>>>
>>> On Fri, Jan 17, 2020 at 9:12 AM Vito Jeng
>>> 
>>> wrote:
>>>
 Thanks Bill, John and Matthias. Glad you guys joined
 this
>>> discussion.
 I got a lot out of the discussion.

 I would like to update KIP-216 base on John's
 suggestion to remove
>>> the
 category.


 --- Vito


 On Fri, Jan 17, 2020 at 2:30 AM Matthias J. Sax <
>>> matth...@confluent.io
>>
 wrote:

>> Nevertheless, if we omit the categorization, it’s
>> moot.
>
> Ack.
>
> I am fine to remove the middle tier. As John
> pointed out, it might
>>> be
> weird to have only one concrete exception type per
> category. We can
> also
> explain in detail how to handle each exception in
> their JavaDocs.
>
>
> -Matthias
>
> On 1/16/20 6:38 AM, Bill Bejeck wrote:
>> Vito,
>>
>> Thanks for the updates, the KIP LGTM.
>>
>> -Bill
>>
>> On Wed, Jan 15, 2020 at 11:31 PM John Roesler <
>>> vvcep...@apache.org>
> wrote:
>>
>>> Hi Vito,
>>>
>>> Haha, your archive game is on point!
>>>
>>> What Matthias said in that email is essentially
>>> what I figured
>>> was
> the
>>> rationale. It makes sense, but the point I was
>>> making is that
>>> this
> really
>>> doesn’t seem like a good way to structure a
>>> production app. On
>>> the
> other
>>> hand, considering the exception fatal has a
>>> good chance of
>>> avoiding
> a
>>> frustrating debug session if you just forgot to
>>> call start.
>>>
>>> Nevertheless, if we omit the categorization,
>>> it’s moot.
>>>
>>> It would be easy to add a categorization layer
>>> later if we want
>>> it,
> but
>>> not very easy to change it if we get it wrong.
>>>
>>> Thanks for your consideration! -John
>>>
>>> On Wed, Jan 15, 2020, at 21:14, Vito Jeng
>>> wrote:
 Hi John,

 About `StreamsNotStartedException is strange`
 -- The original idea came from Matthias, two
 years ago. :) You can reference here:

>>>
>
>
>>> https://mail-archives.apache.org/mod_mbox/kafka-dev/201806.mbox/%3c6
c32083e-b63c-435b-521d-032d45cc5...@confluent.io%3e


>>>
About omitting the categorization --
 It looks reasonable. I'm fine with omitting
 the categorization
>>> but
> not
>>> very
 sure it is a good choice. Does 

[jira] [Created] (KAFKA-9487) Followup : KAFKA-9445

2020-01-31 Thread Navinder Brar (Jira)
Navinder Brar created KAFKA-9487:


 Summary: Followup : KAFKA-9445
 Key: KAFKA-9487
 URL: https://issues.apache.org/jira/browse/KAFKA-9487
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Navinder Brar
Assignee: Navinder Brar






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-562: Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-23 Thread Navinder Brar
Thanks, everyone for voting and discussions. The KIP is passed with 3 binding 
votes from John, Matthias, and Guozhang and 1 non-binding vote from Bruno. 

On Friday, 24 January, 2020, 08:50:21 am IST, Bruno Cadonna 
 wrote:  
 
 Hi Navinder,

+1 (non-binding)

Best,
Bruno

On Thu, Jan 23, 2020 at 9:19 AM John Roesler  wrote:
>
> Thanks, Navinder. It's just to give everyone a chance to object if they 
> wanted to.
> -John
>
> On Thu, Jan 23, 2020, at 00:44, Navinder Brar wrote:
> > Oh sorry, my bad. Will wait for another 12 hours.
> >
> >    On Thursday, 23 January, 2020, 12:09:57 pm IST, Matthias J. Sax
> >  wrote:
> >
> >  Navinder,
> >
> > a KIP vote must be open for 72h and cannot be closed earlier.
> >
> > -Matthias
> >
> > On 1/22/20 10:27 PM, Navinder Brar wrote:
> > > Thanks, everyone for voting. KIP-562 has been accepted with binding votes 
> > > from John, Matthias, and Guozhang.
> > >
> > >    On Thursday, 23 January, 2020, 09:40:07 am IST, Guozhang Wang 
> > > wrote:
> > >
> > >  +1 (binding) from me as well.
> > >
> > > On Wed, Jan 22, 2020 at 5:59 PM Matthias J. Sax 
> > > wrote:
> > >
> > >> I have a few minor comments (compare the DISCUSS thread), but overall
> > >> the KIP looks good.
> > >>
> > >> +1 (binding)
> > >>
> > >>
> > >> -Matthias
> > >>
> > >> On 1/22/20 10:09 AM, John Roesler wrote:
> > >>> Thanks for updating the KIP, Navinder.
> > >>>
> > >>> I'm +1 (binding) on the current proposal
> > >>>
> > >>> Thanks,
> > >>> -John
> > >>>
> > >>> On Tue, Jan 21, 2020, at 12:50, Navinder Brar wrote:
> > >>>> Thanks, Guozhang. I agree it makes total sense. I will make the
> > >>>> edits.~Navinder
> > >>>>
> > >>>>    On Tuesday, 21 January, 2020, 11:00:32 pm IST, Guozhang Wang
> > >>>>  wrote:
> > >>>>
> > >>>>  Hello Navinder,
> > >>>>
> > >>>> Thanks for brining up this proposal. I made a quick pass on that and
> > >>>> overall I think I agree with your ideas. Just a few thoughts about the
> > >>>> public APIs:
> > >>>>
> > >>>> 1) As we are adding a new overload to `KafkaStreams#store`, could we
> > >> just
> > >>>> add the storeName and queryableStoreType as part of StoreQueryParam, 
> > >>>> and
> > >>>> leaving that the only parameter of the function?
> > >>>>
> > >>>> 2) along with 1), for the static constructors, instead of iterating 
> > >>>> over
> > >>>> all possible combos I'd suggest we make constructors with only, say,
> > >>>> storeName, and then adding `withXXX()` setters to set other fields.
> > >> This is
> > >>>> in case we want to add more param fields into the object, that we do 
> > >>>> not
> > >>>> need to exponentially adding and deprecating the static constructors.
> > >>>>
> > >>>>
> > >>>> Guozhang
> > >>>>
> > >>>>
> > >>>> On Mon, Jan 20, 2020 at 10:42 AM Navinder Brar
> > >>>>  wrote:
> > >>>>
> > >>>>> Hello all,
> > >>>>>
> > >>>>> I'd like to propose a vote to serve keys from a specific
> > >> partition-store
> > >>>>> instead of iterating over all the local stores of an instance to
> > >> locate the
> > >>>>> key, as which happens currently.
> > >>>>> The full KIP is provided here:
> > >>>>>
> > >>>>>
> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance
> > >>>>>
> > >>>>>
> > >>>>> Thanks,
> > >>>>> Navinder
> > >>>>>
> > >>>>
> > >>>>
> > >>>> --
> > >>>> -- Guozhang
> > >>>>
> > >>
> > >>
> > >
> >
  

Re: [DISCUSS] : KIP-562: Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-23 Thread Navinder Brar
Hi John,

Thanks for the responses. I will make the below changes as I had suggested 
earlier, and then close the vote in a few hours.

includeStaleStores -> staleStores
withIncludeStaleStores() > enableStaleStores()
includeStaleStores() -> staleStoresEnabled()

Thanks,
Navinder

Sent from Yahoo Mail for iPhone


On Friday, January 24, 2020, 5:36 AM, John Roesler  wrote:

Hi Bruno,

Thanks for your question; it's a very reasonable response to 
what I said before.

I didn't mean "field" as in an instance variable, just as in a specific
property or attribute. It's hard to talk about because all the words
for this abstract concept are also words that people commonly use
for instance variables.

The principle is that these classes should be simple data containers.
It's not so much that the methods match the field name, or that the
field name matches the methods, but that all three bear a simple
and direct relationship to each other. Or maybe I should say that
the getters, setters, and fields are all directly named after a property.

The point is that we should make sure we're not "playing games" in
these classes but simply setting a property and offering a transparent
way to get exactly what you just set.

I actually do think that the instance variable itself should have the
same name as the "field" or "property" that the getters and setters
are named for. This is not a violation of encapsulation because those 
instance variables are required to be private. 

 I guess you can think of  this rule as more of a style guide than a 
grammar, but whatever. As a maintainer, I think we should discourage 
these particular classes to have different instance variables than 
method names. Otherwise,  it's just silly. Either "includeStaleStores" 
or "staleStoresEnabled" is a fine name, but not both. There's no 
reason at all to name all the accessors one of them and the instance 
variable they access the  other name.

Thanks,
-John


On Thu, Jan 23, 2020, at 17:27, Bruno Cadonna wrote:
> Hi John,
> 
> One question: Why must the field name be involved in the naming? It
> somehow contradicts encapsulation. Field name `includeStaleStores` (or
> `staleStoresEnabled`) sounds perfectly fine to me. IMO, we should
> decouple the parameter name from the actual field name.
> 
> Bruno
> 
> On Thu, Jan 23, 2020 at 3:02 PM John Roesler  wrote:
> >
> > Hi all,
> >
> > Thanks for the discussion!
> >
> > The basic idea I used in the original draft of the grammar was to avoid
> > "fancy code" and just write "normal java". That's why I favored java bean
> > spec over Kafka code traditions.
> >
> > According to the bean spec, setters always start with "set" and getters
> > always start with "get" (or "is" for booleans). This often yields absurd
> > or awkward readability. On the other hand, the "kafka" idioms
> > seems to descend from Scala idiom of naming getters and setters
> > exactly the same as the field they get and set. This plays to a language
> > feature of Scala (getter referential transparency) that is not present
> > in Java. My feeling is that we probably keep this idiom around
> > precisely to avoid the absurd phrasing that the bean spec leads to.
> >
> > On the other hand, adopting the Kafka/Scala idiom brings in an
> > additional burden I was trying to avoid: you have to pick a good
> > name. Basically I was trying to avoid exactly this conversation ;)
> > i.e., "X sounds weird, how about Y", "well, Y also sounds weird,
> > what about Z", "Z sounds good, but then the setter sounds weird",
> > etc.
> >
> > Maybe avoiding discussion was too ambitious, and I can't deny that
> > bean spec names probably result in no one being happy, so I'm on
> > board with the current proposal:
> >
> > setters:
> > set{FieldName}(value)
> > {enable/disable}{FieldName}()
> >
> > getters:
> > {fieldName}()
> > {fieldName}{Enabled/Disabled}()
> >
> > Probably, we'll find cases that are silly under that formula too,
> > but we'll cross that bridge when we get to it.
> >
> > I'll update the grammar when I get the chance.
> >
> > Thanks!
> > -John
> >
> > On Thu, Jan 23, 2020, at 12:37, Navinder Brar wrote:
> > > Thanks Bruno, for the comments.
> > > 1) Fixed.
> > >
> > > 2) I would be okay to call the variable staleStores. Since anyways we
> > > are not using constructor, so the only way the variable is exposed
> > > outside is the getter and the optional builder method. With this
> > > variable name, we can name the builder method

Re: [DISCUSS] : KIP-562: Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-23 Thread Navinder Brar
Thanks Bruno, for the comments.
1) Fixed.

2) I would be okay to call the variable staleStores. Since anyways we are not 
using constructor, so the only way the variable is exposed outside is the 
getter and the optional builder method. With this variable name, we can name 
the builder method as "enableStaleStores" and I feel staleStoresEnabled() is 
more readable for getter function. So, we can also change the grammar for 
getters for boolean variables to {FlagName}enabled / {FlagName}disabled. WDYT 
@John.

Thanks,
Navinder   On Thursday, 23 January, 2020, 11:43:38 pm IST, Bruno Cadonna 
 wrote:  
 
 Hi Navinder,

Thank you for the KIP!

It looks good to me. Here my comments:

1) I agree with John and Matthias that you should remove the
implementation of the methods in the KIP. Just the method signatures
suffice and make the reading easier.

2) According to the grammar `withIncludeStaleStores()` should be
`enableIncludeStaleStores()` but since that reads a bit clumsy I
propose to call the method `enableStaleStores()`.

3) The getter `includeStaleStores()` does not sound right to me. It
does not include stale stores but rather checks if stale stores should
be queried. Thus, I would call it `staleStoresEnabled()` (or
`staleStoresIncluded` but that does not align nicely with
`enableStaleStores()`). No need to change the field name, though.
Maybe, we could also add this special rule for getters of boolean
values to the grammar. WDYT?

I have a final remark about the `StoreQueryParams`. I think it should
be immutable. That is more an implementation detail and we should
discuss it on the PR. Just wanted to mention it in advance. Probably
we should add also a rule for immutability to the grammar.

Best,
Bruno

On Wed, Jan 22, 2020 at 7:38 PM Navinder Brar
 wrote:
>
> +1 on changing to storeName() and includeStateStores(). We can add this to 
> grammar wiki as Matthias suggested.
>
> I have edited the KIP to remove "Deprecating" in favor of "Changing" and I 
> agree we can deprecate store(final String storeName, final 
> QueryableStoreType queryableStoreType).
>
> Thanks
> Navinder
>    On Thursday, 23 January, 2020, 07:28:38 am IST, Matthias J. Sax 
> wrote:
>
>  Thanks for the clarifications about the getters. I agree that it makes
> sense to move to the new pattern incrementally. Might be useful to
> create a Jira (or multiple?) to track this. It's an straight forward change.
>
> A nit about the KIP: it should only list the signature but not the full
> code of the implementation (ie, only package name and the class + method
> names; we can omit toString(), equals(), and hashCode(), too -- alo, no
> license header please ;))
>
>
> nit: `isIncludeStaleStores` -> `includeStaleStores` (the "is"-prefix
> reads clumsy and it's common in Kafka code base to omit the "get"-prefix
> for getters -- we should adopt this)
>
> @John: might be worth to include this in the Grammar wiki page?
>
> nit (similar as above):
>
>  - `getStoreName` -> `storeName`
>  - `getQueryableStoreType` -> `queryableStoreType`
>
>
> The KIP says
>
> > Deprecating the KafkaStreams#store(final String storeName, final 
> > QueryableStoreType queryableStoreType, final boolean includeStaleStores) 
> > in favour of the funtion mentioned below.
>
> We don't need to deprecate this method but we can remove it directly,
> because it was never release.
>
>
> What is the plan for
>
> > store(final String storeName, final QueryableStoreType 
> > queryableStoreType) {
>
> Given that the new `StoreQueryParams` allows to specify `storeName` and
> `queryableStoreType`, should we deprecate this method in favor of the
> new `store(StoreQueryParams)` overload?
>
>
> -Matthias
>
>
>
> On 1/22/20 10:06 AM, John Roesler wrote:
> > Thanks Navinder! I've also updated the motivation.
> >
> > Thanks,
> > -John
> >
> > On Wed, Jan 22, 2020, at 11:12, Navinder Brar wrote:
> >> I went through the grammar wiki page and since it is already agreed in
> >> principle I will change from constructor to below method and add the
> >> getters back.
> >> public static  StoreQueryParams fromNameAndType(
> >>  final String storeName,
> >>  final QueryableStoreType  queryableStoreType
> >> )
> >>
> >>
> >> Thanks,
> >> Navinder
> >>
> >>    On Wednesday, 22 January, 2020, 09:32:07 pm IST, John Roesler
> >>  wrote:
> >>
> >>  22) I'm specifically proposing to establish a new convention.
> >> The existing convention is fundamentally broken and has
> >> been costly both for users and maintainers. That is the purpose
> >> of the grammar

Re: [VOTE] KIP-562: Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-22 Thread Navinder Brar
Oh sorry, my bad. Will wait for another 12 hours. 

On Thursday, 23 January, 2020, 12:09:57 pm IST, Matthias J. Sax 
 wrote:  
 
 Navinder,

a KIP vote must be open for 72h and cannot be closed earlier.

-Matthias

On 1/22/20 10:27 PM, Navinder Brar wrote:
> Thanks, everyone for voting. KIP-562 has been accepted with binding votes 
> from John, Matthias, and Guozhang. 
> 
>    On Thursday, 23 January, 2020, 09:40:07 am IST, Guozhang Wang 
> wrote:  
>  
>  +1 (binding) from me as well.
> 
> On Wed, Jan 22, 2020 at 5:59 PM Matthias J. Sax 
> wrote:
> 
>> I have a few minor comments (compare the DISCUSS thread), but overall
>> the KIP looks good.
>>
>> +1 (binding)
>>
>>
>> -Matthias
>>
>> On 1/22/20 10:09 AM, John Roesler wrote:
>>> Thanks for updating the KIP, Navinder.
>>>
>>> I'm +1 (binding) on the current proposal
>>>
>>> Thanks,
>>> -John
>>>
>>> On Tue, Jan 21, 2020, at 12:50, Navinder Brar wrote:
>>>> Thanks, Guozhang. I agree it makes total sense. I will make the
>>>> edits.~Navinder
>>>>
>>>>     On Tuesday, 21 January, 2020, 11:00:32 pm IST, Guozhang Wang
>>>>  wrote:
>>>>
>>>>   Hello Navinder,
>>>>
>>>> Thanks for brining up this proposal. I made a quick pass on that and
>>>> overall I think I agree with your ideas. Just a few thoughts about the
>>>> public APIs:
>>>>
>>>> 1) As we are adding a new overload to `KafkaStreams#store`, could we
>> just
>>>> add the storeName and queryableStoreType as part of StoreQueryParam, and
>>>> leaving that the only parameter of the function?
>>>>
>>>> 2) along with 1), for the static constructors, instead of iterating over
>>>> all possible combos I'd suggest we make constructors with only, say,
>>>> storeName, and then adding `withXXX()` setters to set other fields.
>> This is
>>>> in case we want to add more param fields into the object, that we do not
>>>> need to exponentially adding and deprecating the static constructors.
>>>>
>>>>
>>>> Guozhang
>>>>
>>>>
>>>> On Mon, Jan 20, 2020 at 10:42 AM Navinder Brar
>>>>  wrote:
>>>>
>>>>> Hello all,
>>>>>
>>>>> I'd like to propose a vote to serve keys from a specific
>> partition-store
>>>>> instead of iterating over all the local stores of an instance to
>> locate the
>>>>> key, as which happens currently.
>>>>> The full KIP is provided here:
>>>>>
>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Navinder
>>>>>
>>>>
>>>>
>>>> --
>>>> -- Guozhang
>>>>
>>
>>
> 
  

Re: [VOTE] KIP-562: Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-22 Thread Navinder Brar
Thanks, everyone for voting. KIP-562 has been accepted with binding votes from 
John, Matthias, and Guozhang. 

On Thursday, 23 January, 2020, 09:40:07 am IST, Guozhang Wang 
 wrote:  
 
 +1 (binding) from me as well.

On Wed, Jan 22, 2020 at 5:59 PM Matthias J. Sax 
wrote:

> I have a few minor comments (compare the DISCUSS thread), but overall
> the KIP looks good.
>
> +1 (binding)
>
>
> -Matthias
>
> On 1/22/20 10:09 AM, John Roesler wrote:
> > Thanks for updating the KIP, Navinder.
> >
> > I'm +1 (binding) on the current proposal
> >
> > Thanks,
> > -John
> >
> > On Tue, Jan 21, 2020, at 12:50, Navinder Brar wrote:
> >> Thanks, Guozhang. I agree it makes total sense. I will make the
> >> edits.~Navinder
> >>
> >>    On Tuesday, 21 January, 2020, 11:00:32 pm IST, Guozhang Wang
> >>  wrote:
> >>
> >>  Hello Navinder,
> >>
> >> Thanks for brining up this proposal. I made a quick pass on that and
> >> overall I think I agree with your ideas. Just a few thoughts about the
> >> public APIs:
> >>
> >> 1) As we are adding a new overload to `KafkaStreams#store`, could we
> just
> >> add the storeName and queryableStoreType as part of StoreQueryParam, and
> >> leaving that the only parameter of the function?
> >>
> >> 2) along with 1), for the static constructors, instead of iterating over
> >> all possible combos I'd suggest we make constructors with only, say,
> >> storeName, and then adding `withXXX()` setters to set other fields.
> This is
> >> in case we want to add more param fields into the object, that we do not
> >> need to exponentially adding and deprecating the static constructors.
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Mon, Jan 20, 2020 at 10:42 AM Navinder Brar
> >>  wrote:
> >>
> >>> Hello all,
> >>>
> >>> I'd like to propose a vote to serve keys from a specific
> partition-store
> >>> instead of iterating over all the local stores of an instance to
> locate the
> >>> key, as which happens currently.
> >>> The full KIP is provided here:
> >>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance
> >>>
> >>>
> >>> Thanks,
> >>> Navinder
> >>>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
>
>

-- 
-- Guozhang
  

Re: [DISCUSS] : KIP-562: Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-22 Thread Navinder Brar
+1 on changing to storeName() and includeStateStores(). We can add this to 
grammar wiki as Matthias suggested.

I have edited the KIP to remove "Deprecating" in favor of "Changing" and I 
agree we can deprecate store(final String storeName, final 
QueryableStoreType queryableStoreType). 

Thanks
Navinder
On Thursday, 23 January, 2020, 07:28:38 am IST, Matthias J. Sax 
 wrote:  
 
 Thanks for the clarifications about the getters. I agree that it makes
sense to move to the new pattern incrementally. Might be useful to
create a Jira (or multiple?) to track this. It's an straight forward change.

A nit about the KIP: it should only list the signature but not the full
code of the implementation (ie, only package name and the class + method
names; we can omit toString(), equals(), and hashCode(), too -- alo, no
license header please ;))


nit: `isIncludeStaleStores` -> `includeStaleStores` (the "is"-prefix
reads clumsy and it's common in Kafka code base to omit the "get"-prefix
for getters -- we should adopt this)

@John: might be worth to include this in the Grammar wiki page?

nit (similar as above):

 - `getStoreName` -> `storeName`
 - `getQueryableStoreType` -> `queryableStoreType`


The KIP says

> Deprecating the KafkaStreams#store(final String storeName, final 
> QueryableStoreType queryableStoreType, final boolean includeStaleStores) 
> in favour of the funtion mentioned below.

We don't need to deprecate this method but we can remove it directly,
because it was never release.


What is the plan for

> store(final String storeName, final QueryableStoreType queryableStoreType) 
> {

Given that the new `StoreQueryParams` allows to specify `storeName` and
`queryableStoreType`, should we deprecate this method in favor of the
new `store(StoreQueryParams)` overload?


-Matthias



On 1/22/20 10:06 AM, John Roesler wrote:
> Thanks Navinder! I've also updated the motivation.
> 
> Thanks,
> -John
> 
> On Wed, Jan 22, 2020, at 11:12, Navinder Brar wrote:
>> I went through the grammar wiki page and since it is already agreed in 
>> principle I will change from constructor to below method and add the 
>> getters back.
>> public static  StoreQueryParams fromNameAndType(
>>   final String storeName,
>>   final QueryableStoreType  queryableStoreType
>> )
>>
>>
>> Thanks,
>> Navinder
>>
>>    On Wednesday, 22 January, 2020, 09:32:07 pm IST, John Roesler 
>>  wrote:  
>>  
>>  22) I'm specifically proposing to establish a new convention.
>> The existing convention is fundamentally broken and has
>> been costly both for users and maintainers. That is the purpose
>> of the grammar I proposed. The plan is to implement  new APIs
>> following the grammar and gradually to port old APIs to it.
>>
>> The grammar wiki page has plenty of justification, so I won't 
>> recapitulate it here.
>>
>> Thanks,
>> -John
>>
>> On Wed, Jan 22, 2020, at 09:39, Navinder Brar wrote:
>>> 10) Sure John, please go ahead.
>>>
>>> 21) I have no strong opinion on constructor vs static factory. If 
>>> everyone's okay with it, I can make the change.
>>>
>>> 22) I looked at classes suggested by Matthias and I see there are no 
>>> getters there. We are ok with breaking the convention?
>>>
>>> Thanks,Navinder Pal Singh Brar
>>>
>>>   
>>>
>>>     On Wednesday, 22 January, 2020, 08:40:27 pm IST, John Roesler 
>>>  wrote:  
>>>   
>>>   Hi all,
>>>
>>> 10) For the motivation, I have some thoughts for why this KIP is
>>> absolutely essential as designed. If it's ok with you, Navinder,
>>> I'd just edit the motivation section of the wiki? If you're unhappy
>>> with my wording, you're of course welcome to revert or revise it; 
>>> it just seems more efficient than discussing it over email.
>>>
>>> 20) The getters were my fault :) 
>>> I proposed to design this KIP following the grammar proposal:
>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
>>> At the risk of delaying the vote on this KIP, I'd humbly suggest we 
>>> keep the getters,
>>> for all the reasons laid out on that grammar.
>>>
>>> I realize this introduces an inconsistency, but my hope is that we 
>>> would close that
>>> gap soon. I can even create tickets for migrating each API, if that 
>>> helps make 
>>> this idea more palatable. IMO, this proposed API is likely to be a bit 
>>> "out of
>>> the way", in that it's not likely to be heavily used by a broad 
>>>

Re: [DISCUSS] : KIP-562: Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-22 Thread Navinder Brar
I went through the grammar wiki page and since it is already agreed in 
principle I will change from constructor to below method and add the getters 
back.
public static  StoreQueryParams fromNameAndType(
  final String storeName,
  final QueryableStoreType  queryableStoreType
)


Thanks,
Navinder

On Wednesday, 22 January, 2020, 09:32:07 pm IST, John Roesler 
 wrote:  
 
 22) I'm specifically proposing to establish a new convention.
The existing convention is fundamentally broken and has
been costly both for users and maintainers. That is the purpose
of the grammar I proposed. The plan is to implement  new APIs
following the grammar and gradually to port old APIs to it.

The grammar wiki page has plenty of justification, so I won't 
recapitulate it here.

Thanks,
-John

On Wed, Jan 22, 2020, at 09:39, Navinder Brar wrote:
> 10) Sure John, please go ahead.
> 
> 21) I have no strong opinion on constructor vs static factory. If 
> everyone's okay with it, I can make the change.
> 
> 22) I looked at classes suggested by Matthias and I see there are no 
> getters there. We are ok with breaking the convention?
> 
> Thanks,Navinder Pal Singh Brar
> 
>  
> 
>    On Wednesday, 22 January, 2020, 08:40:27 pm IST, John Roesler 
>  wrote:  
>  
>  Hi all,
> 
> 10) For the motivation, I have some thoughts for why this KIP is
> absolutely essential as designed. If it's ok with you, Navinder,
> I'd just edit the motivation section of the wiki? If you're unhappy
> with my wording, you're of course welcome to revert or revise it; 
> it just seems more efficient than discussing it over email.
> 
> 20) The getters were my fault :) 
> I proposed to design this KIP following the grammar proposal:
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
> At the risk of delaying the vote on this KIP, I'd humbly suggest we 
> keep the getters,
> for all the reasons laid out on that grammar.
> 
> I realize this introduces an inconsistency, but my hope is that we 
> would close that
> gap soon. I can even create tickets for migrating each API, if that 
> helps make 
> this idea more palatable. IMO, this proposed API is likely to be a bit 
> "out of
> the way", in that it's not likely to be heavily used by a broad 
> audience in 2.5, 
> so the API inconsistency wouldn't be too apparent. Plus, it will save 
> us from 
> implementing a config object in the current style, along with an 
> "internal" 
> counterpart, which we would immediately have plans to deprecate.
> 
> Just to clarify (I know this has been a bit thrashy):
> 21. there should be no public constructor, instead (since there are 
> required arguments),
> there should be just one factory method:
> public static  StoreQueryParams fromNameAndType(
>   final String storeName, 
>   final QueryableStoreType  queryableStoreType
> )
> 
> 22. there should be getters corresponding to each argument (required 
> and optional):
> Integer getPartition()
> boolean getIncludeStaleStores()
> 
> Instead of adding the extra getAllPartitions() pseudo-getter, let's 
> follow Ted's advice and
> just document that getPartition() would return `null`, and that it 
> means that a
> specific partition hasn't been requested, so the store would wrap all 
> local partitions.
> 
> With those two changes, this proposal would be 100% in line with the grammar,
> and IMO ready to go.
> 
> Thanks,
> -John
> 
> Thanks,
> -John
> 
> On Wed, Jan 22, 2020, at 03:56, Navinder Brar wrote:
> > Thanks Matthias for the feedback.
> > 
> > 10) As Guozhang suggested above, we thought of adding storeName and 
> > queryableStoreType as well in the StoreQueryParams, which is another 
> > motivation for this KIP as it overloads KafkaStreams#store(). I have 
> > updated the motivation in the KIP as well.
> > 
> > 20) I agree we can probably remove getPartition() and 
> > getIncludeStaleStores() but we would definitely need getStoreName and 
> > getQueryableStoreType() as they would be used in internal classes 
> > QueryableStoreProvider.java and StreamThreadStateStoreProvider.java.
> > 
> >  30) I have edited the KIP to include only the changed KafkaStreams#store().
> > 
> > 40) Removed the internal classes from the KIP.
> > 
> > I have incorporated feedback from Guozhang as well in the KIP. If 
> > nothing else is pending, vote is ongoing.
> > 
> > ~Navinder    On Wednesday, 22 January, 2020, 12:49:51 pm IST, Matthias 
> > J. Sax  wrote:  
> >  
> >  Thanks for the KIP. Overall it makes sense.
> > 
> > Couple of minor comments/questions:
> > 
> > 10) To me, it was initially quite unclear why w

Re: [DISCUSS] : KIP-562: Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-22 Thread Navinder Brar
10) Sure John, please go ahead.

21) I have no strong opinion on constructor vs static factory. If everyone's 
okay with it, I can make the change.

22) I looked at classes suggested by Matthias and I see there are no getters 
there. We are ok with breaking the convention?

Thanks,Navinder Pal Singh Brar

 

On Wednesday, 22 January, 2020, 08:40:27 pm IST, John Roesler 
 wrote:  
 
 Hi all,

10) For the motivation, I have some thoughts for why this KIP is
absolutely essential as designed. If it's ok with you, Navinder,
I'd just edit the motivation section of the wiki? If you're unhappy
with my wording, you're of course welcome to revert or revise it; 
it just seems more efficient than discussing it over email.

20) The getters were my fault :) 
I proposed to design this KIP following the grammar proposal:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
At the risk of delaying the vote on this KIP, I'd humbly suggest we keep the 
getters,
for all the reasons laid out on that grammar.

I realize this introduces an inconsistency, but my hope is that we would close 
that
gap soon. I can even create tickets for migrating each API, if that helps make 
this idea more palatable. IMO, this proposed API is likely to be a bit "out of
the way", in that it's not likely to be heavily used by a broad audience in 
2.5, 
so the API inconsistency wouldn't be too apparent. Plus, it will save us from 
implementing a config object in the current style, along with an "internal" 
counterpart, which we would immediately have plans to deprecate.

Just to clarify (I know this has been a bit thrashy):
21. there should be no public constructor, instead (since there are required 
arguments),
there should be just one factory method:
public static  StoreQueryParams fromNameAndType(
  final String storeName, 
  final QueryableStoreType  queryableStoreType
)

22. there should be getters corresponding to each argument (required and 
optional):
Integer getPartition()
boolean getIncludeStaleStores()

Instead of adding the extra getAllPartitions() pseudo-getter, let's follow 
Ted's advice and
just document that getPartition() would return `null`, and that it means that a
specific partition hasn't been requested, so the store would wrap all local 
partitions.

With those two changes, this proposal would be 100% in line with the grammar,
and IMO ready to go.

Thanks,
-John

Thanks,
-John

On Wed, Jan 22, 2020, at 03:56, Navinder Brar wrote:
> Thanks Matthias for the feedback.
> 
> 10) As Guozhang suggested above, we thought of adding storeName and 
> queryableStoreType as well in the StoreQueryParams, which is another 
> motivation for this KIP as it overloads KafkaStreams#store(). I have 
> updated the motivation in the KIP as well.
> 
> 20) I agree we can probably remove getPartition() and 
> getIncludeStaleStores() but we would definitely need getStoreName and 
> getQueryableStoreType() as they would be used in internal classes 
> QueryableStoreProvider.java and StreamThreadStateStoreProvider.java.
> 
>  30) I have edited the KIP to include only the changed KafkaStreams#store().
> 
> 40) Removed the internal classes from the KIP.
> 
> I have incorporated feedback from Guozhang as well in the KIP. If 
> nothing else is pending, vote is ongoing.
> 
> ~Navinder    On Wednesday, 22 January, 2020, 12:49:51 pm IST, Matthias 
> J. Sax  wrote:  
>  
>  Thanks for the KIP. Overall it makes sense.
> 
> Couple of minor comments/questions:
> 
> 10) To me, it was initially quite unclear why we need this KIP. The
> motivation section does only talk about some performance issues (that
> are motivated by single key look-ups) -- however, all issues mentioned
> in the KIP could be fixed without any public API change. The important
> cases, why the public API changes (and thus this KIP) is useful are
> actually missing in the motivation section. I would be helpful to add
> more details.
> 
> 20) `StoreQueryParams` has a lot of getter methods that we usually don't
> have for config objects (compare `Consumed`, `Produced`, `Materialized`,
> etc). Is there any reason why we need to add those getters to the public
> API?
> 
> 30) The change to remove `KafkaStreams#store(...)` as introduced in
> KIP-535 should be listed in sections Public API changes. Also, existing
> methods should not be listed -- only changes. Hence, in
> `KafkaStreams.java` only one new method and the `store()` method as
> added via KIP-535 should be listed.
> 
> 40) `QueryableStoreProvider` and `StreamThreadStateStoreProvider` are
> internal classes and thus we can remove all changes to it from the KIP.
> 
> 
> Thanks!
> 
> 
> -Matthias
> 
> 
> 
> On 1/21/20 11:46 AM, Vinoth Chandar wrote:
> > Chiming in a bit late here..
> > 
> > +1 This is a very val

Re: [DISCUSS] : KIP-562: Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-22 Thread Navinder Brar
Thanks Matthias for the feedback.

10) As Guozhang suggested above, we thought of adding storeName and 
queryableStoreType as well in the StoreQueryParams, which is another motivation 
for this KIP as it overloads KafkaStreams#store(). I have updated the 
motivation in the KIP as well.

20) I agree we can probably remove getPartition() and getIncludeStaleStores() 
but we would definitely need getStoreName and getQueryableStoreType() as they 
would be used in internal classes QueryableStoreProvider.java and 
StreamThreadStateStoreProvider.java.

 30) I have edited the KIP to include only the changed KafkaStreams#store().

40) Removed the internal classes from the KIP.

I have incorporated feedback from Guozhang as well in the KIP. If nothing else 
is pending, vote is ongoing.

~NavinderOn Wednesday, 22 January, 2020, 12:49:51 pm IST, Matthias J. Sax 
 wrote:  
 
 Thanks for the KIP. Overall it makes sense.

Couple of minor comments/questions:

10) To me, it was initially quite unclear why we need this KIP. The
motivation section does only talk about some performance issues (that
are motivated by single key look-ups) -- however, all issues mentioned
in the KIP could be fixed without any public API change. The important
cases, why the public API changes (and thus this KIP) is useful are
actually missing in the motivation section. I would be helpful to add
more details.

20) `StoreQueryParams` has a lot of getter methods that we usually don't
have for config objects (compare `Consumed`, `Produced`, `Materialized`,
etc). Is there any reason why we need to add those getters to the public
API?

30) The change to remove `KafkaStreams#store(...)` as introduced in
KIP-535 should be listed in sections Public API changes. Also, existing
methods should not be listed -- only changes. Hence, in
`KafkaStreams.java` only one new method and the `store()` method as
added via KIP-535 should be listed.

40) `QueryableStoreProvider` and `StreamThreadStateStoreProvider` are
internal classes and thus we can remove all changes to it from the KIP.


Thanks!


-Matthias



On 1/21/20 11:46 AM, Vinoth Chandar wrote:
> Chiming in a bit late here..
> 
> +1 This is a very valid improvement. Avoiding doing gets on irrelevant
> partitions will improve performance and efficiency for IQs.
> 
> As an incremental improvement to the current APIs,  adding an option to
> filter out based on partitions makes sense
> 
> 
> 
> 
> 
> 
> 
> On Mon, Jan 20, 2020 at 3:13 AM Navinder Brar
>  wrote:
> 
>> Thanks John. If there are no other comments to be addressed, I will start
>> a vote today so that we are on track for this release.~Navinder
>>
>>
>> On Monday, January 20, 2020, 8:32 AM, John Roesler 
>> wrote:
>>
>> Thanks, Navinder,
>>
>> The Param object looks a bit different than I would have done, but it
>> certainly is explicit. We might have to deprecate those particular factory
>> methods and move to a builder pattern if we need to add any more options in
>> the future, but I’m fine with that possibility.
>>
>> The KIP also discusses some implementation details that aren’t necessary
>> here. We really only need to see the public interfaces. We can discuss the
>> implementation in the PR.
>>
>> That said, the public API part of the current proposal looks good to me! I
>> would be a +1 if you called for a vote.
>>
>> Thanks,
>> John
>>
>> On Sun, Jan 19, 2020, at 20:50, Navinder Brar wrote:
>>> I have made some edits in the KIP, please take another look. It would
>>> be great if we can push it in 2.5.0.
>>> ~Navinder
>>>
>>>
>>> On Sunday, January 19, 2020, 12:59 AM, Navinder Brar
>>>  wrote:
>>>
>>> Sure John, I will update the StoreQueryParams with static factory
>>> methods.
>>> @Ted, we would need to create taskId only in case a user provides one
>>> single partition. In case user wants to query all partitions of an
>>> instance the current code is good enough where we iterate over all
>>> stream threads and go over all taskIds to match the store. But in case
>>> a user requests for a single partition-based store, we need to create a
>>> taskId out of that partition and store name(using
>>> internalTopologyBuilder class) and match with the taskIds belonging to
>>> that instance. I will add the code in the KIP.
>>>
>>>    On Sunday, 19 January, 2020, 12:47:08 am IST, Ted Yu
>>>  wrote:
>>>
>>>  Looking at the current KIP-562:
>>>
>>> bq. Create a taskId from the combination of store name and partition
>>> provided by the user
>>>
>>> I wonder if a single taskId would be used for the “all pa

Re: [VOTE] KIP-562: Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-21 Thread Navinder Brar
Thanks, Guozhang. I agree it makes total sense. I will make the edits.~Navinder 
 

On Tuesday, 21 January, 2020, 11:00:32 pm IST, Guozhang Wang 
 wrote:  
 
 Hello Navinder,

Thanks for brining up this proposal. I made a quick pass on that and
overall I think I agree with your ideas. Just a few thoughts about the
public APIs:

1) As we are adding a new overload to `KafkaStreams#store`, could we just
add the storeName and queryableStoreType as part of StoreQueryParam, and
leaving that the only parameter of the function?

2) along with 1), for the static constructors, instead of iterating over
all possible combos I'd suggest we make constructors with only, say,
storeName, and then adding `withXXX()` setters to set other fields. This is
in case we want to add more param fields into the object, that we do not
need to exponentially adding and deprecating the static constructors.


Guozhang


On Mon, Jan 20, 2020 at 10:42 AM Navinder Brar
 wrote:

> Hello all,
>
> I'd like to propose a vote to serve keys from a specific partition-store
> instead of iterating over all the local stores of an instance to locate the
> key, as which happens currently.
> The full KIP is provided here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance
>
>
> Thanks,
> Navinder
>


-- 
-- Guozhang
  

[VOTE] KIP-562: Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-20 Thread Navinder Brar
Hello all,

I'd like to propose a vote to serve keys from a specific partition-store 
instead of iterating over all the local stores of an instance to locate the 
key, as which happens currently.
The full KIP is provided here:
 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance


Thanks,
Navinder


Re: [DISCUSS] : KIP-562: Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-20 Thread Navinder Brar
Thanks John. If there are no other comments to be addressed, I will start a 
vote today so that we are on track for this release.~Navinder


On Monday, January 20, 2020, 8:32 AM, John Roesler  wrote:

Thanks, Navinder,

The Param object looks a bit different than I would have done, but it certainly 
is explicit. We might have to deprecate those particular factory methods and 
move to a builder pattern if we need to add any more options in the future, but 
I’m fine with that possibility. 

The KIP also discusses some implementation details that aren’t necessary here. 
We really only need to see the public interfaces. We can discuss the 
implementation in the PR.

That said, the public API part of the current proposal looks good to me! I 
would be a +1 if you called for a vote. 

Thanks,
John

On Sun, Jan 19, 2020, at 20:50, Navinder Brar wrote:
> I have made some edits in the KIP, please take another look. It would 
> be great if we can push it in 2.5.0.
> ~Navinder
> 
> 
> On Sunday, January 19, 2020, 12:59 AM, Navinder Brar 
>  wrote:
> 
> Sure John, I will update the StoreQueryParams with static factory 
> methods.
> @Ted, we would need to create taskId only in case a user provides one 
> single partition. In case user wants to query all partitions of an 
> instance the current code is good enough where we iterate over all 
> stream threads and go over all taskIds to match the store. But in case 
> a user requests for a single partition-based store, we need to create a 
> taskId out of that partition and store name(using 
> internalTopologyBuilder class) and match with the taskIds belonging to 
> that instance. I will add the code in the KIP. 
> 
>     On Sunday, 19 January, 2020, 12:47:08 am IST, Ted Yu 
>  wrote:  
>  
>  Looking at the current KIP-562:
> 
> bq. Create a taskId from the combination of store name and partition
> provided by the user
> 
> I wonder if a single taskId would be used for the “all partitions” case.
> If so, we need to choose a numerical value for the partition portion of the
> taskId.
> 
> On Sat, Jan 18, 2020 at 10:27 AM John Roesler  wrote:
> 
> > Thanks, Ted!
> >
> > This makes sense, but it seems like we should lean towards explicit
> > semantics in the public API. ‘-1’ meaning “all partitions” is reasonable,
> > but not explicit. That’s why I suggested the Boolean for “all partitions”.
> > I guess this also means getPartition() should either throw an exception or
> > return null if the partition is unspecified.
> >
> > Thanks,
> > John
> >
> > On Sat, Jan 18, 2020, at 08:43, Ted Yu wrote:
> > > I wonder if the following two methods can be combined:
> > >
> > > Integer getPartition() // would be null if unset or if "all partitions"
> > > boolean getAllLocalPartitions() // true/false if "all partitions"
> > requested
> > >
> > > into:
> > >
> > > Integer getPartition() // would be null if unset or -1 if "all
> > partitions"
> > >
> > > Cheers
> > >
> > > On Fri, Jan 17, 2020 at 9:56 PM John Roesler 
> > wrote:
> > >
> > > > Thanks, Navinder!
> > > >
> > > > I took a look at the KIP.
> > > >
> > > > We tend to use static factory methods instead of public constructors,
> > and
> > > > also builders for optional parameters.
> > > >
> > > > Given that, I think it would be more typical to have a factory method:
> > > > storeQueryParams()
> > > >
> > > > and also builders for setting the optional parameters, like:
> > > > withPartitions(List partitions)
> > > > withStaleStoresEnabled()
> > > > withStaleStoresDisabled()
> > > >
> > > >
> > > > I was also thinking this over today, and it really seems like there are
> > > > two main cases for specifying partitions,
> > > > 1. you know exactly what partition you want. In this case, you'll only
> > > > pass in a single number.
> > > > 2. you want to get a handle on all the stores for this instance (the
> > > > current behavior). In this case, it's not clear how to use
> > withPartitions
> > > > to achieve the goal, unless you want to apply a-priori knowledge of the
> > > > number of partitions in the store. We could consider an empty list, or
> > a
> > > > null, to indicate "all", but that seems a little complicated.
> > > >
> > > > Thus, maybe it would actually be better to eschew withPartitions for
> > now
> > > > and instead just offer

Re: [DISCUSS] : KIP-562: Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-19 Thread Navinder Brar
I have made some edits in the KIP, please take another look. It would be great 
if we can push it in 2.5.0.
~Navinder


On Sunday, January 19, 2020, 12:59 AM, Navinder Brar 
 wrote:

Sure John, I will update the StoreQueryParams with static factory methods.
@Ted, we would need to create taskId only in case a user provides one single 
partition. In case user wants to query all partitions of an instance the 
current code is good enough where we iterate over all stream threads and go 
over all taskIds to match the store. But in case a user requests for a single 
partition-based store, we need to create a taskId out of that partition and 
store name(using internalTopologyBuilder class) and match with the taskIds 
belonging to that instance. I will add the code in the KIP. 

    On Sunday, 19 January, 2020, 12:47:08 am IST, Ted Yu  
wrote:  
 
 Looking at the current KIP-562:

bq. Create a taskId from the combination of store name and partition
provided by the user

I wonder if a single taskId would be used for the “all partitions” case.
If so, we need to choose a numerical value for the partition portion of the
taskId.

On Sat, Jan 18, 2020 at 10:27 AM John Roesler  wrote:

> Thanks, Ted!
>
> This makes sense, but it seems like we should lean towards explicit
> semantics in the public API. ‘-1’ meaning “all partitions” is reasonable,
> but not explicit. That’s why I suggested the Boolean for “all partitions”.
> I guess this also means getPartition() should either throw an exception or
> return null if the partition is unspecified.
>
> Thanks,
> John
>
> On Sat, Jan 18, 2020, at 08:43, Ted Yu wrote:
> > I wonder if the following two methods can be combined:
> >
> > Integer getPartition() // would be null if unset or if "all partitions"
> > boolean getAllLocalPartitions() // true/false if "all partitions"
> requested
> >
> > into:
> >
> > Integer getPartition() // would be null if unset or -1 if "all
> partitions"
> >
> > Cheers
> >
> > On Fri, Jan 17, 2020 at 9:56 PM John Roesler 
> wrote:
> >
> > > Thanks, Navinder!
> > >
> > > I took a look at the KIP.
> > >
> > > We tend to use static factory methods instead of public constructors,
> and
> > > also builders for optional parameters.
> > >
> > > Given that, I think it would be more typical to have a factory method:
> > > storeQueryParams()
> > >
> > > and also builders for setting the optional parameters, like:
> > > withPartitions(List partitions)
> > > withStaleStoresEnabled()
> > > withStaleStoresDisabled()
> > >
> > >
> > > I was also thinking this over today, and it really seems like there are
> > > two main cases for specifying partitions,
> > > 1. you know exactly what partition you want. In this case, you'll only
> > > pass in a single number.
> > > 2. you want to get a handle on all the stores for this instance (the
> > > current behavior). In this case, it's not clear how to use
> withPartitions
> > > to achieve the goal, unless you want to apply a-priori knowledge of the
> > > number of partitions in the store. We could consider an empty list, or
> a
> > > null, to indicate "all", but that seems a little complicated.
> > >
> > > Thus, maybe it would actually be better to eschew withPartitions for
> now
> > > and instead just offer:
> > > withPartition(int partition)
> > > withAllLocalPartitions()
> > >
> > > and the getters:
> > > Integer getPartition() // would be null if unset or if "all partitions"
> > > boolean getAllLocalPartitions() // true/false if "all partitions"
> requested
> > >
> > > Sorry, I know I'm stirring the pot, but what do you think about this?
> > >
> > > Oh, also, the KIP is missing the method signature for the new
> > > KafkaStreams#store overload.
> > >
> > > Thanks!
> > > -John
> > >
> > > On Fri, Jan 17, 2020, at 08:07, Navinder Brar wrote:
> > > > Hi all,
> > > > I have created a new
> > > > KIP:
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance
> > > > Please take a look if you get a chance.
> > > > ~Navinder
> > >
> >
>  




Re: [DISCUSS] : KIP-562: Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-18 Thread Navinder Brar
Sure John, I will update the StoreQueryParams with static factory methods.
@Ted, we would need to create taskId only in case a user provides one single 
partition. In case user wants to query all partitions of an instance the 
current code is good enough where we iterate over all stream threads and go 
over all taskIds to match the store. But in case a user requests for a single 
partition-based store, we need to create a taskId out of that partition and 
store name(using internalTopologyBuilder class) and match with the taskIds 
belonging to that instance. I will add the code in the KIP. 

On Sunday, 19 January, 2020, 12:47:08 am IST, Ted Yu  
wrote:  
 
 Looking at the current KIP-562:

bq. Create a taskId from the combination of store name and partition
provided by the user

I wonder if a single taskId would be used for the “all partitions” case.
If so, we need to choose a numerical value for the partition portion of the
taskId.

On Sat, Jan 18, 2020 at 10:27 AM John Roesler  wrote:

> Thanks, Ted!
>
> This makes sense, but it seems like we should lean towards explicit
> semantics in the public API. ‘-1’ meaning “all partitions” is reasonable,
> but not explicit. That’s why I suggested the Boolean for “all partitions”.
> I guess this also means getPartition() should either throw an exception or
> return null if the partition is unspecified.
>
> Thanks,
> John
>
> On Sat, Jan 18, 2020, at 08:43, Ted Yu wrote:
> > I wonder if the following two methods can be combined:
> >
> > Integer getPartition() // would be null if unset or if "all partitions"
> > boolean getAllLocalPartitions() // true/false if "all partitions"
> requested
> >
> > into:
> >
> > Integer getPartition() // would be null if unset or -1 if "all
> partitions"
> >
> > Cheers
> >
> > On Fri, Jan 17, 2020 at 9:56 PM John Roesler 
> wrote:
> >
> > > Thanks, Navinder!
> > >
> > > I took a look at the KIP.
> > >
> > > We tend to use static factory methods instead of public constructors,
> and
> > > also builders for optional parameters.
> > >
> > > Given that, I think it would be more typical to have a factory method:
> > > storeQueryParams()
> > >
> > > and also builders for setting the optional parameters, like:
> > > withPartitions(List partitions)
> > > withStaleStoresEnabled()
> > > withStaleStoresDisabled()
> > >
> > >
> > > I was also thinking this over today, and it really seems like there are
> > > two main cases for specifying partitions,
> > > 1. you know exactly what partition you want. In this case, you'll only
> > > pass in a single number.
> > > 2. you want to get a handle on all the stores for this instance (the
> > > current behavior). In this case, it's not clear how to use
> withPartitions
> > > to achieve the goal, unless you want to apply a-priori knowledge of the
> > > number of partitions in the store. We could consider an empty list, or
> a
> > > null, to indicate "all", but that seems a little complicated.
> > >
> > > Thus, maybe it would actually be better to eschew withPartitions for
> now
> > > and instead just offer:
> > > withPartition(int partition)
> > > withAllLocalPartitions()
> > >
> > > and the getters:
> > > Integer getPartition() // would be null if unset or if "all partitions"
> > > boolean getAllLocalPartitions() // true/false if "all partitions"
> requested
> > >
> > > Sorry, I know I'm stirring the pot, but what do you think about this?
> > >
> > > Oh, also, the KIP is missing the method signature for the new
> > > KafkaStreams#store overload.
> > >
> > > Thanks!
> > > -John
> > >
> > > On Fri, Jan 17, 2020, at 08:07, Navinder Brar wrote:
> > > > Hi all,
> > > > I have created a new
> > > > KIP:
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance
> > > > Please take a look if you get a chance.
> > > > ~Navinder
> > >
> >
>  

Re: [DISCUSS] : KIP-562: Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-17 Thread Navinder Brar
Hi John,
Thanks for looking into it. 
On using constructors rather than static factory methods I was coming from the 
convention on the classes currently available to users such as LagInfo and 
KeyQueryMetadata. Let me know if it's still favorable to change 
StoreQueryParams into static factory method, I will update the KIP.
On List partitions vs Integer partition, I agree sending empty list to 
return all is cumbersome so will change the class to have a single partition in 
case users want to fetch store for a partition and if it's unset we will return 
all partitions available.
On KafkaStreams#store overload - noted. I will update the KIP.
Thanks,Navinder 

On Saturday, 18 January, 2020, 11:26:30 am IST, John Roesler 
 wrote:  
 
 Thanks, Navinder!

I took a look at the KIP.

We tend to use static factory methods instead of public constructors, and also 
builders for optional parameters.

Given that, I think it would be more typical to have a factory method:
storeQueryParams() 

and also builders for setting the optional parameters, like:
withPartitions(List partitions)
withStaleStoresEnabled()
withStaleStoresDisabled()


I was also thinking this over today, and it really seems like there are two 
main cases for specifying partitions,
1. you know exactly what partition you want. In this case, you'll only pass in 
a single number.
2. you want to get a handle on all the stores for this instance (the current 
behavior). In this case, it's not clear how to use withPartitions to achieve 
the goal, unless you want to apply a-priori knowledge of the number of 
partitions in the store. We could consider an empty list, or a null, to 
indicate "all", but that seems a little complicated.

Thus, maybe it would actually be better to eschew withPartitions for now and 
instead just offer:
withPartition(int partition)
withAllLocalPartitions()

and the getters:
Integer getPartition() // would be null if unset or if "all partitions"
boolean getAllLocalPartitions() // true/false if "all partitions" requested

Sorry, I know I'm stirring the pot, but what do you think about this?

Oh, also, the KIP is missing the method signature for the new 
KafkaStreams#store overload.

Thanks!
-John

On Fri, Jan 17, 2020, at 08:07, Navinder Brar wrote:
> Hi all,
> I have created a new 
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance
> Please take a look if you get a chance.
> ~Navinder  

[DISCUSS] : KIP-562: Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-17 Thread Navinder Brar
Hi all,
I have created a new KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance
Please take a look if you get a chance.
~Navinder

[jira] [Created] (KAFKA-9445) Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-16 Thread Navinder Brar (Jira)
Navinder Brar created KAFKA-9445:


 Summary: Allow fetching a key from a single partition rather than 
iterating over all the stores on an instance
 Key: KAFKA-9445
 URL: https://issues.apache.org/jira/browse/KAFKA-9445
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Navinder Brar
Assignee: Navinder Brar


Currently when expanding the KS cluster, the new node's partitions will be 
unavailable during the rebalance, which for large states can take a very long 
time, or for small state stores even more than a few ms can be a deal-breaker 
for micro service use cases.

One workaround is to allow stale data to be read from the state stores when use 
case allows. Adding the use case from KAFKA-8994 as it is more descriptive.

"Consider the following scenario in a three node Streams cluster with node A, 
node S and node R, executing a stateful sub-topology/topic group with 1 
partition and `_num.standby.replicas=1_`  
 * *t0*: A is the active instance owning the partition, B is the standby that 
keeps replicating the A's state into its local disk, R just routes streams IQs 
to active instance using StreamsMetadata
 * *t1*: IQs pick node R as router, R forwards query to A, A responds back to R 
which reverse forwards back the results.
 * *t2:* Active A instance is killed and rebalance begins. IQs start failing to 
A
 * *t3*: Rebalance assignment happens and standby B is now promoted as active 
instance. IQs continue to fail
 * *t4*: B fully catches up to changelog tail and rewinds offsets to A's last 
commit position, IQs continue to fail
 * *t5*: IQs to R, get routed to B, which is now ready to serve results. IQs 
start succeeding again

 

Depending on Kafka consumer group session/heartbeat timeouts, step t2,t3 can 
take few seconds (~10 seconds based on defaults values). Depending on how laggy 
the standby B was prior to A being killed, t4 can take few seconds-minutes. 

While this behavior favors consistency over availability at all times, the long 
unavailability window might be undesirable for certain classes of applications 
(e.g simple caches or dashboards). 

This issue aims to also expose information about standby B to R, during each 
rebalance such that the queries can be routed by an application to a standby to 
serve stale reads, choosing availability over consistency."



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Role of CompositeReadOnlyKeyValueStore for point queries

2020-01-16 Thread Navinder Brar
Thanks John for the prompt response. Yeah, store(int partition) is exactly what 
I meant. Sorry for the confusion. Since we have already merged PR:7962 I can 
take the new trunk and start working on the KIP in the meantime. I agree that 
we should replace the newly added param. It would be a good improvement for 
improving latencies for applications that contain multiple partitions on a 
single instance and don't have bloom filters enabled internally for Rocksdb.

On Thursday, 16 January, 2020, 10:22:45 pm IST, John Roesler 
 wrote:  
 
 Hey Navinder,

I agree this is not ideal. The answer is that until KIP-535, you wouldn't
necessarily know which partition you were querying, just the instance.

Now, though, the information is front-and-center, and we should
update the store() API to favor efficiently routing the query directly
to the right store instance, rather than iterating over all the stores.

I'd been privately planning to propose a new `KafkaStreams#store()`
method that lets you just request a store for a particular partition.
Reading your message, I think maybe this is what you meant, instead
of `get()`?

If you want to take it on, I'd be more than happy to take the lead on
reviewing your work.

One food for thought is that we just added a new overload:
`store(name, type, boolean includeStaleStores)`. Instead of adding
another new overload, we should replace the new overload with
one taking a config object.

Suggestion:
KafkaStreams#store(
  String storeName,
  QueryableStoreType type,
  StoreQueryParams options
)

where:
StoreQueryParams {
  (enable/disable/get)IncludeStaleStores()
  (with/get)partitions(List)
}

Although this isn't a DSL operation, please consider 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
when proposing new interfaces.

If you're _really_ fast, we could potentially get this in to 2.5:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=143428858
We'd basically have to call for a vote by Friday to have a chance of meeting 
the deadline.
Then, we could just replace the new overload instead of deprecating it.
But it's not such a huge deal to deprecate it, so no pressure.

Thanks for the astute observation!
-John

On Wed, Jan 15, 2020, at 22:50, Navinder Brar wrote:
> Hi all,
> Can someone explain to me the thoughts behind having 
> CompositeReadOnlyKeyValueStore. java class while serving data via APIs 
> in Kafka Streams. It fetches the list of stores for all the running 
> tasks on the machine and then looks for a key one by one in each of the 
> stores. When we already know the key belongs to a particular 
> partition(with the help of partitioner) we can just query that 
> particular partition's store right?
> I am thinking of overloading the get() function as get(int partition) 
> and sending just the store for that single partition from 
> QueryableStoreProvider.java so that all the stores are needed to be 
> iterated through to fetch a key.
> Regards,
> Navinder
>
  

Role of CompositeReadOnlyKeyValueStore for point queries

2020-01-15 Thread Navinder Brar
Hi all,
Can someone explain to me the thoughts behind having 
CompositeReadOnlyKeyValueStore. java class while serving data via APIs in Kafka 
Streams. It fetches the list of stores for all the running tasks on the machine 
and then looks for a key one by one in each of the stores. When we already know 
the key belongs to a particular partition(with the help of partitioner) we can 
just query that particular partition's store right?
I am thinking of overloading the get() function as get(int partition) and 
sending just the store for that single partition from 
QueryableStoreProvider.java so that all the stores are needed to be iterated 
through to fetch a key.
Regards,
Navinder


Re: [VOTE] KIP-535: Allow state stores to serve stale reads during rebalance

2020-01-15 Thread Navinder Brar
 > >>>> harmful behavioral change. I'm fine with adding the new overload you
> > >>>> mentioned, just a simple boolean flag to enable the new behavior.
> > >>>>
> > >>>> I'd actually propose that we call this flag "queryStaleData", with
> a default
> > >>>> of "false". The meaning of this would be to preserve exactly the
> existing
> > >>>> semantics. I.e., that the store must be both active and running to
> be
> > >>>> included.
> > >>>>
> > >>>> It seems less severe to just suddenly start returning queries from
> standbys,
> > >>>> but in the interest of safety, the easiest thing is just flag the
> whole feature.
> > >>>>
> > >>>> If you, Navinder, and Vinoth agree, we can just update the KIP. It
> seems like
> > >>>> a pretty uncontroversial amendment to avoid breaking query
> consistency
> > >>>> semantics.
> > >>>>
> > >>>> Thanks,
> > >>>> -John
> > >>>>
> > >>>>
> > >>>> On Tue, Jan 14, 2020, at 13:21, Matthias J. Sax wrote:
> > >>>>> During the discussion of KIP-216
> > >>>>> (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors
> )
> > >>>>> we encountered that KIP-535 introduces a behavior change that is
> not
> > >>>>> backward compatible, hence, I would like to request a small change.
> > >>>>>
> > >>>>> KIP-535 suggests, that active tasks can be queried during recovery
> and
> > >>>>> no exception would be thrown and longer. This is a change in
> behavior
> > >>>>> and in fact introduces a race condition for users that only want to
> > >>>>> query consistent state. Querying inconsistent state should be an
> opt-in,
> > >>>>> and for StandbyTasks, user can opt-in by querying them or opt-out
> by not
> > >>>>> querying them. However, for active task, if we don't throw an
> exception
> > >>>>> during recovery, users cannot opt-out of querying potentially
> > >>>>> inconsistent state, because they would need to check the state
> (ie, ==
> > >>>>> RUNNING) before they would query the active task -- however, the
> state
> > >>>>> might change at any point in between, and there is a race.
> > >>>>>
> > >>>>> Hence, I would suggest to actually not change the default behavior
> of
> > >>>>> existing methods and we should throw an exception during restore
> if an
> > >>>>> active task is queried. To allow user to opt-in to query an active
> task
> > >>>>> during restore, we would add an overload
> > >>>>>
> > >>>>>> KafkaStream#store(..., boolean allowQueryWhileStateIsRestoring)
> > >>>>>
> > >>>>> (with an hopefully better/short variable name). Developers would
> use
> > >>>>> this new method to opt-into querying active tasks during restore.
> > >>>>>
> > >>>>> Thoughts?
> > >>>>>
> > >>>>>
> > >>>>> -Matthias
> > >>>>>
> > >>>>> On 11/18/19 10:45 AM, Vinoth Chandar wrote:
> > >>>>>> Thanks, everyone involved!
> > >>>>>>
> > >>>>>> On Mon, Nov 18, 2019 at 7:51 AM John Roesler 
> wrote:
> > >>>>>>
> > >>>>>>> Thanks to you, also, Navinder!
> > >>>>>>>
> > >>>>>>> Looking forward to getting this feature in.
> > >>>>>>> -John
> > >>>>>>>
> > >>>>>>> On Sun, Nov 17, 2019 at 11:34 PM Navinder Brar
> > >>>>>>>  wrote:
> > >>>>>>>>
> > >>>>>>>>  Hello all,
> > >>>>>>>>
> > >>>>>>>> With 4 binding +1 votes from Guozhang Wang, Matthias J. Sax,
> Bill Bejeck,
> > >>>>>>>> and John Roesler, the vote passes.
> > >>>>>>>> Thanks Guozhang, Matthias, Bill, John, Sophie for the healthy
> > >>>>>>> discussions and Vinoth for all the help on this KIP.
> > >>>>>>>> Best,
> > >>>>>>>> Navinder
> > >>>>>>>>
> > >>>>>>>>    On Friday, 15 November, 2019, 11:32:31 pm IST, John Roesler
> <
> > >>>>>>> j...@confluent.io> wrote:
> > >>>>>>>>
> > >>>>>>>>  I'm +1 (binding) as well.
> > >>>>>>>>
> > >>>>>>>> Thanks,
> > >>>>>>>> -John
> > >>>>>>>>
> > >>>>>>>> On Fri, Nov 15, 2019 at 6:20 AM Bill Bejeck 
> wrote:
> > >>>>>>>>>
> > >>>>>>>>> +1 (binding)
> > >>>>>>>>>
> > >>>>>>>>> On Fri, Nov 15, 2019 at 1:11 AM Matthias J. Sax <
> matth...@confluent.io
> > >>>>>>>>
> > >>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> +1 (binding)
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> On 11/14/19 3:48 PM, Guozhang Wang wrote:
> > >>>>>>>>>>> +1 (binding), thanks for the KIP!
> > >>>>>>>>>>>
> > >>>>>>>>>>> Guozhang
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Fri, Nov 15, 2019 at 4:38 AM Navinder Brar
> > >>>>>>>>>>>  wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Hello all,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I'd like to propose a vote for serving interactive queries
> during
> > >>>>>>>>>>>> Rebalancing, as it is a big deal for applications looking
> for high
> > >>>>>>>>>>>> availability. With this change, users will have control
> over the
> > >>>>>>>>>> tradeoff
> > >>>>>>>>>>>> between consistency and availability during serving.
> > >>>>>>>>>>>> The full KIP is provided here:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>> Navinder
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>>> Attachments:
> > >>>>> * signature.asc
> > >>>>
> > >>
> > >>
> > >> Attachments:
> > >> * signature.asc
> >
> >
> > Attachments:
> > * signature.asc
>


-- 
-- Guozhang  

Re: [VOTE] KIP-216: IQ should throw different exceptions for different errors

2020-01-14 Thread Navinder Brar
+1 (non-binding) With a small comment which was mentioned by Vinoth as well. 
Did we fix on the flag for StreamsRebalancingException, I don't see it in the 
KIP.
-Navinder


On Tuesday, 14 January, 2020, 08:00:11 pm IST, Vito Jeng 
 wrote:  
 
 Hi, all,

I would like to start the vote for KIP-216.

Currently, IQ throws InvalidStateStoreException for any types of error.
With this KIP, user can distinguish different types of error.

KIP is here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors

Thanks

---
Vito
-- 


---
Vito
  

Re: [VOTE] KIP-545 support automated consumer offset sync across clusters in MM 2.0

2020-01-14 Thread Navinder Brar
+1 (non-binding)
Navinder
On Tuesday, 14 January, 2020, 07:24:02 pm IST, Ryanne Dolan 
 wrote:  
 
 Bump. We've got 4 non-binding and one binding vote.

Ryanne

On Fri, Dec 13, 2019, 1:44 AM Tom Bentley  wrote:

> +1 (non-binding)
>
> On Thu, Dec 12, 2019 at 6:33 PM Andrew Schofield <
> andrew_schofi...@live.com>
> wrote:
>
> > +1 (non-binding)
> >
> > On 12/12/2019, 14:20, "Mickael Maison" 
> wrote:
> >
> >    +1 (binding)
> >    Thanks for the KIP!
> >
> >    On Thu, Dec 5, 2019 at 12:56 AM Ryanne Dolan 
> > wrote:
> >    >
> >    > Bump. We've got 2 non-binding votes so far.
> >    >
> >    > On Wed, Nov 13, 2019 at 3:32 PM Ning Zhang  >
> > wrote:
> >    >
> >    > > My current plan is to implement this in "MirrorCheckpointTask"
> >    > >
> >    > > On 2019/11/02 03:30:11, Xu Jianhai  wrote:
> >    > > > I think this kip will implement a task in sinkTask ? right?
> >    > > >
> >    > > > On Sat, Nov 2, 2019 at 1:06 AM Ryanne Dolan <
> > ryannedo...@gmail.com>
> >    > > wrote:
> >    > > >
> >    > > > > Hey y'all, Ning Zhang and I would like to start the vote for
> > the
> >    > > following
> >    > > > > small KIP:
> >    > > > >
> >    > > > >
> >    > > > >
> >    > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0
> >    > > > >
> >    > > > > This is an elegant way to automatically write consumer group
> > offsets to
> >    > > > > downstream clusters without breaking existing use cases.
> > Currently, we
> >    > > rely
> >    > > > > on external tooling based on RemoteClusterUtils and
> >    > > kafka-consumer-groups
> >    > > > > command to write offsets. This KIP bakes this functionality
> > into MM2
> >    > > > > itself, reducing the effort required to failover/failback
> > workloads
> >    > > between
> >    > > > > clusters.
> >    > > > >
> >    > > > > Thanks for the votes!
> >    > > > >
> >    > > > > Ryanne
> >    > > > >
> >    > > >
> >    > >
> >
> >
> >
>  

Re: [VOTE] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-17 Thread Navinder Brar
 Hello all,

With 4 binding +1 votes from Guozhang Wang, Matthias J. Sax, Bill Bejeck,
and John Roesler, the vote passes.
Thanks Guozhang, Matthias, Bill, John, Sophie for the healthy discussions and 
Vinoth for all the help on this KIP.
Best,
Navinder 

On Friday, 15 November, 2019, 11:32:31 pm IST, John Roesler 
 wrote:  
 
 I'm +1 (binding) as well.

Thanks,
-John

On Fri, Nov 15, 2019 at 6:20 AM Bill Bejeck  wrote:
>
> +1 (binding)
>
> On Fri, Nov 15, 2019 at 1:11 AM Matthias J. Sax 
> wrote:
>
> > +1 (binding)
> >
> >
> > On 11/14/19 3:48 PM, Guozhang Wang wrote:
> > > +1 (binding), thanks for the KIP!
> > >
> > > Guozhang
> > >
> > > On Fri, Nov 15, 2019 at 4:38 AM Navinder Brar
> > >  wrote:
> > >
> > >> Hello all,
> > >>
> > >> I'd like to propose a vote for serving interactive queries during
> > >> Rebalancing, as it is a big deal for applications looking for high
> > >> availability. With this change, users will have control over the
> > tradeoff
> > >> between consistency and availability during serving.
> > >> The full KIP is provided here:
> > >>
> > >>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
> > >>
> > >>
> > >> Thanks,
> > >> Navinder
> > >
> > >
> > >
> >
> >
  

Re: Potential Bug in 2.3 version (leading to deletion of state directories)

2019-11-14 Thread Navinder Brar
Hi John,
Thanks for the response. Yeah, by "marked for deletion" I meant the unlocking 
of the store(by which in a way it is marked for deletion). From what I have 
seen the standby task gets stuck in Created state and doesn't move to Running 
and is not able to recreate the directory. Also, the point is not just that. 
With the new KIP to support serving from replicas we want to have very less 
downtime on replicas and in this case we already have a completely built state 
directory which is getting deleted just because of the assignment change on the 
thread(the host is still same). We have StreamsMetadataState#allMetadata() 
which would be common for all threads of all instances. Can't we have a 
conditional check during unlocking which checks allMetadata and finds out that 
the partition we are about to unlock is assigned to this host(we don't care 
which thread of this host) and then we don't unlock the task, meanwhile the 
Stream Thread-2 will take the lock on its own when it moves to Running.
Thanks,Navinder
On Friday, 15 November, 2019, 02:55:40 am IST, John Roesler 
 wrote:  
 
 Hey Navinder,

I think what's happening is a little different. Let's see if my
worldview also explains your experiences.

There is no such thing as "mark for deletion". When a thread loses a
task, it simply releases its lock on the directory. If no one else on
the instance claims that lock within `state.cleanup.delay.ms` amount
of milliseconds, then the state cleaner will itself grab the lock and
delete the directory. On the other hand, if another thread (or the
same thread) gets the task back and claims the lock before the
cleaner, it will be able to re-open the store and use it.

The default for `state.cleanup.delay.ms` is 10 minutes, which is
actually short enough that it could pass during a single rebalance (if
Streams starts recovering a lot of state). I recommend you increase
`state.cleanup.delay.ms` by a lot, like maybe set it to one hour.

One thing I'm curious about... You didn't mention if Thread-2
eventually is able to re-create the state directory (after the cleaner
is done) and transition to RUNNING. This should be the case. If not, I
would consider it a bug.

Thanks,
-John

On Thu, Nov 14, 2019 at 3:02 PM Navinder Brar
 wrote:
>
> Hi,
> We are facing a peculiar situation in the 2.3 version of Kafka Streams. First 
> of all, I want to clarify if it is possible that a Stream Thread (say Stream 
> Thread-1) which had got an assignment for a standby task (say 0_0) can change 
> to Stream Thread-2 on the same host post rebalancing. The issue we are facing 
> is this is happening for us and post rebalancing since the Stream Thread-1 
> had 0_0 and is not assigned back to it, it closes that task and marks it for 
> deletion(after cleanup delay time), and meanwhile, the task gets assigned to 
> Stream Thread-2. When the Stream Thread-2 tries to transition this task to 
> Running, it gets a LockException which is caught in 
> AssignedTasks#initializeNewTasks(). This makes 0_0 stay in Created state on 
> Stream Thread-2 and after the cleanup delay is over the task directories for 
> 0_0 get deleted.
> Can someone please comment on this behavior.
> Thanks,Navinder  

Potential Bug in 2.3 version (leading to deletion of state directories)

2019-11-14 Thread Navinder Brar
Hi,
We are facing a peculiar situation in the 2.3 version of Kafka Streams. First 
of all, I want to clarify if it is possible that a Stream Thread (say Stream 
Thread-1) which had got an assignment for a standby task (say 0_0) can change 
to Stream Thread-2 on the same host post rebalancing. The issue we are facing 
is this is happening for us and post rebalancing since the Stream Thread-1 had 
0_0 and is not assigned back to it, it closes that task and marks it for 
deletion(after cleanup delay time), and meanwhile, the task gets assigned to 
Stream Thread-2. When the Stream Thread-2 tries to transition this task to 
Running, it gets a LockException which is caught in 
AssignedTasks#initializeNewTasks(). This makes 0_0 stay in Created state on 
Stream Thread-2 and after the cleanup delay is over the task directories for 
0_0 get deleted.
Can someone please comment on this behavior.
Thanks,Navinder

[VOTE] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-14 Thread Navinder Brar
Hello all,

I'd like to propose a vote for serving interactive queries during Rebalancing, 
as it is a big deal for applications looking for high availability. With this 
change, users will have control over the tradeoff between consistency and 
availability during serving.
The full KIP is provided here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance


Thanks,
Navinder

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-14 Thread Navinder Brar
; :  KafkaStreams#metadataForKey(..), to get more users to use the new
> > > queryMetadataForKey APIs
> > > - We will still have to enhance StreamsMetadata with fields for
> > > standbyTopicPartitions and standbyStateStoreNames, since that is a core
> > > object that gets updated upon rebalance.
> > >
> > >
> > > Please let us know if this is agreeable.  We will also work some of this
> > > discussion into the background/proposed changes sections, upon feedback.
> > >
> > >
> > > On Tue, Nov 12, 2019 at 9:17 AM Vinoth Chandar 
> > > wrote:
> > >
> > >> In all, is everyone OK with
> > >>
> > >>  - Dropping KeyQueryMetadata, and the allMetadataForKey() apis
> > >>  - Dropping allLagInfo() from KafkaStreams class, Drop StoreLagInfo
> > class
> > >>  - Add offsetLag(store, key, serializer) -> Optional &
> > >> offsetLag(store, key, partitioner) -> Optional to StreamsMetadata
> > >>  - Duplicate the current methods for standbyMetadata in KafkaStreams :
> > >> allStandbyMetadata(), allStandbyMetadataForStore(), two variants of
> > >> standbyMetadataForKey(),
> > >>
> > >>
> > >> Responses to Guozhang:
> > >>
> > >> 1.1 Like I mentioned before, the allStandbyMetadata() and
> > >> allStandbyMetadataForStore() complement existing allMetadata() and
> > >> allMetadataForStore(), since we don't want to change behavior of
> > existing
> > >> APIs. Based on discussions so far, if we decide to drop
> > KeyQueryMetadata,
> > >> then we will need to introduce 4 equivalents for standby metadata as
> > >> Matthias mentioned.
> > >> 1.2 I am okay with pushing lag information to a method on
> > StreamsMetadata
> > >> (Personally, I won't design it like that, but happy to live with it)
> > like
> > >> what Matthias suggested. But assuming topic name <=> store name
> > equivalency
> > >> for mapping this seems like a broken API to me. If all of Streams code
> > were
> > >> written like this, I can understand. But I don't think its the case? I
> > >> would not be comfortable making such assumptions outside of public APIs.
> > >>>> look into each one's standby partition / stores to tell which one
> > >> StreamsMetadata is corresponding to the instance who holds a specific
> > key
> > >> as standby, yes, but I feel this one extra iteration is worth to avoid
> > >> introducing a new class.
> > >> This sort of thing would lead to non-standardized/potentially buggy
> > client
> > >> implementations, for something I expect the system would hand me
> > directly.
> > >> I don't personally feel introducing a new class is so bad, to warrant
> > the
> > >> user to do all this matching. Given the current APIs are not explicitly
> > >> named to denote active metadata, it gives us a chance to build something
> > >> more direct and clear IMO. If we do allMetadataForKey apis, then we
> > should
> > >> clearly separate active and standby ourselves. Alternate is separate
> > active
> > >> and standby APIs as Matthias suggests, which I can make peace with.
> > >>
> > >> 1.3 Similar as above. In Streams code, we treat topic partitions and
> > store
> > >> names separately?.
> > >> 2.1 I think most databases build replication using logical offsets, not
> > >> time. Time lag can be a nice to have feature, but offset lag is fully
> > >> sufficient for a lot of use-cases.
> > >> 2.2.1 We could support a lagInfoForStores() batch api. makes sense.
> > >>
> > >>
> > >> Responses to Matthias :
> > >> (100) Streams can still keep the upto date version in memory and
> > >> implementation could be for now just reading this already refreshed
> > value.
> > >> Designing the API, with intent  of pushing this to the user keeps doors
> > >> open for supporting time based lag in the future.
> > >> (101) I am not sure what the parameters of evaluating approaches here
> > is.
> > >> Generally, when I am handed a Metadata object, I don't expect to further
> > >> query it for more information semantically. I would not also force user
> > to
> > >> make separate calls for active and standby metadata.
> > >> Well, that may be just me. So sure, we can push this into
> > StreamsMetad

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-12 Thread Navinder Brar
isting methods return all active metadata, there is no
> reason to return `KeyQueryMetadata` as it's more complicated to get the
> standby metadata. With `KeyQueryMetadata` the user needs to make more
> calls to get the metadata:
>
>  KafkaStreams#allMetadataForKey()
>              #getActive()
>
>  KafkaStreams#allMetadataForKey()
>              #getStandby()
>
> vs:
>
>  KafkaStreams#metadataForKey()
>
>  KafkaStreams#standbyMetadataForKey()
>
> The wrapping of both within `KeyQueryMetadata` does not seem to provide
> any benefit but increase our public API surface.
>
>
>
> @Guozhang:
>
> (1.1. + 1.2.) From my understanding `allMetadata()` (and other existing
> methods) will only return the metadata of _active_ tasks for backward
> compatibility reasons. If we would return standby metadata, existing
> code would potentially "break" because the code might pick a standby to
> query a key without noticing.
>
>
>
> -Matthias
>
>
> On 11/8/19 6:07 AM, Navinder Brar wrote:
> > Thanks, Guozhang for going through it again.
> >
> >    - 1.1 & 1.2: The main point of adding topicPartition in
> KeyQueryMetadata is not topicName, but the partition number. I agree
> changelog topicNames and store names will have 1-1 mapping but we also need
> the partition number of the changelog for which are calculating the lag.
> Now we can add partition number in StreamsMetadata but it will be
> orthogonal to the definition of StreamsMetadata i.e.- “Represents the state
> of an instance (process) in a {@link KafkaStreams} application.”  If we add
> partition number in this, it doesn’t stay metadata for an instance, because
> now it is storing the partition information for a key being queried. So,
> having “KeyQueryMetadata” simplifies this as now it contains all the
> metadata and also changelog and partition information for which we need to
> calculate the lag.
> >
> > Another way is having another function in parallel to metadataForKey,
> which returns the partition number for the key being queried. But then we
> would need 2 calls to StreamsMetadataState, once to fetch metadata and
> another to fetch partition number. Let me know if any of these two ways
> seem more intuitive than KeyQueryMetadata then we can try to converge on
> one.
> >    - 1.3:  Again, it is required for the partition number. We can drop
> store name though.
> >    - 2.1: I think this was done in accordance with the opinion from John
> as time lag would be better implemented with a broker level change and
> offset change is readily implementable. @vinoth?
> >    - 2.2.1: Good point.  +1
> >    - 2.2.2: I am not well aware of it, @vinoth any comments?
> >    - 3.1: I think we have already agreed on dropping this, we need to
> KIP. Also, is there any opinion on lagInfoForStore(String storeName) vs
> lagInfoForStore(String storeName, int partition)
> >    - 3.2: But in functions such as onAssignment(),
> onPartitionsAssigned(), for standbyTasks also the topicPartitions we use
> are input topic partitions and not changelog partitions. Would this be
> breaking from that semantics?
> >
> >
> >    On Thursday, 7 November, 2019, 11:33:19 pm IST, Guozhang Wang <
> wangg...@gmail.com> wrote:
> >
> >  Hi Navinder, Vinoth, thanks for the updated KIP!
> >
> > Read through the discussions so far and made another pass on the wiki
> page,
> > and here are some more comments:
> >
> > 1. About the public APIs:
> >
> > 1.1. It is not clear to me how allStandbyMetadataForStore
> > and allStandbyMetadata would be differentiated from the original APIs
> given
> > that we will augment StreamsMetadata to include both active and standby
> > topic-partitions and store names, so I think we can still use allMetadata
> > and allMetadataForStore to get the collection of instance metadata that
> > host the store both as active and standbys. Are there any specific use
> > cases where we ONLY want to get the standby's metadata? And even if there
> > are, we can easily filter it out from the allMetadata /
> allMetadataForStore
> > right?
> >
> > 1.2. Similarly I'm wondering for allMetadataForKey, can we return the
> same
> > type: "Collection" which includes 1 for active, and N-1
> > for standbys, and callers can easily identify them by looking inside the
> > StreamsMetadata objects? In addition I feel the "topicPartition" field
> > inside "KeyQueryMetadata" is not very important since the changelog
> > topic-name is always 1-1 mapping to the store name, so as long as the
> store
> > name matches, the changelog topic nam

Re: Why standby tasks read from the StandbyTasks::checkpointedOffsets in assignStandbyPartitions()

2019-11-10 Thread Navinder Brar
Thanks Guozhang.
The jira is filed: [KAFKA-9169] Standby Tasks point ask for incorrect offsets 
on resuming post suspension - ASF JIRA

| 
| 
|  | 
[KAFKA-9169] Standby Tasks point ask for incorrect offsets on resuming p...


 |

 |

 |




On Monday, 11 November, 2019, 03:10:37 am IST, Guozhang Wang 
 wrote:  
 
 Could you file a JIRA report for this so that we can keep track of it and fix?

Guozhang
On Sun, Nov 10, 2019 at 1:39 PM Guozhang Wang  wrote:

If a standby task is suspended, it will write the checkpoint file again after 
flushing its state stores, and when it resumes it does not re initialize the 
position on the consumer and hence it is still the task-manager's 
responsibility to set the right starting offset from the latest checkpoint 
file. If we did not do that, that should still be a bug.

Guozhang
On Sat, Nov 9, 2019 at 11:33 AM Navinder Brar  wrote:

Hi Guozhang,
Thanks for the reply.
So, if I understand it correctly. In versions where KIP-429 was not implemented 
and when we were suspending the standby tasks during rebalance and they were 
resumed post rebalance, they will be reading from the beginning of the offsets 
of changelog, since the will be reading from standbyTask.checkpointedOffsets() 
which was only updated during the first initialization.
Regards,
Navinder
On Sunday, 10 November, 2019, 12:50:39 am IST, Guozhang Wang 
 wrote:  
 
 Hello Navinder,

Sorry for the late reply and thanks for bringing this up. I think this is
indeed a bug that needs to be fixed.

The rationale behind was the following: for restoring active tasks and
processing standby tasks, we are using the same consumer client within the
thread (the restoreConsumer). And before ALL of the active tasks have
completed restoration, the consumer would not get assigned to any of the
standby tasks at all. So in a timeline it should be looking like this with
a rebalance assuming KIP-429 is already in place:

T0: rebalance triggered, some tasks gets revoked but some others may still
be active;
T0-T1: a subset of active tasks (via the main consumer) and all standby
tasks (via the restore consumer) are still processing;
T1: rebalance finished, some new tasks gets assigned, and now needs to be
restored. Restore consumer re-assign to fetch from those restoring consumer
only.
T1-T2: the main consumer paused all partitions, hence no active tasks
processing; also restore consumer only fetching for restoring tasks, and
hence no standby tasks processing;
T2: restoration completed, restore consumer reassigned to those standby
tasks.

Note in T1, the standby tasks are all still "running" but they just do not
proceed any more since the consumer has switched to fetch other partitions;
so at T2 when the consumer switch back it should just resume from where it
has switched off.


Guozhang


On Mon, Nov 4, 2019 at 4:47 AM Navinder Brar
 wrote:

> Hi,
> Please let me know if this is not the correct forum to ask this. But I
> have a doubt, I was hoping someone can clear it for me.
> In TaskManager:: updateNewAndRestoringTasks(), the
> function assignStandbyPartitions() gets called for all the running standby
> tasks where it populates the Map: checkpointedOffsets from the
> standbyTask.checkpointedOffsets() which is only updated at the time of
> initialization of a StandbyTask(i.e. in it's constructor). I have checked
> and this goes way to 1.1 version when the rebalance protocol was old and
> standby tasks were suspended during rebalance and then resumed on
> assignment.
> I want to know, why post resumption we were/are reading
> standbyTask.checkpointedOffsets() to know the offset from where the standby
> task should start running and not from stateMgr.checkpointed() which gets
> updated on every commit to the checkpoint file. In the former case it's
> always reading from the same offset, even those which it had already read
> earlier and in cases where changelog topic has a retention time, it gives
> offsetOutOfRange exception.
> Regards,
> Navinder



-- 
-- Guozhang
  


-- 
-- Guozhang



-- 
-- Guozhang
  

[jira] [Created] (KAFKA-9169) Standby Tasks point ask for incorrect offsets on resuming post suspension

2019-11-10 Thread Navinder Brar (Jira)
Navinder Brar created KAFKA-9169:


 Summary: Standby Tasks point ask for incorrect offsets on resuming 
post suspension
 Key: KAFKA-9169
 URL: https://issues.apache.org/jira/browse/KAFKA-9169
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Navinder Brar


In versions(check 2.0) where standby tasks are suspended on each rebalance the 
checkpoint file is updated post the flush and the expected behaviour is that 
post assignment the same standby task gets assigned back on the machine it will 
start reading data from changelog from the same offset from it left off. 

 

But there looks like a bug in the code, every time post rebalance it starts 
reading from the offset from where it read the first time the task was assigned 
on this machine. This has 2 repercussions:
 # After every rebalance the standby tasks start restoring huge amount of data 
which they have already restored earlier(Verified this via 300x increase 
Network IO on all streams instances post rebalance even when no change in 
assignment) .
 # If changelog has time retention those offsets will not be available in the 
changelog, which leads to offsetOutOfRange exceptions and the stores get 
deleted and recreated again.

 

I have gone through the code and I think I know the issue.

In TaskManager# updateNewAndRestoringTasks(), the function 
assignStandbyPartitions() gets called for all the running standby tasks where 
it populates the Map: checkpointedOffsets from the 
standbyTask.checkpointedOffsets() which is only updated at the time of 
initialization of a StandbyTask(i.e. in it's constructor). 

 

This has an easy fix.

Post resumption we are reading standbyTask.checkpointedOffsets() to know the 
offset from where the standby task should start running and not from 
stateMgr.checkpointed() which gets updated on every commit to the checkpoint 
file. In the former case it's always reading from the same offset, even those 
which it had already read earlier and in cases where changelog topic has a 
retention time, it gives offsetOutOfRange exception. So, 
standbyTask.checkpointedOffsets() is quite useless and we should use 
stateMgr.checkpointed() instead to return offsets to task manager.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-08 Thread Navinder Brar
ching effects.

3. Misc.:

3.1 There's a typo on the pseudo code "globalLagInforation". Also it seems
not describing how that information is collected (personally I also feel
one "lagInfoForStores" is sufficient).
3.2 Note there's a slight semantical difference between active and
standby's "partitions" inside StreamsMetadata, for active tasks the
partitions are actually input topic partitions for the task: some of them
may also act as changelog topics but these are exceptional cases; for
standby tasks the "standbyTopicPartitions" are actually the changelog
topics of the task. So maybe renaming it to "standbyChangelogPartitions" to
differentiate it?


Overall I think this would be a really good KIP to add to Streams, thank
you so much!


Guozhang

On Wed, Nov 6, 2019 at 8:47 PM Navinder Brar
 wrote:

> +1 on implementing offset based lag for now and push time-based lag to a
> later point in time when broker changes are done. Although time-based lag
> enhances the readability, it would not be a make or break change for
> implementing this KIP.
>
> Vinoth has explained the role of KeyQueryMetadata, let me in add in my 2
> cents as well.
>    - There are basically 2 reasons. One is that instead of having two
> functions, one to get StreamsMetadata for active and one for replicas. We
> are fetching both in a single call and we have a way to get only active or
> only replicas from the KeyQueryMetadata object(just like isStandby() and
> isActive() discussion we had earlier)
>    - Since even after fetching the metadata now we have a requirement of
> fetching the topicPartition for which the query came:- to fetch lag for
> that specific topicPartition. Instead of having another call to fetch the
> partition from StreamsMetadataState we thought using one single call and
> fetching partition and all metadata would be better.
>    - Another option was to change StreamsMetadata object and add
> topicPartition in that for which the query came but it doesn’t make sense
> in terms of semantics as it StreamsMetadata. Also, KeyQueryMetadata
> represents all the metadata for the Key being queried, i.e. the partition
> it belongs to and the list of StreamsMetadata(hosts) active or replica
> where the key could be found.
>
>
>
>
>
>    On Thursday, 7 November, 2019, 01:53:36 am IST, Vinoth Chandar <
> vchan...@confluent.io> wrote:
>
>  +1 to John, suggestion on Duration/Instant and dropping the API to fetch
> all store's lags. However, I do think we need to return lags per topic
> partition. So not sure if single return value would work? We need some new
> class that holds a TopicPartition and Duration/Instant variables together?
>
> 10) Because we needed to return the topicPartition the key belongs to, in
> order to correlate with the lag information from the other set of APIs.
> Otherwise, we don't know which topic partition's lag estimate to use. We
> tried to illustrate this on the example code. StreamsMetadata is simply
> capturing state of a streams host/instance, where as TopicPartition depends
> on the key passed in. This is a side effect of our decision to decouple lag
> based filtering on the metadata apis.
>
> 20) Goes back to the previous point. We needed to return information that
> is key specific, at which point it seemed natural for the KeyQueryMetadata
> to contain active, standby, topic partition for that key. If we merely
> returned a standbyMetadataForKey() -> Collection standby,
> an active metadataForKey() -> StreamsMetadata, and new
> getTopicPartition(key) -> topicPartition object back to the caller, then
> arguably you could do the same kind of correlation. IMO having a the
> KeyQueryMetadata class to encapsulate all this is a friendlier API.
>  allStandbyMetadata() and allStandbyMetadataForStore() are just counter
> parts for metadataForStore() and allMetadata() that we introduce mostly for
> consistent API semantics. (their presence implicitly could help denote
> metadataForStore() is for active instances. Happy to drop them if their
> utility is not clear)
>
> 30) This would assume we refresh all the standby lag information every
> time we query for that StreamsMetadata for a specific store? For time based
> lag, this will involve fetching the tail kafka record at once from multiple
> kafka topic partitions? I would prefer not to couple them like this and
> have the ability to make granular store (or even topic partition level)
> fetches for lag information.
>
> 32) I actually prefer John's suggestion to let the application drive the
> lag fetches/updation and not have flags as the KIP current points to. Are
> you reexamining that position?
>
> On fetching lag information, +1 we could do this much more efficiently with
> a broker c

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-06 Thread Navinder Brar
 By explicitly sending
> >>> the
> >>>> global assignment maps as actual topic partitions,  group coordinator
> >>> (i.e
> >>>> the leader that computes the assignment's ) is able to consistently
> >>> enforce
> >>>> its view of the topic metadata. Still don't think doing such a change
> >>> that
> >>>> forces you to reconsider semantics, is not needed to save bits on
> wire.
> >>> May
> >>>> be we can discuss this separately from this KIP?
> >>>>
> >>>> 4. There needs to be some caching/interval somewhere though since we
> >>> don't
> >>>> want to make 1 kafka read per 1 IQ potentially. But I think its a
> valid
> >>>> suggestion, to make this call just synchronous and leave the caching
> or
> >>> how
> >>>> often you want to call to the application. Would it be good to then
> >> break
> >>>> up the APIs for time and offset based lag?  We can obtain offset based
> >>> lag
> >>>> for free? Only incur the overhead of reading kafka if we want time
> >>>> based lags?
> >>>>
> >>>> On Fri, Nov 1, 2019 at 2:49 PM Sophie Blee-Goldman <
> >> sop...@confluent.io>
> >>>> wrote:
> >>>>
> >>>>> Adding on to John's response to 3), can you clarify when and why
> >>> exactly we
> >>>>> cannot
> >>>>> convert between taskIds and partitions? If that's really the case I
> >>> don't
> >>>>> feel confident
> >>>>> that the StreamsPartitionAssignor is not full of bugs...
> >>>>>
> >>>>> It seems like it currently just encodes a list of all partitions (the
> >>>>> assignment) and also
> >>>>> a list of the corresponding task ids, duplicated to ensure each
> >>> partition
> >>>>> has the corresponding
> >>>>> taskId at the same offset into the list. Why is that problematic?
> >>>>>
> >>>>>
> >>>>> On Fri, Nov 1, 2019 at 12:39 PM John Roesler 
> >>> wrote:
> >>>>>
> >>>>>> Thanks, all, for considering the points!
> >>>>>>
> >>>>>> 3. Interesting. I have a vague recollection of that... Still,
> >> though,
> >>>>>> it seems a little fishy. After all, we return the assignments
> >>>>>> themselves as task ids, and the members have to map these to topic
> >>>>>> partitions in order to configure themselves properly. If it's too
> >>>>>> complicated to get this right, then how do we know that Streams is
> >>>>>> computing the correct partitions at all?
> >>>>>>
> >>>>>> 4. How about just checking the log-end timestamp when you call the
> >>>>>> method? Then, when you get an answer, it's as fresh as it could
> >>>>>> possibly be. And as a user you have just one, obvious, "knob" to
> >>>>>> configure how much overhead you want to devote to checking... If
> >> you
> >>>>>> want to call the broker API less frequently, you just call the
> >>> Streams
> >>>>>> API less frequently. And you don't have to worry about the
> >>>>>> relationship between your invocations of that method and the config
> >>>>>> setting (e.g., you'll never get a negative number, which you could
> >> if
> >>>>>> you check the log-end timestamp less frequently than you check the
> >>>>>> lag).
> >>>>>>
> >>>>>> Thanks,
> >>>>>> -John
> >>>>>>
> >>>>>> On Thu, Oct 31, 2019 at 11:52 PM Navinder Brar
> >>>>>>  wrote:
> >>>>>>>
> >>>>>>> Thanks John for going through this.
> >>>>>>>
> >>>>>>>    - +1, makes sense
> >>>>>>>    - +1, no issues there
> >>>>>>>    - Yeah the initial patch I had submitted for K-7149(
> >>>>>> https://github.com/apache/kafka/pull/6935) to reduce
> >> assignmentInfo
> >>>>>> object had taskIds but the merged PR had similar size according to
> >>> Vinoth
> >>>>>> and it 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-05 Thread Navinder Brar
 time-lag, and another for offset-lag,
> >> > so are they not already broken up? FWIW, yes, I agree, the offset lag
> >> > is already locally known, so we don't need to build in an extra
> >> > synchronous broker API call, just one for the time-lag.
> >> >
> >> > Thanks again for the discussion,
> >> > -John
> >> >
> >> > On Mon, Nov 4, 2019 at 11:17 AM Vinoth Chandar 
> >> > wrote:
> >> > >
> >> > > 3. Right now, we still get the topic partitions assigned as a part of
> >> the
> >> > > top level Assignment object (the one that wraps AssignmentInfo) and
> >> use
> >> > > that to convert taskIds back. This list of only contains assignments
> >> for
> >> > > that particular instance. Attempting to also reverse map for "all" the
> >> > > tasksIds in the streams cluster i.e all the topic partitions in these
> >> > > global assignment maps was what was problematic. By explicitly sending
> >> > the
> >> > > global assignment maps as actual topic partitions,  group coordinator
> >> > (i.e
> >> > > the leader that computes the assignment's ) is able to consistently
> >> > enforce
> >> > > its view of the topic metadata. Still don't think doing such a change
> >> > that
> >> > > forces you to reconsider semantics, is not needed to save bits on
> >> wire.
> >> > May
> >> > > be we can discuss this separately from this KIP?
> >> > >
> >> > > 4. There needs to be some caching/interval somewhere though since we
> >> > don't
> >> > > want to make 1 kafka read per 1 IQ potentially. But I think its a
> >> valid
> >> > > suggestion, to make this call just synchronous and leave the caching
> >> or
> >> > how
> >> > > often you want to call to the application. Would it be good to then
> >> break
> >> > > up the APIs for time and offset based lag?  We can obtain offset based
> >> > lag
> >> > > for free? Only incur the overhead of reading kafka if we want time
> >> > > based lags?
> >> > >
> >> > > On Fri, Nov 1, 2019 at 2:49 PM Sophie Blee-Goldman <
> >> sop...@confluent.io>
> >> > > wrote:
> >> > >
> >> > > > Adding on to John's response to 3), can you clarify when and why
> >> > exactly we
> >> > > > cannot
> >> > > > convert between taskIds and partitions? If that's really the case I
> >> > don't
> >> > > > feel confident
> >> > > > that the StreamsPartitionAssignor is not full of bugs...
> >> > > >
> >> > > > It seems like it currently just encodes a list of all partitions
> >> (the
> >> > > > assignment) and also
> >> > > > a list of the corresponding task ids, duplicated to ensure each
> >> > partition
> >> > > > has the corresponding
> >> > > > taskId at the same offset into the list. Why is that problematic?
> >> > > >
> >> > > >
> >> > > > On Fri, Nov 1, 2019 at 12:39 PM John Roesler 
> >> > wrote:
> >> > > >
> >> > > > > Thanks, all, for considering the points!
> >> > > > >
> >> > > > > 3. Interesting. I have a vague recollection of that... Still,
> >> though,
> >> > > > > it seems a little fishy. After all, we return the assignments
> >> > > > > themselves as task ids, and the members have to map these to topic
> >> > > > > partitions in order to configure themselves properly. If it's too
> >> > > > > complicated to get this right, then how do we know that Streams is
> >> > > > > computing the correct partitions at all?
> >> > > > >
> >> > > > > 4. How about just checking the log-end timestamp when you call the
> >> > > > > method? Then, when you get an answer, it's as fresh as it could
> >> > > > > possibly be. And as a user you have just one, obvious, "knob" to
> >> > > > > configure how much overhead you want to devote to checking... If
> >> you
> >> > > > > want to call the broker API less frequently, you just call the
> >

Why standby tasks read from the StandbyTasks::checkpointedOffsets in assignStandbyPartitions()

2019-11-04 Thread Navinder Brar
Hi,
Please let me know if this is not the correct forum to ask this. But I have a 
doubt, I was hoping someone can clear it for me.
In TaskManager:: updateNewAndRestoringTasks(), the function 
assignStandbyPartitions() gets called for all the running standby tasks where 
it populates the Map: checkpointedOffsets from the 
standbyTask.checkpointedOffsets() which is only updated at the time of 
initialization of a StandbyTask(i.e. in it's constructor). I have checked and 
this goes way to 1.1 version when the rebalance protocol was old and standby 
tasks were suspended during rebalance and then resumed on assignment.
I want to know, why post resumption we were/are reading 
standbyTask.checkpointedOffsets() to know the offset from where the standby 
task should start running and not from stateMgr.checkpointed() which gets 
updated on every commit to the checkpoint file. In the former case it's always 
reading from the same offset, even those which it had already read earlier and 
in cases where changelog topic has a retention time, it gives offsetOutOfRange 
exception.
Regards,
Navinder

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-31 Thread Navinder Brar
Thanks John for going through this.
   
   - +1, makes sense
   - +1, no issues there
   - Yeah the initial patch I had submitted for 
K-7149(https://github.com/apache/kafka/pull/6935) to reduce assignmentInfo 
object had taskIds but the merged PR had similar size according to Vinoth and 
it was simpler so if the end result is of same size, it would not make sense to 
pivot from dictionary and again move to taskIDs.
   - Not sure about what a good default would be if we don't have a 
configurable setting. This gives the users the flexibility to the users to 
serve their requirements as at the end of the day it would take CPU cycles. I 
am ok with starting it with a default and see how it goes based upon feedback.

Thanks,
Navinder
On Friday, 1 November, 2019, 03:46:42 am IST, Vinoth Chandar 
 wrote:  
 
 1. Was trying to spell them out separately. but makes sense for
readability. done

2. No I immediately agree :) .. makes sense. @navinder?

3. I actually attempted only sending taskIds while working on KAFKA-7149.
Its non-trivial to handle edges cases resulting from newly added topic
partitions and wildcarded topic entries. I ended up simplifying it to just
dictionary encoding the topic names to reduce size. We can apply the same
technique here for this map. Additionally, we could also dictionary encode
HostInfo, given its now repeated twice. I think this would save more space
than having a flag per topic partition entry. Lmk if you are okay with
this.

4. This opens up a good discussion. Given we support time lag estimates
also, we need to read the tail record of the changelog periodically (unlike
offset lag, which we can potentially piggyback on metadata in
ConsumerRecord IIUC). we thought we should have a config that control how
often this read happens? Let me know if there is a simple way to get
timestamp value of the tail record that we are missing.

On Thu, Oct 31, 2019 at 12:58 PM John Roesler  wrote:

> Hey Navinder,
>
> Thanks for updating the KIP, it's a lot easier to see the current
> state of the proposal now.
>
> A few remarks:
> 1. I'm sure it was just an artifact of revisions, but you have two
> separate sections where you list additions to the KafkaStreams
> interface. Can you consolidate those so we can see all the additions
> at once?
>
> 2. For messageLagEstimate, can I suggest "offsetLagEstimate" instead,
> to be clearer that we're specifically measuring a number of offsets?
> If you don't immediately agree, then I'd at least point out that we
> usually refer to elements of Kafka topics as "records", not
> "messages", so "recordLagEstimate" might be more appropriate.
>
> 3. The proposal mentions adding a map of the standby _partitions_ for
> each host to AssignmentInfo. I assume this is designed to mirror the
> existing "partitionsByHost" map. To keep the size of these metadata
> messages down, maybe we can consider making two changes:
> (a) for both actives and standbys, encode the _task ids_ instead of
> _partitions_. Every member of the cluster has a copy of the topology,
> so they can convert task ids into specific partitions on their own,
> and task ids are only (usually) three characters.
> (b) instead of encoding two maps (hostinfo -> actives AND hostinfo ->
> standbys), which requires serializing all the hostinfos twice, maybe
> we can pack them together in one map with a structured value (hostinfo
> -> [actives,standbys]).
> Both of these ideas still require bumping the protocol version to 6,
> and they basically mean we drop the existing `PartitionsByHost` field
> and add a new `TasksByHost` field with the structured value I
> mentioned.
>
> 4. Can we avoid adding the new "lag refresh" config? The lags would
> necessarily be approximate anyway, so adding the config seems to
> increase the operational complexity of the system for little actual
> benefit.
>
> Thanks for the pseudocode, by the way, it really helps visualize how
> these new interfaces would play together. And thanks again for the
> update!
> -John
>
> On Thu, Oct 31, 2019 at 2:41 PM John Roesler  wrote:
> >
> > Hey Vinoth,
> >
> > I started going over the KIP again yesterday. There are a lot of
> > updates, and I didn't finish my feedback in one day. I'm working on it
> > now.
> >
> > Thanks,
> > John
> >
> > On Thu, Oct 31, 2019 at 11:42 AM Vinoth Chandar 
> wrote:
> > >
> > > Wondering if anyone has thoughts on these changes? I liked that the new
> > > metadata fetch APIs provide all the information at once with consistent
> > > naming..
> > >
> > > Any guidance on what you would like to be discussed or fleshed out more
> > > before we call a VOTE?
> > >
> &g

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-30 Thread Navinder Brar
Hi,
We have made some edits in the 
KIP(https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance)
 after due deliberation on the agreed design to support the new query design. 
This includes the new public API to query offset/time lag information and other 
details related to querying standby tasks which have come up after thinking of 
thorough details. 


   
   - Addition of new config, “lag.fetch.interval.ms” to configure the interval 
of time/offset lag
   - Addition of new class StoreLagInfo to store the periodically obtained 
time/offset lag 
   - Addition of two new functions in KafkaStreams, List 
allLagInfo() and List lagInfoForStore(String storeName) to return 
the lag information for an instance and a store respectively
   - Addition of new class KeyQueryMetadata. We need topicPartition for each 
key to be matched with the lag API for the topic partition. One way is to add 
new functions and fetch topicPartition from StreamsMetadataState but we thought 
having one call and fetching StreamsMetadata and topicPartition is more cleaner.
   - Renaming partitionsForHost to activePartitionsForHost in 
StreamsMetadataState and partitionsByHostState to activePartitionsByHostState 
in StreamsPartitionAssignor
   - We have also added the pseudo code of how all the changes will exist 
together and support the new querying APIs

Please let me know if anything is pending now, before a vote can be started on 
this.   On Saturday, 26 October, 2019, 05:41:44 pm IST, Navinder Brar 
 wrote:  
 
 >> Since there are two soft votes for separate active/standby API methods, I 
 >>also change my position on that. Fine with 2 separate methods. Once we 
 >>remove the lag information from these APIs, returning a List is less 
 >>attractive, since the ordering has no special meaning now.
Agreed, now that we are not returning lag, I am also sold on having two 
separate functions. We already have one which returns streamsMetadata for 
active tasks, and now we can add another one for standbys.



    On Saturday, 26 October, 2019, 03:55:16 am IST, Vinoth Chandar 
 wrote:  
 
 +1 to Sophie's suggestion. Having both lag in terms of time and offsets is
good and makes for a more complete API.

Since there are two soft votes for separate active/standby API methods, I
also change my position on that. Fine with 2 separate methods.
Once we remove the lag information from these APIs, returning a List is
less attractive, since the ordering has no special meaning now.

>> lag in offsets vs time: Having both, as suggested by Sophie would of
course be best. What is a little unclear to me is, how in details are we
going to compute both?
@navinder may be next step is to flesh out these details and surface any
larger changes we need to make if need be.

Any other details we need to cover, before a VOTE can be called on this?


On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck  wrote:

> I am jumping in a little late here.
>
> Overall I agree with the proposal to push decision making on what/how to
> query in the query layer.
>
> For point 5 from above, I'm slightly in favor of having a new method,
> "standbyMetadataForKey()" or something similar.
> Because even if we return all tasks in one list, the user will still have
> to perform some filtering to separate the different tasks, so I don't feel
> making two calls is a burden, and IMHO makes things more transparent for
> the user.
> If the final vote is for using an "isActive" field, I'm good with that as
> well.
>
> Just my 2 cents.
>
> On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar
>  wrote:
>
> > I think now we are aligned on almost all the design parts. Summarising
> > below what has been discussed above and we have a general consensus on.
> >
> >
> >    - Rather than broadcasting lag across all nodes at rebalancing/with
> the
> > heartbeat, we will just return a list of all available standby’s in the
> > system and the user can make IQ query any of those nodes which will
> return
> > the response, and the lag and offset time. Based on which user can decide
> > if he wants to return the response back or call another standby.
> >    -  The current metadata query frequency will not change. It will be
> the
> > same as it does now, i.e. before each query.
> >
> >    -  For fetching list in StreamsMetadataState.java and
> > List in StreamThreadStateStoreProvider.java
> (which
> > will return all active stores which are running/restoring and replica
> > stores which are running), we will add new functions and not disturb the
> > existing functions
> >
> >    - There is no need to add new StreamsConfig for implementing this KIP
> >
> >    - We will add standbyPartitionsByHost in AssignmentInfo 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-26 Thread Navinder Brar
>> Since there are two soft votes for separate active/standby API methods, I 
>>also change my position on that. Fine with 2 separate methods. Once we remove 
>>the lag information from these APIs, returning a List is less attractive, 
>>since the ordering has no special meaning now.
Agreed, now that we are not returning lag, I am also sold on having two 
separate functions. We already have one which returns streamsMetadata for 
active tasks, and now we can add another one for standbys.



On Saturday, 26 October, 2019, 03:55:16 am IST, Vinoth Chandar 
 wrote:  
 
 +1 to Sophie's suggestion. Having both lag in terms of time and offsets is
good and makes for a more complete API.

Since there are two soft votes for separate active/standby API methods, I
also change my position on that. Fine with 2 separate methods.
Once we remove the lag information from these APIs, returning a List is
less attractive, since the ordering has no special meaning now.

>> lag in offsets vs time: Having both, as suggested by Sophie would of
course be best. What is a little unclear to me is, how in details are we
going to compute both?
@navinder may be next step is to flesh out these details and surface any
larger changes we need to make if need be.

Any other details we need to cover, before a VOTE can be called on this?


On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck  wrote:

> I am jumping in a little late here.
>
> Overall I agree with the proposal to push decision making on what/how to
> query in the query layer.
>
> For point 5 from above, I'm slightly in favor of having a new method,
> "standbyMetadataForKey()" or something similar.
> Because even if we return all tasks in one list, the user will still have
> to perform some filtering to separate the different tasks, so I don't feel
> making two calls is a burden, and IMHO makes things more transparent for
> the user.
> If the final vote is for using an "isActive" field, I'm good with that as
> well.
>
> Just my 2 cents.
>
> On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar
>  wrote:
>
> > I think now we are aligned on almost all the design parts. Summarising
> > below what has been discussed above and we have a general consensus on.
> >
> >
> >    - Rather than broadcasting lag across all nodes at rebalancing/with
> the
> > heartbeat, we will just return a list of all available standby’s in the
> > system and the user can make IQ query any of those nodes which will
> return
> > the response, and the lag and offset time. Based on which user can decide
> > if he wants to return the response back or call another standby.
> >    -  The current metadata query frequency will not change. It will be
> the
> > same as it does now, i.e. before each query.
> >
> >    -  For fetching list in StreamsMetadataState.java and
> > List in StreamThreadStateStoreProvider.java
> (which
> > will return all active stores which are running/restoring and replica
> > stores which are running), we will add new functions and not disturb the
> > existing functions
> >
> >    - There is no need to add new StreamsConfig for implementing this KIP
> >
> >    - We will add standbyPartitionsByHost in AssignmentInfo and
> > StreamsMetadataState which would change the existing rebuildMetadata()
> and
> > setPartitionsByHostState()
> >
> >
> >
> > If anyone has any more concerns please feel free to add. Post this I will
> > be initiating a vote.
> > ~Navinder
> >
> >    On Friday, 25 October, 2019, 12:05:29 pm IST, Matthias J. Sax <
> > matth...@confluent.io> wrote:
> >
> >  Just to close the loop @Vinoth:
> >
> > > 1. IIUC John intends to add (or we can do this in this KIP) lag
> > information
> > > to AssignmentInfo, which gets sent to every participant.
> >
> > As explained by John, currently KIP-441 plans to only report the
> > information to the leader. But I guess, with the new proposal to not
> > broadcast this information anyway, this concern is invalidated anyway
> >
> > > 2. At-least I was under the assumption that it can be called per query,
> > > since the API docs don't seem to suggest otherwise. Do you see any
> > > potential issues if we call this every query? (we should benchmark this
> > > nonetheless)
> >
> > I did not see a real issue if people refresh the metadata frequently,
> > because it would be a local call. My main point was, that this would
> > change the current usage pattern of the API, and we would clearly need
> > to communicate this change. Similar to (1), this concern in invalidated
> > anyway.
> >
> >
>

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-25 Thread Navinder Brar
t;>>>> to AssignmentInfo, which gets sent to every participant.
>>>>>
>>>>> 2. At-least I was under the assumption that it can be called per
>> query,
>>>>> since the API docs don't seem to suggest otherwise. Do you see any
>>>>> potential issues if we call this every query? (we should benchmark
>> this
>>>>> nonetheless)
>>>>>
>>>>> 4. Agree. metadataForKey() implicitly would return the active host
>>>> metadata
>>>>> (as it was before). We should also document this in that APIs
>> javadoc,
>>>>> given we have another method(s) that returns more host metadata now.
>>>>>
>>>>> 5.  While I see the point, the app/caller has to make two different
>>> APIs
>>>>> calls to obtain active/standby and potentially do the same set of
>>>> operation
>>>>> to query the state. I personally still like a method like isActive()
>>>>> better, but don't have strong opinions.
>>>>>
>>>>> 9. If we do expose the lag information, could we just leave it upto
>> to
>>>> the
>>>>> caller to decide whether it errors out or not and not make the
>> decision
>>>>> within Streams? i.e we don't need a new config
>>>>>
>>>>> 14. +1 . If it's easier to do right away. We started with number of
>>>>> records, following the lead from KIP-441
>>>>>
>>>>> On Wed, Oct 23, 2019 at 5:44 AM Navinder Brar
>>>>>  wrote:
>>>>>
>>>>>>
>>>>>> Thanks, everyone for taking a look. Some very cool ideas have flown
>>> in.
>>>>>>
>>>>>>>> There was a follow-on idea I POCed to continuously share lag
>>>>>> information in the heartbeat protocol+1 that would be great, I will
>>>> update
>>>>>> the KIP assuming this work will finish soon
>>>>>>>> I think that adding a new method to StreamsMetadataState and
>>>>>> deprecating the existing method isthe best way to go; we just can't
>>>> change
>>>>>> the return types of any existing methods.+1 on this, we will add
>> new
>>>>>> methods for users who would be interested in querying back a list
>> of
>>>>>> possible options to query from and leave the current function
>>>>>> getStreamsMetadataForKey() untouched for users who want absolute
>>>>>> consistency.
>>>>>>>> why not just always return all available metadata (including
>>>>>> active/standby or lag) and let the caller decide to which node they
>>>> want to
>>>>>> route the query+1. I think this makes sense as from a user
>> standpoint
>>>> there
>>>>>> is no difference b/w an active and a standby if both have same lag,
>>>> Infact
>>>>>> users would be able to use this API to reduce query load on
>> actives,
>>> so
>>>>>> returning all available options along with the current lag in each
>>>> would
>>>>>> make sense and leave it to user how they want to use this data.
>> This
>>>> has
>>>>>> another added advantage. If a user queries any random machine for a
>>>> key and
>>>>>> that machine has a replica for the partition(where key belongs)
>> user
>>>> might
>>>>>> choose to serve the data from there itself(if it doesn’t lag much)
>>>> rather
>>>>>> than finding the active and making an IQ to that. This would save
>>> some
>>>>>> critical time in serving for some applications.
>>>>>>>> Adding the lag in terms of timestamp diff comparing the
>> committed
>>>>>> offset.+1 on this, I think it’s more readable. But as John said the
>>>>>> function allMetadataForKey() is just returning the possible options
>>>> from
>>>>>> where users can query a key, so we can even drop the parameter
>>>>>> enableReplicaServing/tolerableDataStaleness and just return all the
>>>>>> streamsMetadata containing that key along with the offset limit.
>>>>>> Answering the questions posted by Matthias in sequence.
>>>>>> 1. @John can you please comment on this one.2. Yeah the usage
>> pattern
>>>>>> would i

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-25 Thread Navinder Brar
 which gets sent to every participant.
>>>>>
>>>>> 2. At-least I was under the assumption that it can be called per
>> query,
>>>>> since the API docs don't seem to suggest otherwise. Do you see any
>>>>> potential issues if we call this every query? (we should benchmark
>> this
>>>>> nonetheless)
>>>>>
>>>>> 4. Agree. metadataForKey() implicitly would return the active host
>>>> metadata
>>>>> (as it was before). We should also document this in that APIs
>> javadoc,
>>>>> given we have another method(s) that returns more host metadata now.
>>>>>
>>>>> 5.  While I see the point, the app/caller has to make two different
>>> APIs
>>>>> calls to obtain active/standby and potentially do the same set of
>>>> operation
>>>>> to query the state. I personally still like a method like isActive()
>>>>> better, but don't have strong opinions.
>>>>>
>>>>> 9. If we do expose the lag information, could we just leave it upto
>> to
>>>> the
>>>>> caller to decide whether it errors out or not and not make the
>> decision
>>>>> within Streams? i.e we don't need a new config
>>>>>
>>>>> 14. +1 . If it's easier to do right away. We started with number of
>>>>> records, following the lead from KIP-441
>>>>>
>>>>> On Wed, Oct 23, 2019 at 5:44 AM Navinder Brar
>>>>>  wrote:
>>>>>
>>>>>>
>>>>>> Thanks, everyone for taking a look. Some very cool ideas have flown
>>> in.
>>>>>>
>>>>>>>> There was a follow-on idea I POCed to continuously share lag
>>>>>> information in the heartbeat protocol+1 that would be great, I will
>>>> update
>>>>>> the KIP assuming this work will finish soon
>>>>>>>> I think that adding a new method to StreamsMetadataState and
>>>>>> deprecating the existing method isthe best way to go; we just can't
>>>> change
>>>>>> the return types of any existing methods.+1 on this, we will add
>> new
>>>>>> methods for users who would be interested in querying back a list
>> of
>>>>>> possible options to query from and leave the current function
>>>>>> getStreamsMetadataForKey() untouched for users who want absolute
>>>>>> consistency.
>>>>>>>> why not just always return all available metadata (including
>>>>>> active/standby or lag) and let the caller decide to which node they
>>>> want to
>>>>>> route the query+1. I think this makes sense as from a user
>> standpoint
>>>> there
>>>>>> is no difference b/w an active and a standby if both have same lag,
>>>> Infact
>>>>>> users would be able to use this API to reduce query load on
>> actives,
>>> so
>>>>>> returning all available options along with the current lag in each
>>>> would
>>>>>> make sense and leave it to user how they want to use this data.
>> This
>>>> has
>>>>>> another added advantage. If a user queries any random machine for a
>>>> key and
>>>>>> that machine has a replica for the partition(where key belongs)
>> user
>>>> might
>>>>>> choose to serve the data from there itself(if it doesn’t lag much)
>>>> rather
>>>>>> than finding the active and making an IQ to that. This would save
>>> some
>>>>>> critical time in serving for some applications.
>>>>>>>> Adding the lag in terms of timestamp diff comparing the
>> committed
>>>>>> offset.+1 on this, I think it’s more readable. But as John said the
>>>>>> function allMetadataForKey() is just returning the possible options
>>>> from
>>>>>> where users can query a key, so we can even drop the parameter
>>>>>> enableReplicaServing/tolerableDataStaleness and just return all the
>>>>>> streamsMetadata containing that key along with the offset limit.
>>>>>> Answering the questions posted by Matthias in sequence.
>>>>>> 1. @John can you please comment on this one.2. Yeah the usage
>> pattern
>>>>>> would include querying this p

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-25 Thread Navinder Brar
er method(s) that returns more host metadata now.
> > >
> > > 5.  While I see the point, the app/caller has to make two different
> APIs
> > > calls to obtain active/standby and potentially do the same set of
> > operation
> > > to query the state. I personally still like a method like isActive()
> > > better, but don't have strong opinions.
> > >
> > > 9. If we do expose the lag information, could we just leave it upto to
> > the
> > > caller to decide whether it errors out or not and not make the decision
> > > within Streams? i.e we don't need a new config
> > >
> > > 14. +1 . If it's easier to do right away. We started with number of
> > > records, following the lead from KIP-441
> > >
> > > On Wed, Oct 23, 2019 at 5:44 AM Navinder Brar
> > >  wrote:
> > >
> > > >
> > > > Thanks, everyone for taking a look. Some very cool ideas have flown
> in.
> > > >
> > > > >> There was a follow-on idea I POCed to continuously share lag
> > > > information in the heartbeat protocol+1 that would be great, I will
> > update
> > > > the KIP assuming this work will finish soon
> > > > >> I think that adding a new method to StreamsMetadataState and
> > > > deprecating the existing method isthe best way to go; we just can't
> > change
> > > > the return types of any existing methods.+1 on this, we will add new
> > > > methods for users who would be interested in querying back a list of
> > > > possible options to query from and leave the current function
> > > > getStreamsMetadataForKey() untouched for users who want absolute
> > > > consistency.
> > > > >> why not just always return all available metadata (including
> > > > active/standby or lag) and let the caller decide to which node they
> > want to
> > > > route the query+1. I think this makes sense as from a user standpoint
> > there
> > > > is no difference b/w an active and a standby if both have same lag,
> > Infact
> > > > users would be able to use this API to reduce query load on actives,
> so
> > > > returning all available options along with the current lag in each
> > would
> > > > make sense and leave it to user how they want to use this data. This
> > has
> > > > another added advantage. If a user queries any random machine for a
> > key and
> > > > that machine has a replica for the partition(where key belongs) user
> > might
> > > > choose to serve the data from there itself(if it doesn’t lag much)
> > rather
> > > > than finding the active and making an IQ to that. This would save
> some
> > > > critical time in serving for some applications.
> > > > >> Adding the lag in terms of timestamp diff comparing the committed
> > > > offset.+1 on this, I think it’s more readable. But as John said the
> > > > function allMetadataForKey() is just returning the possible options
> > from
> > > > where users can query a key, so we can even drop the parameter
> > > > enableReplicaServing/tolerableDataStaleness and just return all the
> > > > streamsMetadata containing that key along with the offset limit.
> > > > Answering the questions posted by Matthias in sequence.
> > > > 1. @John can you please comment on this one.2. Yeah the usage pattern
> > > > would include querying this prior to every request 3. Will add the
> > changes
> > > > to StreamsMetadata in the KIP, would include changes in
> > rebuildMetadata()
> > > > etc.4. Makes sense, already addressed above5. Is it important from a
> > user
> > > > perspective if they are querying an  active(processing),
> > active(restoring),
> > > > a standby task if we have away of denoting lag in a readable manner
> > which
> > > > kind of signifies the user that this is the best node to query the
> > fresh
> > > > data.6. Yes, I intend to return the actives and replicas in the same
> > return
> > > > list in allMetadataForKey()7. tricky8. yes, we need new functions to
> > return
> > > > activeRestoring and standbyRunning tasks.9. StreamsConfig doesn’t
> look
> > like
> > > > of much use to me since we are giving all possible options via this
> > > > function, or they can use existing function
> getStreamsMetadataForKey()
> > and
> > > > get just the active10. I think tre

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-23 Thread Navinder Brar
t;>>> more general to refactor it to "allMetadataForKey(long
>>> tolerableDataStaleness, ...)", and when it's set to 0 it means "active task
>>> only".
>>> +1 IMO if we plan on having `enableReplicaServing`, it makes sense to
>>> generalize based on dataStaleness. This seems complementary to exposing the
>>> lag information itself.
>>>
>>>>> This is actually not a public api change at all, and I'm planning to
>>> implement it asap as a precursor to the rest of KIP-441
>>> +1 again. Do we have a concrete timeline for when this change will land on
>>> master? I would like to get the implementation wrapped up (as much as
>>> possible) by end of the month. :). But I agree this sequencing makes
>>> sense..
>>>
>>>
>>> On Mon, Oct 21, 2019 at 2:56 PM Guozhang Wang  wrote:
>>>
>>>> Hi Navinder,
>>>>
>>>> Thanks for the KIP, I have a high level question about the proposed API
>>>> regarding:
>>>>
>>>> "StreamsMetadataState::allMetadataForKey(boolean enableReplicaServing...)"
>>>>
>>>> I'm wondering if it's more general to refactor it to
>>>> "allMetadataForKey(long tolerableDataStaleness, ...)", and when it's set to
>>>> 0 it means "active task only". Behind the scene, we can have the committed
>>>> offsets to encode the stream time as well, so that when processing standby
>>>> tasks the stream process knows not long the lag in terms of offsets
>>>> comparing to the committed offset (internally we call it offset limit), but
>>>> also the lag in terms of timestamp diff comparing the committed offset.
>>>>
>>>> Also encoding the timestamp as part of offset have other benefits for
>>>> improving Kafka Streams time semantics as well, but for KIP-535 itself I
>>>> think it can help giving users a more intuitive interface to reason about.
>>>>
>>>>
>>>> Guozhang
>>>>
>>>> On Mon, Oct 21, 2019 at 12:30 PM John Roesler  wrote:
>>>>
>>>>> Hey Navinder,
>>>>>
>>>>> Thanks for the KIP! I've been reading over the discussion thus far,
>>>>> and I have a couple of thoughts to pile on as well:
>>>>>
>>>>> It seems confusing to propose the API in terms of the current system
>>>>> state, but also propose how the API would look if/when KIP-441 is
>>>>> implemented. It occurs to me that the only part of KIP-441 that would
>>>>> affect you is the availability of the lag information in the
>>>>> SubscriptionInfo message. This is actually not a public api change at
>>>>> all, and I'm planning to implement it asap as a precursor to the rest
>>>>> of KIP-441, so maybe you can just build on top of KIP-441 and assume
>>>>> the lag information will be available. Then you could have a more
>>>>> straightforward proposal (e.g., mention that you'd return the lag
>>>>> information in AssignmentInfo as well as in the StreamsMetadata in
>>>>> some form, or make use of it in the API somehow).
>>>>>
>>>>> I'm partially motivated in that former point because it seems like
>>>>> understanding how callers would bound the staleness for their use case
>>>>> is _the_ key point for this KIP. FWIW, I think that adding a new
>>>>> method to StreamsMetadataState and deprecating the existing method is
>>>>> the best way to go; we just can't change the return types of any
>>>>> existing methods.
>>>>>
>>>>> I'm wondering, rather than putting "acceptable lag" into the
>>>>> configuration at all, or even making it a parameter on
>>>>> `allMetadataForKey`, why not just _always_ return all available
>>>>> metadata (including active/standby or lag) and let the caller decide
>>>>> to which node they want to route the query? This method isn't making
>>>>> any queries itself; it's merely telling you where the local Streams
>>>>> instance _thinks_ the key in question is located. Just returning all
>>>>> available information lets the caller implement any semantics they
>>>>> desire around querying only active stores, or standbys, or recovering
>>>>> stores, or whatever.
>>>>>
>>>>> One fly in the ointment, which you may wish to consider if propos

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-19 Thread Navinder Brar
Thanks, Vinoth. Looks like we are on the same page. I will add some of these 
explanations to the KIP as well. Have assigned the KAFKA-6144 to myself and 
KAFKA-8994 is closed(by you). As suggested, we will replace "replica" with 
"standby".

In the new API, "StreamsMetadataState::allMetadataForKey(boolean 
enableReplicaServing, String storeName, K key, Serializer keySerializer)" Do 
we really need a per key configuration? or a new StreamsConfig is good 
enough?>> Coming from experience, when teams are building a platform with Kafka 
Streams and these API's serve data to multiple teams, we can't have a 
generalized config that says as a platform we will support stale reads or not. 
It should be the choice of someone who is calling the API's to choose whether 
they are ok with stale reads or not. Makes sense?
On Thursday, 17 October, 2019, 11:56:02 pm IST, Vinoth Chandar 
 wrote:  
 
 Looks like we are covering ground :)

>>Only if it is within a permissible  range(say 1) we will serve from
Restoring state of active.
+1 on having a knob like this.. My reasoning is as follows.

Looking at the Streams state as a read-only distributed kv store. With
num_standby = f , we should be able to tolerate f failures and if there is
a f+1' failure, the system should be unavailable.

A) So with num_standby=0, the system should be unavailable even if there is
1 failure and thats my argument for not allowing querying in restoration
state, esp in this case it will be a total rebuild of the state (which IMO
cannot be considered a normal fault free operational state).

B) Even there are standby's, say num_standby=2, if the user decides to shut
down all 3 instances, then only outcome should be unavailability until all
of them come back or state is rebuilt on other nodes in the cluster. In
normal operations, f <= 2 and when a failure does happen we can then either
choose to be C over A and fail IQs until replication is fully caught up or
choose A over C by serving in restoring state as long as lag is minimal. If
even with f=1 say, all the standbys are lagging a lot due to some issue,
then that should be considered a failure since that is different from
normal/expected operational mode. Serving reads with unbounded replication
lag and calling it "available" may not be very usable or even desirable :)
IMHO, since it gives the user no way to reason about the app that is going
to query this store.

So there is definitely a need to distinguish between :  Replication catchup
while being in fault free state vs Restoration of state when we lose more
than f standbys. This knob is a great starting point towards this.

If you agree with some of the explanation above, please feel free to
include it in the KIP as well since this is sort of our design principle
here..

Small nits :

- let's standardize on "standby" instead of "replica", KIP or code,  to be
consistent with rest of Streams code/docs?
- Can we merge KAFKA-8994 into KAFKA-6144 now and close the former?
Eventually need to consolidate KAFKA-6555 as well
- In the new API, "StreamsMetadataState::allMetadataForKey(boolean
enableReplicaServing, String storeName, K key, Serializer keySerializer)" Do
we really need a per key configuration? or a new StreamsConfig is good
enough?

On Wed, Oct 16, 2019 at 8:31 PM Navinder Brar
 wrote:

> @Vinoth, I have incorporated a few of the discussions we have had in the
> KIP.
>
> In the current code, t0 and t1 serve queries from Active(Running)
> partition. For case t2, we are planning to return List
> such that it returns  so that if IQ
> fails on A, the replica on B can serve the data by enabling serving from
> replicas. This still does not solve case t3 and t4 since B has been
> promoted to active but it is in Restoring state to catchup till A’s last
> committed position as we don’t serve from Restoring state in Active and new
> Replica on R is building itself from scratch. Both these cases can be
> solved if we start serving from Restoring state of active as well since it
> is almost equivalent to previous Active.
>
> There could be a case where all replicas of a partition become unavailable
> and active and all replicas of that partition are building themselves from
> scratch, in this case, the state in Active is far behind even though it is
> in Restoring state. To cater to such cases that we don’t serve from this
> state we can either add another state before Restoring or check the
> difference between last committed offset and current position. Only if it
> is within a permissible range (say 1) we will serve from Restoring the
> state of Active.
>
>
>    On Wednesday, 16 October, 2019, 10:01:35 pm IST, Vinoth Chandar <
> vchan...@confluent.io> wrote:
>
>  Thanks for the updates on the KIP, Navinder!
>
> Few comments
>
> - AssignmentInfo is 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-16 Thread Navinder Brar
@Vinoth, I have incorporated a few of the discussions we have had in the KIP. 

In the current code, t0 and t1 serve queries from Active(Running) partition. 
For case t2, we are planning to return List such that it 
returns  so that if IQ fails on A, the 
replica on B can serve the data by enabling serving from replicas. This still 
does not solve case t3 and t4 since B has been promoted to active but it is in 
Restoring state to catchup till A’s last committed position as we don’t serve 
from Restoring state in Active and new Replica on R is building itself from 
scratch. Both these cases can be solved if we start serving from Restoring 
state of active as well since it is almost equivalent to previous Active.

There could be a case where all replicas of a partition become unavailable and 
active and all replicas of that partition are building themselves from scratch, 
in this case, the state in Active is far behind even though it is in Restoring 
state. To cater to such cases that we don’t serve from this state we can either 
add another state before Restoring or check the difference between last 
committed offset and current position. Only if it is within a permissible range 
(say 1) we will serve from Restoring the state of Active.


On Wednesday, 16 October, 2019, 10:01:35 pm IST, Vinoth Chandar 
 wrote:  
 
 Thanks for the updates on the KIP, Navinder!

Few comments

- AssignmentInfo is not public API?. But we will change it and thus need to
increment the version and test for version_probing etc. Good to separate
that from StreamsMetadata changes (which is public API)
- From what I see, there is going to be choice between the following

  A) introducing a new *KafkaStreams::allMetadataForKey() *API that
potentially returns List ordered from most upto date to
least upto date replicas. Today we cannot fully implement this ordering,
since all we know is which hosts are active and which are standbys.
However, this aligns well with the future. KIP-441 adds the lag information
to the rebalancing protocol. We could also sort replicas based on the
report lags eventually. This is fully backwards compatible with existing
clients. Only drawback I see is the naming of the existing method
KafkaStreams::metadataForKey, not conveying the distinction that it simply
returns the active replica i.e allMetadataForKey.get(0).
 B) Change KafkaStreams::metadataForKey() to return a List. Its a breaking
change.

I prefer A, since none of the semantics/behavior changes for existing
users. Love to hear more thoughts. Can we also work this into the KIP?
I already implemented A to unblock myself for now. Seems feasible to do.


On Tue, Oct 15, 2019 at 12:21 PM Vinoth Chandar 
wrote:

> >>I get your point. But suppose there is a replica which has just become
> active, so in that case replica will still be building itself from scratch
> and this active will go to restoring state till it catches up with previous
> active, wouldn't serving from a restoring active make more sense than a
> replica in such case.
>
> KIP-441 will change this behavior such that promotion to active happens
> based on how caught up a replica is. So, once we have that (work underway
> already for 2.5 IIUC) and user sets num.standby.replicas > 0, then the
> staleness window should not be that long as you describe. IMO if user wants
> availability for state, then should configure num.standby.replicas > 0. If
> not, then on a node loss, few partitions would be unavailable for a while
> (there are other ways to bring this window down, which I won't bring in
> here). We could argue for querying a restoring active (say a new node added
> to replace a faulty old node) based on AP vs CP principles. But not sure
> reading really really old values for the sake of availability is useful. No
> AP data system would be inconsistent for such a long time in practice.
>
> So, I still feel just limiting this to standby reads provides best
> semantics.
>
> Just my 2c. Would love to see what others think as well.
>
> On Tue, Oct 15, 2019 at 5:34 AM Navinder Brar
>  wrote:
>
>> Hi Vinoth,
>> Thanks for the feedback.
>>  Can we link the JIRA, discussion thread also to the KIP.>> Added.
>> Based on the discussion on KAFKA-6144, I was under the impression that
>> this KIP is also going to cover exposing of the standby information in
>> StreamsMetadata and thus subsume KAFKA-8994 . That would require a public
>> API change?>> Sure, I can add changes for 8994 in this KIP and link
>> KAFKA-6144 to KAFKA-8994 as well.
>>  KIP seems to be focussing on restoration when a new node is added.
>> KIP-441 is underway and has some major changes proposed for this. It would
>> be good to clarify dependencies if any. Without KIP-441, I am not very sure
>> if we should allow reads from nodes in RESTORING state, which

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-15 Thread Navinder Brar
Hi Vinoth,
Thanks for the feedback. 
 Can we link the JIRA, discussion thread also to the KIP.>> Added.
Based on the discussion on KAFKA-6144, I was under the impression that this KIP 
is also going to cover exposing of the standby information in StreamsMetadata 
and thus subsume KAFKA-8994 . That would require a public API change?>> Sure, I 
can add changes for 8994 in this KIP and link  KAFKA-6144 to KAFKA-8994 as well.
  KIP seems to be focussing on restoration when a new node is added. KIP-441 is 
underway and has some major changes proposed for this. It would be good to 
clarify dependencies if any. Without KIP-441, I am not very sure if we should 
allow reads from nodes in RESTORING state, which could amount to many 
minutes/few hours of stale reads?  This is different from allowing querying 
standby replicas, which could be mostly caught up and the staleness window 
could be much smaller/tolerable. (once again the focus on KAFKA-8994).>> I get 
your point. But suppose there is a replica which has just become active, so in 
that case replica will still be building itself from scratch and this active 
will go to restoring state till it catches up with previous active, wouldn't 
serving from a restoring active make more sense than a replica in such case.

Finally, we may need to introduce a configuration to control this. Some users 
may prefer errors to stale data. Can we also add it to the KIP?>> Will add this.

Regards,
Navinder


On2019/10/14 16:56:49, Vinoth Chandar wrote: 

>Hi Navinder,> 

>

>Thanks for sharing the KIP! Few thoughts> 

>

>- Can we link the JIRA, discussion thread also to the KIP> 

>- Based on the discussion on KAFKA-6144, I was under the impression that> 

>this KIP is also going to cover exposing of the standby information in> 

>StreamsMetadata and thus subsume KAFKA-8994 . That would require a public> 

>API change?> 

>- KIP seems to be focussing on restoration when a new node is added.> 

>KIP-441 is underway and has some major changes proposed for this. It would> 

>be good to clarify dependencies if any. Without KIP-441, I am not very sure>

>if we should allow reads from nodes in RESTORING state, which could amount> 

>to many minutes/few hours of stale reads?  This is different fromallowing> 

>querying standby replicas, which could be mostly caught up and the> 

>staleness window could be much smaller/tolerable. (once again the focus on> 

>KAFKA-8994)> 

>- Finally, we may need to introduce a configuration to control this. Some> 

>users may prefer errors to stale data. Can we also add it to the KIP?> 

>

>Thanks> 

>Vinoth> 

>

>

>

>

>On Sun, Oct 13, 2019 at 3:31 PM Navinder Brar> 

>wrote:> 

>

>> Hi,> 

>> Starting a discussion on the KIP to Allow state stores to serve stale> 

>> reads during rebalance(> 

>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance>

>> ).> 

>> Thanks & Regards,Navinder> 

>> LinkedIn> 

>>> 
> 

[DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-13 Thread Navinder Brar
Hi,
Starting a discussion on the KIP to Allow state stores to serve stale reads 
during 
rebalance(https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance).
Thanks & Regards,Navinder 
LinkedIn


[jira] [Created] (KAFKA-6924) Making state store queryable on replicas

2018-05-20 Thread Navinder Brar (JIRA)
Navinder Brar created KAFKA-6924:


 Summary: Making state store queryable on replicas
 Key: KAFKA-6924
 URL: https://issues.apache.org/jira/browse/KAFKA-6924
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Navinder Brar


State store in Kafka streams are currently only queryable when StreamTask is in 
RUNNING state. The idea is to make it queryable even for StandbyTasks to 
decrease the downtime if client is not able to fetch data from Active machine.

Suppose the coordinator is not able to connect to machine which had active 
partition due to some reason. So, rather than failing that request we could 
serve request from replica which could be on some other machine. Although state 
on replica might be little behind the active but it could still be beneficial 
in some cases to serve request from replica than failing the request.

It's very important improvement as it could simply improve the availability of 
microservices developed using kafka streams.

I am working on a patch for this change. Any feedback or comments are welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6643) Warm up new replicas from scratch when changelog topic has retention time

2018-03-12 Thread Navinder Brar (JIRA)
Navinder Brar created KAFKA-6643:


 Summary: Warm up new replicas from scratch when changelog topic 
has retention time
 Key: KAFKA-6643
 URL: https://issues.apache.org/jira/browse/KAFKA-6643
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Navinder Brar


In the current scenario, Kafka Streams has changelog Kafka topics(internal 
topics having all the data for the store) which are used to build the state of 
replicas. So, if we keep the number of standby replicas as 1, we still have 
more availability for persistent state stores as changelog Kafka topics are 
also replicated depending upon broker replication policy but that also means we 
are using at least 4 times the space(1 master store, 1 replica store, 1 
changelog, 1 changelog replica). 

Now if we have an year's data in persistent stores(rocksdb), we don't want the 
changelog topics to have an year's data as it will put an unnecessary burden on 
brokers(in terms of space). If we have to scale our kafka streams 
application(having 200-300 TB's of data) we have to scale the kafka brokers as 
well. We want to reduce this dependency and find out ways to just use changelog 
topic as a queue, having just 2 or 3 days of data and warm up the replicas from 
scratch in some other way.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)