jolshan commented on code in PR #14985: URL: https://github.com/apache/kafka/pull/14985#discussion_r1432877933
########## core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala: ########## @@ -234,6 +237,79 @@ class CoordinatorPartitionWriterTest { assertEquals(records, receivedRecords) } + @ParameterizedTest + @EnumSource(value = classOf[ControlRecordType], names = Array("COMMIT", "ABORT")) + def testWriteTransactionEndMarker(controlRecordType: ControlRecordType): Unit = { + val tp = new TopicPartition("foo", 0) + val replicaManager = mock(classOf[ReplicaManager]) + val time = new MockTime() + val partitionRecordWriter = new CoordinatorPartitionWriter( + replicaManager, + new StringKeyValueSerializer(), + CompressionType.NONE, + time + ) + + when(replicaManager.getLogConfig(tp)).thenReturn(Some(LogConfig.fromProps( + Collections.emptyMap(), + new Properties() + ))) + + val recordsCapture: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]]) + val callbackCapture: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) + + when(replicaManager.appendRecords( + ArgumentMatchers.eq(0L), + ArgumentMatchers.eq(1.toShort), + ArgumentMatchers.eq(true), + ArgumentMatchers.eq(AppendOrigin.COORDINATOR), + recordsCapture.capture(), + callbackCapture.capture(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + )).thenAnswer(_ => { + callbackCapture.getValue.apply(Map( + tp -> new PartitionResponse( + Errors.NONE, + 5, + 10, + RecordBatch.NO_TIMESTAMP, + -1, + Collections.emptyList(), + "" + ) + )) + }) + + assertEquals(11, partitionRecordWriter.appendEndTransactionMarker( + tp, + 100L, + 50.toShort, + 10, + if (controlRecordType == ControlRecordType.COMMIT) TransactionResult.COMMIT else TransactionResult.ABORT + )) + + val batch = recordsCapture.getValue.getOrElse(tp, + throw new AssertionError(s"No records for $tp")) + assertEquals(1, batch.batches.asScala.toList.size) + + val firstBatch = batch.batches.asScala.head + assertEquals(100L, firstBatch.producerId) + assertEquals(50.toShort, firstBatch.producerEpoch) + assertTrue(firstBatch.isTransactional) Review Comment: should we also confirm the firstBatch is a control record? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org