lucasbru commented on code in PR #15535:
URL: https://github.com/apache/kafka/pull/15535#discussion_r1537284419
##
core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignTest.scala:
##
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package kafka.api
+
+import kafka.utils.{TestInfoUtils, TestUtils}
+import java.util.Properties
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+import org.apache.kafka.common.PartitionInfo
+import java.util.stream.Stream
+import scala.jdk.CollectionConverters._
+import scala.collection.mutable
+import org.junit.jupiter.params.provider.CsvSource
+
+/**
+ * Integration tests for the consumer that covers logic related to manual
assignment.
+ */
+@Timeout(600)
+class PlaintextConsumerAssignTest extends AbstractConsumerTest {
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testAssignAndCommitAsyncNotCommitted(quorum: String, groupProtocol:
String): Unit = {
+val props = new Properties()
+val consumer = createConsumer(configOverrides = props)
+val producer = createProducer()
+val numRecords = 1
+val startingTimestamp = System.currentTimeMillis()
+val cb = new CountConsumerCommitCallback
+sendRecords(producer, numRecords, tp, startingTimestamp =
startingTimestamp)
+consumer.assign(List(tp).asJava)
+consumer.commitAsync(cb)
+TestUtils.pollUntilTrue(consumer, () => cb.successCount >= 1 ||
cb.lastError.isDefined,
+ "Failed to observe commit callback before timeout", waitTimeMs = 1)
+val committedOffset = consumer.committed(Set(tp).asJava)
+assertNotNull(committedOffset)
+// No valid fetch position due to the absence of consumer.poll; and
therefore no offset was committed to
+// tp. The committed offset should be null. This is intentional.
+assertNull(committedOffset.get(tp))
+assertTrue(consumer.assignment.contains(tp))
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testAssignAndCommitSyncNotCommitted(quorum: String, groupProtocol:
String): Unit = {
+val props = new Properties()
+val consumer = createConsumer(configOverrides = props)
+val producer = createProducer()
+val numRecords = 1
+val startingTimestamp = System.currentTimeMillis()
+sendRecords(producer, numRecords, tp, startingTimestamp =
startingTimestamp)
+consumer.assign(List(tp).asJava)
+consumer.commitSync()
+val committedOffset = consumer.committed(Set(tp).asJava)
+assertNotNull(committedOffset)
+// No valid fetch position due to the absence of consumer.poll; and
therefore no offset was committed to
+// tp. The committed offset should be null. This is intentional.
+assertNull(committedOffset.get(tp))
+assertTrue(consumer.assignment.contains(tp))
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testAssignAndCommitSyncAllConsumed(quorum: String, groupProtocol:
String): Unit = {
+val numRecords = 1
+
+val producer = createProducer()
+val startingTimestamp = System.currentTimeMillis()
+sendRecords(producer, numRecords, tp, startingTimestamp =
startingTimestamp)
+
+val props = new Properties()
+val consumer = createConsumer(configOverrides = props)
+consumer.assign(List(tp).asJava)
+consumer.seek(tp, 0)
+consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset =
0, startingTimestamp = startingTimestamp)
+
+consumer.commitSync()
+val committedOffset = consumer.committed(Set(tp).asJava)
+assertNotNull(committedOffset)
+assertNotNull(committedOffset.get(tp))
+assertEquals(numRecords,