Re: [PR] [Draft] Join, Sync, Heartbeat during Migration [kafka]

2024-04-26 Thread via GitHub


dajac closed pull request #15268: [Draft] Join, Sync, Heartbeat during Migration
URL: https://github.com/apache/kafka/pull/15268


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Draft] Join, Sync, Heartbeat during Migration [kafka]

2024-04-26 Thread via GitHub


dajac commented on PR #15268:
URL: https://github.com/apache/kafka/pull/15268#issuecomment-2079401308

   I think that we don't need this anymore. Closing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Draft] Join, Sync, Heartbeat during Migration [kafka]

2024-01-29 Thread via GitHub


dongnuo123 commented on code in PR #15268:
URL: https://github.com/apache/kafka/pull/15268#discussion_r1470177662


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3467,6 +3476,358 @@ public void maybeDeleteGroup(String groupId, 
List records) {
 }
 }
 
+private JoinGroupRequestProtocol throwIfProtocolUnmatched(
+ConsumerGroupMember member,
+JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols
+) {
+for (JoinGroupRequestData.JoinGroupRequestProtocol protocol : 
protocols) {
+final ByteBuffer buffer = ByteBuffer.wrap(protocol.metadata());
+ConsumerProtocol.deserializeVersion(buffer);
+final Optional generationId = 
ConsumerProtocol.deserializeSubscription(buffer, (short) 0).generationId();
+
+// If the generation id is provided, it must match the member 
epoch.
+if (!generationId.isPresent() || generationId.get() == 
member.memberEpoch()) {
+// TODO: need a list of all available server assignors
+if 
(UniformAssignor.UNIFORM_ASSIGNOR_NAME.equals(protocol.name())
+|| 
RangeAssignor.RANGE_ASSIGNOR_NAME.equals(protocol.name())) {
+return protocol;
+}
+}
+}
+throw new FencedMemberEpochException("The JoinGroup request doesn't 
have a matched generation id from a " +
+"protocol supported by the server assignors with the epoch of the 
member known by the group coordinator (" +
+member.memberEpoch() + ").");
+}
+
+private List 
transitionToConsumerGroupHeartbeatTopicPartitions(
+List topicPartitions
+) {
+Map> topicMap = new HashMap<>();
+topicPartitions.forEach(tp ->
+topicMap.computeIfAbsent(tp.topic(), __ -> new 
ArrayList<>()).add(tp.partition())
+);
+return topicMap.entrySet().stream().map(item -> {
+TopicImage topicImage = 
metadataImage.topics().getTopic(item.getKey());
+if (topicImage == null) {
+throw INVALID_TOPIC_EXCEPTION.exception("Can't find the topic 
id of topic " + item.getKey() + ".");
+}
+return new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+.setTopicId(topicImage.id())
+.setPartitions(item.getValue());
+}).collect(Collectors.toList());
+}
+
+public CoordinatorResult upgradeGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < classicGroupMinSessionTimeoutMs ||
+sessionTimeoutMs > classicGroupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+return EMPTY_RESULT;
+}
+
+// Get or create the consumer group.
+final ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+throwIfConsumerGroupIsFull(group, memberId);
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+// new dynamic member.
+if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+// If member id required, send back a response to call for 
another join group request with allocated member id.
+log.info("Dynamic member with unknown member id joins group 
{}. " +
+"Created a new member id {} and requesting the member 
to rejoin with this id.",
+group.groupId(), memberId);
+
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+);
+return EMPTY_RESULT;
+} else {
+member = group.getOrMaybeCreateMember(memberId, true);
+newMemberCreated = !group.members().containsKey(memberId);
+log.info("[GroupId {}] Member {} joins the consumer group.", 

Re: [PR] [Draft] Join, Sync, Heartbeat during Migration [kafka]

2024-01-26 Thread via GitHub


dongnuo123 commented on code in PR #15268:
URL: https://github.com/apache/kafka/pull/15268#discussion_r1467816570


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3467,6 +3476,358 @@ public void maybeDeleteGroup(String groupId, 
List records) {
 }
 }
 
+private JoinGroupRequestProtocol throwIfProtocolUnmatched(
+ConsumerGroupMember member,
+JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols
+) {
+for (JoinGroupRequestData.JoinGroupRequestProtocol protocol : 
protocols) {
+final ByteBuffer buffer = ByteBuffer.wrap(protocol.metadata());
+ConsumerProtocol.deserializeVersion(buffer);
+final Optional generationId = 
ConsumerProtocol.deserializeSubscription(buffer, (short) 0).generationId();
+
+// If the generation id is provided, it must match the member 
epoch.
+if (!generationId.isPresent() || generationId.get() == 
member.memberEpoch()) {
+// TODO: need a list of all available server assignors
+if 
(UniformAssignor.UNIFORM_ASSIGNOR_NAME.equals(protocol.name())
+|| 
RangeAssignor.RANGE_ASSIGNOR_NAME.equals(protocol.name())) {
+return protocol;
+}
+}
+}
+throw new FencedMemberEpochException("The JoinGroup request doesn't 
have a matched generation id from a " +
+"protocol supported by the server assignors with the epoch of the 
member known by the group coordinator (" +
+member.memberEpoch() + ").");
+}
+
+private List 
transitionToConsumerGroupHeartbeatTopicPartitions(
+List topicPartitions
+) {
+Map> topicMap = new HashMap<>();
+topicPartitions.forEach(tp ->
+topicMap.computeIfAbsent(tp.topic(), __ -> new 
ArrayList<>()).add(tp.partition())
+);
+return topicMap.entrySet().stream().map(item -> {
+TopicImage topicImage = 
metadataImage.topics().getTopic(item.getKey());
+if (topicImage == null) {
+throw INVALID_TOPIC_EXCEPTION.exception("Can't find the topic 
id of topic " + item.getKey() + ".");
+}
+return new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+.setTopicId(topicImage.id())
+.setPartitions(item.getValue());
+}).collect(Collectors.toList());
+}
+
+public CoordinatorResult upgradeGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < classicGroupMinSessionTimeoutMs ||
+sessionTimeoutMs > classicGroupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+return EMPTY_RESULT;
+}
+
+// Get or create the consumer group.
+final ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+throwIfConsumerGroupIsFull(group, memberId);
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+// new dynamic member.
+if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+// If member id required, send back a response to call for 
another join group request with allocated member id.
+log.info("Dynamic member with unknown member id joins group 
{}. " +
+"Created a new member id {} and requesting the member 
to rejoin with this id.",
+group.groupId(), memberId);
+
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+);
+return EMPTY_RESULT;
+} else {
+member = group.getOrMaybeCreateMember(memberId, true);
+newMemberCreated = !group.members().containsKey(memberId);
+log.info("[GroupId {}] Member {} joins the consumer group.", 

Re: [PR] [Draft] Join, Sync, Heartbeat during Migration [kafka]

2024-01-26 Thread via GitHub


dongnuo123 commented on code in PR #15268:
URL: https://github.com/apache/kafka/pull/15268#discussion_r1467814642


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3467,6 +3476,358 @@ public void maybeDeleteGroup(String groupId, 
List records) {
 }
 }
 
+private JoinGroupRequestProtocol throwIfProtocolUnmatched(
+ConsumerGroupMember member,
+JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols
+) {
+for (JoinGroupRequestData.JoinGroupRequestProtocol protocol : 
protocols) {
+final ByteBuffer buffer = ByteBuffer.wrap(protocol.metadata());
+ConsumerProtocol.deserializeVersion(buffer);
+final Optional generationId = 
ConsumerProtocol.deserializeSubscription(buffer, (short) 0).generationId();
+
+// If the generation id is provided, it must match the member 
epoch.
+if (!generationId.isPresent() || generationId.get() == 
member.memberEpoch()) {
+// TODO: need a list of all available server assignors
+if 
(UniformAssignor.UNIFORM_ASSIGNOR_NAME.equals(protocol.name())
+|| 
RangeAssignor.RANGE_ASSIGNOR_NAME.equals(protocol.name())) {
+return protocol;
+}
+}
+}
+throw new FencedMemberEpochException("The JoinGroup request doesn't 
have a matched generation id from a " +
+"protocol supported by the server assignors with the epoch of the 
member known by the group coordinator (" +
+member.memberEpoch() + ").");
+}
+
+private List 
transitionToConsumerGroupHeartbeatTopicPartitions(
+List topicPartitions
+) {
+Map> topicMap = new HashMap<>();
+topicPartitions.forEach(tp ->
+topicMap.computeIfAbsent(tp.topic(), __ -> new 
ArrayList<>()).add(tp.partition())
+);
+return topicMap.entrySet().stream().map(item -> {
+TopicImage topicImage = 
metadataImage.topics().getTopic(item.getKey());
+if (topicImage == null) {
+throw INVALID_TOPIC_EXCEPTION.exception("Can't find the topic 
id of topic " + item.getKey() + ".");
+}
+return new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+.setTopicId(topicImage.id())
+.setPartitions(item.getValue());
+}).collect(Collectors.toList());
+}
+
+public CoordinatorResult upgradeGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < classicGroupMinSessionTimeoutMs ||
+sessionTimeoutMs > classicGroupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+return EMPTY_RESULT;
+}
+
+// Get or create the consumer group.
+final ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+throwIfConsumerGroupIsFull(group, memberId);
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+// new dynamic member.
+if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+// If member id required, send back a response to call for 
another join group request with allocated member id.
+log.info("Dynamic member with unknown member id joins group 
{}. " +
+"Created a new member id {} and requesting the member 
to rejoin with this id.",
+group.groupId(), memberId);
+
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+);
+return EMPTY_RESULT;
+} else {
+member = group.getOrMaybeCreateMember(memberId, true);
+newMemberCreated = !group.members().containsKey(memberId);
+log.info("[GroupId {}] Member {} joins the consumer group.", 

Re: [PR] [Draft] Join, Sync, Heartbeat during Migration [kafka]

2024-01-26 Thread via GitHub


dongnuo123 commented on code in PR #15268:
URL: https://github.com/apache/kafka/pull/15268#discussion_r1467811816


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3467,6 +3476,358 @@ public void maybeDeleteGroup(String groupId, 
List records) {
 }
 }
 
+private JoinGroupRequestProtocol throwIfProtocolUnmatched(
+ConsumerGroupMember member,
+JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols
+) {
+for (JoinGroupRequestData.JoinGroupRequestProtocol protocol : 
protocols) {
+final ByteBuffer buffer = ByteBuffer.wrap(protocol.metadata());
+ConsumerProtocol.deserializeVersion(buffer);
+final Optional generationId = 
ConsumerProtocol.deserializeSubscription(buffer, (short) 0).generationId();
+
+// If the generation id is provided, it must match the member 
epoch.
+if (!generationId.isPresent() || generationId.get() == 
member.memberEpoch()) {
+// TODO: need a list of all available server assignors
+if 
(UniformAssignor.UNIFORM_ASSIGNOR_NAME.equals(protocol.name())
+|| 
RangeAssignor.RANGE_ASSIGNOR_NAME.equals(protocol.name())) {
+return protocol;
+}
+}
+}
+throw new FencedMemberEpochException("The JoinGroup request doesn't 
have a matched generation id from a " +
+"protocol supported by the server assignors with the epoch of the 
member known by the group coordinator (" +
+member.memberEpoch() + ").");
+}
+
+private List 
transitionToConsumerGroupHeartbeatTopicPartitions(
+List topicPartitions
+) {
+Map> topicMap = new HashMap<>();
+topicPartitions.forEach(tp ->
+topicMap.computeIfAbsent(tp.topic(), __ -> new 
ArrayList<>()).add(tp.partition())
+);
+return topicMap.entrySet().stream().map(item -> {
+TopicImage topicImage = 
metadataImage.topics().getTopic(item.getKey());
+if (topicImage == null) {
+throw INVALID_TOPIC_EXCEPTION.exception("Can't find the topic 
id of topic " + item.getKey() + ".");
+}
+return new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+.setTopicId(topicImage.id())
+.setPartitions(item.getValue());
+}).collect(Collectors.toList());
+}
+
+public CoordinatorResult upgradeGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < classicGroupMinSessionTimeoutMs ||
+sessionTimeoutMs > classicGroupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+return EMPTY_RESULT;
+}
+
+// Get or create the consumer group.
+final ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+throwIfConsumerGroupIsFull(group, memberId);
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+// new dynamic member.
+if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+// If member id required, send back a response to call for 
another join group request with allocated member id.
+log.info("Dynamic member with unknown member id joins group 
{}. " +
+"Created a new member id {} and requesting the member 
to rejoin with this id.",
+group.groupId(), memberId);
+
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+);
+return EMPTY_RESULT;
+} else {
+member = group.getOrMaybeCreateMember(memberId, true);
+newMemberCreated = !group.members().containsKey(memberId);
+log.info("[GroupId {}] Member {} joins the consumer group.", 

Re: [PR] [Draft] Join, Sync, Heartbeat during Migration [kafka]

2024-01-26 Thread via GitHub


dongnuo123 commented on code in PR #15268:
URL: https://github.com/apache/kafka/pull/15268#discussion_r1467807980


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3467,6 +3476,358 @@ public void maybeDeleteGroup(String groupId, 
List records) {
 }
 }
 
+private JoinGroupRequestProtocol throwIfProtocolUnmatched(
+ConsumerGroupMember member,
+JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols
+) {
+for (JoinGroupRequestData.JoinGroupRequestProtocol protocol : 
protocols) {
+final ByteBuffer buffer = ByteBuffer.wrap(protocol.metadata());
+ConsumerProtocol.deserializeVersion(buffer);
+final Optional generationId = 
ConsumerProtocol.deserializeSubscription(buffer, (short) 0).generationId();
+
+// If the generation id is provided, it must match the member 
epoch.
+if (!generationId.isPresent() || generationId.get() == 
member.memberEpoch()) {
+// TODO: need a list of all available server assignors
+if 
(UniformAssignor.UNIFORM_ASSIGNOR_NAME.equals(protocol.name())
+|| 
RangeAssignor.RANGE_ASSIGNOR_NAME.equals(protocol.name())) {
+return protocol;
+}
+}
+}
+throw new FencedMemberEpochException("The JoinGroup request doesn't 
have a matched generation id from a " +
+"protocol supported by the server assignors with the epoch of the 
member known by the group coordinator (" +
+member.memberEpoch() + ").");

Review Comment:
   maybe handle the case where generation id doesn't match the member epoch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Draft] Join, Sync, Heartbeat during Migration [kafka]

2024-01-26 Thread via GitHub


dongnuo123 commented on code in PR #15268:
URL: https://github.com/apache/kafka/pull/15268#discussion_r1467804300


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3467,6 +3476,358 @@ public void maybeDeleteGroup(String groupId, 
List records) {
 }
 }
 
+private JoinGroupRequestProtocol throwIfProtocolUnmatched(
+ConsumerGroupMember member,
+JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols
+) {
+for (JoinGroupRequestData.JoinGroupRequestProtocol protocol : 
protocols) {
+final ByteBuffer buffer = ByteBuffer.wrap(protocol.metadata());
+ConsumerProtocol.deserializeVersion(buffer);
+final Optional generationId = 
ConsumerProtocol.deserializeSubscription(buffer, (short) 0).generationId();
+
+// If the generation id is provided, it must match the member 
epoch.
+if (!generationId.isPresent() || generationId.get() == 
member.memberEpoch()) {
+// TODO: need a list of all available server assignors
+if 
(UniformAssignor.UNIFORM_ASSIGNOR_NAME.equals(protocol.name())
+|| 
RangeAssignor.RANGE_ASSIGNOR_NAME.equals(protocol.name())) {
+return protocol;
+}
+}
+}
+throw new FencedMemberEpochException("The JoinGroup request doesn't 
have a matched generation id from a " +
+"protocol supported by the server assignors with the epoch of the 
member known by the group coordinator (" +
+member.memberEpoch() + ").");

Review Comment:
   not supported by the old protocol



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org