lianetm commented on code in PR #15612:
URL: https://github.com/apache/kafka/pull/15612#discussion_r1541452355


##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala:
##########
@@ -0,0 +1,320 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+package kafka.api
+
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.serialization.{StringDeserializer, 
StringSerializer}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.test.MockConsumerInterceptor
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+import java.time.Duration
+import java.util
+import java.util.Optional
+import java.util.stream.Stream
+import scala.jdk.CollectionConverters._
+
+/**
+ * Integration tests for the consumer that covers the logic related to 
committing offsets.
+ */
+@Timeout(600)
+class PlaintextConsumerCommitTest extends AbstractConsumerTest {
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testAutoCommitOnClose(quorum: String, groupProtocol: String): Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true")
+    val consumer = createConsumer()
+
+    val numRecords = 10000
+    val producer = createProducer()
+    sendRecords(producer, numRecords, tp)
+
+    consumer.subscribe(List(topic).asJava)
+    awaitAssignment(consumer, Set(tp, tp2))
+
+    // should auto-commit sought positions before closing
+    consumer.seek(tp, 300)
+    consumer.seek(tp2, 500)
+    consumer.close()
+
+    // now we should see the committed positions from another consumer
+    val anotherConsumer = createConsumer()
+    assertEquals(300, anotherConsumer.committed(Set(tp).asJava).get(tp).offset)
+    assertEquals(500, 
anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset)
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testAutoCommitOnCloseAfterWakeup(quorum: String, groupProtocol: String): 
Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true")
+    val consumer = createConsumer()
+
+    val numRecords = 10000
+    val producer = createProducer()
+    sendRecords(producer, numRecords, tp)
+
+    consumer.subscribe(List(topic).asJava)
+    awaitAssignment(consumer, Set(tp, tp2))
+
+    // should auto-commit sought positions before closing
+    consumer.seek(tp, 300)
+    consumer.seek(tp2, 500)
+
+    // wakeup the consumer before closing to simulate trying to break a poll
+    // loop from another thread
+    consumer.wakeup()
+    consumer.close()
+
+    // now we should see the committed positions from another consumer
+    val anotherConsumer = createConsumer()
+    assertEquals(300, anotherConsumer.committed(Set(tp).asJava).get(tp).offset)
+    assertEquals(500, 
anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset)
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitMetadata(quorum: String, groupProtocol: String): Unit = {
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
+
+    // sync commit
+    val syncMetadata = new OffsetAndMetadata(5, Optional.of(15), "foo")
+    consumer.commitSync(Map((tp, syncMetadata)).asJava)
+    assertEquals(syncMetadata, consumer.committed(Set(tp).asJava).get(tp))
+
+    // async commit
+    val asyncMetadata = new OffsetAndMetadata(10, "bar")
+    sendAndAwaitAsyncCommit(consumer, Some(Map(tp -> asyncMetadata)))
+    assertEquals(asyncMetadata, consumer.committed(Set(tp).asJava).get(tp))
+
+    // handle null metadata
+    val nullMetadata = new OffsetAndMetadata(5, null)
+    consumer.commitSync(Map(tp -> nullMetadata).asJava)
+    assertEquals(nullMetadata, consumer.committed(Set(tp).asJava).get(tp))
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testAsyncCommit(quorum: String, groupProtocol: String): Unit = {
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
+
+    val callback = new CountConsumerCommitCallback
+    val count = 5
+
+    for (i <- 1 to count)
+      consumer.commitAsync(Map(tp -> new OffsetAndMetadata(i)).asJava, 
callback)
+
+    TestUtils.pollUntilTrue(consumer, () => callback.successCount >= count || 
callback.lastError.isDefined,
+      "Failed to observe commit callback before timeout", waitTimeMs = 10000)
+
+    assertEquals(None, callback.lastError)
+    assertEquals(count, callback.successCount)
+    assertEquals(new OffsetAndMetadata(count), 
consumer.committed(Set(tp).asJava).get(tp))
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testAutoCommitIntercept(quorum: String, groupProtocol: String): Unit = {
+    val topic2 = "topic2"
+    createTopic(topic2, 2, brokerCount)
+
+    // produce records
+    val numRecords = 100
+    val testProducer = createProducer(keySerializer = new StringSerializer, 
valueSerializer = new StringSerializer)
+    (0 until numRecords).map { i =>
+      testProducer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key 
$i", s"value $i"))
+    }.foreach(_.get)
+
+    // create consumer with interceptor
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true")
+    this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 
"org.apache.kafka.test.MockConsumerInterceptor")
+    val testConsumer = createConsumer(keyDeserializer = new 
StringDeserializer, valueDeserializer = new StringDeserializer)
+    val rebalanceListener = new ConsumerRebalanceListener {
+      override def onPartitionsAssigned(partitions: 
util.Collection[TopicPartition]) = {
+        // keep partitions paused in this test so that we can verify the 
commits based on specific seeks
+        testConsumer.pause(partitions)
+      }
+
+      override def onPartitionsRevoked(partitions: 
util.Collection[TopicPartition]) = {}
+    }
+    changeConsumerSubscriptionAndValidateAssignment(testConsumer, List(topic), 
Set(tp, tp2), rebalanceListener)
+    testConsumer.seek(tp, 10)
+    testConsumer.seek(tp2, 20)
+
+    // change subscription to trigger rebalance
+    val commitCountBeforeRebalance = 
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()
+    changeConsumerSubscriptionAndValidateAssignment(testConsumer,
+      List(topic, topic2),
+      Set(tp, tp2, new TopicPartition(topic2, 0), new TopicPartition(topic2, 
1)),
+      rebalanceListener)
+
+    // after rebalancing, we should have reset to the committed positions
+    assertEquals(10, testConsumer.committed(Set(tp).asJava).get(tp).offset)
+    assertEquals(20, testConsumer.committed(Set(tp2).asJava).get(tp2).offset)
+
+    // In both CLASSIC and CONSUMER protocols, interceptors are executed in 
poll and close.
+    // However, in the CONSUMER protocol, the assignment may be changed 
outside of a poll, so
+    // we need to poll once to ensure the interceptor is called.
+    if (groupProtocol.toUpperCase == GroupProtocol.CONSUMER.name) {
+      testConsumer.poll(Duration.ZERO);
+    }
+
+    assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() > 
commitCountBeforeRebalance)
+
+    // verify commits are intercepted on close
+    val commitCountBeforeClose = 
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()
+    testConsumer.close()
+    assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() > 
commitCountBeforeClose)
+    testProducer.close()
+
+    // cleanup
+    MockConsumerInterceptor.resetCounters()
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitSpecifiedOffsets(quorum: String, groupProtocol: String): Unit 
= {
+    val producer = createProducer()
+    sendRecords(producer, numRecords = 5, tp)
+    sendRecords(producer, numRecords = 7, tp2)
+
+    val consumer = createConsumer()
+    consumer.assign(List(tp, tp2).asJava)
+
+    val pos1 = consumer.position(tp)
+    val pos2 = consumer.position(tp2)
+    consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(3L))).asJava)
+    assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset)
+    assertNull(consumer.committed(Set(tp2).asJava).get(tp2))
+
+    // Positions should not change
+    assertEquals(pos1, consumer.position(tp))
+    assertEquals(pos2, consumer.position(tp2))
+    consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(5L))).asJava)
+    assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset)
+    assertEquals(5, consumer.committed(Set(tp2).asJava).get(tp2).offset)
+
+    // Using async should pick up the committed changes after commit completes
+    sendAndAwaitAsyncCommit(consumer, Some(Map(tp2 -> new 
OffsetAndMetadata(7L))))
+    assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testAutoCommitOnRebalance(quorum: String, groupProtocol: String): Unit = 
{
+    val topic2 = "topic2"
+    createTopic(topic2, 2, brokerCount)
+
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true")
+    val consumer = createConsumer()
+
+    val numRecords = 10000
+    val producer = createProducer()
+    sendRecords(producer, numRecords, tp)
+
+    val rebalanceListener = new ConsumerRebalanceListener {
+      override def onPartitionsAssigned(partitions: 
util.Collection[TopicPartition]) = {
+        // keep partitions paused in this test so that we can verify the 
commits based on specific seeks
+        consumer.pause(partitions)
+      }
+
+      override def onPartitionsRevoked(partitions: 
util.Collection[TopicPartition]) = {}
+    }
+
+    consumer.subscribe(List(topic).asJava, rebalanceListener)
+
+    awaitAssignment(consumer, Set(tp, tp2))
+
+    consumer.seek(tp, 300)
+    consumer.seek(tp2, 500)
+
+    // change subscription to trigger rebalance
+    consumer.subscribe(List(topic, topic2).asJava, rebalanceListener)
+
+    val newAssignment = Set(tp, tp2, new TopicPartition(topic2, 0), new 
TopicPartition(topic2, 1))
+    awaitAssignment(consumer, newAssignment)
+
+    // after rebalancing, we should have reset to the committed positions
+    assertEquals(300, consumer.committed(Set(tp).asJava).get(tp).offset)
+    assertEquals(500, consumer.committed(Set(tp2).asJava).get(tp2).offset)
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit 
= {
+    // This test ensure that the member ID is propagated from the group 
coordinator when the
+    // assignment is received into a subsequent offset commit
+    val consumer = createConsumer()
+    assertEquals(0, consumer.assignment.size)
+    consumer.subscribe(List(topic).asJava)
+    awaitAssignment(consumer, Set(tp, tp2))
+
+    consumer.seek(tp, 0)
+
+    consumer.commitSync()
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testPositionAndCommit(quorum: String, groupProtocol: String): Unit = {
+    val producer = createProducer()
+    var startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords = 5, tp, startingTimestamp = 
startingTimestamp)
+
+    val topicPartition = new TopicPartition(topic, 15)
+    val consumer = createConsumer()
+    
assertNull(consumer.committed(Set(topicPartition).asJava).get(topicPartition))
+
+    // position() on a partition that we aren't subscribed to throws an 
exception
+    assertThrows(classOf[IllegalStateException], () => 
consumer.position(topicPartition))
+
+    consumer.assign(List(tp).asJava)
+
+    assertEquals(0L, consumer.position(tp), "position() on a partition that we 
are subscribed to should reset the offset")
+    consumer.commitSync()
+    assertEquals(0L, consumer.committed(Set(tp).asJava).get(tp).offset)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 5, 
startingOffset = 0, startingTimestamp = startingTimestamp)
+    assertEquals(5L, consumer.position(tp), "After consuming 5 records, 
position should be 5")
+    consumer.commitSync()
+    assertEquals(5L, consumer.committed(Set(tp).asJava).get(tp).offset, 
"Committed offset should be returned")
+
+    startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords = 1, tp, startingTimestamp = 
startingTimestamp)
+
+    // another consumer in the same group should get the same position
+    val otherConsumer = createConsumer()
+    otherConsumer.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, 
startingOffset = 5, startingTimestamp = startingTimestamp)
+  }
+
+  def changeConsumerSubscriptionAndValidateAssignment[K, V](consumer: 
Consumer[K, V],
+                                                            topicsToSubscribe: 
List[String],
+                                                            
expectedAssignment: Set[TopicPartition],
+                                                            rebalanceListener: 
ConsumerRebalanceListener): Unit = {
+    consumer.subscribe(topicsToSubscribe.asJava, rebalanceListener)
+    awaitAssignment(consumer, expectedAssignment)
+  }
+}
+
+object PlaintextConsumerCommitTest {
+
+  def getTestQuorumAndGroupProtocolParametersAll: Stream[Arguments] =
+    BaseConsumerTest.getTestQuorumAndGroupProtocolParametersAll()
+}

Review Comment:
   Only addition, the rest of the file is exact copy-paste from 
PlainTextConsumerTest



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to