jeffkbkim commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1431849588


##########
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala:
##########
@@ -511,4 +544,35 @@ class CoordinatorLoaderImplTest {
 
     new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecords)
   }
+
+  private def logReadResult(
+    startOffset: Long,
+    producerId: Long,
+    producerEpoch: Short,
+    controlRecordType: ControlRecordType
+  ): FetchDataInfo = {
+    val fileRecords = mock(classOf[FileRecords])
+    val memoryRecords = MemoryRecords.withEndTransactionMarker(
+      startOffset,

Review Comment:
   it doesn't seem like we use the start/initial offset when we generate the 
control records, i.e. 
https://github.com/apache/kafka/pull/14985/files#diff-252d8f54c521cc32e09406f127d568391346d3f92183fdb70f04108f4360f27fR197-R201
   
   is this because we only use the start offset only during a read? i guess it 
makes sense since we don't know what offset we're appending a record at when 
creating the records



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -2414,6 +2424,67 @@ public void testReplayWithTombstone() {
         assertNull(context.offsetMetadataManager.offset("foo", "bar", 0));
     }
 
+    @Test
+    public void testReplayTransactionEndMarkerWithCommit() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+        // Add pending transactional commit for producer id 5.
+        verifyTransactionalReplay(context, 5L, "foo", "bar", 0, new 
OffsetAndMetadata(
+            100L,
+            OptionalInt.empty(),
+            "small",
+            context.time.milliseconds(),
+            OptionalLong.empty()
+        ));
+
+        // Add pending transactional commit for producer id 6.
+        verifyTransactionalReplay(context, 6L, "foo", "bar", 1, new 
OffsetAndMetadata(
+            200L,
+            OptionalInt.empty(),
+            "small",
+            context.time.milliseconds(),
+            OptionalLong.empty()
+        ));
+
+        // Replaying an end marker with an unknown producer id should not fail.

Review Comment:
   is this to provide idempotency? i'm guessing we can have duplicate requests 
coming in



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -1900,4 +1908,59 @@ public void testCommitTransactionalOffsets() throws 
ExecutionException, Interrup
 
         assertEquals(response, future.get());
     }
+
+    @Test
+    public void testCompleteTransaction() throws ExecutionException, 
InterruptedException {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime,
+            new GroupCoordinatorMetrics()
+        );
+        service.startup(() -> 1);
+
+        when(runtime.scheduleTransactionCompletion(
+            ArgumentMatchers.eq("write-txn-marker"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.eq(100L),
+            ArgumentMatchers.eq((short) 5),
+            ArgumentMatchers.eq(10),
+            ArgumentMatchers.eq(TransactionResult.COMMIT),
+            ArgumentMatchers.eq(Duration.ofMillis(100))
+        )).thenReturn(CompletableFuture.completedFuture(null));
+
+        CompletableFuture<Void> future = service.completeTransaction(
+            new TopicPartition("__consumer_offsets", 0),
+            100L,
+            (short) 5,
+            10,
+            TransactionResult.COMMIT,
+            Duration.ofMillis(100)
+        );
+
+        assertNull(future.get());
+    }
+
+    @Test
+    public void testCompleteTransactionWhenNotStarted() throws 
ExecutionException, InterruptedException {

Review Comment:
   seems we can remove the exceptions



##########
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala:
##########
@@ -234,6 +237,79 @@ class CoordinatorPartitionWriterTest {
     assertEquals(records, receivedRecords)
   }
 
+  @ParameterizedTest
+  @EnumSource(value = classOf[ControlRecordType], names = Array("COMMIT", 
"ABORT"))
+  def testWriteTransactionEndMarker(controlRecordType: ControlRecordType): 
Unit = {
+    val tp = new TopicPartition("foo", 0)
+    val replicaManager = mock(classOf[ReplicaManager])
+    val time = new MockTime()
+    val partitionRecordWriter = new CoordinatorPartitionWriter(
+      replicaManager,
+      new StringKeyValueSerializer(),
+      CompressionType.NONE,
+      time
+    )
+
+    when(replicaManager.getLogConfig(tp)).thenReturn(Some(LogConfig.fromProps(
+      Collections.emptyMap(),
+      new Properties()
+    )))
+
+    val recordsCapture: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] =
+      ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]])
+    val callbackCapture: ArgumentCaptor[Map[TopicPartition, PartitionResponse] 
=> Unit] =
+      ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] 
=> Unit])
+
+    when(replicaManager.appendRecords(
+      ArgumentMatchers.eq(0L),
+      ArgumentMatchers.eq(1.toShort),
+      ArgumentMatchers.eq(true),
+      ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
+      recordsCapture.capture(),
+      callbackCapture.capture(),
+      ArgumentMatchers.any(),
+      ArgumentMatchers.any(),
+      ArgumentMatchers.any(),
+      ArgumentMatchers.any(),
+      ArgumentMatchers.any()
+    )).thenAnswer(_ => {
+      callbackCapture.getValue.apply(Map(
+        tp -> new PartitionResponse(
+          Errors.NONE,
+          5,
+          10,
+          RecordBatch.NO_TIMESTAMP,
+          -1,
+          Collections.emptyList(),
+          ""
+        )
+      ))
+    })
+
+    assertEquals(11, partitionRecordWriter.appendEndTransactionMarker(
+      tp,
+      100L,
+      50.toShort,
+      10,
+      TransactionResult.COMMIT
+    ))
+
+    val batch = recordsCapture.getValue.getOrElse(tp,
+      throw new AssertionError(s"No records for $tp"))
+    assertEquals(1, batch.batches.asScala.toList.size)
+
+    val firstBatch = batch.batches.asScala.head
+    assertEquals(100L, firstBatch.producerId)
+    assertEquals(50.toShort, firstBatch.producerEpoch)
+    assertTrue(firstBatch.isTransactional)
+
+    val receivedRecords = batch.records.asScala.map { record =>
+      ControlRecordType.parse(record.key)
+    }.toList
+
+    assertEquals(List(ControlRecordType.COMMIT), receivedRecords)

Review Comment:
   should we use the `controlRecordType` argument here? similarly for 
TransactionResult type in L294



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -968,4 +971,34 @@ public void testCleanupGroupMetadata() {
         verify(groupMetadataManager, 
times(1)).maybeDeleteGroup(eq("group-id"), any());
         verify(groupMetadataManager, 
times(0)).maybeDeleteGroup(eq("other-group-id"), any());
     }
+
+    @ParameterizedTest
+    @EnumSource(value = TransactionResult.class)
+    public void testReplayTransactionEndMarker(TransactionResult result) {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+        CoordinatorMetricsShard metricsShard = 
mock(CoordinatorMetricsShard.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager,
+            Time.SYSTEM,
+            new MockCoordinatorTimer<>(Time.SYSTEM),
+            mock(GroupCoordinatorConfig.class),
+            coordinatorMetrics,
+            metricsShard
+        );
+
+        coordinator.replayTransactionEndMarker(
+            100L,
+            (short) 5,

Review Comment:
   will the producer epoch be used in a future PR?



##########
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala:
##########
@@ -511,4 +544,35 @@ class CoordinatorLoaderImplTest {
 
     new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecords)
   }
+
+  private def logReadResult(
+    startOffset: Long,
+    producerId: Long,
+    producerEpoch: Short,
+    controlRecordType: ControlRecordType
+  ): FetchDataInfo = {
+    val fileRecords = mock(classOf[FileRecords])
+    val memoryRecords = MemoryRecords.withEndTransactionMarker(
+      startOffset,
+      0L,
+      RecordBatch.NO_PARTITION_LEADER_EPOCH,
+      producerId,
+      producerEpoch,
+      new EndTransactionMarker(controlRecordType, 0)
+    )
+
+    when(fileRecords.sizeInBytes).thenReturn(memoryRecords.sizeInBytes)
+
+    val bufferCapture: ArgumentCaptor[ByteBuffer] = 
ArgumentCaptor.forClass(classOf[ByteBuffer])
+    when(fileRecords.readInto(
+      bufferCapture.capture(),
+      ArgumentMatchers.anyInt())
+    ).thenAnswer { _ =>
+      val buffer = bufferCapture.getValue
+      buffer.put(memoryRecords.buffer.duplicate)
+      buffer.flip()
+    }
+
+    new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecords)

Review Comment:
   is there a reason we return FileRecords instead of MemoryRecords here? side 
question: when do we expect FileRecords from `log.read()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to