lucasbru commented on code in PR #18044:
URL: https://github.com/apache/kafka/pull/18044#discussion_r1871038216
##########
clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatRequest.java:
##########
@@ -37,6 +37,14 @@ public class StreamsGroupHeartbeatRequest extends
AbstractRequest {
*/
public static final int JOIN_GROUP_MEMBER_EPOCH = 0;
+ /**
+ * The version from which consumers are required to generate their own
member id.
+ *
+ * <p>Starting from this version, member id must be generated by the
consumer instance
+ * instead of being provided by the server.</p>
+ */
+ public static final int
STREAMS_CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION = 1;
+
Review Comment:
Since our RPCs are not in any release yet, I think we don't need to bump the
version. We can just skip everything that they had to do to bump the version of
the RPC - so this can be removed.
##########
clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json:
##########
@@ -18,13 +18,13 @@
"type": "request",
"listeners": ["broker", "zkBroker"],
"name": "StreamsGroupHeartbeatRequest",
- "validVersions": "0",
+ "validVersions": "0-1",
Review Comment:
don't bump
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1675,20 +1676,25 @@ private void throwIfShareGroupHeartbeatRequestIsInvalid(
* Validates the request.
*
* @param request The request to validate.
- *
+ * @param apiVersion The version of the StreamsGroupHeartbeat RPC
* @throws InvalidRequestException if the request is not valid.
* @throws UnsupportedAssignorException if the assignor is not supported.
*/
private void throwIfStreamsGroupHeartbeatRequestIsInvalid(
- StreamsGroupHeartbeatRequestData request
+ StreamsGroupHeartbeatRequestData request,
+ short apiVersion
) throws InvalidRequestException, UnsupportedAssignorException {
throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
throwIfEmptyString(request.instanceId(), "InstanceId can't be empty.");
throwIfEmptyString(request.rackId(), "RackId can't be empty.");
- if (request.memberEpoch() > 0 || request.memberEpoch() ==
LEAVE_GROUP_MEMBER_EPOCH) {
+ if (apiVersion >=
STREAMS_CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION ||
Review Comment:
simplify - no need to branch on protocol version
##########
clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json:
##########
@@ -18,13 +18,13 @@
"type": "request",
"listeners": ["broker", "zkBroker"],
"name": "StreamsGroupHeartbeatRequest",
- "validVersions": "0",
+ "validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType":
"groupId",
"about": "The group identifier." },
{ "name": "MemberId", "type": "string", "versions": "0+",
- "about": "The member ID generated by the coordinator. The member ID must
be kept during the entire lifetime of the member." },
+ "about": "The member ID generated by the streams consumer. The member ID
must be kept during the entire lifetime of the streams consumer process." },
Review Comment:
Can you make these changes (the ones in the JSONs) in the KIP as well,
please?
##########
clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json:
##########
@@ -42,8 +42,8 @@
"about": "The top-level error code, or 0 if there was no error" },
{ "name": "ErrorMessage", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
- { "name": "MemberId", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null",
- "about": "The member id generated by the coordinator. Only provided when
the member joins with MemberEpoch == 0." },
+ { "name": "MemberId", "type": "string", "versions": "0+",
"nullableVersions": "0",
+ "about": "The member is generated by the streams consumer starting from
version 1, while in version 0, it can be provided by users or generated by the
group coordinator."},
Review Comment:
Just state that it is always generated by the streams consumer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]