Hi Jun,

Thanks for your thorough review. There is already a vote thread if you
want to vote.

Best,
David

On Tue, Oct 18, 2022 at 11:07 PM Jun Rao <j...@confluent.io.invalid> wrote:
>
> Hi, David,
>
> Thanks for the reply. No more comments from me.
>
> 80. Yes, since PrepareAssignment returns topicIds, using topicId in
> ConsumerGroupInstallAssignmentRequest makes sense.
>
> 81. Sounds good.
>
> Jun
>
> On Tue, Oct 18, 2022 at 11:46 AM David Jacot <dja...@confluent.io.invalid>
> wrote:
>
> > Hi Jun,
> >
> > 81. I forgot to say that I put UniformAssignor as the first one in the
> > list. I think that it should be the default one.
> >
> > Best,
> > David
> >
> > On Tue, Oct 18, 2022 at 8:33 PM David Jacot <dja...@confluent.io> wrote:
> > >
> > > Hi Jun,
> > >
> > > 80. Hmm. It seems preferable to keep
> > > ConsumerGroupInstallAssignmentRequest.Members.Partitions.TopicId as a
> > > topic id in order for the RPC to remain symmetrical with the
> > > PrepareAssignment RPCs. The client has to create the TopicPartitions
> > > from the mapping provided in the PrepareAssignment response so it can
> > > use the same mapping to convert them back to topic ids afterwards. I
> > > personally find this cleaner from an RPC perspective, don't you think?
> > >
> > > 81. Make sense. Done.
> > >
> > > Thanks,
> > > David
> > >
> > > On Tue, Oct 18, 2022 at 8:00 PM Jun Rao <j...@confluent.io.invalid>
> > wrote:
> > > >
> > > > Hi, David,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > 80. The change in ConsumerGroupPrepareAssignmentResponse sounds good
> > to me.
> > > > Should
> > ConsumerGroupInstallAssignmentRequest.Members.Partitions.TopicId be
> > > > topic name? Intuitively, the client assignor assigns partitions based
> > on
> > > > topic names and it seems that the group coordinator should be
> > responsible
> > > > for mapping the topic names to topic ids.
> > > >
> > > > 81. group.consumer.assignors: Should we change the default values to
> > > > include the full class name?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Tue, Oct 18, 2022 at 2:36 AM David Jacot
> > <dja...@confluent.io.invalid>
> > > > wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > 80. I have included the mapping from topic ids to topic names in
> > > > > ConsumerGroupPrepareAssignmentResponse.Topics. It can be used to
> > > > > convert all the topic ids that you mentioned. It seems preferable to
> > > > > me to keep it like this as topic ids are usually smaller than topic
> > > > > names. Does that make sense?
> > > > >
> > > > > Thanks,
> > > > > David
> > > > >
> > > > > On Tue, Oct 18, 2022 at 1:51 AM Jun Rao <j...@confluent.io.invalid>
> > wrote:
> > > > > >
> > > > > > Hi, David,
> > > > > >
> > > > > > Thanks for the reply. Just one more comment.
> > > > > >
> > > > > > 80. Since PartitionAssignor.AssignmentMemberSpec uses topic name
> > for both
> > > > > > subscribedTopics and targetPartitions, in
> > > > > > ConsumerGroupPrepareAssignmentResponse, should
> > Members.SubscribedTopicIds
> > > > > > and Members.TopicPartitions.TopicId be topic names too? Similarly,
> > since
> > > > > > PartitionAssignor.MemberAssignment uses topic name, should
> > > > > > ConsumerGroupInstallAssignmentRequest.Members.Partitions.TopicId
> > be topic
> > > > > > name too?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > >
> > > > > > On Mon, Oct 17, 2022 at 2:35 AM David Jacot
> > <dja...@confluent.io.invalid
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Jun,
> > > > > > >
> > > > > > > Thanks for your comments. Please find my answers below.
> > > > > > >
> > > > > > > 60. Sure. Let me use a concrete example to illustrate it. Let's
> > assume
> > > > > > > that KStreams has a member A with a task reading from foo-0 and a
> > > > > > > member B with a standby task reading from foo-0. As you know, the
> > > > > > > standby task information for B is encoded in the assignment
> > metadata
> > > > > > > whereas the assigned partition for A is in the assignment. Now,
> > let's
> > > > > > > imagine that the assignor decides to promote the standby task to
> > > > > > > become the active task for foo-0. The assignor will create a new
> > > > > > > assignment for the group and will encode the standby task in A's
> > > > > > > metadata and assign foo-0 to B. A will be requested to revoke
> > foo-0
> > > > > > > and, while it does so, B will get its new metadata but without
> > foo-0
> > > > > > > because foo-0 is not revoked yet. From the point of view of B,
> > it will
> > > > > > > see that the standby task is no longer there. Without providing
> > the
> > > > > > > full set of assigned partitions, it would not know what to do
> > here. If
> > > > > > > foo-0 is targeted to be assigned to B, B should wait until it is
> > > > > > > before stopping the standby task. If foo-0 is not, it can stop
> > the
> > > > > > > standby immediately. Long story short, KStreams needs to get the
> > full
> > > > > > > assignment + metadata in order to reason about the correct end
> > state.
> > > > > > >
> > > > > > > How do we provide this? We have added the pending partitions in
> > the
> > > > > > > ConsumerGroupHeartbeatResponse in conjunction to the assigned
> > > > > > > partitions. With this, the Consumer knows the full set of
> > partitions.
> > > > > > > PartitionAssignor#onAssign will be called when the member
> > transitions
> > > > > > > to a new epoch with the target partitions and the metadata for
> > the
> > > > > > > member. Then, RebalanceListener#onAssignedPartitions is called
> > when
> > > > > > > partitions are incrementally assigned. At most, it will be
> > called N
> > > > > > > times where N is the number of assigned partitions in the current
> > > > > > > epoch.
> > > > > > >
> > > > > > > For context, this was discussed with Guohzang in this thread.
> > > > > > >
> > > > > > > I hope that this clarifies the intent.
> > > > > > >
> > > > > > > 62. Got it. I have reworked that part as well. Unfortunately, I
> > think
> > > > > > > that we should keep using a Set here because it is already there.
> > > > > > > Deprecating the current one to change the type is not worth it.
> > > > > > >
> > > > > > > 63. Fixed as well.
> > > > > > >
> > > > > > > 68.2 Right. The error is only used by the client side assignor.
> > The
> > > > > > > client assignor will report a non-zero error code while
> > installing a
> > > > > > > new assignment if it was not able to compute a new assignment
> > for the
> > > > > > > group. In this case, the assignment is not changed but the error
> > is
> > > > > > > reported to all the members. KStreams will have a few of such
> > errors.
> > > > > > > They are listed in the KIP.
> > > > > > >
> > > > > > > When a new assignment is installed with the
> > > > > > > ConsumerGroupInstallAssignment API, the coordinator validates it
> > and
> > > > > > > rejects the installation directly if it is not valid. In this
> > case,
> > > > > > > only the member trying to install the assignment gets an error.
> > The
> > > > > > > other members are not aware of it as they keep their current
> > assignor.
> > > > > > >
> > > > > > > 71.1 That makes sense. Using SubscribedTopicNames is indeed more
> > > > > > > intuitive. I have changed the HB request to use it in order to be
> > > > > > > consistent. I have also added the topic names in the
> > > > > > > ConsumerGroupDescribe response.
> > > > > > >
> > > > > > > Best,
> > > > > > > David
> > > > > > >
> > > > > > > On Fri, Oct 14, 2022 at 8:05 PM Jun Rao <j...@confluent.io.invalid
> > >
> > > > > wrote:
> > > > > > > >
> > > > > > > > Hi, David,
> > > > > > > >
> > > > > > > > Thanks for the reply.
> > > > > > > >
> > > > > > > > 60. Hmm, could you explain why KStreams needs the full set of
> > > > > partition
> > > > > > > > assignments? I am also not sure how this will be implemented
> > based
> > > > > on the
> > > > > > > > protocol. Since HeartBeatResponse sends the assigned
> > partitions in
> > > > > phases
> > > > > > > > (those that don't need to wait for revocation from other
> > members
> > > > > first,
> > > > > > > > followed by the full assignment list), how does a member know
> > which
> > > > > > > > response has the full assignment?
> > > > > > > >
> > > > > > > > 62. I was referring to Admin#describeConsumerGroups. It seems
> > that
> > > > > > > > ownedPartitions is still of type List<TopicIdPartition>. Also,
> > the
> > > > > > > > existing topicPartitions() returns Set<TopicPartition>, not a
> > > > > collection.
> > > > > > > >
> > > > > > > > 63. This is also in Admin#describeConsumerGroups. The comment
> > seems
> > > > > > > > inconsistent with the field name.
> > > > > > > >     /**
> > > > > > > >      * The reason reported by the assignor.
> > > > > > > >      */
> > > > > > > >     byte error;
> > > > > > > >
> > > > > > > > 67. Thanks for the explanation. Make sense. The existing name
> > may be
> > > > > ok.
> > > > > > > >
> > > > > > > > 68.2 Are you saying the error is only intended for the client
> > > > > assignor?
> > > > > > > But
> > > > > > > > the coordinator generates the error based on the server side
> > > > > validation,
> > > > > > > > right? Should we provide some info to tell the client why the
> > > > > validation
> > > > > > > > fails?
> > > > > > > >
> > > > > > > > 71.1 Hmm, for SubscribedTopicIds, should we use topic name in
> > the
> > > > > > > > subscription part? That seems more intuitive---a subscription
> > > > > shouldn't
> > > > > > > > change just because a topic is recreated. For the assigned
> > > > > partitions,
> > > > > > > > perhaps we could include both topicId and name just like
> > > > > > > FetchOffsetRequest.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > > On Fri, Oct 14, 2022 at 2:49 AM Luke Chen <show...@gmail.com>
> > wrote:
> > > > > > > >
> > > > > > > > > Thanks for the update.
> > > > > > > > > Yes, I think using similar way as KIP-868 to fix this issue
> > makes
> > > > > > > sense.
> > > > > > > > > Let's consider it in the future.
> > > > > > > > >
> > > > > > > > > Luke
> > > > > > > > >
> > > > > > > > > On Fri, Oct 14, 2022 at 5:16 PM David Jacot
> > > > > > > <dja...@confluent.io.invalid>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Luke,
> > > > > > > > > >
> > > > > > > > > > Thanks for your questions.
> > > > > > > > > >
> > > > > > > > > > > 1. We will store the "targetAssignment" into log now.
> > But as we
> > > > > > > know,
> > > > > > > > > > there's max batch size limit (default 1MB), which means, we
> > > > > cannot
> > > > > > > > > support
> > > > > > > > > > 1M partitions in one group (actually, it should be less
> > than 60k
> > > > > > > > > partitions
> > > > > > > > > > since we'll store {topicID+partition id}) by default now.
> > How
> > > > > will we
> > > > > > > > > > handle that? Do we expect users to adjust the max batch
> > size to
> > > > > > > support
> > > > > > > > > > large partitions in groups, which we don't need this
> > change for
> > > > > old
> > > > > > > > > > protocol?
> > > > > > > > > >
> > > > > > > > > > That's right. I have a few ideas to remove this limitation
> > in the
> > > > > > > > > > future but I decided to keep them for future improvement.
> > The
> > > > > KIP is
> > > > > > > > > > large enough and as the current protocol suffers from the
> > exact
> > > > > same
> > > > > > > > > > limitation, it is not a regression.
> > > > > > > > > >
> > > > > > > > > > For the future, my thinking is to split the assignment and
> > to
> > > > > only
> > > > > > > > > > write deltas to the log instead of re-writing all of it.
> > We would
> > > > > > > need
> > > > > > > > > > to use transactions for this in the coordinator (similarly
> > to
> > > > > > > > > > KIP-868). The challenge is that we have to ensure that
> > those
> > > > > deltas
> > > > > > > > > > are all written or completely roll backed. Otherwise, we
> > would
> > > > > have a
> > > > > > > > > > weird state with the compaction. This needs obviously more
> > > > > thinking.
> > > > > > > > > >
> > > > > > > > > > > I'm wondering why we should persist the
> > "targetAssignment"
> > > > > data?
> > > > > > > If we
> > > > > > > > > > want
> > > > > > > > > > to work for coordinator failover, could the new
> > coordinator try
> > > > > to
> > > > > > > > > request
> > > > > > > > > > for currently owned partitions from each consumer when
> > failed
> > > > > over?
> > > > > > > I'm
> > > > > > > > > not
> > > > > > > > > > sure if the consumer will auto send owned partitions to
> > the new
> > > > > > > > > > coordinator. If not, maybe we can return an error to client
> > > > > > > > > > ConsumerGroupHeartbeat API with REQUIRE_OWNED_PARTITION
> > error,
> > > > > and
> > > > > > > ask
> > > > > > > > > > client to append the currently owned partitions to new
> > > > > coordinates
> > > > > > > for
> > > > > > > > > new
> > > > > > > > > > assignment computation. Does that make sense?
> > > > > > > > > >
> > > > > > > > > > The entire reconciliation process depends on it so if we
> > lose it
> > > > > > > > > > during a failover, members could be in a weird state. For
> > > > > instance,
> > > > > > > > > > they could be in the middle of a transition from their
> > current
> > > > > > > > > > assignment to their new target and thus would be blocked.
> > > > > Relying on
> > > > > > > > > > members to reconstruct it back does not really work
> > because they
> > > > > > > don't
> > > > > > > > > > have all the information to do so (e.g. new metadata) so
> > we would
> > > > > > > have
> > > > > > > > > > to recompute a new one. This implies that we need to get
> > the
> > > > > owned
> > > > > > > > > > partitions from all members and that would take a few
> > seconds
> > > > > until
> > > > > > > > > > all members come back in the best case, up to the session
> > > > > timeout in
> > > > > > > > > > the worst case. Imagine that a member joins or fails
> > during this
> > > > > > > time,
> > > > > > > > > > the whole process would be stuck. I am afraid storing it
> > is the
> > > > > best
> > > > > > > > > > way here.
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > David
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Fri, Oct 14, 2022 at 5:11 AM Luke Chen <
> > show...@gmail.com>
> > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > Hi David,
> > > > > > > > > > >
> > > > > > > > > > > A few more questions:
> > > > > > > > > > > 1. We will store the "targetAssignment" into log now.
> > But as we
> > > > > > > know,
> > > > > > > > > > > there's max batch size limit (default 1MB), which means,
> > we
> > > > > cannot
> > > > > > > > > > support
> > > > > > > > > > > 1M partitions in one group (actually, it should be less
> > than
> > > > > 60k
> > > > > > > > > > partitions
> > > > > > > > > > > since we'll store {topicID+partition id}) by default
> > now. How
> > > > > will
> > > > > > > we
> > > > > > > > > > > handle that? Do we expect users to adjust the max batch
> > size to
> > > > > > > support
> > > > > > > > > > > large partitions in groups, which we don't need this
> > change
> > > > > for old
> > > > > > > > > > > protocol?
> > > > > > > > > > >
> > > > > > > > > > > I'm wondering why we should persist the
> > "targetAssignment"
> > > > > data?
> > > > > > > If we
> > > > > > > > > > want
> > > > > > > > > > > to work for coordinator failover, could the new
> > coordinator
> > > > > try to
> > > > > > > > > > request
> > > > > > > > > > > for currently owned partitions from each consumer when
> > failed
> > > > > > > over? I'm
> > > > > > > > > > not
> > > > > > > > > > > sure if the consumer will auto send owned partitions to
> > the new
> > > > > > > > > > > coordinator. If not, maybe we can return an error to
> > client
> > > > > > > > > > > ConsumerGroupHeartbeat API with REQUIRE_OWNED_PARTITION
> > error,
> > > > > and
> > > > > > > ask
> > > > > > > > > > > client to append the currently owned partitions to new
> > > > > coordinates
> > > > > > > for
> > > > > > > > > > new
> > > > > > > > > > > assignment computation. Does that make sense?
> > > > > > > > > > >
> > > > > > > > > > > Luke
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Oct 14, 2022 at 12:22 AM Jun Rao
> > > > > <j...@confluent.io.invalid
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi, David,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for the reply and the updated KIP. A few more
> > > > > comments on
> > > > > > > the
> > > > > > > > > > > > interfaces and the protocols.
> > > > > > > > > > > >
> > > > > > > > > > > > 60.  On the consumer side, do we need both
> > > > > > > > > > PartitionAssignor.onAssignment
> > > > > > > > > > > > and ConsumerRebalanceListener.onPartitionsAssigned? My
> > > > > > > understanding
> > > > > > > > > is
> > > > > > > > > > > > that the former was added for cooperative rebalance,
> > which
> > > > > is now
> > > > > > > > > > handled
> > > > > > > > > > > > by the coordinator. If we do need both, should we make
> > them
> > > > > more
> > > > > > > > > > consistent
> > > > > > > > > > > > (e.g. topic name vs topic id, list vs set vs
> > collection)?
> > > > > > > > > > > >
> > > > > > > > > > > > 61. group.local.assignors: Could we make it clear that
> > it's
> > > > > the
> > > > > > > full
> > > > > > > > > > class
> > > > > > > > > > > > name that implements PartitionAssignor?
> > > > > > > > > > > >
> > > > > > > > > > > > 62. MemberAssignment: It currently has the following
> > method.
> > > > > > > > > > > >     public Set<TopicPartition> topicPartitions()
> > > > > > > > > > > > We are adding List<TopicIdPartition> ownedPartitions.
> > Should
> > > > > we
> > > > > > > keep
> > > > > > > > > > the
> > > > > > > > > > > > naming and the return type consistent?
> > > > > > > > > > > >
> > > > > > > > > > > > 63. MemberAssignment.error: should that be reason?
> > > > > > > > > > > >
> > > > > > > > > > > > 64. group.remote.assignor: The client may not know what
> > > > > > > assignors the
> > > > > > > > > > > > broker supports. Should we default this to what the
> > broker
> > > > > > > determines
> > > > > > > > > > (e.g.
> > > > > > > > > > > > first assignor listed in group.consumer.assignors)?
> > > > > > > > > > > >
> > > > > > > > > > > > 65. After the text "When A heartbeats again and
> > acknowledges
> > > > > the
> > > > > > > > > > > > revocation, the group coordinator transitions him to
> > epoch 2
> > > > > and
> > > > > > > > > > releases
> > > > > > > > > > > > foo-2.", we have the following.
> > > > > > > > > > > >   B - epoch=2, partitions=[foo-2],
> > pending-partitions=[]
> > > > > > > > > > > > Should foo-2 be in pending-partitions?
> > > > > > > > > > > >
> > > > > > > > > > > > 66. In the Online Migration example, is the first
> > occurence
> > > > > of
> > > > > > > "C -
> > > > > > > > > > > > epoch=23, partitions=[foo-2, foo-5, foo-4],
> > > > > > > pending-partitions=[]"
> > > > > > > > > > correct?
> > > > > > > > > > > > It seems that should happen after C receives a
> > SyncGroup
> > > > > > > response? If
> > > > > > > > > > so,
> > > > > > > > > > > > subsequent examples have the same issue.
> > > > > > > > > > > >
> > > > > > > > > > > > 67. ConsumerGroupHeartbeatRequest.RebalanceTimeoutMs :
> > Which
> > > > > > > config
> > > > > > > > > > > > controls this? How is this used by the group
> > coordinator
> > > > > since
> > > > > > > there
> > > > > > > > > > is no
> > > > > > > > > > > > sync barrier anymore?
> > > > > > > > > > > >
> > > > > > > > > > > > 68. ConsumerGroupHeartbeatResponse:
> > > > > > > > > > > > 68.1 AssignedTopicPartitions and
> > PendingTopicPartitions are
> > > > > of
> > > > > > > type
> > > > > > > > > > > > []TopicPartition. Should they be TopicPartitions?
> > > > > > > > > > > > 68.2 Assignment.error. Should we also have an
> > errorMessage
> > > > > field?
> > > > > > > > > > > >
> > > > > > > > > > > > 69.
> > ConsumerGroupPrepareAssignmentResponse.Members.Assignor:
> > > > > > > Should
> > > > > > > > > it
> > > > > > > > > > > > include the selected assignor name?
> > > > > > > > > > > >
> > > > > > > > > > > > 70. ConsumerGroupInstallAssignmentRequest.GroupEpoch:
> > Should
> > > > > we
> > > > > > > let
> > > > > > > > > the
> > > > > > > > > > > > client set this? Intuitively, it seems the coordinator
> > should
> > > > > > > manage
> > > > > > > > > > the
> > > > > > > > > > > > group epoch.
> > > > > > > > > > > >
> > > > > > > > > > > > 71. ConsumerGroupDescribeResponse:
> > > > > > > > > > > > 71.1 Members.Assignment.Partitions. Should we include
> > the
> > > > > topic
> > > > > > > name
> > > > > > > > > > too
> > > > > > > > > > > > since it's convenient for building tools? Ditto for
> > > > > > > TargetAssignment.
> > > > > > > > > > > > 71.2 Members: Should we include SubscribedTopicRegex
> > too?
> > > > > > > > > > > >
> > > > > > > > > > > > 72. OffsetFetchRequest: Is GenerationIdOrMemberEpoch
> > needed
> > > > > since
> > > > > > > > > > tools may
> > > > > > > > > > > > also want to issue this request?
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > >
> > > > > > > > > > > > Jun
> > > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > >
> >

Reply via email to