philipnee commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1548593720
########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -805,4 +805,73 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumer2.close() } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit = { + // This test ensure that the member ID is propagated from the group coordinator when the + // assignment is received into a subsequent offset commit + val consumer = createConsumer() + assertEquals(0, consumer.assignment.size) + consumer.subscribe(List(topic).asJava) + awaitAssignment(consumer, Set(tp, tp2)) + + consumer.seek(tp, 0) + + consumer.commitSync() + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testEndOffsets(quorum: String, groupProtocol: String): Unit = { + val producer = createProducer() + val startingTimestamp = System.currentTimeMillis() + val numRecords = 10000 + (0 until numRecords).map { i => + val timestamp = startingTimestamp + i.toLong + val record = new ProducerRecord(tp.topic(), tp.partition(), timestamp, s"key $i".getBytes, s"value $i".getBytes) + producer.send(record) + record + } + producer.flush() + + val consumer = createConsumer() + consumer.subscribe(List(topic).asJava) + awaitAssignment(consumer, Set(tp, tp2)) + + val endOffsets = consumer.endOffsets(Set(tp).asJava) + assertEquals(numRecords, endOffsets.get(tp)) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testTimestampsToSearch(quorum: String, groupProtocol: String): Unit = { Review Comment: maybe `testFindOffsetsForTime`, which already implies searching at a given timestamps. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org