jeffkbkim commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1431849588
##########
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala:
##########
@@ -511,4 +544,35 @@ class CoordinatorLoaderImplTest {
new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecords)
}
+
+ private def logReadResult(
+ startOffset: Long,
+ producerId: Long,
+ producerEpoch: Short,
+ controlRecordType: ControlRecordType
+ ): FetchDataInfo = {
+ val fileRecords = mock(classOf[FileRecords])
+ val memoryRecords = MemoryRecords.withEndTransactionMarker(
+ startOffset,
Review Comment:
it doesn't seem like we use the start/initial offset when we generate the
control records, i.e.
https://github.com/apache/kafka/pull/14985/files#diff-252d8f54c521cc32e09406f127d568391346d3f92183fdb70f04108f4360f27fR197-R201
is this because we only use the start offset only during a read? i guess it
makes sense since we don't know what offset we're appending a record at when
creating the records
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -2414,6 +2424,67 @@ public void testReplayWithTombstone() {
assertNull(context.offsetMetadataManager.offset("foo", "bar", 0));
}
+ @Test
+ public void testReplayTransactionEndMarkerWithCommit() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+
+ // Add pending transactional commit for producer id 5.
+ verifyTransactionalReplay(context, 5L, "foo", "bar", 0, new
OffsetAndMetadata(
+ 100L,
+ OptionalInt.empty(),
+ "small",
+ context.time.milliseconds(),
+ OptionalLong.empty()
+ ));
+
+ // Add pending transactional commit for producer id 6.
+ verifyTransactionalReplay(context, 6L, "foo", "bar", 1, new
OffsetAndMetadata(
+ 200L,
+ OptionalInt.empty(),
+ "small",
+ context.time.milliseconds(),
+ OptionalLong.empty()
+ ));
+
+ // Replaying an end marker with an unknown producer id should not fail.
Review Comment:
is this to provide idempotency? i'm guessing we can have duplicate requests
coming in
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -1900,4 +1908,59 @@ public void testCommitTransactionalOffsets() throws
ExecutionException, Interrup
assertEquals(response, future.get());
}
+
+ @Test
+ public void testCompleteTransaction() throws ExecutionException,
InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime,
+ new GroupCoordinatorMetrics()
+ );
+ service.startup(() -> 1);
+
+ when(runtime.scheduleTransactionCompletion(
+ ArgumentMatchers.eq("write-txn-marker"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.eq(100L),
+ ArgumentMatchers.eq((short) 5),
+ ArgumentMatchers.eq(10),
+ ArgumentMatchers.eq(TransactionResult.COMMIT),
+ ArgumentMatchers.eq(Duration.ofMillis(100))
+ )).thenReturn(CompletableFuture.completedFuture(null));
+
+ CompletableFuture<Void> future = service.completeTransaction(
+ new TopicPartition("__consumer_offsets", 0),
+ 100L,
+ (short) 5,
+ 10,
+ TransactionResult.COMMIT,
+ Duration.ofMillis(100)
+ );
+
+ assertNull(future.get());
+ }
+
+ @Test
+ public void testCompleteTransactionWhenNotStarted() throws
ExecutionException, InterruptedException {
Review Comment:
seems we can remove the exceptions
##########
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala:
##########
@@ -234,6 +237,79 @@ class CoordinatorPartitionWriterTest {
assertEquals(records, receivedRecords)
}
+ @ParameterizedTest
+ @EnumSource(value = classOf[ControlRecordType], names = Array("COMMIT",
"ABORT"))
+ def testWriteTransactionEndMarker(controlRecordType: ControlRecordType):
Unit = {
+ val tp = new TopicPartition("foo", 0)
+ val replicaManager = mock(classOf[ReplicaManager])
+ val time = new MockTime()
+ val partitionRecordWriter = new CoordinatorPartitionWriter(
+ replicaManager,
+ new StringKeyValueSerializer(),
+ CompressionType.NONE,
+ time
+ )
+
+ when(replicaManager.getLogConfig(tp)).thenReturn(Some(LogConfig.fromProps(
+ Collections.emptyMap(),
+ new Properties()
+ )))
+
+ val recordsCapture: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] =
+ ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]])
+ val callbackCapture: ArgumentCaptor[Map[TopicPartition, PartitionResponse]
=> Unit] =
+ ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse]
=> Unit])
+
+ when(replicaManager.appendRecords(
+ ArgumentMatchers.eq(0L),
+ ArgumentMatchers.eq(1.toShort),
+ ArgumentMatchers.eq(true),
+ ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
+ recordsCapture.capture(),
+ callbackCapture.capture(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ )).thenAnswer(_ => {
+ callbackCapture.getValue.apply(Map(
+ tp -> new PartitionResponse(
+ Errors.NONE,
+ 5,
+ 10,
+ RecordBatch.NO_TIMESTAMP,
+ -1,
+ Collections.emptyList(),
+ ""
+ )
+ ))
+ })
+
+ assertEquals(11, partitionRecordWriter.appendEndTransactionMarker(
+ tp,
+ 100L,
+ 50.toShort,
+ 10,
+ TransactionResult.COMMIT
+ ))
+
+ val batch = recordsCapture.getValue.getOrElse(tp,
+ throw new AssertionError(s"No records for $tp"))
+ assertEquals(1, batch.batches.asScala.toList.size)
+
+ val firstBatch = batch.batches.asScala.head
+ assertEquals(100L, firstBatch.producerId)
+ assertEquals(50.toShort, firstBatch.producerEpoch)
+ assertTrue(firstBatch.isTransactional)
+
+ val receivedRecords = batch.records.asScala.map { record =>
+ ControlRecordType.parse(record.key)
+ }.toList
+
+ assertEquals(List(ControlRecordType.COMMIT), receivedRecords)
Review Comment:
should we use the `controlRecordType` argument here? similarly for
TransactionResult type in L294
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -968,4 +971,34 @@ public void testCleanupGroupMetadata() {
verify(groupMetadataManager,
times(1)).maybeDeleteGroup(eq("group-id"), any());
verify(groupMetadataManager,
times(0)).maybeDeleteGroup(eq("other-group-id"), any());
}
+
+ @ParameterizedTest
+ @EnumSource(value = TransactionResult.class)
+ public void testReplayTransactionEndMarker(TransactionResult result) {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+ CoordinatorMetricsShard metricsShard =
mock(CoordinatorMetricsShard.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
+ groupMetadataManager,
+ offsetMetadataManager,
+ Time.SYSTEM,
+ new MockCoordinatorTimer<>(Time.SYSTEM),
+ mock(GroupCoordinatorConfig.class),
+ coordinatorMetrics,
+ metricsShard
+ );
+
+ coordinator.replayTransactionEndMarker(
+ 100L,
+ (short) 5,
Review Comment:
will the producer epoch be used in a future PR?
##########
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala:
##########
@@ -511,4 +544,35 @@ class CoordinatorLoaderImplTest {
new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecords)
}
+
+ private def logReadResult(
+ startOffset: Long,
+ producerId: Long,
+ producerEpoch: Short,
+ controlRecordType: ControlRecordType
+ ): FetchDataInfo = {
+ val fileRecords = mock(classOf[FileRecords])
+ val memoryRecords = MemoryRecords.withEndTransactionMarker(
+ startOffset,
+ 0L,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH,
+ producerId,
+ producerEpoch,
+ new EndTransactionMarker(controlRecordType, 0)
+ )
+
+ when(fileRecords.sizeInBytes).thenReturn(memoryRecords.sizeInBytes)
+
+ val bufferCapture: ArgumentCaptor[ByteBuffer] =
ArgumentCaptor.forClass(classOf[ByteBuffer])
+ when(fileRecords.readInto(
+ bufferCapture.capture(),
+ ArgumentMatchers.anyInt())
+ ).thenAnswer { _ =>
+ val buffer = bufferCapture.getValue
+ buffer.put(memoryRecords.buffer.duplicate)
+ buffer.flip()
+ }
+
+ new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecords)
Review Comment:
is there a reason we return FileRecords instead of MemoryRecords here? side
question: when do we expect FileRecords from `log.read()`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]