dajac commented on PR #15536: URL: https://github.com/apache/kafka/pull/15536#issuecomment-2127253314
It would be great if we could also add a unit test for the transactional offset commit path. Something like: ``` @Test def testTransactionalCommitOffsetWithPartialFailure(): Unit = { val memberId = "" val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo") val producerId = 232L val producerEpoch = 0.toShort groupMetadataManager.addOwnedPartition(groupPartitionId) val group = new GroupMetadata(groupId, Empty, time) groupMetadataManager.addGroup(group) val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) val offsets = immutable.Map( foo0 -> OffsetAndMetadata(37, "", time.milliseconds()), foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), time.milliseconds()) ) val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { commitErrors = Some(errors) } val verificationGuard = new VerificationGuard() groupMetadataManager.storeOffsets( group, memberId, offsetTopicPartition, offsets, callback, producerId, producerEpoch, verificationGuard = Some(verificationGuard) ) assertTrue(group.hasOffsets) assertTrue(group.allOffsets.isEmpty) verify(replicaManager).appendRecords(anyLong(), anyShort(), any(), any(), any[Map[TopicPartition, MemoryRecords]], capturedResponseCallback.capture(), any[Option[ReentrantLock]], any(), any(), any(), ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard))) verify(replicaManager).getMagic(any()) capturedResponseCallback.getValue.apply(Map(groupTopicPartition -> new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L))) assertEquals(Some(Map( foo0 -> Errors.NONE, foo1 -> Errors.OFFSET_METADATA_TOO_LARGE )), commitErrors) assertTrue(group.hasOffsets) assertTrue(group.allOffsets.isEmpty) group.completePendingTxnOffsetCommit(producerId, isCommit = true) assertTrue(group.hasOffsets) assertFalse(group.allOffsets.isEmpty) assertEquals(offsets.get(foo0), group.offset(foo0.topicPartition)) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org