jeffkbkim commented on code in PR #12847:
URL: https://github.com/apache/kafka/pull/12847#discussion_r1024721499
##
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##
@@ -135,4 +133,71 @@ class GroupCoordinatorAdapterTest {
assertEquals(expectedData, future.get())
}
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.SYNC_GROUP)
+ def testSyncGroup(version: Short): Unit = {
+val groupCoordinator = mock(classOf[GroupCoordinator])
+val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+
+val ctx = makeContext(version)
+val data = new SyncGroupRequestData()
+ .setGroupId("group")
+ .setMemberId("member1")
+ .setGroupInstanceId("instance")
+ .setProtocolType("consumer")
+ .setProtocolName("range")
+ .setGenerationId(10)
+ .setAssignments(List(
+new SyncGroupRequestData.SyncGroupRequestAssignment()
+ .setMemberId("member1")
+ .setAssignment("member1".getBytes()),
+new SyncGroupRequestData.SyncGroupRequestAssignment()
+ .setMemberId("member2")
+ .setAssignment("member2".getBytes())
+ ).asJava)
+
+val future = adapter.syncGroup(ctx, data)
+assertFalse(future.isDone)
+
+val capturedAssignment: ArgumentCaptor[Map[String, Array[Byte]]] =
+ ArgumentCaptor.forClass(classOf[Map[String, Array[Byte]]])
+val capturedCallback: ArgumentCaptor[SyncGroupCallback] =
+ ArgumentCaptor.forClass(classOf[SyncGroupCallback])
+
+verify(groupCoordinator).handleSyncGroup(
+ ArgumentMatchers.eq(data.groupId),
+ ArgumentMatchers.eq(data.generationId),
+ ArgumentMatchers.eq(data.memberId),
+ ArgumentMatchers.eq(Some(data.protocolType)),
+ ArgumentMatchers.eq(Some(data.protocolName)),
+ ArgumentMatchers.eq(Some(data.groupInstanceId)),
+ capturedAssignment.capture(),
+ capturedCallback.capture(),
+ ArgumentMatchers.eq(RequestLocal(ctx.bufferSupplier))
+)
+
+assertEquals(Map(
+ "member1" -> "member1",
+ "member2" -> "member2",
+), capturedAssignment.getValue.map { case (member, metadata) =>
+ (member, new String(metadata))
+}.toMap)
Review Comment:
do we need toMap?
##
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##
@@ -135,4 +133,71 @@ class GroupCoordinatorAdapterTest {
assertEquals(expectedData, future.get())
}
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.SYNC_GROUP)
+ def testSyncGroup(version: Short): Unit = {
+val groupCoordinator = mock(classOf[GroupCoordinator])
+val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+
+val ctx = makeContext(version)
+val data = new SyncGroupRequestData()
+ .setGroupId("group")
+ .setMemberId("member1")
+ .setGroupInstanceId("instance")
+ .setProtocolType("consumer")
+ .setProtocolName("range")
+ .setGenerationId(10)
+ .setAssignments(List(
+new SyncGroupRequestData.SyncGroupRequestAssignment()
+ .setMemberId("member1")
+ .setAssignment("member1".getBytes()),
+new SyncGroupRequestData.SyncGroupRequestAssignment()
+ .setMemberId("member2")
+ .setAssignment("member2".getBytes())
+ ).asJava)
+
+val future = adapter.syncGroup(ctx, data)
+assertFalse(future.isDone)
+
+val capturedAssignment: ArgumentCaptor[Map[String, Array[Byte]]] =
+ ArgumentCaptor.forClass(classOf[Map[String, Array[Byte]]])
+val capturedCallback: ArgumentCaptor[SyncGroupCallback] =
+ ArgumentCaptor.forClass(classOf[SyncGroupCallback])
+
+verify(groupCoordinator).handleSyncGroup(
+ ArgumentMatchers.eq(data.groupId),
+ ArgumentMatchers.eq(data.generationId),
+ ArgumentMatchers.eq(data.memberId),
+ ArgumentMatchers.eq(Some(data.protocolType)),
+ ArgumentMatchers.eq(Some(data.protocolName)),
+ ArgumentMatchers.eq(Some(data.groupInstanceId)),
+ capturedAssignment.capture(),
+ capturedCallback.capture(),
+ ArgumentMatchers.eq(RequestLocal(ctx.bufferSupplier))
+)
+
+assertEquals(Map(
+ "member1" -> "member1",
+ "member2" -> "member2",
+), capturedAssignment.getValue.map { case (member, metadata) =>
+ (member, new String(metadata))
+}.toMap)
+
+capturedCallback.getValue.apply(SyncGroupResult(
+ error = Errors.NONE,
+ protocolType = Some("consumer"),
+ protocolName = Some("range"),
+ memberAssignment = "member1".getBytes()
+))
+
+val expectedResponseData = new SyncGroupResponseData()
+ .setErrorCode(Errors.NONE.code)
+ .setProtocolType("consumer")
+ .setProtocolName("range")
Review Comment:
generally i see a lot of literal strings being used across the tests in
GroupCoordinatorAdapterTest. what's the reason for not re-using them, is it the
convention?
--
This is an automated message from th