dajac commented on PR #15536:
URL: https://github.com/apache/kafka/pull/15536#issuecomment-2127253314

   It would be great if we could also add a unit test for the transactional 
offset commit path. Something like:
   
   ```
     @Test
     def testTransactionalCommitOffsetWithPartialFailure(): Unit = {
       val memberId = ""
       val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
       val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo")
       val producerId = 232L
       val producerEpoch = 0.toShort
   
       groupMetadataManager.addOwnedPartition(groupPartitionId)
   
       val group = new GroupMetadata(groupId, Empty, time)
       groupMetadataManager.addGroup(group)
   
       val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
       val offsets = immutable.Map(
         foo0 -> OffsetAndMetadata(37, "", time.milliseconds()),
         foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 
1), time.milliseconds())
       )
   
       val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, 
PartitionResponse] => Unit] =
         ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] 
=> Unit])
       
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
       var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
   
       def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
         commitErrors = Some(errors)
       }
   
       val verificationGuard = new VerificationGuard()
   
       groupMetadataManager.storeOffsets(
         group,
         memberId,
         offsetTopicPartition,
         offsets,
         callback,
         producerId,
         producerEpoch,
         verificationGuard = Some(verificationGuard)
       )
       assertTrue(group.hasOffsets)
       assertTrue(group.allOffsets.isEmpty)
   
       verify(replicaManager).appendRecords(anyLong(),
         anyShort(),
         any(),
         any(),
         any[Map[TopicPartition, MemoryRecords]],
         capturedResponseCallback.capture(),
         any[Option[ReentrantLock]],
         any(),
         any(),
         any(),
         ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)))
       verify(replicaManager).getMagic(any())
       capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
         new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
   
       assertEquals(Some(Map(
         foo0 -> Errors.NONE,
         foo1 -> Errors.OFFSET_METADATA_TOO_LARGE
       )), commitErrors)
   
       assertTrue(group.hasOffsets)
       assertTrue(group.allOffsets.isEmpty)
   
       group.completePendingTxnOffsetCommit(producerId, isCommit = true)
       assertTrue(group.hasOffsets)
       assertFalse(group.allOffsets.isEmpty)
       assertEquals(offsets.get(foo0), group.offset(foo0.topicPartition))
     }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to