[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1030180438 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRequestContext.java: ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.utils.BufferSupplier; + +import java.net.InetAddress; +import java.util.Objects; + +public class GroupCoordinatorRequestContext { Review Comment: That works except for the buffer supplier. I think that we can pass the buffer supplier as a regular argument instead of passing it via the context. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1030179126 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRequestContext.java: ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.utils.BufferSupplier; + +import java.net.InetAddress; +import java.util.Objects; + +public class GroupCoordinatorRequestContext { + +private final short apiVersion; Review Comment: Yeah, I agree with you. I went with the minimal to start with. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1027049468 ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -21,11 +21,10 @@ import java.io.{File, IOException} import java.net.{InetAddress, SocketTimeoutException} import java.util.concurrent._ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} - Review Comment: I will fix this on Monday. It seems that my IDE is doing this automatically somehow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1027049342 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1647,69 +1656,51 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { -val joinGroupRequest = request.body[JoinGroupRequest] + private def makeGroupCoordinatorRequestContext( +request: RequestChannel.Request, +requestLocal: RequestLocal + ): GroupCoordinatorRequestContext = { +new GroupCoordinatorRequestContext( + request.context.header.data.requestApiVersion, + request.context.header.data.clientId, + request.context.clientAddress, + requestLocal.bufferSupplier +) + } -// the callback for sending a join-group response -def sendResponseCallback(joinResult: JoinGroupResult): Unit = { - def createResponse(requestThrottleMs: Int): AbstractResponse = { -val responseBody = new JoinGroupResponse( - new JoinGroupResponseData() -.setThrottleTimeMs(requestThrottleMs) -.setErrorCode(joinResult.error.code) -.setGenerationId(joinResult.generationId) -.setProtocolType(joinResult.protocolType.orNull) -.setProtocolName(joinResult.protocolName.orNull) -.setLeader(joinResult.leaderId) -.setSkipAssignment(joinResult.skipAssignment) -.setMemberId(joinResult.memberId) -.setMembers(joinResult.members.asJava), - request.context.apiVersion -) + def handleJoinGroupRequest( +request: RequestChannel.Request, +requestLocal: RequestLocal + ): CompletableFuture[Unit] = { +val joinGroupRequest = request.body[JoinGroupRequest] -trace("Sending join group response %s for correlation id %d to client %s." - .format(responseBody, request.header.correlationId, request.header.clientId)) -responseBody - } - requestHelper.sendResponseMaybeThrottle(request, createResponse) +def sendResponse(response: AbstractResponse): Unit = { + trace("Sending join group response %s for correlation id %d to client %s." +.format(response, request.header.correlationId, request.header.clientId)) + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { +response.maybeSetThrottleTimeMs(requestThrottleMs) +response + }) } if (joinGroupRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) { // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states. - sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.UNSUPPORTED_VERSION)) + sendResponse(joinGroupRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) Review Comment: All handlers are already surrounded by a try..catch that catches all the exceptions raised. We just need to ensure that exceptions raised in futures’ callbacks are also caught. In this particular case, any exceptions raised by sendResponse would be caught by that try..catch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1027049080 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1647,69 +1656,51 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { -val joinGroupRequest = request.body[JoinGroupRequest] + private def makeGroupCoordinatorRequestContext( +request: RequestChannel.Request, +requestLocal: RequestLocal + ): GroupCoordinatorRequestContext = { +new GroupCoordinatorRequestContext( + request.context.header.data.requestApiVersion, + request.context.header.data.clientId, + request.context.clientAddress, + requestLocal.bufferSupplier +) + } -// the callback for sending a join-group response -def sendResponseCallback(joinResult: JoinGroupResult): Unit = { - def createResponse(requestThrottleMs: Int): AbstractResponse = { -val responseBody = new JoinGroupResponse( - new JoinGroupResponseData() -.setThrottleTimeMs(requestThrottleMs) -.setErrorCode(joinResult.error.code) -.setGenerationId(joinResult.generationId) -.setProtocolType(joinResult.protocolType.orNull) -.setProtocolName(joinResult.protocolName.orNull) -.setLeader(joinResult.leaderId) -.setSkipAssignment(joinResult.skipAssignment) -.setMemberId(joinResult.memberId) -.setMembers(joinResult.members.asJava), - request.context.apiVersion -) + def handleJoinGroupRequest( +request: RequestChannel.Request, +requestLocal: RequestLocal + ): CompletableFuture[Unit] = { Review Comment: A future is a better abstraction than a callback in my opinion but that is subjective. We could keep using callbacks as well. The end result in the same. The new Controller interface uses Futures as well so standardizing makes sense here. Regarding the current APIs, join group and sync group are definitely relying on callbacks. For the others, I don’t remember from the top of my head. In the new group coordinator, all of them will be executed in a different thread so they must be async. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1026881513 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1647,69 +1656,51 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { -val joinGroupRequest = request.body[JoinGroupRequest] + private def makeGroupCoordinatorRequestContext( +request: RequestChannel.Request, +requestLocal: RequestLocal + ): GroupCoordinatorRequestContext = { +new GroupCoordinatorRequestContext( + request.context.header.data.requestApiVersion, + request.context.header.data.clientId, + request.context.clientAddress, + requestLocal.bufferSupplier +) + } -// the callback for sending a join-group response -def sendResponseCallback(joinResult: JoinGroupResult): Unit = { - def createResponse(requestThrottleMs: Int): AbstractResponse = { -val responseBody = new JoinGroupResponse( - new JoinGroupResponseData() -.setThrottleTimeMs(requestThrottleMs) -.setErrorCode(joinResult.error.code) -.setGenerationId(joinResult.generationId) -.setProtocolType(joinResult.protocolType.orNull) -.setProtocolName(joinResult.protocolName.orNull) -.setLeader(joinResult.leaderId) -.setSkipAssignment(joinResult.skipAssignment) -.setMemberId(joinResult.memberId) -.setMembers(joinResult.members.asJava), - request.context.apiVersion -) + def handleJoinGroupRequest( +request: RequestChannel.Request, +requestLocal: RequestLocal + ): CompletableFuture[Unit] = { +val joinGroupRequest = request.body[JoinGroupRequest] -trace("Sending join group response %s for correlation id %d to client %s." - .format(responseBody, request.header.correlationId, request.header.clientId)) -responseBody - } - requestHelper.sendResponseMaybeThrottle(request, createResponse) +def sendResponse(response: AbstractResponse): Unit = { + trace("Sending join group response %s for correlation id %d to client %s." +.format(response, request.header.correlationId, request.header.clientId)) + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { +response.maybeSetThrottleTimeMs(requestThrottleMs) +response Review Comment: hmm… sendResponseMaybeThrottle takes a function which returns a response. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1026102820 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2524,196 +2530,208 @@ class KafkaApisTest { assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData)) } - @Test - def testJoinGroupProtocolsOrder(): Unit = { -val protocols = List( - ("first", "first".getBytes()), - ("second", "second".getBytes()) + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP) + def testHandleJoinGroupRequest(version: Short): Unit = { +val joinGroupRequest = new JoinGroupRequestData() + .setGroupId("group") + .setMemberId("member") + .setProtocolType("consumer") + .setRebalanceTimeoutMs(1000) + .setSessionTimeoutMs(2000) + +val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version)) + +val expectedRequestContext = new GroupCoordinatorRequestContext( + version, + requestChannelRequest.context.clientId, + requestChannelRequest.context.clientAddress, + RequestLocal.NoCaching.bufferSupplier ) -val groupId = "group" -val memberId = "member1" -val protocolType = "consumer" -val rebalanceTimeoutMs = 10 -val sessionTimeoutMs = 5 -val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]]) +val expectedJoinGroupRequest = new JoinGroupRequestData() + .setGroupId(joinGroupRequest.groupId) + .setMemberId(joinGroupRequest.memberId) + .setProtocolType(joinGroupRequest.protocolType) + .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs) + .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs) -createKafkaApis().handleJoinGroupRequest( - buildRequest( -new JoinGroupRequest.Builder( - new JoinGroupRequestData() -.setGroupId(groupId) -.setMemberId(memberId) -.setProtocolType(protocolType) -.setRebalanceTimeoutMs(rebalanceTimeoutMs) -.setSessionTimeoutMs(sessionTimeoutMs) -.setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection( - protocols.map { case (name, protocol) => new JoinGroupRequestProtocol() -.setName(name).setMetadata(protocol) - }.iterator.asJava)) -).build() - ), - RequestLocal.withThreadConfinedCaching) +val future = new CompletableFuture[JoinGroupResponseData]() +when(newGroupCoordinator.joinGroup( + ArgumentMatchers.eq(expectedRequestContext), + ArgumentMatchers.eq(expectedJoinGroupRequest) +)).thenReturn(future) -verify(groupCoordinator).handleJoinGroup( - ArgumentMatchers.eq(groupId), - ArgumentMatchers.eq(memberId), - ArgumentMatchers.eq(None), - ArgumentMatchers.eq(true), - ArgumentMatchers.eq(true), - ArgumentMatchers.eq(clientId), - ArgumentMatchers.eq(InetAddress.getLocalHost.toString), - ArgumentMatchers.eq(rebalanceTimeoutMs), - ArgumentMatchers.eq(sessionTimeoutMs), - ArgumentMatchers.eq(protocolType), - capturedProtocols.capture(), - any(), - any(), - any() +createKafkaApis().handleJoinGroupRequest( + requestChannelRequest, + RequestLocal.NoCaching ) -val capturedProtocolsList = capturedProtocols.getValue -assertEquals(protocols.size, capturedProtocolsList.size) -protocols.zip(capturedProtocolsList).foreach { case ((expectedName, expectedBytes), (name, bytes)) => - assertEquals(expectedName, name) - assertArrayEquals(expectedBytes, bytes) -} - } - @Test - def testJoinGroupWhenAnErrorOccurs(): Unit = { -for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) { - testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short]) -} - } +val expectedJoinGroupResponse = new JoinGroupResponseData() + .setMemberId("member") + .setGenerationId(0) + .setLeader("leader") + .setProtocolType("consumer") + .setProtocolName("range") - def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = { -reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager) +future.complete(expectedJoinGroupResponse) +val capturedResponse = verifyNoThrottling(requestChannelRequest) +val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse] +assertEquals(expectedJoinGroupResponse, response.data) + } -val groupId = "group" -val memberId = "member1" -val protocolType = "consumer" -val rebalanceTimeoutMs = 10 -val sessionTimeoutMs = 5 + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP) + def testJoinGroupProtocolNameBackwardCompatibility(version: Short): Uni
[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1026101492 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2524,196 +2530,208 @@ class KafkaApisTest { assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData)) } - @Test - def testJoinGroupProtocolsOrder(): Unit = { -val protocols = List( - ("first", "first".getBytes()), - ("second", "second".getBytes()) + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP) + def testHandleJoinGroupRequest(version: Short): Unit = { +val joinGroupRequest = new JoinGroupRequestData() + .setGroupId("group") + .setMemberId("member") + .setProtocolType("consumer") + .setRebalanceTimeoutMs(1000) + .setSessionTimeoutMs(2000) + +val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version)) + +val expectedRequestContext = new GroupCoordinatorRequestContext( + version, + requestChannelRequest.context.clientId, + requestChannelRequest.context.clientAddress, + RequestLocal.NoCaching.bufferSupplier ) -val groupId = "group" -val memberId = "member1" -val protocolType = "consumer" -val rebalanceTimeoutMs = 10 -val sessionTimeoutMs = 5 -val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]]) +val expectedJoinGroupRequest = new JoinGroupRequestData() + .setGroupId(joinGroupRequest.groupId) + .setMemberId(joinGroupRequest.memberId) + .setProtocolType(joinGroupRequest.protocolType) + .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs) + .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs) -createKafkaApis().handleJoinGroupRequest( - buildRequest( -new JoinGroupRequest.Builder( - new JoinGroupRequestData() -.setGroupId(groupId) -.setMemberId(memberId) -.setProtocolType(protocolType) -.setRebalanceTimeoutMs(rebalanceTimeoutMs) -.setSessionTimeoutMs(sessionTimeoutMs) -.setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection( - protocols.map { case (name, protocol) => new JoinGroupRequestProtocol() -.setName(name).setMetadata(protocol) - }.iterator.asJava)) -).build() - ), - RequestLocal.withThreadConfinedCaching) +val future = new CompletableFuture[JoinGroupResponseData]() +when(newGroupCoordinator.joinGroup( + ArgumentMatchers.eq(expectedRequestContext), + ArgumentMatchers.eq(expectedJoinGroupRequest) +)).thenReturn(future) -verify(groupCoordinator).handleJoinGroup( - ArgumentMatchers.eq(groupId), - ArgumentMatchers.eq(memberId), - ArgumentMatchers.eq(None), - ArgumentMatchers.eq(true), - ArgumentMatchers.eq(true), - ArgumentMatchers.eq(clientId), - ArgumentMatchers.eq(InetAddress.getLocalHost.toString), - ArgumentMatchers.eq(rebalanceTimeoutMs), - ArgumentMatchers.eq(sessionTimeoutMs), - ArgumentMatchers.eq(protocolType), - capturedProtocols.capture(), - any(), - any(), - any() +createKafkaApis().handleJoinGroupRequest( + requestChannelRequest, + RequestLocal.NoCaching ) -val capturedProtocolsList = capturedProtocols.getValue -assertEquals(protocols.size, capturedProtocolsList.size) -protocols.zip(capturedProtocolsList).foreach { case ((expectedName, expectedBytes), (name, bytes)) => - assertEquals(expectedName, name) - assertArrayEquals(expectedBytes, bytes) -} - } - @Test - def testJoinGroupWhenAnErrorOccurs(): Unit = { -for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) { - testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short]) -} - } +val expectedJoinGroupResponse = new JoinGroupResponseData() + .setMemberId("member") + .setGenerationId(0) + .setLeader("leader") + .setProtocolType("consumer") + .setProtocolName("range") - def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = { -reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager) +future.complete(expectedJoinGroupResponse) +val capturedResponse = verifyNoThrottling(requestChannelRequest) +val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse] +assertEquals(expectedJoinGroupResponse, response.data) + } -val groupId = "group" -val memberId = "member1" -val protocolType = "consumer" -val rebalanceTimeoutMs = 10 -val sessionTimeoutMs = 5 + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP) + def testJoinGroupProtocolNameBackwardCompatibility(version: Short): Uni
[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1026099763 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1647,69 +1656,51 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { -val joinGroupRequest = request.body[JoinGroupRequest] + private def makeGroupCoordinatorRequestContext( +request: RequestChannel.Request, +requestLocal: RequestLocal + ): GroupCoordinatorRequestContext = { +new GroupCoordinatorRequestContext( + request.context.header.data.requestApiVersion, + request.context.header.data.clientId, + request.context.clientAddress, + requestLocal.bufferSupplier +) + } -// the callback for sending a join-group response -def sendResponseCallback(joinResult: JoinGroupResult): Unit = { - def createResponse(requestThrottleMs: Int): AbstractResponse = { -val responseBody = new JoinGroupResponse( - new JoinGroupResponseData() -.setThrottleTimeMs(requestThrottleMs) -.setErrorCode(joinResult.error.code) -.setGenerationId(joinResult.generationId) -.setProtocolType(joinResult.protocolType.orNull) -.setProtocolName(joinResult.protocolName.orNull) -.setLeader(joinResult.leaderId) -.setSkipAssignment(joinResult.skipAssignment) -.setMemberId(joinResult.memberId) -.setMembers(joinResult.members.asJava), - request.context.apiVersion -) + def handleJoinGroupRequest( +request: RequestChannel.Request, +requestLocal: RequestLocal + ): CompletableFuture[Unit] = { Review Comment: 1. Yeah, that's right. I return a future to catch any other errors. It could be from `sendResponse` but it could also be other issues. 2. No, it would not because the underlying API uses a callback. 3. All APIs will be async for this reason but not that most of them are already async today. The difference is that we use a future here instead of a callback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1026098011 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -161,6 +166,12 @@ class KafkaApis(val requestChannel: RequestChannel, * Top-level method that handles all requests and multiplexes to the right api */ override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { +def handleError(e: Throwable): Unit = { + trace(s"Unexpected error handling request ${request.requestDesc(true)} " + Review Comment: it is a mistake. reverted to error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1024835046 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1646,73 +1649,46 @@ class KafkaApis(val requestChannel: RequestChannel, } } + private def makeGroupCoordinatorRequestContextFrom( Review Comment: I can remove the `From` part. I am not too opinionated on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1023749684 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1646,73 +1649,46 @@ class KafkaApis(val requestChannel: RequestChannel, } } + private def makeGroupCoordinatorRequestContextFrom( Review Comment: why do you think so? is it because of the usage of `make`? the name looks quite reasonable to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1023741182 ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -22,9 +22,8 @@ import java.util import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit, TimeoutException} - Review Comment: Nope. Reverted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1023606759 ## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala: ## @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator.group + +import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.JoinGroupCallback +import kafka.server.RequestLocal + +import org.apache.kafka.common.message.{JoinGroupRequestData, JoinGroupResponseData} +import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol +import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.utils.BufferSupplier +import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource +import org.apache.kafka.coordinator.group.GroupCoordinatorRequestContext + +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} +import org.junit.jupiter.params.ParameterizedTest +import org.mockito.{ArgumentCaptor, ArgumentMatchers} +import org.mockito.Mockito.{mock, verify} + +import java.net.InetAddress +import scala.jdk.CollectionConverters._ + +class GroupCoordinatorAdapterTest { + + private def makeContext( +apiVersion: Short + ): GroupCoordinatorRequestContext = { +new GroupCoordinatorRequestContext( + apiVersion, + "client", + InetAddress.getLocalHost, + BufferSupplier.create() +) + } + + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP) + def testJoinGroup(version: Short): Unit = { Review Comment: This test tests the logic of `joinGroup` method in the `GroupCoordinatorAdapter`. Given an input, it verifies the output. The tests in KafkaApisTests do not do this as they only verity that the adaptor get the expected input. The name seems pretty clear to me as the test tests the `joinGroup` method. ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2524,196 +2528,166 @@ class KafkaApisTest { assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData)) } - @Test - def testJoinGroupProtocolsOrder(): Unit = { -val protocols = List( - ("first", "first".getBytes()), - ("second", "second".getBytes()) + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP) + def testHandleJoinGroupRequest(version: Short): Unit = { +val joinGroupRequest = new JoinGroupRequestData() + .setGroupId("group") + .setMemberId("member") + .setProtocolType("consumer") + .setRebalanceTimeoutMs(1000) + .setSessionTimeoutMs(2000) + +val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version)) + +val expectedRequestContext = new GroupCoordinatorRequestContext( + version, + requestChannelRequest.context.clientId, + requestChannelRequest.context.clientAddress, + RequestLocal.NoCaching.bufferSupplier ) -val groupId = "group" -val memberId = "member1" -val protocolType = "consumer" -val rebalanceTimeoutMs = 10 -val sessionTimeoutMs = 5 -val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]]) +val expectedJoinGroupRequest = new JoinGroupRequestData() + .setGroupId(joinGroupRequest.groupId) + .setMemberId(joinGroupRequest.memberId) + .setProtocolType(joinGroupRequest.protocolType) + .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs) + .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs) -createKafkaApis().handleJoinGroupRequest( - buildRequest( -new JoinGroupRequest.Builder( - new JoinGroupRequestData() -.setGroupId(groupId) -.setMemberId(memberId) -.setProtocolType(protocolType) -.setRebalanceTimeoutMs(rebalanceTimeoutMs) -.setSessionTimeoutMs(sessionTimeoutMs) -.setProtocols(new JoinGroupRequestData.JoinGroupRequestProtoco
[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1023604793 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2524,196 +2528,166 @@ class KafkaApisTest { assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData)) } - @Test - def testJoinGroupProtocolsOrder(): Unit = { -val protocols = List( - ("first", "first".getBytes()), - ("second", "second".getBytes()) + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP) + def testHandleJoinGroupRequest(version: Short): Unit = { +val joinGroupRequest = new JoinGroupRequestData() + .setGroupId("group") + .setMemberId("member") + .setProtocolType("consumer") + .setRebalanceTimeoutMs(1000) + .setSessionTimeoutMs(2000) + +val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version)) + +val expectedRequestContext = new GroupCoordinatorRequestContext( + version, + requestChannelRequest.context.clientId, + requestChannelRequest.context.clientAddress, + RequestLocal.NoCaching.bufferSupplier ) -val groupId = "group" -val memberId = "member1" -val protocolType = "consumer" -val rebalanceTimeoutMs = 10 -val sessionTimeoutMs = 5 -val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]]) +val expectedJoinGroupRequest = new JoinGroupRequestData() + .setGroupId(joinGroupRequest.groupId) + .setMemberId(joinGroupRequest.memberId) + .setProtocolType(joinGroupRequest.protocolType) + .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs) + .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs) -createKafkaApis().handleJoinGroupRequest( - buildRequest( -new JoinGroupRequest.Builder( - new JoinGroupRequestData() -.setGroupId(groupId) -.setMemberId(memberId) -.setProtocolType(protocolType) -.setRebalanceTimeoutMs(rebalanceTimeoutMs) -.setSessionTimeoutMs(sessionTimeoutMs) -.setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection( - protocols.map { case (name, protocol) => new JoinGroupRequestProtocol() -.setName(name).setMetadata(protocol) - }.iterator.asJava)) -).build() - ), - RequestLocal.withThreadConfinedCaching) +val future = new CompletableFuture[JoinGroupResponseData]() +when(newGroupCoordinator.joinGroup( + ArgumentMatchers.eq(expectedRequestContext), + ArgumentMatchers.eq(expectedJoinGroupRequest) +)).thenReturn(future) -verify(groupCoordinator).handleJoinGroup( - ArgumentMatchers.eq(groupId), - ArgumentMatchers.eq(memberId), - ArgumentMatchers.eq(None), - ArgumentMatchers.eq(true), - ArgumentMatchers.eq(true), - ArgumentMatchers.eq(clientId), - ArgumentMatchers.eq(InetAddress.getLocalHost.toString), - ArgumentMatchers.eq(rebalanceTimeoutMs), - ArgumentMatchers.eq(sessionTimeoutMs), - ArgumentMatchers.eq(protocolType), - capturedProtocols.capture(), - any(), - any(), - any() +createKafkaApis().handleJoinGroupRequest( + requestChannelRequest, + RequestLocal.NoCaching ) -val capturedProtocolsList = capturedProtocols.getValue -assertEquals(protocols.size, capturedProtocolsList.size) -protocols.zip(capturedProtocolsList).foreach { case ((expectedName, expectedBytes), (name, bytes)) => - assertEquals(expectedName, name) - assertArrayEquals(expectedBytes, bytes) -} - } - @Test - def testJoinGroupWhenAnErrorOccurs(): Unit = { -for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) { - testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short]) -} +val expectedJoinGroupResponse = new JoinGroupResponseData() + .setMemberId("member") + .setGenerationId(0) + .setLeader("leader") + .setProtocolType("consumer") + .setProtocolName("range") + +future.complete(expectedJoinGroupResponse) +val capturedResponse = verifyNoThrottling(requestChannelRequest) +val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse] +assertEquals(expectedJoinGroupResponse, response.data) } - def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = { -reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager) + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP) + def testJoinGroupProtocolTypeBackwardCompatibility(version: Short): Unit = { +val joinGroupRequest = new JoinGroupRequestData() + .setGroupId("group") + .setMemberId("member") + .setProtocolType("consumer") +
[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1023605151 ## clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java: ## @@ -28,9 +28,15 @@ public class JoinGroupResponse extends AbstractResponse { private final JoinGroupResponseData data; -public JoinGroupResponse(JoinGroupResponseData data) { +public JoinGroupResponse(JoinGroupResponseData data, short version) { super(ApiKeys.JOIN_GROUP); this.data = data; + +// All versions prior to version 7 do not support nullable +// string for the protocol type. Empty string should be used. +if (version < 7 && data.protocolType() == null) { Review Comment: Right it was there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1021208132 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2524,196 +2528,166 @@ class KafkaApisTest { assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData)) } - @Test - def testJoinGroupProtocolsOrder(): Unit = { -val protocols = List( - ("first", "first".getBytes()), - ("second", "second".getBytes()) + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP) + def testHandleJoinGroupRequest(version: Short): Unit = { +val joinGroupRequest = new JoinGroupRequestData() + .setGroupId("group") + .setMemberId("member") + .setProtocolType("consumer") + .setRebalanceTimeoutMs(1000) + .setSessionTimeoutMs(2000) + +val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version)) + +val expectedRequestContext = new GroupCoordinatorRequestContext( + version, + requestChannelRequest.context.clientId, + requestChannelRequest.context.clientAddress, + RequestLocal.NoCaching.bufferSupplier ) -val groupId = "group" -val memberId = "member1" -val protocolType = "consumer" -val rebalanceTimeoutMs = 10 -val sessionTimeoutMs = 5 -val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]]) +val expectedJoinGroupRequest = new JoinGroupRequestData() + .setGroupId(joinGroupRequest.groupId) + .setMemberId(joinGroupRequest.memberId) + .setProtocolType(joinGroupRequest.protocolType) + .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs) + .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs) -createKafkaApis().handleJoinGroupRequest( - buildRequest( -new JoinGroupRequest.Builder( - new JoinGroupRequestData() -.setGroupId(groupId) -.setMemberId(memberId) -.setProtocolType(protocolType) -.setRebalanceTimeoutMs(rebalanceTimeoutMs) -.setSessionTimeoutMs(sessionTimeoutMs) -.setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection( - protocols.map { case (name, protocol) => new JoinGroupRequestProtocol() -.setName(name).setMetadata(protocol) - }.iterator.asJava)) -).build() - ), - RequestLocal.withThreadConfinedCaching) +val future = new CompletableFuture[JoinGroupResponseData]() +when(newGroupCoordinator.joinGroup( + ArgumentMatchers.eq(expectedRequestContext), + ArgumentMatchers.eq(expectedJoinGroupRequest) +)).thenReturn(future) -verify(groupCoordinator).handleJoinGroup( - ArgumentMatchers.eq(groupId), - ArgumentMatchers.eq(memberId), - ArgumentMatchers.eq(None), - ArgumentMatchers.eq(true), - ArgumentMatchers.eq(true), - ArgumentMatchers.eq(clientId), - ArgumentMatchers.eq(InetAddress.getLocalHost.toString), - ArgumentMatchers.eq(rebalanceTimeoutMs), - ArgumentMatchers.eq(sessionTimeoutMs), - ArgumentMatchers.eq(protocolType), - capturedProtocols.capture(), - any(), - any(), - any() +createKafkaApis().handleJoinGroupRequest( + requestChannelRequest, + RequestLocal.NoCaching ) -val capturedProtocolsList = capturedProtocols.getValue -assertEquals(protocols.size, capturedProtocolsList.size) -protocols.zip(capturedProtocolsList).foreach { case ((expectedName, expectedBytes), (name, bytes)) => - assertEquals(expectedName, name) - assertArrayEquals(expectedBytes, bytes) -} - } - @Test - def testJoinGroupWhenAnErrorOccurs(): Unit = { -for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) { - testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short]) -} +val expectedJoinGroupResponse = new JoinGroupResponseData() + .setMemberId("member") + .setGenerationId(0) + .setLeader("leader") + .setProtocolType("consumer") + .setProtocolName("range") + +future.complete(expectedJoinGroupResponse) +val capturedResponse = verifyNoThrottling(requestChannelRequest) +val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse] +assertEquals(expectedJoinGroupResponse, response.data) } - def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = { -reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager) + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP) + def testJoinGroupProtocolTypeBackwardCompatibility(version: Short): Unit = { +val joinGroupRequest = new JoinGroupRequestData() + .setGroupId("group") + .setMemberId("member") + .setProtocolType("consumer") +
[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1023603789 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2524,196 +2528,166 @@ class KafkaApisTest { assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData)) } - @Test - def testJoinGroupProtocolsOrder(): Unit = { Review Comment: Right. Let me fix it. ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2524,196 +2528,166 @@ class KafkaApisTest { assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData)) } - @Test - def testJoinGroupProtocolsOrder(): Unit = { Review Comment: This test's logic is now tested in `GroupCoordinatorAdaptorTest`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1023603444 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1646,73 +1649,46 @@ class KafkaApis(val requestChannel: RequestChannel, } } + private def makeGroupCoordinatorRequestContextFrom( +request: RequestChannel.Request, +requestLocal: RequestLocal + ): GroupCoordinatorRequestContext = { +new GroupCoordinatorRequestContext( + request.context.header.data.requestApiVersion, + request.context.header.data.clientId, + request.context.clientAddress, + requestLocal.bufferSupplier +) + } + def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { val joinGroupRequest = request.body[JoinGroupRequest] -// the callback for sending a join-group response -def sendResponseCallback(joinResult: JoinGroupResult): Unit = { - def createResponse(requestThrottleMs: Int): AbstractResponse = { -val protocolName = if (request.context.apiVersion() >= 7) - joinResult.protocolName.orNull -else - joinResult.protocolName.getOrElse(GroupCoordinator.NoProtocol) - -val responseBody = new JoinGroupResponse( - new JoinGroupResponseData() -.setThrottleTimeMs(requestThrottleMs) -.setErrorCode(joinResult.error.code) -.setGenerationId(joinResult.generationId) -.setProtocolType(joinResult.protocolType.orNull) -.setProtocolName(protocolName) -.setLeader(joinResult.leaderId) -.setSkipAssignment(joinResult.skipAssignment) -.setMemberId(joinResult.memberId) -.setMembers(joinResult.members.asJava) -) - -trace("Sending join group response %s for correlation id %d to client %s." - .format(responseBody, request.header.correlationId, request.header.clientId)) -responseBody - } - requestHelper.sendResponseMaybeThrottle(request, createResponse) +def sendResponse(response: AbstractResponse): Unit = { + trace("Sending join group response %s for correlation id %d to client %s." +.format(response, request.header.correlationId, request.header.clientId)) + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { +response.maybeSetThrottleTimeMs(requestThrottleMs) +response + }) } if (joinGroupRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) { // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states. - sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.UNSUPPORTED_VERSION)) + sendResponse(joinGroupRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) } else if (!authHelper.authorize(request.context, READ, GROUP, joinGroupRequest.data.groupId)) { - sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_AUTHORIZATION_FAILED)) + sendResponse(joinGroupRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) } else { - val groupInstanceId = Option(joinGroupRequest.data.groupInstanceId) Review Comment: Yeah, I meant `now` instead of `not`. This code was moved 1:1 there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1021208615 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2524,196 +2528,166 @@ class KafkaApisTest { assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData)) } - @Test - def testJoinGroupProtocolsOrder(): Unit = { -val protocols = List( - ("first", "first".getBytes()), - ("second", "second".getBytes()) + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP) + def testHandleJoinGroupRequest(version: Short): Unit = { +val joinGroupRequest = new JoinGroupRequestData() + .setGroupId("group") + .setMemberId("member") + .setProtocolType("consumer") + .setRebalanceTimeoutMs(1000) + .setSessionTimeoutMs(2000) + +val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version)) + +val expectedRequestContext = new GroupCoordinatorRequestContext( + version, + requestChannelRequest.context.clientId, + requestChannelRequest.context.clientAddress, + RequestLocal.NoCaching.bufferSupplier ) -val groupId = "group" -val memberId = "member1" -val protocolType = "consumer" -val rebalanceTimeoutMs = 10 -val sessionTimeoutMs = 5 -val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]]) +val expectedJoinGroupRequest = new JoinGroupRequestData() + .setGroupId(joinGroupRequest.groupId) + .setMemberId(joinGroupRequest.memberId) + .setProtocolType(joinGroupRequest.protocolType) + .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs) + .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs) -createKafkaApis().handleJoinGroupRequest( - buildRequest( -new JoinGroupRequest.Builder( - new JoinGroupRequestData() -.setGroupId(groupId) -.setMemberId(memberId) -.setProtocolType(protocolType) -.setRebalanceTimeoutMs(rebalanceTimeoutMs) -.setSessionTimeoutMs(sessionTimeoutMs) -.setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection( - protocols.map { case (name, protocol) => new JoinGroupRequestProtocol() -.setName(name).setMetadata(protocol) - }.iterator.asJava)) -).build() - ), - RequestLocal.withThreadConfinedCaching) +val future = new CompletableFuture[JoinGroupResponseData]() +when(newGroupCoordinator.joinGroup( + ArgumentMatchers.eq(expectedRequestContext), + ArgumentMatchers.eq(expectedJoinGroupRequest) +)).thenReturn(future) -verify(groupCoordinator).handleJoinGroup( - ArgumentMatchers.eq(groupId), - ArgumentMatchers.eq(memberId), - ArgumentMatchers.eq(None), - ArgumentMatchers.eq(true), - ArgumentMatchers.eq(true), - ArgumentMatchers.eq(clientId), - ArgumentMatchers.eq(InetAddress.getLocalHost.toString), - ArgumentMatchers.eq(rebalanceTimeoutMs), - ArgumentMatchers.eq(sessionTimeoutMs), - ArgumentMatchers.eq(protocolType), - capturedProtocols.capture(), - any(), - any(), - any() +createKafkaApis().handleJoinGroupRequest( + requestChannelRequest, + RequestLocal.NoCaching ) -val capturedProtocolsList = capturedProtocols.getValue -assertEquals(protocols.size, capturedProtocolsList.size) -protocols.zip(capturedProtocolsList).foreach { case ((expectedName, expectedBytes), (name, bytes)) => - assertEquals(expectedName, name) - assertArrayEquals(expectedBytes, bytes) -} - } - @Test - def testJoinGroupWhenAnErrorOccurs(): Unit = { -for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) { - testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short]) -} +val expectedJoinGroupResponse = new JoinGroupResponseData() + .setMemberId("member") + .setGenerationId(0) + .setLeader("leader") + .setProtocolType("consumer") + .setProtocolName("range") + +future.complete(expectedJoinGroupResponse) +val capturedResponse = verifyNoThrottling(requestChannelRequest) +val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse] +assertEquals(expectedJoinGroupResponse, response.data) } - def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = { -reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager) + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP) + def testJoinGroupProtocolTypeBackwardCompatibility(version: Short): Unit = { +val joinGroupRequest = new JoinGroupRequestData() + .setGroupId("group") + .setMemberId("member") + .setProtocolType("consumer") +
[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1021208132 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2524,196 +2528,166 @@ class KafkaApisTest { assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData)) } - @Test - def testJoinGroupProtocolsOrder(): Unit = { -val protocols = List( - ("first", "first".getBytes()), - ("second", "second".getBytes()) + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP) + def testHandleJoinGroupRequest(version: Short): Unit = { +val joinGroupRequest = new JoinGroupRequestData() + .setGroupId("group") + .setMemberId("member") + .setProtocolType("consumer") + .setRebalanceTimeoutMs(1000) + .setSessionTimeoutMs(2000) + +val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version)) + +val expectedRequestContext = new GroupCoordinatorRequestContext( + version, + requestChannelRequest.context.clientId, + requestChannelRequest.context.clientAddress, + RequestLocal.NoCaching.bufferSupplier ) -val groupId = "group" -val memberId = "member1" -val protocolType = "consumer" -val rebalanceTimeoutMs = 10 -val sessionTimeoutMs = 5 -val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]]) +val expectedJoinGroupRequest = new JoinGroupRequestData() + .setGroupId(joinGroupRequest.groupId) + .setMemberId(joinGroupRequest.memberId) + .setProtocolType(joinGroupRequest.protocolType) + .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs) + .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs) -createKafkaApis().handleJoinGroupRequest( - buildRequest( -new JoinGroupRequest.Builder( - new JoinGroupRequestData() -.setGroupId(groupId) -.setMemberId(memberId) -.setProtocolType(protocolType) -.setRebalanceTimeoutMs(rebalanceTimeoutMs) -.setSessionTimeoutMs(sessionTimeoutMs) -.setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection( - protocols.map { case (name, protocol) => new JoinGroupRequestProtocol() -.setName(name).setMetadata(protocol) - }.iterator.asJava)) -).build() - ), - RequestLocal.withThreadConfinedCaching) +val future = new CompletableFuture[JoinGroupResponseData]() +when(newGroupCoordinator.joinGroup( + ArgumentMatchers.eq(expectedRequestContext), + ArgumentMatchers.eq(expectedJoinGroupRequest) +)).thenReturn(future) -verify(groupCoordinator).handleJoinGroup( - ArgumentMatchers.eq(groupId), - ArgumentMatchers.eq(memberId), - ArgumentMatchers.eq(None), - ArgumentMatchers.eq(true), - ArgumentMatchers.eq(true), - ArgumentMatchers.eq(clientId), - ArgumentMatchers.eq(InetAddress.getLocalHost.toString), - ArgumentMatchers.eq(rebalanceTimeoutMs), - ArgumentMatchers.eq(sessionTimeoutMs), - ArgumentMatchers.eq(protocolType), - capturedProtocols.capture(), - any(), - any(), - any() +createKafkaApis().handleJoinGroupRequest( + requestChannelRequest, + RequestLocal.NoCaching ) -val capturedProtocolsList = capturedProtocols.getValue -assertEquals(protocols.size, capturedProtocolsList.size) -protocols.zip(capturedProtocolsList).foreach { case ((expectedName, expectedBytes), (name, bytes)) => - assertEquals(expectedName, name) - assertArrayEquals(expectedBytes, bytes) -} - } - @Test - def testJoinGroupWhenAnErrorOccurs(): Unit = { -for (version <- ApiKeys.JOIN_GROUP.oldestVersion to ApiKeys.JOIN_GROUP.latestVersion) { - testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short]) -} +val expectedJoinGroupResponse = new JoinGroupResponseData() + .setMemberId("member") + .setGenerationId(0) + .setLeader("leader") + .setProtocolType("consumer") + .setProtocolName("range") + +future.complete(expectedJoinGroupResponse) +val capturedResponse = verifyNoThrottling(requestChannelRequest) +val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse] +assertEquals(expectedJoinGroupResponse, response.data) } - def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = { -reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager) + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP) + def testJoinGroupProtocolTypeBackwardCompatibility(version: Short): Unit = { +val joinGroupRequest = new JoinGroupRequestData() + .setGroupId("group") + .setMemberId("member") + .setProtocolType("consumer") +
[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1021207545 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2524,196 +2528,166 @@ class KafkaApisTest { assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData)) } - @Test - def testJoinGroupProtocolsOrder(): Unit = { -val protocols = List( - ("first", "first".getBytes()), - ("second", "second".getBytes()) + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP) + def testHandleJoinGroupRequest(version: Short): Unit = { +val joinGroupRequest = new JoinGroupRequestData() + .setGroupId("group") + .setMemberId("member") + .setProtocolType("consumer") + .setRebalanceTimeoutMs(1000) + .setSessionTimeoutMs(2000) + +val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build(version)) + +val expectedRequestContext = new GroupCoordinatorRequestContext( + version, + requestChannelRequest.context.clientId, + requestChannelRequest.context.clientAddress, + RequestLocal.NoCaching.bufferSupplier ) -val groupId = "group" -val memberId = "member1" -val protocolType = "consumer" -val rebalanceTimeoutMs = 10 -val sessionTimeoutMs = 5 -val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]]) +val expectedJoinGroupRequest = new JoinGroupRequestData() + .setGroupId(joinGroupRequest.groupId) + .setMemberId(joinGroupRequest.memberId) + .setProtocolType(joinGroupRequest.protocolType) + .setRebalanceTimeoutMs(if (version >= 1) joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs) + .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs) -createKafkaApis().handleJoinGroupRequest( - buildRequest( -new JoinGroupRequest.Builder( - new JoinGroupRequestData() -.setGroupId(groupId) -.setMemberId(memberId) -.setProtocolType(protocolType) -.setRebalanceTimeoutMs(rebalanceTimeoutMs) -.setSessionTimeoutMs(sessionTimeoutMs) -.setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection( - protocols.map { case (name, protocol) => new JoinGroupRequestProtocol() -.setName(name).setMetadata(protocol) - }.iterator.asJava)) -).build() - ), - RequestLocal.withThreadConfinedCaching) +val future = new CompletableFuture[JoinGroupResponseData]() +when(newGroupCoordinator.joinGroup( + ArgumentMatchers.eq(expectedRequestContext), + ArgumentMatchers.eq(expectedJoinGroupRequest) +)).thenReturn(future) -verify(groupCoordinator).handleJoinGroup( - ArgumentMatchers.eq(groupId), - ArgumentMatchers.eq(memberId), - ArgumentMatchers.eq(None), - ArgumentMatchers.eq(true), - ArgumentMatchers.eq(true), - ArgumentMatchers.eq(clientId), - ArgumentMatchers.eq(InetAddress.getLocalHost.toString), - ArgumentMatchers.eq(rebalanceTimeoutMs), - ArgumentMatchers.eq(sessionTimeoutMs), - ArgumentMatchers.eq(protocolType), - capturedProtocols.capture(), - any(), - any(), - any() +createKafkaApis().handleJoinGroupRequest( + requestChannelRequest, + RequestLocal.NoCaching ) -val capturedProtocolsList = capturedProtocols.getValue -assertEquals(protocols.size, capturedProtocolsList.size) -protocols.zip(capturedProtocolsList).foreach { case ((expectedName, expectedBytes), (name, bytes)) => - assertEquals(expectedName, name) - assertArrayEquals(expectedBytes, bytes) -} - } - @Test - def testJoinGroupWhenAnErrorOccurs(): Unit = { Review Comment: I have renames this one to `testJoinGroupProtocolTypeBackwardCompatibility`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1021207157 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2524,196 +2528,166 @@ class KafkaApisTest { assertEquals(MemoryRecords.EMPTY, FetchResponse.recordsOrFail(partitionData)) } - @Test - def testJoinGroupProtocolsOrder(): Unit = { Review Comment: This test's logic is not tested in `GroupCoordinatorAdaptorTest`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1021206505 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -94,6 +95,7 @@ class KafkaApis(val requestChannel: RequestChannel, val metadataSupport: MetadataSupport, val replicaManager: ReplicaManager, val groupCoordinator: GroupCoordinator, +val newGroupCoordinator: org.apache.kafka.coordinator.group.GroupCoordinator, Review Comment: This is a temporary change. When all the API are migrated to new new interface, I will change `groupCoordinator` to the new interface and remove this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1021205683 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1646,73 +1649,46 @@ class KafkaApis(val requestChannel: RequestChannel, } } + private def makeGroupCoordinatorRequestContextFrom( +request: RequestChannel.Request, +requestLocal: RequestLocal + ): GroupCoordinatorRequestContext = { +new GroupCoordinatorRequestContext( + request.context.header.data.requestApiVersion, + request.context.header.data.clientId, + request.context.clientAddress, + requestLocal.bufferSupplier +) + } + def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { val joinGroupRequest = request.body[JoinGroupRequest] -// the callback for sending a join-group response -def sendResponseCallback(joinResult: JoinGroupResult): Unit = { - def createResponse(requestThrottleMs: Int): AbstractResponse = { -val protocolName = if (request.context.apiVersion() >= 7) - joinResult.protocolName.orNull -else - joinResult.protocolName.getOrElse(GroupCoordinator.NoProtocol) - -val responseBody = new JoinGroupResponse( - new JoinGroupResponseData() -.setThrottleTimeMs(requestThrottleMs) -.setErrorCode(joinResult.error.code) -.setGenerationId(joinResult.generationId) -.setProtocolType(joinResult.protocolType.orNull) -.setProtocolName(protocolName) -.setLeader(joinResult.leaderId) -.setSkipAssignment(joinResult.skipAssignment) -.setMemberId(joinResult.memberId) -.setMembers(joinResult.members.asJava) -) - -trace("Sending join group response %s for correlation id %d to client %s." - .format(responseBody, request.header.correlationId, request.header.clientId)) -responseBody - } - requestHelper.sendResponseMaybeThrottle(request, createResponse) +def sendResponse(response: AbstractResponse): Unit = { + trace("Sending join group response %s for correlation id %d to client %s." +.format(response, request.header.correlationId, request.header.clientId)) + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { +response.maybeSetThrottleTimeMs(requestThrottleMs) +response + }) } if (joinGroupRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) { // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states. - sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.UNSUPPORTED_VERSION)) + sendResponse(joinGroupRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) } else if (!authHelper.authorize(request.context, READ, GROUP, joinGroupRequest.data.groupId)) { - sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_AUTHORIZATION_FAILED)) + sendResponse(joinGroupRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) } else { - val groupInstanceId = Option(joinGroupRequest.data.groupInstanceId) Review Comment: This code is not in `GroupCoordinatorAdaptor` without any changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1021205253 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1646,73 +1649,46 @@ class KafkaApis(val requestChannel: RequestChannel, } } + private def makeGroupCoordinatorRequestContextFrom( +request: RequestChannel.Request, +requestLocal: RequestLocal + ): GroupCoordinatorRequestContext = { +new GroupCoordinatorRequestContext( + request.context.header.data.requestApiVersion, + request.context.header.data.clientId, + request.context.clientAddress, + requestLocal.bufferSupplier +) + } + def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { val joinGroupRequest = request.body[JoinGroupRequest] -// the callback for sending a join-group response -def sendResponseCallback(joinResult: JoinGroupResult): Unit = { - def createResponse(requestThrottleMs: Int): AbstractResponse = { -val protocolName = if (request.context.apiVersion() >= 7) - joinResult.protocolName.orNull -else - joinResult.protocolName.getOrElse(GroupCoordinator.NoProtocol) - -val responseBody = new JoinGroupResponse( - new JoinGroupResponseData() Review Comment: This translation is now in `GroupCoordinatorAdapter`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org