Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2019-06-14 Thread Mike Freyberger
Hi Boyang,

Thanks for the update. To what extent will KIP-345 be available in 2.3.0?

Mike

On 6/13/19, 5:36 PM, "Boyang Chen"  wrote:

Hey all,

we decided to push 2 minor changes for better usability for static 
membership.

  1.  Add `group.instance.id` field to the `DescribeGroupResponse` API. 
This gives visibility for client user to check which static members are 
registered on the current group.
  2.  Add getGroupInstanceId() function to `Subscription` class. This helps 
utilizes static member information to generate more stable resource assignment 
when the group membership remains the same during rolling bounce.

The KIP is updated with these changes. No compatibility issue is 
anticipated. Let me know if you have any concerns.

Best,
Boyang


From: Boyang Chen 
Sent: Friday, April 26, 2019 11:16 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by 
specifying member id

Hey all,

there is a minor change to the stream side logic for static 
membership<https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances>.
 Originally we chose to piggy-back on user to supply a unique `client.id` 
config that we could use to construct per thread level consumer 
`group.instance.id`. This approach has several drawbacks:

  1.  We already have functionalities relying on `client.id`, and it is not 
always the case where user wants to configure it differently for individual 
instances. For example, currently user could throttle requests under same 
client.id, which is a solid use case where the `client.id` should duplicate.
  2.  Existing stream users may unconsciously trigger static membership if 
they already set `client.id` in their Stream apps. This includes unexpected 
fatal errors due to `group.instance.id` fencing we are going to introduce.

In conclusion, it is not good practice to overload existing config that 
users rely on unless there is no side effect. To make more fault tolerant 
upgrade path, we decide to let stream users choose to set `group.instance.id` 
if they want to enable static membership.

Thank you Guozhang and Matthias for the great discussions and enhancements 
for the KIP!

Best,
Boyang



From: Boyang Chen 
Sent: Saturday, March 9, 2019 2:28 PM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by 
specifying member id

Hi Mike,

Yes that's the plan!

____
From: Mike Freyberger 
Sent: Saturday, March 9, 2019 10:04 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by 
specifying member id

Hi Boyang,

Is this work targeted for Kafka 2.3? I am eager to use this new feature.

Thanks,

    Mike Freyberger

On 12/21/18, 1:21 PM, "Mayuresh Gharat"  wrote:

Hi Boyang,

Regarding "However, we shall still attempt to remove the member static 
info
if the given `member.id` points to an existing `group.instance.id` upon
LeaveGroupRequest, because I could think of the possibility that in long
term we could want to add static membership leave group logic for more
fine-grained use cases."

> I think, there is some confusion here. I am probably not putting it
> right.
>
I agree, If a static member sends LeaveGroupRequest, it should be 
removed
> from the group.
>
Now getting back to downgrade of static membership to Dynamic 
membership,
> with the example described earlier  (copying it again for ease of 
reading)
> :
>

>>1. Lets say we have 4 consumers :  c1, c2, c3, c4 in the static 
group.
>>2. The group.instance.id for each of there are as follows :
>>   - c1 -> gc1, c2 -> gc2, c3 -> gc3, c4 -> gc4
>>3. The mapping on the GroupCordinator would be :
>>   - gc1 -> mc1, gc2 -> mc2, gc3 -> mc3, gc4 -> mc4, where mc1, 
mc2,
>>   mc3, mc4 are the randomly generated memberIds for c1, c2, c3, 
c4
>>   respectively, by the GroupCoordinator.
>>4. Now we do a restart to move the group to dynamic membership.
>>5. We bounce c1 first and it rejoins with UNKNOWN_MEMBERID (since 
we
>>don't persist the previously assigned memberId mc1 anywhere on 
the c1).
>>
> - We agree that there is no way to recognize that c1 was a part of the
> group, *earlier*.  If

Re: [DISCUSS] KIP-315: Stream Join Sticky Assignor

2019-05-06 Thread Mike Freyberger
Hi Matthias,

Once KIP-429 is released,  all non-sticky assignors will be as useful. Any user 
that wants to take advantage of KIP-429 needs to use a sticky assignor. 
Currently there is only 1 sticky assignor in the kafka project, which is 
similar to RoundRobinAssignor, but a sticky verion. I imagine there will be 
users who currently use RangeAssignor but want to take advantage of KIP-429. 
So, having more directly accessible sticky assignors will allow for more users 
to take advantage of KIP-429, without being forced to use Kafka Streams. Maybe 
I should reframe the KIP to essentially being a sticky version of RangeAssignor?

Regarding how I am using a KV store instead of a kafka compacted topic: I 
simply prepend my keys with the incoming kafka partition, so on partition 
assignment I can scan the KV store for all keys within the assigned partition.

Mike

On 4/30/19, 6:49 AM, "Matthias J. Sax"  wrote:

Mike,

I am still not sure, why we need to add this assignor to the project.
Even after you pointed out that you cannot use Kafka Streams, the idea
of the consumer to make the `PartitionAssignor` interface public and
plugable is, that the project does not need to add strategies for all
kind of use cases, but that people can customize the assignors to their
needs.

My main question is: how generic is this use case (especially with Kafka
Streams offering joins out-of-the-box) and do we really need to add it?
So far, it seems ok to me, if you just write a custom assignor and plug
it into the consumer. I don't see a strong need to add it to the Kafka
code base. Basically, it breaks down to

- How many people use joins?
- How many people can or can't use Kafka Streams joins?
- To what extend can Kafka Streams be improved to increase the use-case
coverage?
- How many people would benefit? (ie, even with adding this assignor,
they might still be users who need to customize their own assignors
because their join-use-case is still different to yours.)


Also note, that in Kafka Streams you could still provide a custom state
store implementation (with or without using a compacted changelog) and a
`Processor` or `Transformer` to implement a custom join. Even if this
might not work for your specific case, it might work for many other
people who want to customer a join instead of using Kafka Streams'
out-of-the-box join.


Can you elaborate why you think it needs to be part of Kafka directly?


One last question:

> - Our state has a high eviction rate, so kafka compacted topics are not 
ideal for storing the changelog. The compaction cannot keep up and the topic 
will be majority tombstones when it is read on partition reassignment. We are 
using a KV store the "change log" instead.

What do you mean by 'We are using a KV store the "change log" instead.'?
How to you handle reassignment and state movement? Curious to see if we
could improve Kafka Streams :)


-Matthias
    
    
On 4/30/19 3:09 AM, Mike Freyberger wrote:
> In light of KIP-429, I think there will be an increased demand for sticky 
assignors. So, I'd like to restart the conversation about adding the sticky 
streams assignor, 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-315%3A+Stream+Join+Sticky+Assignor.
 
> 
> It’d be great to get feedback on the overall idea and the proposed 
implementation.
> 
> Thanks,
> 
    > Mike
> 
> 
> On 6/20/18, 5:47 PM, "Mike Freyberger"  wrote:
> 
> Matthias, 
> 
> Thanks for the feedback. For our use case, we have some complexities 
that make using the existing Streams API more complicated than using the Kafka 
Consumer directly. 
> 
> - We are doing async processing, which I don't think is currently 
available (KIP-311 is handling this). 
> 
> - Our state has a high eviction rate, so kafka compacted topics are 
not ideal for storing the changelog. The compaction cannot keep up and the 
topic will be majority tombstones when it is read on partition reassignment. We 
are using a KV store the "change log" instead.
> 
> - We wanted to separate consumer threads from worker threads to 
maximize parallelization while keeping consumer TCP connections down.
> 
> Ultimately, it was much simpler to use the KafkaConsumer directly 
rather than peel away a lot of what Streams API does for you. I think we should 
continue to add support for more complex use cases and processing to the 
Streams API. However, I think there will remain streaming join use cases that 
can benefit from the flexibility that comes from using the KafkaConsumer 
directly. 
> 
> Mike
> 
 

Re: [DISCUSS] KIP-315: Stream Join Sticky Assignor

2019-04-29 Thread Mike Freyberger
In light of KIP-429, I think there will be an increased demand for sticky 
assignors. So, I'd like to restart the conversation about adding the sticky 
streams assignor, 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-315%3A+Stream+Join+Sticky+Assignor.
 

It’d be great to get feedback on the overall idea and the proposed 
implementation.

Thanks,

Mike


On 6/20/18, 5:47 PM, "Mike Freyberger"  wrote:

Matthias, 

Thanks for the feedback. For our use case, we have some complexities that 
make using the existing Streams API more complicated than using the Kafka 
Consumer directly. 

- We are doing async processing, which I don't think is currently available 
(KIP-311 is handling this). 

- Our state has a high eviction rate, so kafka compacted topics are not 
ideal for storing the changelog. The compaction cannot keep up and the topic 
will be majority tombstones when it is read on partition reassignment. We are 
using a KV store the "change log" instead.

- We wanted to separate consumer threads from worker threads to maximize 
parallelization while keeping consumer TCP connections down.

Ultimately, it was much simpler to use the KafkaConsumer directly rather 
than peel away a lot of what Streams API does for you. I think we should 
continue to add support for more complex use cases and processing to the 
Streams API. However, I think there will remain streaming join use cases that 
can benefit from the flexibility that comes from using the KafkaConsumer 
directly. 

Mike

On 6/20/18, 5:08 PM, "Matthias J. Sax"  wrote:

Mike,

thanks a lot for the KIP. I am wondering, why Streams API cannot be used
for perform the join? Would be good to understand the advantage of
adding a `StickyStreamJoinAssignor` compared to using Streams API? Atm,
it seems to be a redundant feature to me.

-Matthias

    
    On 6/20/18 1:07 PM, Mike Freyberger wrote:
> Hi everybody,
> 
> I’ve created a proposal document for KIP-315 which outlines the 
motivation of adding a new partition assignment strategy that can used for 
streaming join use cases.
> 
> It’d be great to get feedback on the overall idea and the proposed 
implementation.
> 
> KIP Link: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-315%3A+Stream+Join+Sticky+Assignor
> 
> Thanks,
> 
> Mike
> 







Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2019-03-08 Thread Mike Freyberger
Hi Boyang,

Is this work targeted for Kafka 2.3? I am eager to use this new feature.

Thanks,

Mike Freyberger

On 12/21/18, 1:21 PM, "Mayuresh Gharat"  wrote:

Hi Boyang,

Regarding "However, we shall still attempt to remove the member static info
if the given `member.id` points to an existing `group.instance.id` upon
LeaveGroupRequest, because I could think of the possibility that in long
term we could want to add static membership leave group logic for more
fine-grained use cases."

> I think, there is some confusion here. I am probably not putting it
> right.
>
I agree, If a static member sends LeaveGroupRequest, it should be removed
> from the group.
>
Now getting back to downgrade of static membership to Dynamic membership,
> with the example described earlier  (copying it again for ease of reading)
> :
>

>>1. Lets say we have 4 consumers :  c1, c2, c3, c4 in the static group.
>>2. The group.instance.id for each of there are as follows :
>>   - c1 -> gc1, c2 -> gc2, c3 -> gc3, c4 -> gc4
>>3. The mapping on the GroupCordinator would be :
>>   - gc1 -> mc1, gc2 -> mc2, gc3 -> mc3, gc4 -> mc4, where mc1, mc2,
>>   mc3, mc4 are the randomly generated memberIds for c1, c2, c3, c4
>>   respectively, by the GroupCoordinator.
>>4. Now we do a restart to move the group to dynamic membership.
>>5. We bounce c1 first and it rejoins with UNKNOWN_MEMBERID (since we
>>don't persist the previously assigned memberId mc1 anywhere on the 
c1).
>>
> - We agree that there is no way to recognize that c1 was a part of the
> group, *earlier*.  If yes, the statement : "The dynamic member rejoins
> the group without `group.instance.id`. It will be accepted since it is a
> known member." is not necessarily true, right?
>


> - Now I *agree* with "However, we shall still attempt to remove the
> member static info if the given `member.id` points to an existing `
> group.instance.id` upon LeaveGroupRequest, because I could think of the
> possibility that in long term we could want to add static membership leave
> group logic for more fine-grained use cases."
>
But that would only happen if the GroupCoordinator allocates the same
> member.id (mc1) to the consumer c1, when it rejoins the group in step 5
> above as a dynamic member, which is very rare as it is randomly generated,
> but possible.
>


> - This raises another question, if the GroupCoordinator assigns a
> member.id (mc1~) to consumer c1 after step 5. It will join the group and
> rebalance and the group will become stable, eventually. Now the
> GroupCoordinator still maintains a mapping of  "group.instance.id ->
> member.id" (c1 -> gc1, c2 -> gc2, c3 -> gc3, c4 -> gc4) internally and
> after some time, it realizes that it has not received heartbeat from the
> consumer with "group.instance.id" = gc1. In that case, it will trigger
> another rebalance assuming that a static member has left the group (when
> actually it (c1) has not left the group but moved to dynamic membership).
> This can result in multiple rebalances as the same will happen for c2, c3,
> c4.
>

Thoughts ???
One thing, I can think of right now is to run :
removeMemberFromGroup(String groupId, list
groupInstanceIdsToRemove, RemoveMemberFromGroupOptions options)
with groupInstanceIdsToRemove =  once we have bounced
all the members in the group. This assumes that we will be able to complete
the bounces before the GroupCoordinator realizes that it has not received a
heartbeat for any of . This is tricky and error prone.
Will have to think more on this.

Thanks,

Mayuresh




Re: [VOTE] KIP-345: Introduce static membership protocol to reduce consumer rebalances

2018-12-04 Thread Mike Freyberger
+1 (non binding)

On 12/4/18, 9:43 AM, "Patrick Williams"  wrote:

Pls take me off this VOTE list

Best,
 
Patrick Williams
 
Sales Manager, UK & Ireland, Nordics & Israel
StorageOS
+44 (0)7549 676279
patrick.willi...@storageos.com
 
20 Midtown
20 Proctor Street
Holborn
London WC1V 6NX
 
Twitter: @patch37
LinkedIn: linkedin.com/in/patrickwilliams4 

 
https://slack.storageos.com/
 
 

On 03/12/2018, 17:34, "Guozhang Wang"  wrote:

Hello Boyang,

I've browsed through the new wiki and there are still a couple of minor
things to notice:

1. RemoveMemberFromGroupOptions seems not defined anywhere.

2. LeaveGroupRequest added a list of group instance id, but still keep 
the
member id as a singleton; is that intentional? I think to make the 
protocol
consistent both member id and instance ids could be plural.

3. About the *kafka-remove-member-from-group.sh *tool, I'm wondering if 
we
can defer adding this while just add the corresponding calls of the
LeaveGroupRequest inside Streams until we have used it in production and
hence have a better understanding on how flexible or extensible if we 
want
to add any cmd tools. The rationale is that if we do not necessarily 
need
it now, we can always add it later with a more think-through API design,
but if we add the tool in a rush, we may need to extend or modify it 
soon
after we realize its limits in operations.

Otherwise, I'm +1 on the proposal.

Guozhang


On Mon, Dec 3, 2018 at 9:14 AM Boyang Chen  wrote:

> Hey community friends,
>
> after another month of polishing, KIP-345<
> 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances>
> design is ready for vote. Feel free to add your comment on the 
discussion
> thread or here.
>
> Thanks for your time!
>
> Boyang
> 
> From: Boyang Chen 
> Sent: Friday, November 9, 2018 6:35 AM
> To: dev@kafka.apache.org
> Subject: [VOTE] KIP-345: Introduce static membership protocol to 
reduce
> consumer rebalances
>
> Hey all,
>
>
> thanks so much for all the inputs on KIP-345 so far. The original 
proposal
> has enhanced a lot with your help. To make sure the implementation go
> smoothly without back and forth, I would like to start a vote on the 
final
> design agreement now:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-<
> 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances
> >
>
> 
345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances<
> 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances
> >
>
> KIP-345: Introduce static membership protocol to reduce ...<
> 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances
> >
> cwiki.apache.org
> For stateful applications, one of the biggest performance bottleneck 
is
> the state shuffling. In Kafka consumer, there is a concept called
> "rebalance" which means that for given M partitions and N consumers 
in one
> consumer group, Kafka will try to balance the load between consumers 
and
> ideally have ...
>
>
> Let me know if you have any questions.
>
>
> Best,
>
> Boyang
>
>

-- 
-- Guozhang






Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2018-11-05 Thread Mike Freyberger
Boyang,

Thanks for updating the KIP. It's shaping up well. Two things:

1) I am a little confused about the distinction for the leader. If the consumer 
node that was assigned leader does a bounce (goes down and quickly comes up) to 
update application code, will a rebalance be triggered? I do not think a bounce 
of the leader should trigger a rebalance.

2) The timeout for shrink up makes a lot of sense and allows to gracefully 
increase the number of nodes in the cluster. I think we need to support 
graceful shrink down as well. If I set the registration timeout to 5 minutes to 
handle rolling restarts or intermittent failures without shuffling state, I 
don't want to wait 5 minutes in order for the group to rebalance if I am 
intentionally removing a node from the cluster. I am not sure the best way to 
do this. One idea I had was adding the ability for a CLI or Admin API to force 
a rebalance of the group. This would allow for an admin to trigger the 
rebalance manually without waiting the entire registration timeout on shrink 
down. What do you think?

Mike

On 10/30/18, 1:55 AM, "Boyang Chen"  wrote:

Btw, I updated KIP 345 based on my understanding. Feel free to take another 
round of look:


https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances

KIP-345: Introduce static membership protocol to reduce 
...<https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances>
cwiki.apache.org
For stateful applications, one of the biggest performance bottleneck is the 
state shuffling. In Kafka consumer, there is a concept called "rebalance" which 
means that for given M partitions and N consumers in one consumer group, Kafka 
will try to balance the load between consumers and ideally have ...






From: Boyang Chen 
Sent: Monday, October 29, 2018 12:34 PM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by 
specifying member id

Thanks everyone for the input on this thread! (Sorry it's been a while) I 
feel that we are very close to the final solution.


Hey Jason and Mike, I have two quick questions on the new features here:

  1.  so our proposal is that until we add a new static member into the 
group (scale up), we will not trigger rebalance until the "registration 
timeout"( the member has been offline for too long)? How about leader's rejoin 
request, I think we should still trigger rebalance when that happens, since the 
consumer group may have new topics to consume?
  2.  I'm not very clear on the scale up scenario in static membership 
here. Should we fallback to dynamic membership while adding/removing hosts (by 
setting member.name = null), or we still want to add instances with 
`member.name` so that we eventually expand/shrink the static membership? I 
personally feel the easier solution is to spin up new members and wait until 
either the same "registration timeout" or a "scale up timeout" before starting 
the rebalance. What do you think?

Meanwhile I will go ahead to make changes to the KIP with our newly 
discussed items and details. Really excited to see the design has become more 
solid.

Best,
Boyang


From: Jason Gustafson 
Sent: Saturday, August 25, 2018 6:04 AM
To: dev
Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by 
specifying member id

Hey Mike,

Yeah, that's a good point. A long "registration timeout" may not be a great
idea. Perhaps in practice you'd set it long enough to be able to detect a
failure and provision a new instance. Maybe on the order of 10 minutes is
more reasonable.

In any case, it's probably a good idea to have an administrative way to
force deregistration. One option is to extend the DeleteGroups API with a
list of members names.
    
    -Jason
    


On Fri, Aug 24, 2018 at 2:21 PM, Mike Freyberger 
wrote:

> Jason,
>
> Regarding step 4 in your proposal which suggests beginning a long timer
> (30 minutes) when a static member leaves the group, would there also be 
the
> ability for an admin to force a static membership expiration?
>
> I’m thinking that during particular types of outages or upgrades users
> would want forcefully remove a static member from the group.
>
> So the user would shut the consumer down normally, which wouldn’t trigger
> a rebalance. Then the user could use an admin CLI tool to force remove 
that
> consumer from the group, so the TopicPartitions that were previously owned
> by that consumer can be releas

Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2018-08-24 Thread Mike Freyberger
Jason,

Regarding step 4 in your proposal which suggests beginning a long timer (30 
minutes) when a static member leaves the group, would there also be the ability 
for an admin to force a static membership expiration?

I’m thinking that during particular types of outages or upgrades users would 
want forcefully remove a static member from the group. 

So the user would shut the consumer down normally, which wouldn’t trigger a 
rebalance. Then the user could use an admin CLI tool to force remove that 
consumer from the group, so the TopicPartitions that were previously owned by 
that consumer can be released.

At a high level, we need consumer groups to gracefully handle intermittent 
failures and permanent failures. Currently, the consumer group protocol handles 
permanent failures well, but does not handle intermittent failures well (it 
creates unnecessary rebalances). I want to make sure the overall solution here 
handles both intermittent failures and permanent failures, rather than 
sacrificing support for permanent failures in order to provide support for 
intermittent failures. 

Mike

Sent from my iPhone

> On Aug 24, 2018, at 3:03 PM, Jason Gustafson  wrote:
> 
> Hey Guozhang,
> 
> Responses below:
> 
> Originally I was trying to kill more birds with one stone with KIP-345,
>> e.g. to fix the multi-rebalance issue on starting up / shutting down a
>> multi-instance client (mentioned as case 1)/2) in my early email), and
>> hence proposing to have a pure static-membership protocol. But thinking
>> twice about it I now feel it may be too ambitious and worth fixing in
>> another KIP.
> 
> 
> I was considering an extension to support pre-initialization of the static
> members of the group, but I agree we should probably leave this problem for
> future work.
> 
> 1. How this longish static member expiration timeout defined? Is it via a
>> broker, hence global config, or via a client config which can be
>> communicated to broker via JoinGroupRequest?
> 
> 
> I am not too sure. I tend to lean toward server-side configs because they
> are easier to evolve. If we have to add something to the protocol, then
> we'll be stuck with it forever.
> 
> 2. Assuming that for static members, LEAVE_GROUP request will not trigger a
>> rebalance immediately either, similar to session timeout, but only the
>> longer member expiration timeout, can we remove the internal "
>> internal.leave.group.on.close" config, which is a quick walk-around then?
> 
> 
> Yeah, I hope we can ultimately get rid of it, but we may need it for
> compatibility with older brokers. A related question is what should be the
> behavior of the consumer if `member.name` is provided but the broker does
> not support it? We could either fail or silently downgrade to dynamic
> membership.
> 
> -Jason
> 
> 
>> On Fri, Aug 24, 2018 at 11:44 AM, Guozhang Wang  wrote:
>> 
>> Hey Jason,
>> 
>> I like your idea to simplify the upgrade protocol to allow co-exist of
>> static and dynamic members. Admittedly it may make the coordinator-side
>> logic a bit more complex, but I think it worth doing it.
>> 
>> Originally I was trying to kill more birds with one stone with KIP-345,
>> e.g. to fix the multi-rebalance issue on starting up / shutting down a
>> multi-instance client (mentioned as case 1)/2) in my early email), and
>> hence proposing to have a pure static-membership protocol. But thinking
>> twice about it I now feel it may be too ambitious and worth fixing in
>> another KIP. With that, I think what you've proposed here is a good way to
>> go for KIP-345 itself.
>> 
>> Note there are a few details in your proposal we'd still need to figure
>> out:
>> 
>> 1. How this longish static member expiration timeout defined? Is it via a
>> broker, hence global config, or via a client config which can be
>> communicated to broker via JoinGroupRequest?
>> 
>> 2. Assuming that for static members, LEAVE_GROUP request will not trigger a
>> rebalance immediately either, similar to session timeout, but only the
>> longer member expiration timeout, can we remove the internal "
>> internal.leave.group.on.close" config, which is a quick walk-around then?
>> 
>> 
>> 
>> Guozhang
>> 
>> 
>> On Fri, Aug 24, 2018 at 11:14 AM, Jason Gustafson 
>> wrote:
>> 
>>> Hey All,
>>> 
>>> Nice to see some solid progress on this. It sounds like one of the
>>> complications is allowing static and dynamic registration to coexist. I'm
>>> wondering if we can do something like the following:
>>> 
>>> 1. Statically registered members (those joining the group with a
>> non-null `
>>> member.name`) maintain a session with the coordinator just like dynamic
>>> members.
>>> 2. If a session is active for a static member when a rebalance begins,
>> then
>>> basically we'll keep the current behavior. The rebalance will await the
>>> static member joining the group.
>>> 3. If a static member does not have an active session, then the
>> coordinator
>>> will not wait for it to join, but will still 

Re: [VOTE] KIP-341: Update Sticky Assignor's User Data Protocol

2018-08-03 Thread Mike Freyberger
+1

On 8/3/18, 2:13 PM, "Ted Yu"  wrote:

+1

On Fri, Aug 3, 2018 at 10:40 AM Jason Gustafson  wrote:

> +1 Thanks Vahid.
>
> On Thu, Aug 2, 2018 at 1:27 PM, Vahid S Hashemian <
> vahidhashem...@us.ibm.com
> > wrote:
>
> > Hi everyone,
> >
> > I believe the feedback on this KIP has been addressed so far. So I'd 
like
> > to start a vote.
> > The KIP:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 341%3A+Update+Sticky+Assignor%27s+User+Data+Protocol
> > Discussion thread:
> > https://www.mail-archive.com/dev@kafka.apache.org/msg89733.html
> >
> > Thanks!
> > --Vahid
> >
> >
>




Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2018-07-30 Thread Mike Freyberger
ll to reduce
the cost of each unnecessary rebalance, but ideally we want NO rebalances
at all for these cases, which will be more true with k8s / etc integrations
or static memberships.

--

So just to throw in a sketchy idea following this route for 1/2/5/6 for
brainstorming kick-off:


1. We bump up the JoinGroupRequest with additional fields:

  1.a) a flag indicating "static" or "dynamic" membership protocols.
  1.b) with "static" membership, we also add the pre-defined member id.
  1.c) with "static" membership, we also add an optional
"group-change-timeout" value.

2. On the broker side, we enforce only one of the two protocols for all
group members: we accept the protocol on the first joined member of the
group, and if later joining members indicate a different membership
protocol, we reject it. If the group-change-timeout value was different to
the first joined member, we reject it as well.

3. With dynamic membership, nothing is changed; with static membership, we
do the following:

  3.a) never assign member ids, instead always expect the joining members
to come with their own member id; we could do the fencing based on host /
port here.
  3.b) upon receiving the first join group request, use the
"group-change-timeout" instead of the session-timeout as rebalance timeout
to wait for other members to join. This is for 1) above.
  3.c) upon receiving a leave-group request, use the "group-change-timeout"
to wait for more members to leave group as well, or for the left members to
re-join. After the timeout we trigger a rebalance with whatever have left
in the members list. This is for all 2 (expecting other members to send
leave-group) and 5/6 (expecting the left member to re-join).

4. As a result, we will deprecate KIP-134 and disable-on-leave-group as
well.


The key idea is that, with "static" membership, groups should be created or
terminated as a whole, and dynamic member changes are not expected often.
Hence we would not react to those membership-changing events immediately
but wait for longer specified time expecting some other systems like k8s
will resume the group members. WDYT?


Guozhang


On Mon, Jul 30, 2018 at 3:05 PM, Mike Freyberger 
wrote:

> Jason,
>
> I really appreciate the broader conversation that you are bringing up 
here.
>
> I've been working on an application that does streaming joins for a while
> now, and we face a similar issue with group membership being dynamic. We
> are currently using our own StickyAssignor and take special care during
> rolling restarts to make sure consumer assignments do not change.
>
> I think a feature that allows for group membership to be fixed, along with
> a CLI for adding or removing a node from the group be ideal. This reminds
> me of some of the work by the DynamoDB team about 10 years back when they
> differentiated transient failures from permanent failures to deal with 
this
> problems like this.
>
> Best,
>
> Mike
>
> On 7/30/18, 5:36 PM, "Jason Gustafson"  wrote:
>
> Hi Boyang,
>
> Thanks for the response. I think the main point I was trying to make
> is the
> need for fencing. I am not too concerned about how to generate a
> unique id
> on the client side. The approach you suggested for streams seems
> reasonable. However, any time you reuse an id, you need to be careful
> that
> there is only one instance that can use it at any time. We are always
> running into problems where a previous instance of an application 
comes
> back to life unexpectedly after we had already presumed it was dead.
> Fencing ensures that even if this happens, it cannot do any damage. I
> would
> say that some protection from zombies is a requirement here.
>
> The second point was more abstract and mainly meant to initiate some
> discussion. We have gone through several iterations of improvements to
> try
> and reduce the rebalancing in consumer applications. We started out
> trying
> to tune the session timeout. We have added an internal config to skip
> leaving the group when streams shuts down. The broker now has a config
> to
> delay rebalances in case all consumers join at about the same time. 
The
> approach in this KIP is a step in a more principled direction, but it
> still
> feels like we are mak

Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2018-07-30 Thread Mike Freyberger
Jason,

I really appreciate the broader conversation that you are bringing up here.

I've been working on an application that does streaming joins for a while now, 
and we face a similar issue with group membership being dynamic. We are 
currently using our own StickyAssignor and take special care during rolling 
restarts to make sure consumer assignments do not change. 

I think a feature that allows for group membership to be fixed, along with a 
CLI for adding or removing a node from the group be ideal. This reminds me of 
some of the work by the DynamoDB team about 10 years back when they 
differentiated transient failures from permanent failures to deal with this 
problems like this. 

Best,

Mike

On 7/30/18, 5:36 PM, "Jason Gustafson"  wrote:

Hi Boyang,

Thanks for the response. I think the main point I was trying to make is the
need for fencing. I am not too concerned about how to generate a unique id
on the client side. The approach you suggested for streams seems
reasonable. However, any time you reuse an id, you need to be careful that
there is only one instance that can use it at any time. We are always
running into problems where a previous instance of an application comes
back to life unexpectedly after we had already presumed it was dead.
Fencing ensures that even if this happens, it cannot do any damage. I would
say that some protection from zombies is a requirement here.

The second point was more abstract and mainly meant to initiate some
discussion. We have gone through several iterations of improvements to try
and reduce the rebalancing in consumer applications. We started out trying
to tune the session timeout. We have added an internal config to skip
leaving the group when streams shuts down. The broker now has a config to
delay rebalances in case all consumers join at about the same time. The
approach in this KIP is a step in a more principled direction, but it still
feels like we are making this unnecessarily hard on ourselves by insisting
that group membership is a dynamic concept. In practice, the number of
nodes dedicated to an application tends to remain fixed for long periods of
time and only scales up or down when needed. And these days you've got
frameworks like kubernetes which can automatically provision new nodes if
one fails. So the argument for dynamic membership is becoming weaker in my
opinion. This KIP is basically trying to impose a small degree of static
membership anyway so that rolling restarts do not change membership.
Anyway, curious to hear some thoughts about this from you and the others
who work on streams.

Thanks,
Jason


On Sat, Jul 28, 2018 at 4:44 PM, Boyang Chen  wrote:

> Thanks for the replies, James and Jason. Let me try to summarize your
> concerns.
>
>
> I think James' question is primarily the severity of user using this
> config wrongly. The impact would be that the same member id being used by
> multiple or even all of the consumers. The assignment protocol couldn't
> distinguish any of the overlapping consumers, thus assigning the exact 
same
> partitions multiple times to different consumers. I would say the 
processed
> result would be including a lot of duplicates and unnecessary heavy load 
on
> the client side, The correctness will depend on the user logic, however 
I'm
> pessimistic.
>
>
> Although the impact is very high, the symptom is not hard to triage,
> because user could visualize consumer identity overlapping fairly easily 
by
> exported consumer metrics. On the user standpoint, they would be fully
> aware of the potential erratic status before enabling "member.id"
> configuration IMO. Let me know your thoughts James!
>
>
> Next is Jason's suggestion. Jason shared a higher viewpoint and pointed
> out the problem that we need to solve is to maintain "a strong bias 
towards
> being able to reuse previous state". The proposed approach is to separate
> the notion of consumer membership and consumer identity.
>
>
> The original idea of this KIP was on the Stream application, so I
> understand that the identities of multiple consumers belong to one
> instance, where each Stream thread will be using one dedicated main
> consumer. So in a Stream use case, we could internally generate member id
> with USER_DEFINED_ID + STREAM_THREAD_ID.
>
>
> In pure consumer use case, this could be a little bit challenging since
> user could arbitrarily initiate multiple consumers on the same instance
> which is out of our library control. This could add up the possibility of
> member id collision. So instead of making developers life easier,
> introducing member id config could break the existing code logic and take
> long time to understand and fix. 

Re: [DISCUSS] KIP-341: Update Sticky Assignor's User Data Protocol

2018-07-13 Thread Mike Freyberger
This is great!

For the client side implementation, I think it’s still possible for there to be 
a duplication. I’ll try to walk through the example here. 

Let’s says there are 2 consumers, 1 topic with 2 partitions. 

After the initial rebalance, generation 0:
Consumer A has partition 0
Consumer B has partition 1

Let’s say consumer B leaves the group (long debug, GC pause). This leads to 
another rebalance. This rebalance will be considered generation 1 and will 
result in:

Generation 1, Consumer A owns partition 0,1

Now let’s say Consumer B is still out of the group and then Consumer A leaves 
as well. While Consumer A is out of the group, Consumer B rejoins the group. 
During this rebalance, the only previous state would be the initial generation 
0 assignment. So this assignment would be considered generation 1 as well and 
would result in:

Generation 1, Consumer B owns partition 0,1

When A rejoins the group, both consumers would claim ownership of both 
partitions and they would report the assignment was from generation 1. This 
gets us back into the same issue as before because the generation number cannot 
help at all. You could add a timestamp in addition to the generation marker, 
but that’d still be vulnerable to clock skew.

Would hooking into the existing generation marker protect the assignor for this 
kind of situation? We need to make sure the selected implantation is protected 
against the kind of failure mentioned above. 

Also, I have been working on KIP-315, which is another Sticky Assignor, which 
also requires some kind of epoch/generation marker to be protected against 
zombies. So, I’d be in favor of a generic solution here that other assignors 
can leverage. 

Best,

Mike Freyberger

> On Jul 13, 2018, at 6:15 PM, Vahid S Hashemian  
> wrote:
> 
> Hi all,
> 
> I create a short KIP to address an issue in Sticky Assignor assignment 
> logic: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-341%3A+Update+Sticky+Assignor%27s+User+Data+Protocol
> Please take a look and share your feedback / comments.
> 
> In particular, there is a Generation Marker section (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-341%3A+Update+Sticky+Assignor%27s+User+Data+Protocol#KIP-341:UpdateStickyAssignor'sUserDataProtocol-GenerationMarker
> ) that provides two methods for implementing the improvement to the 
> protocol. I'd like to know which option is more popular.
> 
> Thanks!
> --Vahid
> 
> 


Re: [DISCUSS] KIP-315: Stream Join Sticky Assignor

2018-06-20 Thread Mike Freyberger
Matthias, 

Thanks for the feedback. For our use case, we have some complexities that make 
using the existing Streams API more complicated than using the Kafka Consumer 
directly. 

- We are doing async processing, which I don't think is currently available 
(KIP-311 is handling this). 

- Our state has a high eviction rate, so kafka compacted topics are not ideal 
for storing the changelog. The compaction cannot keep up and the topic will be 
majority tombstones when it is read on partition reassignment. We are using a 
KV store the "change log" instead.

- We wanted to separate consumer threads from worker threads to maximize 
parallelization while keeping consumer TCP connections down.

Ultimately, it was much simpler to use the KafkaConsumer directly rather than 
peel away a lot of what Streams API does for you. I think we should continue to 
add support for more complex use cases and processing to the Streams API. 
However, I think there will remain streaming join use cases that can benefit 
from the flexibility that comes from using the KafkaConsumer directly. 

Mike

On 6/20/18, 5:08 PM, "Matthias J. Sax"  wrote:

Mike,

thanks a lot for the KIP. I am wondering, why Streams API cannot be used
for perform the join? Would be good to understand the advantage of
adding a `StickyStreamJoinAssignor` compared to using Streams API? Atm,
it seems to be a redundant feature to me.

-Matthias


On 6/20/18 1:07 PM, Mike Freyberger wrote:
> Hi everybody,
> 
> I’ve created a proposal document for KIP-315 which outlines the 
motivation of adding a new partition assignment strategy that can used for 
streaming join use cases.
> 
> It’d be great to get feedback on the overall idea and the proposed 
implementation.
> 
> KIP Link: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-315%3A+Stream+Join+Sticky+Assignor
> 
> Thanks,
> 
> Mike
> 





[DISCUSS] KIP-315: Stream Join Sticky Assignor

2018-06-20 Thread Mike Freyberger
Hi everybody,

I’ve created a proposal document for KIP-315 which outlines the motivation of 
adding a new partition assignment strategy that can used for streaming join use 
cases.

It’d be great to get feedback on the overall idea and the proposed 
implementation.

KIP Link: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-315%3A+Stream+Join+Sticky+Assignor

Thanks,

Mike


Perms to create KIP

2018-06-13 Thread Mike Freyberger
Can I please have permission to create a KIP?
WikiID: mfreyberger
Email: mfreyber...@appnexus.com