AndrewJSchofield commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1404213789


##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -1947,4 +2082,215 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     consumer2.close()
   }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndCommitAsync(groupProtocol: String): Unit = {
+    val props = new Properties()
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+    val consumer = createConsumer(configOverrides = props)
+    val producer = createProducer()
+    val numRecords = 10000
+    val startingTimestamp = System.currentTimeMillis()
+    val cb = new CountConsumerCommitCallback
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumer.commitAsync(cb)
+    TestUtils.pollUntilTrue(consumer, () => cb.successCount >= 1 || 
cb.lastError.isDefined,
+      "Failed to observe commit callback before timeout", waitTimeMs = 10000)
+    val committedOffset = consumer.committed(Set(tp).asJava)
+    assertNotNull(committedOffset)
+    // No valid fetch position due to the absence of consumer.poll; and 
therefore no offset was committed to
+    // tp. The committed offset should be null. This is intentional.
+    assertNull(committedOffset.get(tp))
+    assertTrue(consumer.assignment.contains(tp))
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndCommitSync(groupProtocol: String): Unit = {
+    val props = new Properties()
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+    val consumer = createConsumer(configOverrides = props)
+    val producer = createProducer()
+    val numRecords = 10000
+    val startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumer.commitSync()
+    val committedOffset = consumer.committed(Set(tp).asJava)
+    assertNotNull(committedOffset)
+    // No valid fetch position due to the absence of consumer.poll; and 
therefore no offset was committed to
+    // tp. The committed offset should be null. This is intentional.
+    assertNull(committedOffset.get(tp))
+    assertTrue(consumer.assignment.contains(tp))
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndCommitSyncAllConsumed(groupProtocol: String): Unit = {
+    val numRecords = 10000
+
+    val producer = createProducer()
+    val startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+
+    val props = new Properties()
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+    val consumer = createConsumer(configOverrides = props)
+    consumer.assign(List(tp).asJava)
+    consumer.seek(tp, 0)
+    consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 
0, startingTimestamp = startingTimestamp)
+
+    consumer.commitSync()
+    val committedOffset = consumer.committed(Set(tp).asJava)
+    assertNotNull(committedOffset)
+    assertNotNull(committedOffset.get(tp))
+    assertEquals(numRecords, committedOffset.get(tp).offset())
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndConsume(groupProtocol: String): Unit = {
+    val numRecords = 10
+
+    val producer = createProducer()
+    val startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+
+    val props = new Properties()
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+    val consumer = createConsumer(configOverrides = props,
+      configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
+    consumer.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 
0, startingTimestamp = startingTimestamp)
+
+    assertEquals(numRecords, consumer.position(tp))
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndConsumeSkippingPosition(groupProtocol: String): Unit = {
+    val numRecords = 10
+
+    val producer = createProducer()
+    val startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+
+    val props = new Properties()
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+    val consumer = createConsumer(configOverrides = props,
+      configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
+    consumer.assign(List(tp).asJava)
+    val offset = 1
+    consumer.seek(tp, offset)
+    consumeAndVerifyRecords(consumer = consumer, numRecords - offset, 
startingOffset = offset,
+      startingKeyAndValueIndex = offset, startingTimestamp = startingTimestamp 
+ offset)
+
+    assertEquals(numRecords, consumer.position(tp))
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // partitionsFor not implemented in 
consumer group protocol
+  def testAssignAndConsumeWithLeaderChangeValidatingPositions(groupProtocol: 
String): Unit = {
+    val numRecords = 10
+    val producer = createProducer()
+    val startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+    val props = new Properties()
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+    val consumer = createConsumer(configOverrides = props,
+      configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
+    consumer.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 
0, startingTimestamp = startingTimestamp)
+
+    // Force leader epoch change to trigger position validation
+    var parts: mutable.Buffer[PartitionInfo] = null
+    while (parts == null)
+      parts = consumer.partitionsFor(tp.topic()).asScala
+    val leader = parts.head.leader().id()
+    this.servers(leader).shutdown()
+    this.servers(leader).startup()
+
+    // Consume after leader change
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+    consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 
10,
+      startingTimestamp = startingTimestamp)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndFetchCommittedOffsets(groupProtocol: String): Unit = {
+    val numRecords = 100
+    val startingTimestamp = System.currentTimeMillis()
+    val producer = createProducer()
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+    val props = new Properties()
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+    val consumer = createConsumer(configOverrides = props)
+    consumer.assign(List(tp).asJava)
+    // First consumer consumes and commits offsets
+    consumer.seek(tp, 0)
+    consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 
0,
+      startingTimestamp = startingTimestamp)
+    consumer.commitSync()
+    assertEquals(numRecords, consumer.committed(Set(tp).asJava).get(tp).offset)
+    // We should see the committed offsets from another consumer
+    val anotherConsumer = createConsumer(configOverrides = props)
+    anotherConsumer.assign(List(tp).asJava)
+    assertEquals(numRecords, 
anotherConsumer.committed(Set(tp).asJava).get(tp).offset)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndConsumeFromCommittedOffsets(groupProtocol: String): Unit = {
+    val producer = createProducer()
+    val numRecords = 100
+    val startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords = numRecords, tp, startingTimestamp = 
startingTimestamp)
+
+    // Commit offset with first consumer
+    val props = new Properties()
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group1")
+    val consumer = createConsumer(configOverrides = props)
+    consumer.assign(List(tp).asJava)
+    val offset = 10
+    consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(offset)))
+      .asJava)
+    assertEquals(offset, consumer.committed(Set(tp).asJava).get(tp).offset)
+    consumer.close()
+
+    // Consume from committed offsets with another consumer in same group
+    val anotherConsumer = createConsumer(configOverrides = props)
+    assertEquals(offset, 
anotherConsumer.committed(Set(tp).asJava).get(tp).offset)
+    anotherConsumer.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = anotherConsumer, numRecords - offset,
+      startingOffset = offset, startingKeyAndValueIndex = offset,
+      startingTimestamp = startingTimestamp + offset)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndRetrievingCommittedOffsetsMultipleTimes(groupProtocol: 
String): Unit = {
+    val numRecords = 100
+    val startingTimestamp = System.currentTimeMillis()
+    val producer = createProducer()
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+
+    val props = new Properties()
+    props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+    val consumer = createConsumer(configOverrides = props)
+    consumer.assign(List(tp).asJava)
+
+    // Consume and commit offsets
+    consumer.seek(tp, 0)
+    consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 
0,
+      startingTimestamp = startingTimestamp)
+    consumer.commitSync()
+
+    // Check committed offsets twice with same consumer
+    assertEquals(numRecords, consumer.committed(Set(tp).asJava).get(tp).offset)
+    assertEquals(numRecords, consumer.committed(Set(tp).asJava).get(tp).offset)

Review Comment:
   I didn't write this test and there's no comment by the author. I suppose 
this was done specifically to catch a problem and I prefer not to mess around 
with it when I don't understand.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to