Hi Jun, Thanks for your thorough review. There is already a vote thread if you want to vote.
Best, David On Tue, Oct 18, 2022 at 11:07 PM Jun Rao <j...@confluent.io.invalid> wrote: > > Hi, David, > > Thanks for the reply. No more comments from me. > > 80. Yes, since PrepareAssignment returns topicIds, using topicId in > ConsumerGroupInstallAssignmentRequest makes sense. > > 81. Sounds good. > > Jun > > On Tue, Oct 18, 2022 at 11:46 AM David Jacot <dja...@confluent.io.invalid> > wrote: > > > Hi Jun, > > > > 81. I forgot to say that I put UniformAssignor as the first one in the > > list. I think that it should be the default one. > > > > Best, > > David > > > > On Tue, Oct 18, 2022 at 8:33 PM David Jacot <dja...@confluent.io> wrote: > > > > > > Hi Jun, > > > > > > 80. Hmm. It seems preferable to keep > > > ConsumerGroupInstallAssignmentRequest.Members.Partitions.TopicId as a > > > topic id in order for the RPC to remain symmetrical with the > > > PrepareAssignment RPCs. The client has to create the TopicPartitions > > > from the mapping provided in the PrepareAssignment response so it can > > > use the same mapping to convert them back to topic ids afterwards. I > > > personally find this cleaner from an RPC perspective, don't you think? > > > > > > 81. Make sense. Done. > > > > > > Thanks, > > > David > > > > > > On Tue, Oct 18, 2022 at 8:00 PM Jun Rao <j...@confluent.io.invalid> > > wrote: > > > > > > > > Hi, David, > > > > > > > > Thanks for the reply. > > > > > > > > 80. The change in ConsumerGroupPrepareAssignmentResponse sounds good > > to me. > > > > Should > > ConsumerGroupInstallAssignmentRequest.Members.Partitions.TopicId be > > > > topic name? Intuitively, the client assignor assigns partitions based > > on > > > > topic names and it seems that the group coordinator should be > > responsible > > > > for mapping the topic names to topic ids. > > > > > > > > 81. group.consumer.assignors: Should we change the default values to > > > > include the full class name? > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > On Tue, Oct 18, 2022 at 2:36 AM David Jacot > > <dja...@confluent.io.invalid> > > > > wrote: > > > > > > > > > Hi Jun, > > > > > > > > > > 80. I have included the mapping from topic ids to topic names in > > > > > ConsumerGroupPrepareAssignmentResponse.Topics. It can be used to > > > > > convert all the topic ids that you mentioned. It seems preferable to > > > > > me to keep it like this as topic ids are usually smaller than topic > > > > > names. Does that make sense? > > > > > > > > > > Thanks, > > > > > David > > > > > > > > > > On Tue, Oct 18, 2022 at 1:51 AM Jun Rao <j...@confluent.io.invalid> > > wrote: > > > > > > > > > > > > Hi, David, > > > > > > > > > > > > Thanks for the reply. Just one more comment. > > > > > > > > > > > > 80. Since PartitionAssignor.AssignmentMemberSpec uses topic name > > for both > > > > > > subscribedTopics and targetPartitions, in > > > > > > ConsumerGroupPrepareAssignmentResponse, should > > Members.SubscribedTopicIds > > > > > > and Members.TopicPartitions.TopicId be topic names too? Similarly, > > since > > > > > > PartitionAssignor.MemberAssignment uses topic name, should > > > > > > ConsumerGroupInstallAssignmentRequest.Members.Partitions.TopicId > > be topic > > > > > > name too? > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > On Mon, Oct 17, 2022 at 2:35 AM David Jacot > > <dja...@confluent.io.invalid > > > > > > > > > > > > wrote: > > > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > > > Thanks for your comments. Please find my answers below. > > > > > > > > > > > > > > 60. Sure. Let me use a concrete example to illustrate it. Let's > > assume > > > > > > > that KStreams has a member A with a task reading from foo-0 and a > > > > > > > member B with a standby task reading from foo-0. As you know, the > > > > > > > standby task information for B is encoded in the assignment > > metadata > > > > > > > whereas the assigned partition for A is in the assignment. Now, > > let's > > > > > > > imagine that the assignor decides to promote the standby task to > > > > > > > become the active task for foo-0. The assignor will create a new > > > > > > > assignment for the group and will encode the standby task in A's > > > > > > > metadata and assign foo-0 to B. A will be requested to revoke > > foo-0 > > > > > > > and, while it does so, B will get its new metadata but without > > foo-0 > > > > > > > because foo-0 is not revoked yet. From the point of view of B, > > it will > > > > > > > see that the standby task is no longer there. Without providing > > the > > > > > > > full set of assigned partitions, it would not know what to do > > here. If > > > > > > > foo-0 is targeted to be assigned to B, B should wait until it is > > > > > > > before stopping the standby task. If foo-0 is not, it can stop > > the > > > > > > > standby immediately. Long story short, KStreams needs to get the > > full > > > > > > > assignment + metadata in order to reason about the correct end > > state. > > > > > > > > > > > > > > How do we provide this? We have added the pending partitions in > > the > > > > > > > ConsumerGroupHeartbeatResponse in conjunction to the assigned > > > > > > > partitions. With this, the Consumer knows the full set of > > partitions. > > > > > > > PartitionAssignor#onAssign will be called when the member > > transitions > > > > > > > to a new epoch with the target partitions and the metadata for > > the > > > > > > > member. Then, RebalanceListener#onAssignedPartitions is called > > when > > > > > > > partitions are incrementally assigned. At most, it will be > > called N > > > > > > > times where N is the number of assigned partitions in the current > > > > > > > epoch. > > > > > > > > > > > > > > For context, this was discussed with Guohzang in this thread. > > > > > > > > > > > > > > I hope that this clarifies the intent. > > > > > > > > > > > > > > 62. Got it. I have reworked that part as well. Unfortunately, I > > think > > > > > > > that we should keep using a Set here because it is already there. > > > > > > > Deprecating the current one to change the type is not worth it. > > > > > > > > > > > > > > 63. Fixed as well. > > > > > > > > > > > > > > 68.2 Right. The error is only used by the client side assignor. > > The > > > > > > > client assignor will report a non-zero error code while > > installing a > > > > > > > new assignment if it was not able to compute a new assignment > > for the > > > > > > > group. In this case, the assignment is not changed but the error > > is > > > > > > > reported to all the members. KStreams will have a few of such > > errors. > > > > > > > They are listed in the KIP. > > > > > > > > > > > > > > When a new assignment is installed with the > > > > > > > ConsumerGroupInstallAssignment API, the coordinator validates it > > and > > > > > > > rejects the installation directly if it is not valid. In this > > case, > > > > > > > only the member trying to install the assignment gets an error. > > The > > > > > > > other members are not aware of it as they keep their current > > assignor. > > > > > > > > > > > > > > 71.1 That makes sense. Using SubscribedTopicNames is indeed more > > > > > > > intuitive. I have changed the HB request to use it in order to be > > > > > > > consistent. I have also added the topic names in the > > > > > > > ConsumerGroupDescribe response. > > > > > > > > > > > > > > Best, > > > > > > > David > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 8:05 PM Jun Rao <j...@confluent.io.invalid > > > > > > > > wrote: > > > > > > > > > > > > > > > > Hi, David, > > > > > > > > > > > > > > > > Thanks for the reply. > > > > > > > > > > > > > > > > 60. Hmm, could you explain why KStreams needs the full set of > > > > > partition > > > > > > > > assignments? I am also not sure how this will be implemented > > based > > > > > on the > > > > > > > > protocol. Since HeartBeatResponse sends the assigned > > partitions in > > > > > phases > > > > > > > > (those that don't need to wait for revocation from other > > members > > > > > first, > > > > > > > > followed by the full assignment list), how does a member know > > which > > > > > > > > response has the full assignment? > > > > > > > > > > > > > > > > 62. I was referring to Admin#describeConsumerGroups. It seems > > that > > > > > > > > ownedPartitions is still of type List<TopicIdPartition>. Also, > > the > > > > > > > > existing topicPartitions() returns Set<TopicPartition>, not a > > > > > collection. > > > > > > > > > > > > > > > > 63. This is also in Admin#describeConsumerGroups. The comment > > seems > > > > > > > > inconsistent with the field name. > > > > > > > > /** > > > > > > > > * The reason reported by the assignor. > > > > > > > > */ > > > > > > > > byte error; > > > > > > > > > > > > > > > > 67. Thanks for the explanation. Make sense. The existing name > > may be > > > > > ok. > > > > > > > > > > > > > > > > 68.2 Are you saying the error is only intended for the client > > > > > assignor? > > > > > > > But > > > > > > > > the coordinator generates the error based on the server side > > > > > validation, > > > > > > > > right? Should we provide some info to tell the client why the > > > > > validation > > > > > > > > fails? > > > > > > > > > > > > > > > > 71.1 Hmm, for SubscribedTopicIds, should we use topic name in > > the > > > > > > > > subscription part? That seems more intuitive---a subscription > > > > > shouldn't > > > > > > > > change just because a topic is recreated. For the assigned > > > > > partitions, > > > > > > > > perhaps we could include both topicId and name just like > > > > > > > FetchOffsetRequest. > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 2:49 AM Luke Chen <show...@gmail.com> > > wrote: > > > > > > > > > > > > > > > > > Thanks for the update. > > > > > > > > > Yes, I think using similar way as KIP-868 to fix this issue > > makes > > > > > > > sense. > > > > > > > > > Let's consider it in the future. > > > > > > > > > > > > > > > > > > Luke > > > > > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 5:16 PM David Jacot > > > > > > > <dja...@confluent.io.invalid> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Luke, > > > > > > > > > > > > > > > > > > > > Thanks for your questions. > > > > > > > > > > > > > > > > > > > > > 1. We will store the "targetAssignment" into log now. > > But as we > > > > > > > know, > > > > > > > > > > there's max batch size limit (default 1MB), which means, we > > > > > cannot > > > > > > > > > support > > > > > > > > > > 1M partitions in one group (actually, it should be less > > than 60k > > > > > > > > > partitions > > > > > > > > > > since we'll store {topicID+partition id}) by default now. > > How > > > > > will we > > > > > > > > > > handle that? Do we expect users to adjust the max batch > > size to > > > > > > > support > > > > > > > > > > large partitions in groups, which we don't need this > > change for > > > > > old > > > > > > > > > > protocol? > > > > > > > > > > > > > > > > > > > > That's right. I have a few ideas to remove this limitation > > in the > > > > > > > > > > future but I decided to keep them for future improvement. > > The > > > > > KIP is > > > > > > > > > > large enough and as the current protocol suffers from the > > exact > > > > > same > > > > > > > > > > limitation, it is not a regression. > > > > > > > > > > > > > > > > > > > > For the future, my thinking is to split the assignment and > > to > > > > > only > > > > > > > > > > write deltas to the log instead of re-writing all of it. > > We would > > > > > > > need > > > > > > > > > > to use transactions for this in the coordinator (similarly > > to > > > > > > > > > > KIP-868). The challenge is that we have to ensure that > > those > > > > > deltas > > > > > > > > > > are all written or completely roll backed. Otherwise, we > > would > > > > > have a > > > > > > > > > > weird state with the compaction. This needs obviously more > > > > > thinking. > > > > > > > > > > > > > > > > > > > > > I'm wondering why we should persist the > > "targetAssignment" > > > > > data? > > > > > > > If we > > > > > > > > > > want > > > > > > > > > > to work for coordinator failover, could the new > > coordinator try > > > > > to > > > > > > > > > request > > > > > > > > > > for currently owned partitions from each consumer when > > failed > > > > > over? > > > > > > > I'm > > > > > > > > > not > > > > > > > > > > sure if the consumer will auto send owned partitions to > > the new > > > > > > > > > > coordinator. If not, maybe we can return an error to client > > > > > > > > > > ConsumerGroupHeartbeat API with REQUIRE_OWNED_PARTITION > > error, > > > > > and > > > > > > > ask > > > > > > > > > > client to append the currently owned partitions to new > > > > > coordinates > > > > > > > for > > > > > > > > > new > > > > > > > > > > assignment computation. Does that make sense? > > > > > > > > > > > > > > > > > > > > The entire reconciliation process depends on it so if we > > lose it > > > > > > > > > > during a failover, members could be in a weird state. For > > > > > instance, > > > > > > > > > > they could be in the middle of a transition from their > > current > > > > > > > > > > assignment to their new target and thus would be blocked. > > > > > Relying on > > > > > > > > > > members to reconstruct it back does not really work > > because they > > > > > > > don't > > > > > > > > > > have all the information to do so (e.g. new metadata) so > > we would > > > > > > > have > > > > > > > > > > to recompute a new one. This implies that we need to get > > the > > > > > owned > > > > > > > > > > partitions from all members and that would take a few > > seconds > > > > > until > > > > > > > > > > all members come back in the best case, up to the session > > > > > timeout in > > > > > > > > > > the worst case. Imagine that a member joins or fails > > during this > > > > > > > time, > > > > > > > > > > the whole process would be stuck. I am afraid storing it > > is the > > > > > best > > > > > > > > > > way here. > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > David > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 5:11 AM Luke Chen < > > show...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > Hi David, > > > > > > > > > > > > > > > > > > > > > > A few more questions: > > > > > > > > > > > 1. We will store the "targetAssignment" into log now. > > But as we > > > > > > > know, > > > > > > > > > > > there's max batch size limit (default 1MB), which means, > > we > > > > > cannot > > > > > > > > > > support > > > > > > > > > > > 1M partitions in one group (actually, it should be less > > than > > > > > 60k > > > > > > > > > > partitions > > > > > > > > > > > since we'll store {topicID+partition id}) by default > > now. How > > > > > will > > > > > > > we > > > > > > > > > > > handle that? Do we expect users to adjust the max batch > > size to > > > > > > > support > > > > > > > > > > > large partitions in groups, which we don't need this > > change > > > > > for old > > > > > > > > > > > protocol? > > > > > > > > > > > > > > > > > > > > > > I'm wondering why we should persist the > > "targetAssignment" > > > > > data? > > > > > > > If we > > > > > > > > > > want > > > > > > > > > > > to work for coordinator failover, could the new > > coordinator > > > > > try to > > > > > > > > > > request > > > > > > > > > > > for currently owned partitions from each consumer when > > failed > > > > > > > over? I'm > > > > > > > > > > not > > > > > > > > > > > sure if the consumer will auto send owned partitions to > > the new > > > > > > > > > > > coordinator. If not, maybe we can return an error to > > client > > > > > > > > > > > ConsumerGroupHeartbeat API with REQUIRE_OWNED_PARTITION > > error, > > > > > and > > > > > > > ask > > > > > > > > > > > client to append the currently owned partitions to new > > > > > coordinates > > > > > > > for > > > > > > > > > > new > > > > > > > > > > > assignment computation. Does that make sense? > > > > > > > > > > > > > > > > > > > > > > Luke > > > > > > > > > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 12:22 AM Jun Rao > > > > > <j...@confluent.io.invalid > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi, David, > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the reply and the updated KIP. A few more > > > > > comments on > > > > > > > the > > > > > > > > > > > > interfaces and the protocols. > > > > > > > > > > > > > > > > > > > > > > > > 60. On the consumer side, do we need both > > > > > > > > > > PartitionAssignor.onAssignment > > > > > > > > > > > > and ConsumerRebalanceListener.onPartitionsAssigned? My > > > > > > > understanding > > > > > > > > > is > > > > > > > > > > > > that the former was added for cooperative rebalance, > > which > > > > > is now > > > > > > > > > > handled > > > > > > > > > > > > by the coordinator. If we do need both, should we make > > them > > > > > more > > > > > > > > > > consistent > > > > > > > > > > > > (e.g. topic name vs topic id, list vs set vs > > collection)? > > > > > > > > > > > > > > > > > > > > > > > > 61. group.local.assignors: Could we make it clear that > > it's > > > > > the > > > > > > > full > > > > > > > > > > class > > > > > > > > > > > > name that implements PartitionAssignor? > > > > > > > > > > > > > > > > > > > > > > > > 62. MemberAssignment: It currently has the following > > method. > > > > > > > > > > > > public Set<TopicPartition> topicPartitions() > > > > > > > > > > > > We are adding List<TopicIdPartition> ownedPartitions. > > Should > > > > > we > > > > > > > keep > > > > > > > > > > the > > > > > > > > > > > > naming and the return type consistent? > > > > > > > > > > > > > > > > > > > > > > > > 63. MemberAssignment.error: should that be reason? > > > > > > > > > > > > > > > > > > > > > > > > 64. group.remote.assignor: The client may not know what > > > > > > > assignors the > > > > > > > > > > > > broker supports. Should we default this to what the > > broker > > > > > > > determines > > > > > > > > > > (e.g. > > > > > > > > > > > > first assignor listed in group.consumer.assignors)? > > > > > > > > > > > > > > > > > > > > > > > > 65. After the text "When A heartbeats again and > > acknowledges > > > > > the > > > > > > > > > > > > revocation, the group coordinator transitions him to > > epoch 2 > > > > > and > > > > > > > > > > releases > > > > > > > > > > > > foo-2.", we have the following. > > > > > > > > > > > > B - epoch=2, partitions=[foo-2], > > pending-partitions=[] > > > > > > > > > > > > Should foo-2 be in pending-partitions? > > > > > > > > > > > > > > > > > > > > > > > > 66. In the Online Migration example, is the first > > occurence > > > > > of > > > > > > > "C - > > > > > > > > > > > > epoch=23, partitions=[foo-2, foo-5, foo-4], > > > > > > > pending-partitions=[]" > > > > > > > > > > correct? > > > > > > > > > > > > It seems that should happen after C receives a > > SyncGroup > > > > > > > response? If > > > > > > > > > > so, > > > > > > > > > > > > subsequent examples have the same issue. > > > > > > > > > > > > > > > > > > > > > > > > 67. ConsumerGroupHeartbeatRequest.RebalanceTimeoutMs : > > Which > > > > > > > config > > > > > > > > > > > > controls this? How is this used by the group > > coordinator > > > > > since > > > > > > > there > > > > > > > > > > is no > > > > > > > > > > > > sync barrier anymore? > > > > > > > > > > > > > > > > > > > > > > > > 68. ConsumerGroupHeartbeatResponse: > > > > > > > > > > > > 68.1 AssignedTopicPartitions and > > PendingTopicPartitions are > > > > > of > > > > > > > type > > > > > > > > > > > > []TopicPartition. Should they be TopicPartitions? > > > > > > > > > > > > 68.2 Assignment.error. Should we also have an > > errorMessage > > > > > field? > > > > > > > > > > > > > > > > > > > > > > > > 69. > > ConsumerGroupPrepareAssignmentResponse.Members.Assignor: > > > > > > > Should > > > > > > > > > it > > > > > > > > > > > > include the selected assignor name? > > > > > > > > > > > > > > > > > > > > > > > > 70. ConsumerGroupInstallAssignmentRequest.GroupEpoch: > > Should > > > > > we > > > > > > > let > > > > > > > > > the > > > > > > > > > > > > client set this? Intuitively, it seems the coordinator > > should > > > > > > > manage > > > > > > > > > > the > > > > > > > > > > > > group epoch. > > > > > > > > > > > > > > > > > > > > > > > > 71. ConsumerGroupDescribeResponse: > > > > > > > > > > > > 71.1 Members.Assignment.Partitions. Should we include > > the > > > > > topic > > > > > > > name > > > > > > > > > > too > > > > > > > > > > > > since it's convenient for building tools? Ditto for > > > > > > > TargetAssignment. > > > > > > > > > > > > 71.2 Members: Should we include SubscribedTopicRegex > > too? > > > > > > > > > > > > > > > > > > > > > > > > 72. OffsetFetchRequest: Is GenerationIdOrMemberEpoch > > needed > > > > > since > > > > > > > > > > tools may > > > > > > > > > > > > also want to issue this request? > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >