philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548593720


##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -805,4 +805,73 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     consumer2.close()
   }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit 
= {
+    // This test ensure that the member ID is propagated from the group 
coordinator when the
+    // assignment is received into a subsequent offset commit
+    val consumer = createConsumer()
+    assertEquals(0, consumer.assignment.size)
+    consumer.subscribe(List(topic).asJava)
+    awaitAssignment(consumer, Set(tp, tp2))
+
+    consumer.seek(tp, 0)
+
+    consumer.commitSync()
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testEndOffsets(quorum: String, groupProtocol: String): Unit = {
+    val producer = createProducer()
+    val startingTimestamp = System.currentTimeMillis()
+    val numRecords = 10000
+     (0 until numRecords).map { i =>
+      val timestamp = startingTimestamp + i.toLong
+      val record = new ProducerRecord(tp.topic(), tp.partition(), timestamp, 
s"key $i".getBytes, s"value $i".getBytes)
+      producer.send(record)
+      record
+    }
+    producer.flush()
+
+    val consumer = createConsumer()
+    consumer.subscribe(List(topic).asJava)
+    awaitAssignment(consumer, Set(tp, tp2))
+
+    val endOffsets = consumer.endOffsets(Set(tp).asJava)
+    assertEquals(numRecords, endOffsets.get(tp))
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testTimestampsToSearch(quorum: String, groupProtocol: String): Unit = {

Review Comment:
   maybe `testFetchOffsetsForTime`, which already implies searching at a given 
timestamps.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to