Hi Nick,

NT4.
I agree with you that it is more correct to use the most recent offsets. Although, as you already pointed out, a difference between most recent and max should be rare.

Best,
Bruno

On 8/21/24 7:02 PM, Nick Telford wrote:
Hi Lucas,

NT4.
Sounds good, although should it take the maximum offsets? Wouldn't it be
more correct to take the *most recent* offsets? (i.e. the offsets from the
more recently received heartbeat)
My thinking is that it might be possible (albeit exceptionally rare) for
the on-disk offsets to revert to a previous number, and taking the max
would incorrectly assume the older offsets are correct.

Regards,
Nick

On Mon, 19 Aug 2024 at 15:00, Lucas Brutschy <lbruts...@confluent.io.invalid>
wrote:

Hi Nick,

NT4: As discussed, we will still require locking in the new protocol
to avoid concurrent read/write access on the checkpoint file, at least
as long as KIP-1035 hasn't landed. However, as you correctly pointed
out, the assignor will have to accept offsets for overlapping sets of
dormant tasks. I updated the KIP to make this explicit. If the
corresponding offset information for one task conflicts between
clients (which can happen), the conflict is resolved by taking the
maximum of the offsets.

Cheers,
Lucas

On Fri, Aug 16, 2024 at 7:14 PM Guozhang Wang
<guozhang.wang...@gmail.com> wrote:

Hello Lucas,

Thanks for the great KIP. I've read it through and it looks good to
me. As we've discussed, much of my thoughts would be outside the scope
of this very well scoped and defined KIP, so I will omit them for now.

The only one I had related to this KIP is about topology updating. I
understand the motivation of the proposal is that basically since each
time group forming a (new) generation may potentially accept not all
of the members joining because of the timing of the RPCs, the group's
topology ID may be not reflecting the "actual" most recent topologies
if some zombie members holding an old topology form a group generation
quickly enough, which would effectively mean that zombie members
actually blocking other real members from getting tasks assigned. On
the other hand, like you've mentioned already in the doc, requesting
some sort of ID ordering by pushing the burden on the user's side
would also be too much for users, increasing the risk of human errors
in operations.

I'm wondering if instead of trying to be smart programmingly, we just
let the protocol to act dumbly (details below). The main reasons I had
in mind are:

1) Upon topology changes, some tasks may no longer exist in the new
topology, so still letting them execute on the clients which do not
yet have the new topology would waste resources.

2) As we discussed, trying to act smart introduces more complexities
in the coordinator that tries to balance different assignment goals
between stickiness, balance, and now topology mis-matches between
clients.

3) Scenarios that mismatching topologies be observed within a group
generation:
    a. Zombie / old clients that do not have the new topology, and will
never have.
    b. During a rolling bounce upgrade, where not-yet-bounced clients
would not yet have the new topology.
    c. Let's assume we would not ever have scenarios where users want
to intentionally have a subset of clients within a group running a
partial / subset of the full sub-topologies, since such cases can well
be covered by a custom assignor that takes into those considerations
by never assigning some tasks to some clients etc. That means, the
only scenarios we would need to consider are a) and b).

For b), I think it's actually okay to temporarily block the progress
of the group until everyone is bounced with the updated topology; as
for a), originally I thought having one or a few clients blocking the
whole group would be a big problem, but now that I think more, I felt
from the operations point of view, just letting the app being blocked
with a informational log entry to quickly ping-down the zombie clients
may actually be acceptable. All in all, it makes the code simpler
programmingly by not trying to abstract away issue scenario a) from
the users (or operators) but letting them know asap.

----------

Other than that, everything else looks good to me.


Guozhang


On Fri, Aug 16, 2024 at 7:38 AM Nick Telford <nick.telf...@gmail.com>
wrote:

Hi Lucas,

NT4.
Given that the new assignment procedure guarantees that a Task has been
closed before it is assigned to a different client, I don't think there
should be a problem with concurrent access? I don't think we should
worry
too much about 1035 here, as it's orthogonal to 1071. I don't think
that
1035 *requires* the locking, and indeed once 1071 is the only
assignment
mechanism, we should be able to do away with the locking completely (I
think).

Anyway, given your point about it not being possible to guarantee
disjoint
sets, does it make sense to require clients to continue to supply the
lags
for only a subset of the dormant Tasks on-disk? Wouldn't it be simpler
to
just have them supply everything, since the assignor has to handle
overlapping sets anyway?

Cheers,
Nick

On Fri, 16 Aug 2024 at 13:51, Lucas Brutschy <lbruts...@confluent.io
.invalid>
wrote:

Hi Nick,

NT4. I think it will be hard anyway to ensure that the assignor
always
gets disjoint sets (there is no synchronized rebalance point anymore,
so locks wouldn't prevent two clients reporting the same dormant
task). So I think we'll have to lift this restriction. I was thinking
more that locking is required to prevent concurrent access. In
particular, I was expecting that the lock will avoid two threads
opening the same RocksDB in KIP-1035. Wouldn't this cause problems?

Cheers,
Lucas

On Fri, Aug 16, 2024 at 11:34 AM Nick Telford <
nick.telf...@gmail.com>
wrote:

Hi Lucas,

NT4.
The reason I mentioned this was because, while implementing 1035, I
stumbled across a problem: initially I had changed it so that
threads
always reported the lag for *all* dormant Tasks on-disk, even if
it meant
multiple threads reporting lag for the same Tasks. I found that
this
didn't
work, apparently because the assignor assumes that multiple
threads on
the
same instance always report disjoint sets.

 From reading through 1071, it sounded like this assumption is no
longer
being made by the assignor, and that the processId field would
allow the
assignor to understand when multiple clients reporting lag for the
same
Tasks are on the same instance. This would enable us to do away
with the
locking when reporting lag, and just have threads report the lag
for
every
Task on-disk, even if other threads are reporting lag for the same
Tasks.

But it sounds like this is not correct, and that the new assignor
will
make
the same assumptions as the old one?

Regards,
Nick

On Fri, 16 Aug 2024 at 10:17, Lucas Brutschy <
lbruts...@confluent.io
.invalid>
wrote:

Hi Nick!

Thanks for getting involved in the discussion.

NT1. We are always referring to offsets in the changelog topics
here.
I tried to make it more consistent. But in the schemas and API,
I find
"task changelog end offset" a bit lengthy, so we use "task
offset" and
"task end offset" for short. We could change it, if people think
this
is confusing.

NT2. You are right. The confusing part is that the current
streams
config is called `max.warmup.replicas`, but in the new protocol,
we
are bounding the group-level parameter using
`group.streams.max.warmup.replicas`. If we wanted to keep
`group.streams.max.warmup.replicas` for the config name on the
group-level, we'd have to bound it using
`group.streams.max.max.warmup.replicas`. I prefer not doing
this, but
open to suggestions.

NT3. You are right, we do not need to make it this restrictive. I
think the main problem with having 10,000 warm-up replicas would
be
that it slows down the assignment inside the broker - once we are
closer to production-ready implementation, we may have better
numbers
of this and may revisit these defaults. I'll set the max to 100
for
now, but it would be good to hear what values people typically
use in
their production workloads.

NT4. We will actually only report the offsets if we manage to
acquire
the lock. I tried to make this more precise. I suppose also with
KIP-1035, we'd require the lock to read the offset?

Cheers,
Lucas

On Thu, Aug 15, 2024 at 8:40 PM Nick Telford <
nick.telf...@gmail.com>
wrote:

Hi everyone,

Looks really promising, and I can see this resolving several
issues
I've
noticed. I particularly like the choice to use a String for
Subtopology
ID,
as it will (eventually) lead to a better solution to KIP-816.

I noticed a few typos in the KIP that I thought I'd mention:

NT1.
In several places you refer to "task changelog end offsets",
while in
others, you call it "task end offsets". Which is it?

NT2.
Under "Group Configurations", you included
"group.streams.max.warmup.replicas", but I think you meant
"group.streams.num.warmup.replicas"?

NT3.
Not a typo, but a suggestion: it makes sense to set the
default for
"group.streams.num.warmup.replicas" to 2, for compatibility
with the
existing defaults, but why set the default for
"group.streams.max.warmup.replicas" to only 4? That seems
extremely
restrictive. These "max" configs are typically used to prevent
a
subset
of
users causing problems on the shared broker cluster - what's
the
reason
to
set such a restrictive value for max warmup replicas? If I had
10,000
warmup replicas, would it cause a noticeable problem on the
brokers?

NT4.
It's implied that clients send the changelog offsets for *all*
dormant
stateful Tasks, but the current behaviour is that clients will
only
send
the changelog offsets for the stateful Tasks that they are
able to
lock
on-disk. Since this is a change in behaviour, perhaps this
should be
called
out explicitly?

Regards,
Nick

On Thu, 15 Aug 2024 at 10:55, Lucas Brutschy <
lbruts...@confluent.io
.invalid>
wrote:

Hi Andrew,

thanks for the comment.

AS12: I clarified the command-line interface. It's supposed
to be
used
with --reset-offsets and --delete-offsets. I removed --topic.

AS13: Yes, it's --delete. I clarified the command-line
interface.

Cheers,
Lucas

On Tue, Aug 13, 2024 at 4:14 PM Andrew Schofield
<andrew_schofi...@live.com> wrote:

Hi Lucas,
Thanks for the KIP update.

I think that `kafka-streams-groups.sh` looks like a good
equivalent
to
the tools for the other types of groups.

AS12: In kafka-streams-groups.sh, the description for the
--input-topics option seems insufficient. Why is an input
topic
specified
with this option different than a topic specified with
--topic?
Why
is
It --input-topics rather than --input-topic? Which action
of this
tool
does this option apply to?

AS13: Similarly, for --internal-topics, which action of
the tool
does it
apply to? I suppose it’s --delete, but it’s not clear to
me.

Thanks,
Andrew

On 11 Aug 2024, at 12:10, Lucas Brutschy <
lbruts...@confluent.io
.INVALID>
wrote:

Hi Andrew/Lianet,

I have added an administrative command-line tool
(replacing
`kafka-streams-application-reset`) and extensions of the
Admin
API
for
listing, deleting, describing groups and listing,
altering and
deleting offsets for streams groups. No new RPCs have to
be
added,
however, we duplicate some of the API in the admin
client that
exist
for consumer groups. It seems to me cleaner to duplicate
some
code/interface here, instead of using "consumer group"
APIs for
streams groups, or renaming existing APIs that use
"consumerGroup"
in
the name to something more generic (which wouldn't cover
share
groups).

I think for now, all comments are addressed.

Cheers,
Lucas

On Tue, Aug 6, 2024 at 3:19 PM Lucas Brutschy <
lbruts...@confluent.io>
wrote:

Hi Lianet and Andrew,

LM1/LM2: You are right. The idea is to omit fields
exactly in
the
same
situations as in KIP-848. In the KIP, I stuck with how
the
behavior
was defined in KIP-848 (e.g. KIP-848 defined that that
instance ID
will be omitted if it did not change since the last
heartbeat).
But
you are correct that the implementation handles these
details
slightly
differently. I updated the KIP to match more closely the
behavior
of
the KIP-848 implementation.

LM9: Yes, there are several options to do this. The
idea is to
have
only one client initialize the topology, not all
clients. It
seems
easier to understand on the protocol level (otherwise
we'd
have N
topology initializations racing with a hard-to-determine
winner).
We
also expect the payload of the request to grow in the
future
and
want
to avoid the overhead of having all clients sending the
topology
at
the same time. But initializing the group could take
some
time -
we
have to create internal topics, and maybe a client is
malfunctioning
and the initialization has to be retried. It seemed a
bit
confusing to
return errors to all other clients that are trying to
join the
group
during that time - as if there was a problem with
joining the
group /
the contents of the heartbeat. It seems cleaner to me
to let
all
clients successfully join the group and heartbeat, but
remain
in
an
INITIALIZING state which does not yet assign any tasks.
Does
that
make
sense to you? You are right that returning a retriable
error
and
having all clients retry until the group is initialized
would
also
work, it just doesn't model well that "everything is
going
according
to plan".
As for the order of the calls - yes, I think it is fine
to
allow
an
Initialize RPC before the first heartbeat for supporting
future
admin
tools. I made this change throughout the KIP, thanks!

AS11: Yes, your understanding is correct. The number of
tasks
for
one
subtopology is the maximum number of partitions in any
of the
matched
topics. What will happen in Kafka Streams is that the
partitions
of
the matched topics will effectively be merged during
stream
processing, so in your example, subtopology:0 would
consume
from
AB:0
and AC:0.

Cheers,
Lucas

On Fri, Aug 2, 2024 at 9:47 PM Lianet M. <
liane...@gmail.com>
wrote:

Hi Bruno, answering your questions:

About the full heartbeat (LM1): I just wanted to
confirm that
you'll
be
sending full HBs in case of errors in general. It's
not clear
from
the KIP,
since it referred to sending Id/epoch and whatever had
changed
since
the
last HB only. Sending full HB on error is key to
ensure fresh
rejoins after
fencing for instance, and retries with all relevant
info.

About the instanceId (LM2): The instanceId is needed on
every HB
to
be able
to identify a member using one that is already taken.
On
every
HB,
the
broker uses the instance id (if any) to retrieve the
member
ID
associated
with it, and checks it against the memberId received
in the
HB
(throwing UnreleasedInstance exception if needed). So
similar to
my
previous point, just wanted to confirm that we are
considering
that
here
too.

Now some other thoughts:

LM9: Definitely interesting imo if we can avoid the
dependency
between the
StreamsGroupInitialize and the StreamsGroupHeartbeat. I
totally
get
that
the initial client implementation will do a HB first,
and
that's
fine, but
not having the flow enforced at the protocol level
would
allow
for
further
improvement in the future (that initialize via admin
idea you
mentioned,
for instance). Actually, I may be missing something
about
the HB,
but if we
are at the point where HB requires that the topology
has been
initialized,
and the topology init requires the group, why is it the
heartbeat
RPC the
one responsible for the group creation? (vs.
StreamsGroupInitialize
creates
group if needed + HB just fails if topology not
initialized)

Thanks!

Lianet
(I didn't miss your answer on my INVALID_GROUP_TYPE
proposal,
just
still
thinking about it in sync with the same discussion
we're
having
on
the
KIP-1043 thread...I'll come back on that)

On Thu, Aug 1, 2024 at 10:55 AM Andrew Schofield <
andrew_schofi...@live.com>
wrote:

Hi Bruno,
Thanks for adding the detail on the schemas on records
written
to
__consumer_offsets.
I’ve reviewed them in detail and they look good to
me. I
have
one
naive
question.

AS11: I notice that an assignment is essentially a
set of
partition
indices for
subtopologies. Since a subtopology can be defined by a
source
topic
regex,
does
this mean that an assignment gives the same set of
partition
indices for
all topics
which happen to match the regex? So, a subtopology
reading
from
A*
that
matches
AB and AC would give the same set of partitions to
each
task for
both
topics, and
is not able to give AB:0 to one task and AC:0 to a
different
task.
Is this
correct?

Thanks,
Andrew

On 23 Jul 2024, at 16:16, Bruno Cadonna <
cado...@apache.org>
wrote:

Hi Lianet,

Thanks for the review!

Here my answers:

LM1. Is your question whether we need to send a full
heartbeat
each time
the member re-joins the group even if the information
in
the RPC
did not
change since the last heartbeat?

LM2. Is the reason for sending the instance ID each
time
that a
member
could shutdown, change the instance ID and then start
and
heartbeat
again,
but the group coordinator would never notice that the
instance
ID
changed?

LM3. I see your point. I am wondering whether this
additional
information is worth the dependency between the group
types. To
return
INVALID_GROUP_TYPE, the group coordinator needs to
know
that a
group ID
exists with a different group type. With a group
coordinator as
we
have it
now in Apache Kafka that manages all group types,
that is
not a
big
deal,
but imagine if we (or some implementation of the
Apache
Kafka
protocol)
decide to have a separate group coordinator for each
group
type.

LM4. Using INVALID_GROUP_ID if the group ID is empty
makes
sense
to me.
I going to change that.

LM5. I think there is a dependency from the
StreamsGroupInitialize
RPC
to the heartbeat. The group must exist when the
initialize
RPC
is
received
by the group coordinator. The group is created by the
heartbeat
RPC. I
would be in favor of making the initialize RPC
independent
from
the
heartbeat RPC. That would allow to initialize a
streams
group
explicitly
with an admin tool.

LM6. I think it affects streams and streams should
behave
as
the
consumer group.

LM7. Good point that we will consider.

LM8. Fixed! Thanks!


Best,
Bruno




On 7/19/24 9:53 PM, Lianet M. wrote:
Hi Lucas/Bruno, thanks for the great KIP! First
comments:
LM1. Related to where the KIP says:  *“Group ID,
member
ID,
member epoch
are sent with each heartbeat request. Any other
information
that
has not
changed since the last heartbeat can be omitted.”.
*I
expect
all
the
other
info also needs to be sent whenever a full
heartbeat is
required
(even
if
it didn’t change from the last heartbeat), ex. on
fencing
scenarios,
correct?
LM2. For consumer groups we always send the
groupInstanceId
(if
any) as
part of every heartbeat, along with memberId, epoch
and
groupId.
Should
we
consider that too here?
LM3. We’re proposing returning a GROUP_ID_NOT_FOUND
error
in
response to
the stream-specific RPCs if the groupId is
associated
with a
group type
that is not streams (ie. consumer group or share
group). I
wonder
if at
this point, where we're getting several new group
types
added,
each with
RPCs that are supposed to include groupId of a
certain
type,
we
should
be
more explicit about this situation. Maybe a kind of
INVALID_GROUP_TYPE
(group exists but not with a valid type for this
RPC) vs a
GROUP_ID_NOT_FOUND (group does not exist).  Those
errors
would be
consistently used across consumer, share, and
streams RPCs
whenever the
group id is not of the expected type.
This is truly not specific to this KIP, and should
be
addressed
with all
group types and their RPCs in mind. I just wanted
to bring
out my
concern
and get thoughts around it.
LM4. On a related note, StreamsGroupDescribe returns
INVALID_REQUEST if
groupId is empty. There is already an
INVALID_GROUP_ID
error,
that seems
more specific to this situation. Error handling of
specific
errors would
definitely be easier than having to deal with a
generic
INVALID_REQUEST
(and probably its custom message). I know that for
KIP-848 we
have
INVALID_REQUEST for similar situations, so if ever
we take
down
this
path
we should review it there too for consistency.
Thoughts?
LM5. The dependency between the
StreamsGroupHeartbeat RPC
and
the
StreamsGroupInitialize RPC is one-way only right? HB
requires
a
previous
StreamsGroupInitialize request, but
StreamsGroupInitialize
processing is
totally independent of heartbeats (and could
perfectly be
processed
without
a previous HB, even though the client
implementation we’re
proposing
won’t
go down that path). Is my understanding correct?
Just to
double
check,
seems sensible like that at the protocol level.
LM6. With KIP-848, there is an important
improvement that
brings a
difference in behaviour around the static
membership:
with the
classic
protocol, if a static member joins with a group
instance
already
in
use, it
makes the initial member fail with a
FENCED_INSTANCED_ID
exception, vs.
with the new consumer group protocol, the second
member
trying to
join
fails with an UNRELEASED_INSTANCE_ID. Does this
change
need
to be
considered in any way for the streams app? (I'm not
familiar
with
KS
yet,
but thought it was worth asking. If it doesn't
affect in
any
way,
still
maybe helpful to call it out on a section for static
membership)
LM7. Regarding the admin tool to manage streams
groups.
We can
discuss
whether to have it here or separately, but I think
we
should
aim
for
some
basic admin capabilities from the start, mainly
because I
believe
it
will
be very helpful/needed in practice during the impl
of the
KIP.
From
experience with KIP-848, we felt a bit blindfolded
in the
initial
phase
where we still didn't have kafka-consumer-groups
dealing
with
the
new
groups (and then it was very helpful and used when
we were
able to
easily
inspect them from the console)
LM8. nit: the links the KIP-848 are not quite right
(pointing
to
an
unrelated “Future work section” at the end of
KIP-848)
Thanks!
Lianet
On Fri, Jul 19, 2024 at 11:13 AM Lucas Brutschy
<lbruts...@confluent.io.invalid> wrote:
Hi Andrew,

AS2: I added a note for now. If others feel
strongly
about
it,
we can
still add more administrative tools to the KIP - it
should
not
change
the overall story significantly.

AS8: "streams.group.assignor.name" sounds good to
me to
distinguish
the config from class names. Not sure if I like the
"default".
To be
consistent, we'd then have to call it
`group.streams.default.session.timeout.ms` as
well. I
only
added the
`.name` on both broker and group level for now.

AS10: Ah, I misread your comment, now I know what
you
meant.
Good
point, fixed (by Bruno).

Cheers,
Lucas

On Fri, Jul 19, 2024 at 4:44 PM Andrew Schofield
<andrew_schofi...@live.com> wrote:

Hi Lucas,
I see that I hit send too quickly. One more
comment:

AS2: I think stating that there will be a
`kafka-streams-group.sh` in
a
future KIP is fine to keep this KIP focused.
Personally, I
would
probably
put all of the gory details in this KIP, but then
it’s
not
my
KIP. A
future
pointer is fine too.

Thanks,
Andrew


On 19 Jul 2024, at 13:46, Lucas Brutschy <
lbruts...@confluent.io
.INVALID>
wrote:

Hi Andrew,

thanks for getting the discussion going! Here
are my
responses.

AS1: Good point, done.

AS2: We were planning to add more administrative
tools
to
the
interface in a follow-up KIP, to not make this
KIP too
large.
If
people think that it would help to understand the
overall
picture if
we already add something like
`kafka-streams-groups.sh`, we
will do
that. I also agree that we should address how
this
relates
to
KIP-1043, we'll add it.

AS3: Good idea, that's more consistent with
`assigning` and
`reconciling` etc.

AS4: Thanks, Fixed.

AS5: Good catch. This was supposed to mean that
we
require
CREATE on
cluster or CREATE on all topics, not both. Fixed.

AS6: Thanks, Fixed.

AS7. Thanks, Fixed.

AS8: I think this works a bit different in this
KIP
than in
consumer
groups. KIP-848 lets the members vote for a
preferred
assignor, and
the broker-side assignor is picked by majority
vote.
The
`group.consumer.assignors` specifies the list of
assignors
that are
supported on the broker, and is configurable
because
the
interface is
pluggable. In this KIP, the task assignor is not
voted
on
by
members
but configured on the broker-side.
`group.streams.assignor` is
used
for this, and uses a specific name. If we'll
make the
task
assignor
pluggable on the broker-side, we'd introduce a
separate
config
`group.streams.assignors`, which would indeed be
a
list of
class
names. I think there is no conflict here, the two
configurations
serve
different purposes.  The only gripe I'd have
here is
naming as
`group.streams.assignor` and
`group.streams.assignors`
would
be a bit
similar, but I cannot really think of a better
name for
`group.streams.assignor`, so I'd probably rather
introduce
`group.streams.assignors`  as
`group.streams.possible_assignors`  or
something like that.

AS9: I added explanations for the various record
types.
Apart
from
the
new topology record, and the partition metadata
(which
is
based on
the
topology and can only be created once we have a
topology
initialized)
the lifecycle for the records is basically
identical
as in
KIP-848.

AS10: In the consumer offset topic, the version
in the
key
is
used to
differentiate different schema types with the
same
content. So
the
keys are not versioned, but the version field is
"abused"
as a
type
tag. This is the same in KIP-848, we followed it
for
consistency.

Cheers,
Lucas


On Thu, Jul 18, 2024 at 1:27 PM Andrew Schofield
<andrew_schofi...@live.com> wrote:

Hi Lucas and Bruno,

Thanks for the great KIP.

I've read through the document and have some
initial
comments.

AS1: I suppose that there is a new
o.a.k.common.GroupType.STREAMS
enumeration
constant. This is a change to the public
interface and
should
be
called out.

AS2: Since streams groups are no longer consumer
groups,
how
does
the
user
manipulate them, observe lag and so on? Will
you add
`kafka-streams-groups.sh`
or extend `kafka-streams-application-reset.sh`?
Of
course,
KIP-1043
can easily
be extended to support streams groups, but that
only
lets
the
user
see the
groups, not manipulate them.

AS3: I wonder whether the streams group state of
UNINITIALIZED would
be
better expressed as INITIALIZING.

AS4: In StreamsGroupInitializeRequest,
Topology[].SourceTopicRegex
should
be nullable.

AS5: Why does StreamsGroupInitialize require
CREATE
permission on
the
cluster resource? I imagine that this is one of
the
ways
that
the
request might
be granted permission to create the
StateChangelogTopics
and
RepartitionSourceTopics, but if it is granted
permission
to
create
those topics
with specific ACLs, would CREATE on the cluster
resource
still be
required?

AS6: StreamsGroupInitialize can also fail with
TOPIC_AUTHORIZATION_FAILED
and (subject to AS5)
CLUSTER_AUTHORIZATION_FAILED.

AS7: A tiny nit. You've used TopologyID
(capitals) in
StreamsGroupHeartbeatRequest
and a few others, but in all other cases the
fields
which
are
ids
are
spelled Id.
I suggest TopologyId.

Also, "interal" is probably meant to be
"interval”.

AS8: For consumer groups, the
`group.consumer.assignors`
configuration is a
list of class names. The assignors do have
names too,
but
the
configuration which
enables them is in terms of class names. I
wonder
whether
the
broker’s
group.streams.assignor could actually be
`group.streams.assignors`
and specified
as a list of the class names of the supplied
assignors. I
know
you're
not supporting
other assignors yet, but when you do, I expect
you
would
prefer to
have used class
names from the start.

The use of assignor names in the other places
looks
good
to
me.

AS9: I'd find it really helpful to have a bit
of a
description about
the purpose and
lifecycle of the 9 record types you've
introduced on
the
__consumer_offsets topic.
I did a cursory review but without really
understanding
what's
written when,
I can't do a thorough job of reviewing.

AS10: In the definitions of the record keys,
such as
StreamsGroupCurrentMemberAssignmentKey, the
versions
of
the
fields
must
match the versions of the types.

Thanks,
Andrew

On 12 Jul 2024, at 09:04, Lucas Brutschy <
lbruts...@confluent.io
.INVALID>
wrote:

Hi all,

I would like to start a discussion thread on
KIP-1071:
Streams
Rebalance Protocol. With this KIP, we aim to
bring
the
principles
laid
down by KIP-848 to Kafka Streams, to make
rebalances
more
reliable
and
scalable, and make Kafka Streams overall
easier to
deploy and
operate.
The KIP proposed moving the assignment logic
to the
broker,
and
introducing a dedicated group type and
dedicated
RPCs for
Kafka
Streams.

The KIP is here:






https://cwiki.apache.org/confluence/display/KAFKA/KIP-1071%3A+Streams+Rebalance+Protocol

This is joint work with Bruno Cadonna.

Please take a look and let us know what you
think.

Best,
Lucas











Reply via email to