dajac commented on code in PR #14353:
URL: https://github.com/apache/kafka/pull/14353#discussion_r1319545269


##########
core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala:
##########
@@ -14,233 +14,532 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package kafka.server
 
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance
+import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.OffsetFetchResponseData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
-import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, 
OffsetFetchResponse}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.apache.kafka.common.requests.{OffsetFetchRequest, 
OffsetFetchResponse}
+import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
 
-import java.util
-import java.util.Collections.singletonList
+import java.util.Comparator
+import java.util.stream.Collectors
 import scala.jdk.CollectionConverters._
-import java.util.{Collections, Optional, Properties}
-
-class OffsetFetchRequestTest extends BaseRequestTest {
-
-  override def brokerCount: Int = 1
-
-  val brokerId: Integer = 0
-  val offset = 15L
-  val leaderEpoch: Optional[Integer] = Optional.of(3)
-  val metadata = "metadata"
-  val topic = "topic"
-  val groupId = "groupId"
-  val groups: Seq[String] = (1 to 5).map(i => s"group$i")
-  val topics: Seq[String] = (1 to 3).map(i => s"topic$i")
-  val topic1List = singletonList(new TopicPartition(topics(0), 0))
-  val topic1And2List = util.Arrays.asList(
-    new TopicPartition(topics(0), 0),
-    new TopicPartition(topics(1), 0),
-    new TopicPartition(topics(1), 1))
-  val allTopicsList = util.Arrays.asList(
-    new TopicPartition(topics(0), 0),
-    new TopicPartition(topics(1), 0),
-    new TopicPartition(topics(1), 1),
-    new TopicPartition(topics(2), 0),
-    new TopicPartition(topics(2), 1),
-    new TopicPartition(topics(2), 2))
-  val groupToPartitionMap: util.Map[String, util.List[TopicPartition]] =
-    new util.HashMap[String, util.List[TopicPartition]]()
-  groupToPartitionMap.put(groups(0), topic1List)
-  groupToPartitionMap.put(groups(1), topic1And2List)
-  groupToPartitionMap.put(groups(2), allTopicsList)
-  groupToPartitionMap.put(groups(3), null)
-  groupToPartitionMap.put(groups(4), null)
-
-  override def brokerPropertyOverrides(properties: Properties): Unit = {
-    properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
-    properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
-    properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
-    properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
-    properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
-    properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1")
-    properties.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true")
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class OffsetFetchRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "600000"),
+    new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "600000"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def 
testSingleGroupOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+    testSingleGroupOffsetFetch(useNewProtocol = true, requireStable = true)
   }
 
-  @BeforeEach
-  override def setUp(testInfo: TestInfo): Unit = {
-    doSetup(testInfo, createOffsetsTopic = false)
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "600000"),
+    new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "600000"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def 
testSingleGroupOffsetFetchWithOldConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+    testSingleGroupOffsetFetch(useNewProtocol = false, requireStable = false)
+  }
 
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "600000"),
+    new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "600000"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def 
testSingleGroupOffsetFetchWithOldConsumerGroupProtocolAndOldGroupCoordinator(): 
Unit = {
+    testSingleGroupOffsetFetch(useNewProtocol = false, requireStable = true)
   }
 
-  @Test
-  def testOffsetFetchRequestSingleGroup(): Unit = {
-    createTopic(topic)
-
-    val tpList = singletonList(new TopicPartition(topic, 0))
-    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
-    commitOffsets(tpList)
-
-    // testing from version 1 onward since version 0 read offsets from ZK
-    for (version <- 1 to ApiKeys.OFFSET_FETCH.latestVersion()) {
-      if (version < 8) {
-        val request =
-          if (version < 7) {
-            new OffsetFetchRequest.Builder(
-              groupId, false, tpList, false)
-              .build(version.asInstanceOf[Short])
-          } else {
-            new OffsetFetchRequest.Builder(
-              groupId, false, tpList, true)
-              .build(version.asInstanceOf[Short])
-          }
-        val response = connectAndReceive[OffsetFetchResponse](request)
-        val topicData = response.data().topics().get(0)
-        val partitionData = topicData.partitions().get(0)
-        if (version < 3) {
-          assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, 
response.throttleTimeMs())
-        }
-        verifySingleGroupResponse(version.asInstanceOf[Short],
-          response.error().code(), partitionData.errorCode(), topicData.name(),
-          partitionData.partitionIndex(), partitionData.committedOffset(),
-          partitionData.committedLeaderEpoch(), partitionData.metadata())
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "600000"),
+    new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "600000"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def 
testMultiGroupsOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+    testMultipleGroupsOffsetFetch(useNewProtocol = true, requireStable = true)
+  }
+
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "600000"),
+    new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "600000"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def 
testMultiGroupsOffsetFetchWithOldConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+    testMultipleGroupsOffsetFetch(useNewProtocol = false, requireStable = 
false)
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "600000"),
+    new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "600000"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def 
testMultiGroupsOffsetFetchWithOldConsumerGroupProtocolAndOldGroupCoordinator(): 
Unit = {
+    testMultipleGroupsOffsetFetch(useNewProtocol = false, requireStable = true)
+  }
+
+  private def testSingleGroupOffsetFetch(useNewProtocol: Boolean, 
requireStable: Boolean): Unit = {

Review Comment:
   Sure. I can separate the fetch offsets vs fetch all offsets cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to