[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-23 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1030180438


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRequestContext.java:
##
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.utils.BufferSupplier;
+
+import java.net.InetAddress;
+import java.util.Objects;
+
+public class GroupCoordinatorRequestContext {

Review Comment:
   That works except for the buffer supplier. I think that we can pass the 
buffer supplier as a regular argument instead of passing it via the context.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-23 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1030179126


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRequestContext.java:
##
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.utils.BufferSupplier;
+
+import java.net.InetAddress;
+import java.util.Objects;
+
+public class GroupCoordinatorRequestContext {
+
+private final short apiVersion;

Review Comment:
   Yeah, I agree with you. I went with the minimal to start with.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-18 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1027049468


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -21,11 +21,10 @@ import java.io.{File, IOException}
 import java.net.{InetAddress, SocketTimeoutException}
 import java.util.concurrent._
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
-

Review Comment:
   I will fix this on Monday. It seems that my IDE is doing this automatically 
somehow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-18 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1027049342


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1647,69 +1656,51 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
-  def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: 
RequestLocal): Unit = {
-val joinGroupRequest = request.body[JoinGroupRequest]
+  private def makeGroupCoordinatorRequestContext(
+request: RequestChannel.Request,
+requestLocal: RequestLocal
+  ): GroupCoordinatorRequestContext = {
+new GroupCoordinatorRequestContext(
+  request.context.header.data.requestApiVersion,
+  request.context.header.data.clientId,
+  request.context.clientAddress,
+  requestLocal.bufferSupplier
+)
+  }
 
-// the callback for sending a join-group response
-def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-val responseBody = new JoinGroupResponse(
-  new JoinGroupResponseData()
-.setThrottleTimeMs(requestThrottleMs)
-.setErrorCode(joinResult.error.code)
-.setGenerationId(joinResult.generationId)
-.setProtocolType(joinResult.protocolType.orNull)
-.setProtocolName(joinResult.protocolName.orNull)
-.setLeader(joinResult.leaderId)
-.setSkipAssignment(joinResult.skipAssignment)
-.setMemberId(joinResult.memberId)
-.setMembers(joinResult.members.asJava),
-  request.context.apiVersion
-)
+  def handleJoinGroupRequest(
+request: RequestChannel.Request,
+requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
+val joinGroupRequest = request.body[JoinGroupRequest]
 
-trace("Sending join group response %s for correlation id %d to client 
%s."
-  .format(responseBody, request.header.correlationId, 
request.header.clientId))
-responseBody
-  }
-  requestHelper.sendResponseMaybeThrottle(request, createResponse)
+def sendResponse(response: AbstractResponse): Unit = {
+  trace("Sending join group response %s for correlation id %d to client 
%s."
+.format(response, request.header.correlationId, 
request.header.clientId))
+  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+response.maybeSetThrottleTimeMs(requestThrottleMs)
+response
+  })
 }
 
 if (joinGroupRequest.data.groupInstanceId != null && 
config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
   // Only enable static membership when IBP >= 2.3, because it is not safe 
for the broker to use the static member logic
   // until we are sure that all brokers support it. If static group being 
loaded by an older coordinator, it will discard
   // the group.instance.id field, so static members could accidentally 
become "dynamic", which leads to wrong states.
-  sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, 
Errors.UNSUPPORTED_VERSION))
+  
sendResponse(joinGroupRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+  CompletableFuture.completedFuture[Unit](())

Review Comment:
   All handlers are already surrounded by a try..catch that catches all the 
exceptions raised. We just need to ensure that exceptions raised in futures’ 
callbacks are also caught.
   
   In this particular case, any exceptions raised by sendResponse would be 
caught by that try..catch. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-18 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1027049080


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1647,69 +1656,51 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
-  def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: 
RequestLocal): Unit = {
-val joinGroupRequest = request.body[JoinGroupRequest]
+  private def makeGroupCoordinatorRequestContext(
+request: RequestChannel.Request,
+requestLocal: RequestLocal
+  ): GroupCoordinatorRequestContext = {
+new GroupCoordinatorRequestContext(
+  request.context.header.data.requestApiVersion,
+  request.context.header.data.clientId,
+  request.context.clientAddress,
+  requestLocal.bufferSupplier
+)
+  }
 
-// the callback for sending a join-group response
-def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-val responseBody = new JoinGroupResponse(
-  new JoinGroupResponseData()
-.setThrottleTimeMs(requestThrottleMs)
-.setErrorCode(joinResult.error.code)
-.setGenerationId(joinResult.generationId)
-.setProtocolType(joinResult.protocolType.orNull)
-.setProtocolName(joinResult.protocolName.orNull)
-.setLeader(joinResult.leaderId)
-.setSkipAssignment(joinResult.skipAssignment)
-.setMemberId(joinResult.memberId)
-.setMembers(joinResult.members.asJava),
-  request.context.apiVersion
-)
+  def handleJoinGroupRequest(
+request: RequestChannel.Request,
+requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {

Review Comment:
   A future is a better abstraction than a callback in my opinion but that is 
subjective. We could keep using callbacks as well. The end result in the same. 
The new Controller interface uses Futures as well so standardizing makes sense 
here.
   
   Regarding the current APIs, join group and sync group are definitely relying 
on callbacks. For the others, I don’t remember from the top of my head. In the 
new group coordinator, all of them will be executed in a different thread so 
they must be async.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-18 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1026881513


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1647,69 +1656,51 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
-  def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: 
RequestLocal): Unit = {
-val joinGroupRequest = request.body[JoinGroupRequest]
+  private def makeGroupCoordinatorRequestContext(
+request: RequestChannel.Request,
+requestLocal: RequestLocal
+  ): GroupCoordinatorRequestContext = {
+new GroupCoordinatorRequestContext(
+  request.context.header.data.requestApiVersion,
+  request.context.header.data.clientId,
+  request.context.clientAddress,
+  requestLocal.bufferSupplier
+)
+  }
 
-// the callback for sending a join-group response
-def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-val responseBody = new JoinGroupResponse(
-  new JoinGroupResponseData()
-.setThrottleTimeMs(requestThrottleMs)
-.setErrorCode(joinResult.error.code)
-.setGenerationId(joinResult.generationId)
-.setProtocolType(joinResult.protocolType.orNull)
-.setProtocolName(joinResult.protocolName.orNull)
-.setLeader(joinResult.leaderId)
-.setSkipAssignment(joinResult.skipAssignment)
-.setMemberId(joinResult.memberId)
-.setMembers(joinResult.members.asJava),
-  request.context.apiVersion
-)
+  def handleJoinGroupRequest(
+request: RequestChannel.Request,
+requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
+val joinGroupRequest = request.body[JoinGroupRequest]
 
-trace("Sending join group response %s for correlation id %d to client 
%s."
-  .format(responseBody, request.header.correlationId, 
request.header.clientId))
-responseBody
-  }
-  requestHelper.sendResponseMaybeThrottle(request, createResponse)
+def sendResponse(response: AbstractResponse): Unit = {
+  trace("Sending join group response %s for correlation id %d to client 
%s."
+.format(response, request.header.correlationId, 
request.header.clientId))
+  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+response.maybeSetThrottleTimeMs(requestThrottleMs)
+response

Review Comment:
   hmm… sendResponseMaybeThrottle takes a function which returns a response.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-17 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1026102820


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2524,196 +2530,208 @@ class KafkaApisTest {
 assertEquals(MemoryRecords.EMPTY, 
FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {
-val protocols = List(
-  ("first", "first".getBytes()),
-  ("second", "second".getBytes())
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testHandleJoinGroupRequest(version: Short): Unit = {
+val joinGroupRequest = new JoinGroupRequestData()
+  .setGroupId("group")
+  .setMemberId("member")
+  .setProtocolType("consumer")
+  .setRebalanceTimeoutMs(1000)
+  .setSessionTimeoutMs(2000)
+
+val requestChannelRequest = buildRequest(new 
JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+val expectedRequestContext = new GroupCoordinatorRequestContext(
+  version,
+  requestChannelRequest.context.clientId,
+  requestChannelRequest.context.clientAddress,
+  RequestLocal.NoCaching.bufferSupplier
 )
 
-val groupId = "group"
-val memberId = "member1"
-val protocolType = "consumer"
-val rebalanceTimeoutMs = 10
-val sessionTimeoutMs = 5
-val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = 
ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
+val expectedJoinGroupRequest = new JoinGroupRequestData()
+  .setGroupId(joinGroupRequest.groupId)
+  .setMemberId(joinGroupRequest.memberId)
+  .setProtocolType(joinGroupRequest.protocolType)
+  .setRebalanceTimeoutMs(if (version >= 1) 
joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+  .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-createKafkaApis().handleJoinGroupRequest(
-  buildRequest(
-new JoinGroupRequest.Builder(
-  new JoinGroupRequestData()
-.setGroupId(groupId)
-.setMemberId(memberId)
-.setProtocolType(protocolType)
-.setRebalanceTimeoutMs(rebalanceTimeoutMs)
-.setSessionTimeoutMs(sessionTimeoutMs)
-.setProtocols(new 
JoinGroupRequestData.JoinGroupRequestProtocolCollection(
-  protocols.map { case (name, protocol) => new 
JoinGroupRequestProtocol()
-.setName(name).setMetadata(protocol)
-  }.iterator.asJava))
-).build()
-  ),
-  RequestLocal.withThreadConfinedCaching)
+val future = new CompletableFuture[JoinGroupResponseData]()
+when(newGroupCoordinator.joinGroup(
+  ArgumentMatchers.eq(expectedRequestContext),
+  ArgumentMatchers.eq(expectedJoinGroupRequest)
+)).thenReturn(future)
 
-verify(groupCoordinator).handleJoinGroup(
-  ArgumentMatchers.eq(groupId),
-  ArgumentMatchers.eq(memberId),
-  ArgumentMatchers.eq(None),
-  ArgumentMatchers.eq(true),
-  ArgumentMatchers.eq(true),
-  ArgumentMatchers.eq(clientId),
-  ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-  ArgumentMatchers.eq(rebalanceTimeoutMs),
-  ArgumentMatchers.eq(sessionTimeoutMs),
-  ArgumentMatchers.eq(protocolType),
-  capturedProtocols.capture(),
-  any(),
-  any(),
-  any()
+createKafkaApis().handleJoinGroupRequest(
+  requestChannelRequest,
+  RequestLocal.NoCaching
 )
-val capturedProtocolsList = capturedProtocols.getValue
-assertEquals(protocols.size, capturedProtocolsList.size)
-protocols.zip(capturedProtocolsList).foreach { case ((expectedName, 
expectedBytes), (name, bytes)) =>
-  assertEquals(expectedName, name)
-  assertArrayEquals(expectedBytes, bytes)
-}
-  }
 
-  @Test
-  def testJoinGroupWhenAnErrorOccurs(): Unit = {
-for (version <- ApiKeys.JOIN_GROUP.oldestVersion to 
ApiKeys.JOIN_GROUP.latestVersion) {
-  testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short])
-}
-  }
+val expectedJoinGroupResponse = new JoinGroupResponseData()
+  .setMemberId("member")
+  .setGenerationId(0)
+  .setLeader("leader")
+  .setProtocolType("consumer")
+  .setProtocolName("range")
 
-  def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = {
-reset(groupCoordinator, clientRequestQuotaManager, requestChannel, 
replicaManager)
+future.complete(expectedJoinGroupResponse)
+val capturedResponse = verifyNoThrottling(requestChannelRequest)
+val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+assertEquals(expectedJoinGroupResponse, response.data)
+  }
 
-val groupId = "group"
-val memberId = "member1"
-val protocolType = "consumer"
-val rebalanceTimeoutMs = 10
-val sessionTimeoutMs = 5
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroupProtocolNameBackwardCompatibility(version: Short): Uni

[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-17 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1026101492


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2524,196 +2530,208 @@ class KafkaApisTest {
 assertEquals(MemoryRecords.EMPTY, 
FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {
-val protocols = List(
-  ("first", "first".getBytes()),
-  ("second", "second".getBytes())
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testHandleJoinGroupRequest(version: Short): Unit = {
+val joinGroupRequest = new JoinGroupRequestData()
+  .setGroupId("group")
+  .setMemberId("member")
+  .setProtocolType("consumer")
+  .setRebalanceTimeoutMs(1000)
+  .setSessionTimeoutMs(2000)
+
+val requestChannelRequest = buildRequest(new 
JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+val expectedRequestContext = new GroupCoordinatorRequestContext(
+  version,
+  requestChannelRequest.context.clientId,
+  requestChannelRequest.context.clientAddress,
+  RequestLocal.NoCaching.bufferSupplier
 )
 
-val groupId = "group"
-val memberId = "member1"
-val protocolType = "consumer"
-val rebalanceTimeoutMs = 10
-val sessionTimeoutMs = 5
-val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = 
ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
+val expectedJoinGroupRequest = new JoinGroupRequestData()
+  .setGroupId(joinGroupRequest.groupId)
+  .setMemberId(joinGroupRequest.memberId)
+  .setProtocolType(joinGroupRequest.protocolType)
+  .setRebalanceTimeoutMs(if (version >= 1) 
joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+  .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-createKafkaApis().handleJoinGroupRequest(
-  buildRequest(
-new JoinGroupRequest.Builder(
-  new JoinGroupRequestData()
-.setGroupId(groupId)
-.setMemberId(memberId)
-.setProtocolType(protocolType)
-.setRebalanceTimeoutMs(rebalanceTimeoutMs)
-.setSessionTimeoutMs(sessionTimeoutMs)
-.setProtocols(new 
JoinGroupRequestData.JoinGroupRequestProtocolCollection(
-  protocols.map { case (name, protocol) => new 
JoinGroupRequestProtocol()
-.setName(name).setMetadata(protocol)
-  }.iterator.asJava))
-).build()
-  ),
-  RequestLocal.withThreadConfinedCaching)
+val future = new CompletableFuture[JoinGroupResponseData]()
+when(newGroupCoordinator.joinGroup(
+  ArgumentMatchers.eq(expectedRequestContext),
+  ArgumentMatchers.eq(expectedJoinGroupRequest)
+)).thenReturn(future)
 
-verify(groupCoordinator).handleJoinGroup(
-  ArgumentMatchers.eq(groupId),
-  ArgumentMatchers.eq(memberId),
-  ArgumentMatchers.eq(None),
-  ArgumentMatchers.eq(true),
-  ArgumentMatchers.eq(true),
-  ArgumentMatchers.eq(clientId),
-  ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-  ArgumentMatchers.eq(rebalanceTimeoutMs),
-  ArgumentMatchers.eq(sessionTimeoutMs),
-  ArgumentMatchers.eq(protocolType),
-  capturedProtocols.capture(),
-  any(),
-  any(),
-  any()
+createKafkaApis().handleJoinGroupRequest(
+  requestChannelRequest,
+  RequestLocal.NoCaching
 )
-val capturedProtocolsList = capturedProtocols.getValue
-assertEquals(protocols.size, capturedProtocolsList.size)
-protocols.zip(capturedProtocolsList).foreach { case ((expectedName, 
expectedBytes), (name, bytes)) =>
-  assertEquals(expectedName, name)
-  assertArrayEquals(expectedBytes, bytes)
-}
-  }
 
-  @Test
-  def testJoinGroupWhenAnErrorOccurs(): Unit = {
-for (version <- ApiKeys.JOIN_GROUP.oldestVersion to 
ApiKeys.JOIN_GROUP.latestVersion) {
-  testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short])
-}
-  }
+val expectedJoinGroupResponse = new JoinGroupResponseData()
+  .setMemberId("member")
+  .setGenerationId(0)
+  .setLeader("leader")
+  .setProtocolType("consumer")
+  .setProtocolName("range")
 
-  def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = {
-reset(groupCoordinator, clientRequestQuotaManager, requestChannel, 
replicaManager)
+future.complete(expectedJoinGroupResponse)
+val capturedResponse = verifyNoThrottling(requestChannelRequest)
+val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+assertEquals(expectedJoinGroupResponse, response.data)
+  }
 
-val groupId = "group"
-val memberId = "member1"
-val protocolType = "consumer"
-val rebalanceTimeoutMs = 10
-val sessionTimeoutMs = 5
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroupProtocolNameBackwardCompatibility(version: Short): Uni

[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-17 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1026099763


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1647,69 +1656,51 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
-  def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: 
RequestLocal): Unit = {
-val joinGroupRequest = request.body[JoinGroupRequest]
+  private def makeGroupCoordinatorRequestContext(
+request: RequestChannel.Request,
+requestLocal: RequestLocal
+  ): GroupCoordinatorRequestContext = {
+new GroupCoordinatorRequestContext(
+  request.context.header.data.requestApiVersion,
+  request.context.header.data.clientId,
+  request.context.clientAddress,
+  requestLocal.bufferSupplier
+)
+  }
 
-// the callback for sending a join-group response
-def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-val responseBody = new JoinGroupResponse(
-  new JoinGroupResponseData()
-.setThrottleTimeMs(requestThrottleMs)
-.setErrorCode(joinResult.error.code)
-.setGenerationId(joinResult.generationId)
-.setProtocolType(joinResult.protocolType.orNull)
-.setProtocolName(joinResult.protocolName.orNull)
-.setLeader(joinResult.leaderId)
-.setSkipAssignment(joinResult.skipAssignment)
-.setMemberId(joinResult.memberId)
-.setMembers(joinResult.members.asJava),
-  request.context.apiVersion
-)
+  def handleJoinGroupRequest(
+request: RequestChannel.Request,
+requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {

Review Comment:
   1. Yeah, that's right. I return a future to catch any other errors. It could 
be from `sendResponse` but it could also be other issues.
   2. No, it would not because the underlying API uses a callback.
   3. All APIs will be async for this reason but not that most of them are 
already async today. The difference is that we use a future here instead of a 
callback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-17 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1026098011


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -161,6 +166,12 @@ class KafkaApis(val requestChannel: RequestChannel,
* Top-level method that handles all requests and multiplexes to the right 
api
*/
   override def handle(request: RequestChannel.Request, requestLocal: 
RequestLocal): Unit = {
+def handleError(e: Throwable): Unit = {
+  trace(s"Unexpected error handling request ${request.requestDesc(true)} " 
+

Review Comment:
   it is a mistake. reverted to error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-16 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1024835046


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1646,73 +1649,46 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  private def makeGroupCoordinatorRequestContextFrom(

Review Comment:
   I can remove the `From` part. I am not too opinionated on this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-16 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023749684


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1646,73 +1649,46 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  private def makeGroupCoordinatorRequestContextFrom(

Review Comment:
   why do you think so? is it because of the usage of `make`? the name looks 
quite reasonable to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-16 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023741182


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -22,9 +22,8 @@ import java.util
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit, 
TimeoutException}
-

Review Comment:
   Nope. Reverted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-15 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023606759


##
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import 
kafka.coordinator.group.GroupCoordinatorConcurrencyTest.JoinGroupCallback
+import kafka.server.RequestLocal
+
+import org.apache.kafka.common.message.{JoinGroupRequestData, 
JoinGroupResponseData}
+import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.utils.BufferSupplier
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
+import org.apache.kafka.coordinator.group.GroupCoordinatorRequestContext
+
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import org.mockito.Mockito.{mock, verify}
+
+import java.net.InetAddress
+import scala.jdk.CollectionConverters._
+
+class GroupCoordinatorAdapterTest {
+
+  private def makeContext(
+apiVersion: Short
+  ): GroupCoordinatorRequestContext = {
+new GroupCoordinatorRequestContext(
+  apiVersion,
+  "client",
+  InetAddress.getLocalHost,
+  BufferSupplier.create()
+)
+  }
+
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroup(version: Short): Unit = {

Review Comment:
   This test tests the logic of `joinGroup` method in the 
`GroupCoordinatorAdapter`. Given an input, it verifies the output. The tests in 
KafkaApisTests do not do this as they only verity that the adaptor get the 
expected input.
   
   The name seems pretty clear to me as the test tests the `joinGroup` method.



##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2524,196 +2528,166 @@ class KafkaApisTest {
 assertEquals(MemoryRecords.EMPTY, 
FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {
-val protocols = List(
-  ("first", "first".getBytes()),
-  ("second", "second".getBytes())
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testHandleJoinGroupRequest(version: Short): Unit = {
+val joinGroupRequest = new JoinGroupRequestData()
+  .setGroupId("group")
+  .setMemberId("member")
+  .setProtocolType("consumer")
+  .setRebalanceTimeoutMs(1000)
+  .setSessionTimeoutMs(2000)
+
+val requestChannelRequest = buildRequest(new 
JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+val expectedRequestContext = new GroupCoordinatorRequestContext(
+  version,
+  requestChannelRequest.context.clientId,
+  requestChannelRequest.context.clientAddress,
+  RequestLocal.NoCaching.bufferSupplier
 )
 
-val groupId = "group"
-val memberId = "member1"
-val protocolType = "consumer"
-val rebalanceTimeoutMs = 10
-val sessionTimeoutMs = 5
-val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = 
ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
+val expectedJoinGroupRequest = new JoinGroupRequestData()
+  .setGroupId(joinGroupRequest.groupId)
+  .setMemberId(joinGroupRequest.memberId)
+  .setProtocolType(joinGroupRequest.protocolType)
+  .setRebalanceTimeoutMs(if (version >= 1) 
joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+  .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-createKafkaApis().handleJoinGroupRequest(
-  buildRequest(
-new JoinGroupRequest.Builder(
-  new JoinGroupRequestData()
-.setGroupId(groupId)
-.setMemberId(memberId)
-.setProtocolType(protocolType)
-.setRebalanceTimeoutMs(rebalanceTimeoutMs)
-.setSessionTimeoutMs(sessionTimeoutMs)
-.setProtocols(new 
JoinGroupRequestData.JoinGroupRequestProtoco

[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-15 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023604793


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2524,196 +2528,166 @@ class KafkaApisTest {
 assertEquals(MemoryRecords.EMPTY, 
FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {
-val protocols = List(
-  ("first", "first".getBytes()),
-  ("second", "second".getBytes())
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testHandleJoinGroupRequest(version: Short): Unit = {
+val joinGroupRequest = new JoinGroupRequestData()
+  .setGroupId("group")
+  .setMemberId("member")
+  .setProtocolType("consumer")
+  .setRebalanceTimeoutMs(1000)
+  .setSessionTimeoutMs(2000)
+
+val requestChannelRequest = buildRequest(new 
JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+val expectedRequestContext = new GroupCoordinatorRequestContext(
+  version,
+  requestChannelRequest.context.clientId,
+  requestChannelRequest.context.clientAddress,
+  RequestLocal.NoCaching.bufferSupplier
 )
 
-val groupId = "group"
-val memberId = "member1"
-val protocolType = "consumer"
-val rebalanceTimeoutMs = 10
-val sessionTimeoutMs = 5
-val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = 
ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
+val expectedJoinGroupRequest = new JoinGroupRequestData()
+  .setGroupId(joinGroupRequest.groupId)
+  .setMemberId(joinGroupRequest.memberId)
+  .setProtocolType(joinGroupRequest.protocolType)
+  .setRebalanceTimeoutMs(if (version >= 1) 
joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+  .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-createKafkaApis().handleJoinGroupRequest(
-  buildRequest(
-new JoinGroupRequest.Builder(
-  new JoinGroupRequestData()
-.setGroupId(groupId)
-.setMemberId(memberId)
-.setProtocolType(protocolType)
-.setRebalanceTimeoutMs(rebalanceTimeoutMs)
-.setSessionTimeoutMs(sessionTimeoutMs)
-.setProtocols(new 
JoinGroupRequestData.JoinGroupRequestProtocolCollection(
-  protocols.map { case (name, protocol) => new 
JoinGroupRequestProtocol()
-.setName(name).setMetadata(protocol)
-  }.iterator.asJava))
-).build()
-  ),
-  RequestLocal.withThreadConfinedCaching)
+val future = new CompletableFuture[JoinGroupResponseData]()
+when(newGroupCoordinator.joinGroup(
+  ArgumentMatchers.eq(expectedRequestContext),
+  ArgumentMatchers.eq(expectedJoinGroupRequest)
+)).thenReturn(future)
 
-verify(groupCoordinator).handleJoinGroup(
-  ArgumentMatchers.eq(groupId),
-  ArgumentMatchers.eq(memberId),
-  ArgumentMatchers.eq(None),
-  ArgumentMatchers.eq(true),
-  ArgumentMatchers.eq(true),
-  ArgumentMatchers.eq(clientId),
-  ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-  ArgumentMatchers.eq(rebalanceTimeoutMs),
-  ArgumentMatchers.eq(sessionTimeoutMs),
-  ArgumentMatchers.eq(protocolType),
-  capturedProtocols.capture(),
-  any(),
-  any(),
-  any()
+createKafkaApis().handleJoinGroupRequest(
+  requestChannelRequest,
+  RequestLocal.NoCaching
 )
-val capturedProtocolsList = capturedProtocols.getValue
-assertEquals(protocols.size, capturedProtocolsList.size)
-protocols.zip(capturedProtocolsList).foreach { case ((expectedName, 
expectedBytes), (name, bytes)) =>
-  assertEquals(expectedName, name)
-  assertArrayEquals(expectedBytes, bytes)
-}
-  }
 
-  @Test
-  def testJoinGroupWhenAnErrorOccurs(): Unit = {
-for (version <- ApiKeys.JOIN_GROUP.oldestVersion to 
ApiKeys.JOIN_GROUP.latestVersion) {
-  testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short])
-}
+val expectedJoinGroupResponse = new JoinGroupResponseData()
+  .setMemberId("member")
+  .setGenerationId(0)
+  .setLeader("leader")
+  .setProtocolType("consumer")
+  .setProtocolName("range")
+
+future.complete(expectedJoinGroupResponse)
+val capturedResponse = verifyNoThrottling(requestChannelRequest)
+val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+assertEquals(expectedJoinGroupResponse, response.data)
   }
 
-  def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = {
-reset(groupCoordinator, clientRequestQuotaManager, requestChannel, 
replicaManager)
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroupProtocolTypeBackwardCompatibility(version: Short): Unit = {
+val joinGroupRequest = new JoinGroupRequestData()
+  .setGroupId("group")
+  .setMemberId("member")
+  .setProtocolType("consumer")
+

[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-15 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023605151


##
clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java:
##
@@ -28,9 +28,15 @@ public class JoinGroupResponse extends AbstractResponse {
 
 private final JoinGroupResponseData data;
 
-public JoinGroupResponse(JoinGroupResponseData data) {
+public JoinGroupResponse(JoinGroupResponseData data, short version) {
 super(ApiKeys.JOIN_GROUP);
 this.data = data;
+
+// All versions prior to version 7 do not support nullable
+// string for the protocol type. Empty string should be used.
+if (version < 7 && data.protocolType() == null) {

Review Comment:
   Right it was there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-15 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1021208132


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2524,196 +2528,166 @@ class KafkaApisTest {
 assertEquals(MemoryRecords.EMPTY, 
FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {
-val protocols = List(
-  ("first", "first".getBytes()),
-  ("second", "second".getBytes())
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testHandleJoinGroupRequest(version: Short): Unit = {
+val joinGroupRequest = new JoinGroupRequestData()
+  .setGroupId("group")
+  .setMemberId("member")
+  .setProtocolType("consumer")
+  .setRebalanceTimeoutMs(1000)
+  .setSessionTimeoutMs(2000)
+
+val requestChannelRequest = buildRequest(new 
JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+val expectedRequestContext = new GroupCoordinatorRequestContext(
+  version,
+  requestChannelRequest.context.clientId,
+  requestChannelRequest.context.clientAddress,
+  RequestLocal.NoCaching.bufferSupplier
 )
 
-val groupId = "group"
-val memberId = "member1"
-val protocolType = "consumer"
-val rebalanceTimeoutMs = 10
-val sessionTimeoutMs = 5
-val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = 
ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
+val expectedJoinGroupRequest = new JoinGroupRequestData()
+  .setGroupId(joinGroupRequest.groupId)
+  .setMemberId(joinGroupRequest.memberId)
+  .setProtocolType(joinGroupRequest.protocolType)
+  .setRebalanceTimeoutMs(if (version >= 1) 
joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+  .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-createKafkaApis().handleJoinGroupRequest(
-  buildRequest(
-new JoinGroupRequest.Builder(
-  new JoinGroupRequestData()
-.setGroupId(groupId)
-.setMemberId(memberId)
-.setProtocolType(protocolType)
-.setRebalanceTimeoutMs(rebalanceTimeoutMs)
-.setSessionTimeoutMs(sessionTimeoutMs)
-.setProtocols(new 
JoinGroupRequestData.JoinGroupRequestProtocolCollection(
-  protocols.map { case (name, protocol) => new 
JoinGroupRequestProtocol()
-.setName(name).setMetadata(protocol)
-  }.iterator.asJava))
-).build()
-  ),
-  RequestLocal.withThreadConfinedCaching)
+val future = new CompletableFuture[JoinGroupResponseData]()
+when(newGroupCoordinator.joinGroup(
+  ArgumentMatchers.eq(expectedRequestContext),
+  ArgumentMatchers.eq(expectedJoinGroupRequest)
+)).thenReturn(future)
 
-verify(groupCoordinator).handleJoinGroup(
-  ArgumentMatchers.eq(groupId),
-  ArgumentMatchers.eq(memberId),
-  ArgumentMatchers.eq(None),
-  ArgumentMatchers.eq(true),
-  ArgumentMatchers.eq(true),
-  ArgumentMatchers.eq(clientId),
-  ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-  ArgumentMatchers.eq(rebalanceTimeoutMs),
-  ArgumentMatchers.eq(sessionTimeoutMs),
-  ArgumentMatchers.eq(protocolType),
-  capturedProtocols.capture(),
-  any(),
-  any(),
-  any()
+createKafkaApis().handleJoinGroupRequest(
+  requestChannelRequest,
+  RequestLocal.NoCaching
 )
-val capturedProtocolsList = capturedProtocols.getValue
-assertEquals(protocols.size, capturedProtocolsList.size)
-protocols.zip(capturedProtocolsList).foreach { case ((expectedName, 
expectedBytes), (name, bytes)) =>
-  assertEquals(expectedName, name)
-  assertArrayEquals(expectedBytes, bytes)
-}
-  }
 
-  @Test
-  def testJoinGroupWhenAnErrorOccurs(): Unit = {
-for (version <- ApiKeys.JOIN_GROUP.oldestVersion to 
ApiKeys.JOIN_GROUP.latestVersion) {
-  testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short])
-}
+val expectedJoinGroupResponse = new JoinGroupResponseData()
+  .setMemberId("member")
+  .setGenerationId(0)
+  .setLeader("leader")
+  .setProtocolType("consumer")
+  .setProtocolName("range")
+
+future.complete(expectedJoinGroupResponse)
+val capturedResponse = verifyNoThrottling(requestChannelRequest)
+val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+assertEquals(expectedJoinGroupResponse, response.data)
   }
 
-  def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = {
-reset(groupCoordinator, clientRequestQuotaManager, requestChannel, 
replicaManager)
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroupProtocolTypeBackwardCompatibility(version: Short): Unit = {
+val joinGroupRequest = new JoinGroupRequestData()
+  .setGroupId("group")
+  .setMemberId("member")
+  .setProtocolType("consumer")
+

[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-15 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023603789


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2524,196 +2528,166 @@ class KafkaApisTest {
 assertEquals(MemoryRecords.EMPTY, 
FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {

Review Comment:
   Right. Let me fix it.



##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2524,196 +2528,166 @@ class KafkaApisTest {
 assertEquals(MemoryRecords.EMPTY, 
FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {

Review Comment:
   This test's logic is now tested in `GroupCoordinatorAdaptorTest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-15 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023603444


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1646,73 +1649,46 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  private def makeGroupCoordinatorRequestContextFrom(
+request: RequestChannel.Request,
+requestLocal: RequestLocal
+  ): GroupCoordinatorRequestContext = {
+new GroupCoordinatorRequestContext(
+  request.context.header.data.requestApiVersion,
+  request.context.header.data.clientId,
+  request.context.clientAddress,
+  requestLocal.bufferSupplier
+)
+  }
+
   def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: 
RequestLocal): Unit = {
 val joinGroupRequest = request.body[JoinGroupRequest]
 
-// the callback for sending a join-group response
-def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-val protocolName = if (request.context.apiVersion() >= 7)
-  joinResult.protocolName.orNull
-else
-  joinResult.protocolName.getOrElse(GroupCoordinator.NoProtocol)
-
-val responseBody = new JoinGroupResponse(
-  new JoinGroupResponseData()
-.setThrottleTimeMs(requestThrottleMs)
-.setErrorCode(joinResult.error.code)
-.setGenerationId(joinResult.generationId)
-.setProtocolType(joinResult.protocolType.orNull)
-.setProtocolName(protocolName)
-.setLeader(joinResult.leaderId)
-.setSkipAssignment(joinResult.skipAssignment)
-.setMemberId(joinResult.memberId)
-.setMembers(joinResult.members.asJava)
-)
-
-trace("Sending join group response %s for correlation id %d to client 
%s."
-  .format(responseBody, request.header.correlationId, 
request.header.clientId))
-responseBody
-  }
-  requestHelper.sendResponseMaybeThrottle(request, createResponse)
+def sendResponse(response: AbstractResponse): Unit = {
+  trace("Sending join group response %s for correlation id %d to client 
%s."
+.format(response, request.header.correlationId, 
request.header.clientId))
+  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+response.maybeSetThrottleTimeMs(requestThrottleMs)
+response
+  })
 }
 
 if (joinGroupRequest.data.groupInstanceId != null && 
config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
   // Only enable static membership when IBP >= 2.3, because it is not safe 
for the broker to use the static member logic
   // until we are sure that all brokers support it. If static group being 
loaded by an older coordinator, it will discard
   // the group.instance.id field, so static members could accidentally 
become "dynamic", which leads to wrong states.
-  sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, 
Errors.UNSUPPORTED_VERSION))
+  
sendResponse(joinGroupRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
 } else if (!authHelper.authorize(request.context, READ, GROUP, 
joinGroupRequest.data.groupId)) {
-  sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, 
Errors.GROUP_AUTHORIZATION_FAILED))
+  
sendResponse(joinGroupRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
 } else {
-  val groupInstanceId = Option(joinGroupRequest.data.groupInstanceId)

Review Comment:
   Yeah, I meant `now` instead of `not`. This code was moved 1:1 there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-14 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1021208615


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2524,196 +2528,166 @@ class KafkaApisTest {
 assertEquals(MemoryRecords.EMPTY, 
FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {
-val protocols = List(
-  ("first", "first".getBytes()),
-  ("second", "second".getBytes())
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testHandleJoinGroupRequest(version: Short): Unit = {
+val joinGroupRequest = new JoinGroupRequestData()
+  .setGroupId("group")
+  .setMemberId("member")
+  .setProtocolType("consumer")
+  .setRebalanceTimeoutMs(1000)
+  .setSessionTimeoutMs(2000)
+
+val requestChannelRequest = buildRequest(new 
JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+val expectedRequestContext = new GroupCoordinatorRequestContext(
+  version,
+  requestChannelRequest.context.clientId,
+  requestChannelRequest.context.clientAddress,
+  RequestLocal.NoCaching.bufferSupplier
 )
 
-val groupId = "group"
-val memberId = "member1"
-val protocolType = "consumer"
-val rebalanceTimeoutMs = 10
-val sessionTimeoutMs = 5
-val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = 
ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
+val expectedJoinGroupRequest = new JoinGroupRequestData()
+  .setGroupId(joinGroupRequest.groupId)
+  .setMemberId(joinGroupRequest.memberId)
+  .setProtocolType(joinGroupRequest.protocolType)
+  .setRebalanceTimeoutMs(if (version >= 1) 
joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+  .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-createKafkaApis().handleJoinGroupRequest(
-  buildRequest(
-new JoinGroupRequest.Builder(
-  new JoinGroupRequestData()
-.setGroupId(groupId)
-.setMemberId(memberId)
-.setProtocolType(protocolType)
-.setRebalanceTimeoutMs(rebalanceTimeoutMs)
-.setSessionTimeoutMs(sessionTimeoutMs)
-.setProtocols(new 
JoinGroupRequestData.JoinGroupRequestProtocolCollection(
-  protocols.map { case (name, protocol) => new 
JoinGroupRequestProtocol()
-.setName(name).setMetadata(protocol)
-  }.iterator.asJava))
-).build()
-  ),
-  RequestLocal.withThreadConfinedCaching)
+val future = new CompletableFuture[JoinGroupResponseData]()
+when(newGroupCoordinator.joinGroup(
+  ArgumentMatchers.eq(expectedRequestContext),
+  ArgumentMatchers.eq(expectedJoinGroupRequest)
+)).thenReturn(future)
 
-verify(groupCoordinator).handleJoinGroup(
-  ArgumentMatchers.eq(groupId),
-  ArgumentMatchers.eq(memberId),
-  ArgumentMatchers.eq(None),
-  ArgumentMatchers.eq(true),
-  ArgumentMatchers.eq(true),
-  ArgumentMatchers.eq(clientId),
-  ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-  ArgumentMatchers.eq(rebalanceTimeoutMs),
-  ArgumentMatchers.eq(sessionTimeoutMs),
-  ArgumentMatchers.eq(protocolType),
-  capturedProtocols.capture(),
-  any(),
-  any(),
-  any()
+createKafkaApis().handleJoinGroupRequest(
+  requestChannelRequest,
+  RequestLocal.NoCaching
 )
-val capturedProtocolsList = capturedProtocols.getValue
-assertEquals(protocols.size, capturedProtocolsList.size)
-protocols.zip(capturedProtocolsList).foreach { case ((expectedName, 
expectedBytes), (name, bytes)) =>
-  assertEquals(expectedName, name)
-  assertArrayEquals(expectedBytes, bytes)
-}
-  }
 
-  @Test
-  def testJoinGroupWhenAnErrorOccurs(): Unit = {
-for (version <- ApiKeys.JOIN_GROUP.oldestVersion to 
ApiKeys.JOIN_GROUP.latestVersion) {
-  testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short])
-}
+val expectedJoinGroupResponse = new JoinGroupResponseData()
+  .setMemberId("member")
+  .setGenerationId(0)
+  .setLeader("leader")
+  .setProtocolType("consumer")
+  .setProtocolName("range")
+
+future.complete(expectedJoinGroupResponse)
+val capturedResponse = verifyNoThrottling(requestChannelRequest)
+val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+assertEquals(expectedJoinGroupResponse, response.data)
   }
 
-  def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = {
-reset(groupCoordinator, clientRequestQuotaManager, requestChannel, 
replicaManager)
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroupProtocolTypeBackwardCompatibility(version: Short): Unit = {
+val joinGroupRequest = new JoinGroupRequestData()
+  .setGroupId("group")
+  .setMemberId("member")
+  .setProtocolType("consumer")
+

[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-14 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1021208132


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2524,196 +2528,166 @@ class KafkaApisTest {
 assertEquals(MemoryRecords.EMPTY, 
FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {
-val protocols = List(
-  ("first", "first".getBytes()),
-  ("second", "second".getBytes())
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testHandleJoinGroupRequest(version: Short): Unit = {
+val joinGroupRequest = new JoinGroupRequestData()
+  .setGroupId("group")
+  .setMemberId("member")
+  .setProtocolType("consumer")
+  .setRebalanceTimeoutMs(1000)
+  .setSessionTimeoutMs(2000)
+
+val requestChannelRequest = buildRequest(new 
JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+val expectedRequestContext = new GroupCoordinatorRequestContext(
+  version,
+  requestChannelRequest.context.clientId,
+  requestChannelRequest.context.clientAddress,
+  RequestLocal.NoCaching.bufferSupplier
 )
 
-val groupId = "group"
-val memberId = "member1"
-val protocolType = "consumer"
-val rebalanceTimeoutMs = 10
-val sessionTimeoutMs = 5
-val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = 
ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
+val expectedJoinGroupRequest = new JoinGroupRequestData()
+  .setGroupId(joinGroupRequest.groupId)
+  .setMemberId(joinGroupRequest.memberId)
+  .setProtocolType(joinGroupRequest.protocolType)
+  .setRebalanceTimeoutMs(if (version >= 1) 
joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+  .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-createKafkaApis().handleJoinGroupRequest(
-  buildRequest(
-new JoinGroupRequest.Builder(
-  new JoinGroupRequestData()
-.setGroupId(groupId)
-.setMemberId(memberId)
-.setProtocolType(protocolType)
-.setRebalanceTimeoutMs(rebalanceTimeoutMs)
-.setSessionTimeoutMs(sessionTimeoutMs)
-.setProtocols(new 
JoinGroupRequestData.JoinGroupRequestProtocolCollection(
-  protocols.map { case (name, protocol) => new 
JoinGroupRequestProtocol()
-.setName(name).setMetadata(protocol)
-  }.iterator.asJava))
-).build()
-  ),
-  RequestLocal.withThreadConfinedCaching)
+val future = new CompletableFuture[JoinGroupResponseData]()
+when(newGroupCoordinator.joinGroup(
+  ArgumentMatchers.eq(expectedRequestContext),
+  ArgumentMatchers.eq(expectedJoinGroupRequest)
+)).thenReturn(future)
 
-verify(groupCoordinator).handleJoinGroup(
-  ArgumentMatchers.eq(groupId),
-  ArgumentMatchers.eq(memberId),
-  ArgumentMatchers.eq(None),
-  ArgumentMatchers.eq(true),
-  ArgumentMatchers.eq(true),
-  ArgumentMatchers.eq(clientId),
-  ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-  ArgumentMatchers.eq(rebalanceTimeoutMs),
-  ArgumentMatchers.eq(sessionTimeoutMs),
-  ArgumentMatchers.eq(protocolType),
-  capturedProtocols.capture(),
-  any(),
-  any(),
-  any()
+createKafkaApis().handleJoinGroupRequest(
+  requestChannelRequest,
+  RequestLocal.NoCaching
 )
-val capturedProtocolsList = capturedProtocols.getValue
-assertEquals(protocols.size, capturedProtocolsList.size)
-protocols.zip(capturedProtocolsList).foreach { case ((expectedName, 
expectedBytes), (name, bytes)) =>
-  assertEquals(expectedName, name)
-  assertArrayEquals(expectedBytes, bytes)
-}
-  }
 
-  @Test
-  def testJoinGroupWhenAnErrorOccurs(): Unit = {
-for (version <- ApiKeys.JOIN_GROUP.oldestVersion to 
ApiKeys.JOIN_GROUP.latestVersion) {
-  testJoinGroupWhenAnErrorOccurs(version.asInstanceOf[Short])
-}
+val expectedJoinGroupResponse = new JoinGroupResponseData()
+  .setMemberId("member")
+  .setGenerationId(0)
+  .setLeader("leader")
+  .setProtocolType("consumer")
+  .setProtocolName("range")
+
+future.complete(expectedJoinGroupResponse)
+val capturedResponse = verifyNoThrottling(requestChannelRequest)
+val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
+assertEquals(expectedJoinGroupResponse, response.data)
   }
 
-  def testJoinGroupWhenAnErrorOccurs(version: Short): Unit = {
-reset(groupCoordinator, clientRequestQuotaManager, requestChannel, 
replicaManager)
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroupProtocolTypeBackwardCompatibility(version: Short): Unit = {
+val joinGroupRequest = new JoinGroupRequestData()
+  .setGroupId("group")
+  .setMemberId("member")
+  .setProtocolType("consumer")
+

[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-14 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1021207545


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2524,196 +2528,166 @@ class KafkaApisTest {
 assertEquals(MemoryRecords.EMPTY, 
FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {
-val protocols = List(
-  ("first", "first".getBytes()),
-  ("second", "second".getBytes())
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testHandleJoinGroupRequest(version: Short): Unit = {
+val joinGroupRequest = new JoinGroupRequestData()
+  .setGroupId("group")
+  .setMemberId("member")
+  .setProtocolType("consumer")
+  .setRebalanceTimeoutMs(1000)
+  .setSessionTimeoutMs(2000)
+
+val requestChannelRequest = buildRequest(new 
JoinGroupRequest.Builder(joinGroupRequest).build(version))
+
+val expectedRequestContext = new GroupCoordinatorRequestContext(
+  version,
+  requestChannelRequest.context.clientId,
+  requestChannelRequest.context.clientAddress,
+  RequestLocal.NoCaching.bufferSupplier
 )
 
-val groupId = "group"
-val memberId = "member1"
-val protocolType = "consumer"
-val rebalanceTimeoutMs = 10
-val sessionTimeoutMs = 5
-val capturedProtocols: ArgumentCaptor[List[(String, Array[Byte])]] = 
ArgumentCaptor.forClass(classOf[List[(String, Array[Byte])]])
+val expectedJoinGroupRequest = new JoinGroupRequestData()
+  .setGroupId(joinGroupRequest.groupId)
+  .setMemberId(joinGroupRequest.memberId)
+  .setProtocolType(joinGroupRequest.protocolType)
+  .setRebalanceTimeoutMs(if (version >= 1) 
joinGroupRequest.rebalanceTimeoutMs else joinGroupRequest.sessionTimeoutMs)
+  .setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
 
-createKafkaApis().handleJoinGroupRequest(
-  buildRequest(
-new JoinGroupRequest.Builder(
-  new JoinGroupRequestData()
-.setGroupId(groupId)
-.setMemberId(memberId)
-.setProtocolType(protocolType)
-.setRebalanceTimeoutMs(rebalanceTimeoutMs)
-.setSessionTimeoutMs(sessionTimeoutMs)
-.setProtocols(new 
JoinGroupRequestData.JoinGroupRequestProtocolCollection(
-  protocols.map { case (name, protocol) => new 
JoinGroupRequestProtocol()
-.setName(name).setMetadata(protocol)
-  }.iterator.asJava))
-).build()
-  ),
-  RequestLocal.withThreadConfinedCaching)
+val future = new CompletableFuture[JoinGroupResponseData]()
+when(newGroupCoordinator.joinGroup(
+  ArgumentMatchers.eq(expectedRequestContext),
+  ArgumentMatchers.eq(expectedJoinGroupRequest)
+)).thenReturn(future)
 
-verify(groupCoordinator).handleJoinGroup(
-  ArgumentMatchers.eq(groupId),
-  ArgumentMatchers.eq(memberId),
-  ArgumentMatchers.eq(None),
-  ArgumentMatchers.eq(true),
-  ArgumentMatchers.eq(true),
-  ArgumentMatchers.eq(clientId),
-  ArgumentMatchers.eq(InetAddress.getLocalHost.toString),
-  ArgumentMatchers.eq(rebalanceTimeoutMs),
-  ArgumentMatchers.eq(sessionTimeoutMs),
-  ArgumentMatchers.eq(protocolType),
-  capturedProtocols.capture(),
-  any(),
-  any(),
-  any()
+createKafkaApis().handleJoinGroupRequest(
+  requestChannelRequest,
+  RequestLocal.NoCaching
 )
-val capturedProtocolsList = capturedProtocols.getValue
-assertEquals(protocols.size, capturedProtocolsList.size)
-protocols.zip(capturedProtocolsList).foreach { case ((expectedName, 
expectedBytes), (name, bytes)) =>
-  assertEquals(expectedName, name)
-  assertArrayEquals(expectedBytes, bytes)
-}
-  }
 
-  @Test
-  def testJoinGroupWhenAnErrorOccurs(): Unit = {

Review Comment:
   I have renames this one to `testJoinGroupProtocolTypeBackwardCompatibility`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-14 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1021207157


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2524,196 +2528,166 @@ class KafkaApisTest {
 assertEquals(MemoryRecords.EMPTY, 
FetchResponse.recordsOrFail(partitionData))
   }
 
-  @Test
-  def testJoinGroupProtocolsOrder(): Unit = {

Review Comment:
   This test's logic is not tested in `GroupCoordinatorAdaptorTest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-14 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1021206505


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -94,6 +95,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 val metadataSupport: MetadataSupport,
 val replicaManager: ReplicaManager,
 val groupCoordinator: GroupCoordinator,
+val newGroupCoordinator: 
org.apache.kafka.coordinator.group.GroupCoordinator,

Review Comment:
   This is a temporary change. When all the API are migrated to new new 
interface, I will change `groupCoordinator` to the new interface and remove 
this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-14 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1021205683


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1646,73 +1649,46 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  private def makeGroupCoordinatorRequestContextFrom(
+request: RequestChannel.Request,
+requestLocal: RequestLocal
+  ): GroupCoordinatorRequestContext = {
+new GroupCoordinatorRequestContext(
+  request.context.header.data.requestApiVersion,
+  request.context.header.data.clientId,
+  request.context.clientAddress,
+  requestLocal.bufferSupplier
+)
+  }
+
   def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: 
RequestLocal): Unit = {
 val joinGroupRequest = request.body[JoinGroupRequest]
 
-// the callback for sending a join-group response
-def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-val protocolName = if (request.context.apiVersion() >= 7)
-  joinResult.protocolName.orNull
-else
-  joinResult.protocolName.getOrElse(GroupCoordinator.NoProtocol)
-
-val responseBody = new JoinGroupResponse(
-  new JoinGroupResponseData()
-.setThrottleTimeMs(requestThrottleMs)
-.setErrorCode(joinResult.error.code)
-.setGenerationId(joinResult.generationId)
-.setProtocolType(joinResult.protocolType.orNull)
-.setProtocolName(protocolName)
-.setLeader(joinResult.leaderId)
-.setSkipAssignment(joinResult.skipAssignment)
-.setMemberId(joinResult.memberId)
-.setMembers(joinResult.members.asJava)
-)
-
-trace("Sending join group response %s for correlation id %d to client 
%s."
-  .format(responseBody, request.header.correlationId, 
request.header.clientId))
-responseBody
-  }
-  requestHelper.sendResponseMaybeThrottle(request, createResponse)
+def sendResponse(response: AbstractResponse): Unit = {
+  trace("Sending join group response %s for correlation id %d to client 
%s."
+.format(response, request.header.correlationId, 
request.header.clientId))
+  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+response.maybeSetThrottleTimeMs(requestThrottleMs)
+response
+  })
 }
 
 if (joinGroupRequest.data.groupInstanceId != null && 
config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
   // Only enable static membership when IBP >= 2.3, because it is not safe 
for the broker to use the static member logic
   // until we are sure that all brokers support it. If static group being 
loaded by an older coordinator, it will discard
   // the group.instance.id field, so static members could accidentally 
become "dynamic", which leads to wrong states.
-  sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, 
Errors.UNSUPPORTED_VERSION))
+  
sendResponse(joinGroupRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
 } else if (!authHelper.authorize(request.context, READ, GROUP, 
joinGroupRequest.data.groupId)) {
-  sendResponseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, 
Errors.GROUP_AUTHORIZATION_FAILED))
+  
sendResponse(joinGroupRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
 } else {
-  val groupInstanceId = Option(joinGroupRequest.data.groupInstanceId)

Review Comment:
   This code is not in `GroupCoordinatorAdaptor` without any changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-14 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1021205253


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1646,73 +1649,46 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  private def makeGroupCoordinatorRequestContextFrom(
+request: RequestChannel.Request,
+requestLocal: RequestLocal
+  ): GroupCoordinatorRequestContext = {
+new GroupCoordinatorRequestContext(
+  request.context.header.data.requestApiVersion,
+  request.context.header.data.clientId,
+  request.context.clientAddress,
+  requestLocal.bufferSupplier
+)
+  }
+
   def handleJoinGroupRequest(request: RequestChannel.Request, requestLocal: 
RequestLocal): Unit = {
 val joinGroupRequest = request.body[JoinGroupRequest]
 
-// the callback for sending a join-group response
-def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-val protocolName = if (request.context.apiVersion() >= 7)
-  joinResult.protocolName.orNull
-else
-  joinResult.protocolName.getOrElse(GroupCoordinator.NoProtocol)
-
-val responseBody = new JoinGroupResponse(
-  new JoinGroupResponseData()

Review Comment:
   This translation is now in `GroupCoordinatorAdapter`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org