rreddy-22 commented on code in PR #14353: URL: https://github.com/apache/kafka/pull/14353#discussion_r1319088896
########## core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala: ########## @@ -14,233 +14,532 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package kafka.server +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance +import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance import kafka.utils.TestUtils -import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata} import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.OffsetFetchResponseData import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData -import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, OffsetFetchResponse} -import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} -import org.junit.jupiter.api.{BeforeEach, Test, TestInfo} +import org.apache.kafka.common.requests.{OffsetFetchRequest, OffsetFetchResponse} +import org.junit.jupiter.api.Assertions.{assertEquals, fail} +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith -import java.util -import java.util.Collections.singletonList +import java.util.Comparator +import java.util.stream.Collectors import scala.jdk.CollectionConverters._ -import java.util.{Collections, Optional, Properties} - -class OffsetFetchRequestTest extends BaseRequestTest { - - override def brokerCount: Int = 1 - - val brokerId: Integer = 0 - val offset = 15L - val leaderEpoch: Optional[Integer] = Optional.of(3) - val metadata = "metadata" - val topic = "topic" - val groupId = "groupId" - val groups: Seq[String] = (1 to 5).map(i => s"group$i") - val topics: Seq[String] = (1 to 3).map(i => s"topic$i") - val topic1List = singletonList(new TopicPartition(topics(0), 0)) - val topic1And2List = util.Arrays.asList( - new TopicPartition(topics(0), 0), - new TopicPartition(topics(1), 0), - new TopicPartition(topics(1), 1)) - val allTopicsList = util.Arrays.asList( - new TopicPartition(topics(0), 0), - new TopicPartition(topics(1), 0), - new TopicPartition(topics(1), 1), - new TopicPartition(topics(2), 0), - new TopicPartition(topics(2), 1), - new TopicPartition(topics(2), 2)) - val groupToPartitionMap: util.Map[String, util.List[TopicPartition]] = - new util.HashMap[String, util.List[TopicPartition]]() - groupToPartitionMap.put(groups(0), topic1List) - groupToPartitionMap.put(groups(1), topic1And2List) - groupToPartitionMap.put(groups(2), allTopicsList) - groupToPartitionMap.put(groups(3), null) - groupToPartitionMap.put(groups(4), null) - - override def brokerPropertyOverrides(properties: Properties): Unit = { - properties.put(KafkaConfig.BrokerIdProp, brokerId.toString) - properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") - properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") - properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1") - properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1") - properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1") - properties.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true") + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testSingleGroupOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testSingleGroupOffsetFetch(useNewProtocol = true, requireStable = true) } - @BeforeEach - override def setUp(testInfo: TestInfo): Unit = { - doSetup(testInfo, createOffsetsTopic = false) + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testSingleGroupOffsetFetchWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testSingleGroupOffsetFetch(useNewProtocol = false, requireStable = false) + } - TestUtils.createOffsetsTopic(zkClient, servers) + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testSingleGroupOffsetFetchWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { + testSingleGroupOffsetFetch(useNewProtocol = false, requireStable = true) } - @Test - def testOffsetFetchRequestSingleGroup(): Unit = { - createTopic(topic) - - val tpList = singletonList(new TopicPartition(topic, 0)) - consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId) - commitOffsets(tpList) - - // testing from version 1 onward since version 0 read offsets from ZK - for (version <- 1 to ApiKeys.OFFSET_FETCH.latestVersion()) { - if (version < 8) { - val request = - if (version < 7) { - new OffsetFetchRequest.Builder( - groupId, false, tpList, false) - .build(version.asInstanceOf[Short]) - } else { - new OffsetFetchRequest.Builder( - groupId, false, tpList, true) - .build(version.asInstanceOf[Short]) - } - val response = connectAndReceive[OffsetFetchResponse](request) - val topicData = response.data().topics().get(0) - val partitionData = topicData.partitions().get(0) - if (version < 3) { - assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs()) - } - verifySingleGroupResponse(version.asInstanceOf[Short], - response.error().code(), partitionData.errorCode(), topicData.name(), - partitionData.partitionIndex(), partitionData.committedOffset(), - partitionData.committedLeaderEpoch(), partitionData.metadata()) + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testMultiGroupsOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testMultipleGroupsOffsetFetch(useNewProtocol = true, requireStable = true) + } + + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testMultiGroupsOffsetFetchWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testMultipleGroupsOffsetFetch(useNewProtocol = false, requireStable = false) + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testMultiGroupsOffsetFetchWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { + testMultipleGroupsOffsetFetch(useNewProtocol = false, requireStable = true) + } + + private def testSingleGroupOffsetFetch(useNewProtocol: Boolean, requireStable: Boolean): Unit = { + if (useNewProtocol && !isNewGroupCoordinatorEnabled) { + fail("Cannot use the new protocol with the old group coordinator.") + } + + val admin = cluster.createAdminClient() + + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + TestUtils.createOffsetsTopicWithAdmin( + admin = admin, + brokers = if (cluster.isKRaftTest) { + cluster.asInstanceOf[RaftClusterInstance].brokers.collect(Collectors.toList[KafkaBroker]).asScala } else { - val request = new OffsetFetchRequest.Builder( - Map(groupId -> tpList).asJava, false, false) - .build(version.asInstanceOf[Short]) - val response = connectAndReceive[OffsetFetchResponse](request) - val groupData = response.data().groups().get(0) - val topicData = groupData.topics().get(0) - val partitionData = topicData.partitions().get(0) - verifySingleGroupResponse(version.asInstanceOf[Short], - groupData.errorCode(), partitionData.errorCode(), topicData.name(), - partitionData.partitionIndex(), partitionData.committedOffset(), - partitionData.committedLeaderEpoch(), partitionData.metadata()) + cluster.asInstanceOf[ZkClusterInstance].servers.collect(Collectors.toList[KafkaBroker]).asScala } + ) + + // Create the topic. + TestUtils.createTopicWithAdminRaw( + admin = admin, + topic = "foo", + numPartitions = 3 + ) + + // Join the consumer group. + val (memberId, memberEpoch) = if (useNewProtocol) { + // Note that we heartbeat only once to join the group and assume + // that the test will complete within the session timeout. + joinConsumerGroupWithNewProtocol("grp") + } else { + // Note that we don't heartbeat and assume that the test will + // complete within the session timeout. + joinConsumerGroupWithOldProtocol("grp") } - } - @Test - def testOffsetFetchRequestAllOffsetsSingleGroup(): Unit = { - createTopic(topic) - - val tpList = singletonList(new TopicPartition(topic, 0)) - consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId) - commitOffsets(tpList) - - // Testing from version 2 onward since version 0 and do not support - // fetching all offsets. - for (version <- 2 to ApiKeys.OFFSET_FETCH.latestVersion()) { - if (version < 8) { - val request = new OffsetFetchRequest.Builder( - groupId, - false, - null, - version >= 7 - ).build(version.toShort) - - val response = connectAndReceive[OffsetFetchResponse](request) - assertEquals(Errors.NONE, response.error()) - val topicData = response.data.topics().get(0) - val partitionData = topicData.partitions().get(0) - if (version < 3) { - assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs()) + // Commit offsets. + for (partitionId <- 0 to 2) { + commitOffset( + groupId = "grp", + memberId = memberId, + memberEpoch = memberEpoch, + topic = "foo", + partition = partitionId, + offset = 100L + partitionId, + expectedError = Errors.NONE, + version = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled) + ) + } + + // Start from version 1 because version 0 goes to ZK. + for (version <- 1 to ApiKeys.OFFSET_FETCH.latestVersion(isUnstableApiEnabled)) { + // Fetch with partitions. + assertEquals( + new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId("grp") + .setTopics(List( + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setName("foo") + .setPartitions(List( + new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(0) + .setCommittedOffset(100L), + new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(1) + .setCommittedOffset(101L), + new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(5) + .setCommittedOffset(-1L) + ).asJava) + ).asJava), + fetchOffsets( + groupId = "grp", + memberId = memberId, + memberEpoch = memberEpoch, + partitions = List( + new TopicPartition("foo", 0), + new TopicPartition("foo", 1), + new TopicPartition("foo", 5) // This one does not exist. + ), + requireStable = requireStable, + version = version.toShort + ) + ) + + assertEquals( + new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId("unknown") + .setTopics(List( + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setName("foo") + .setPartitions(List( + new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(0) + .setCommittedOffset(-1L), + new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(1) + .setCommittedOffset(-1L), + new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(5) + .setCommittedOffset(-1L) + ).asJava) + ).asJava), + fetchOffsets( + groupId = "unknown", + memberId = memberId, + memberEpoch = memberEpoch, + partitions = List( + new TopicPartition("foo", 0), + new TopicPartition("foo", 1), + new TopicPartition("foo", 5) // This one does not exist. + ), + requireStable = requireStable, + version = version.toShort + ) + ) + + if (useNewProtocol && version >= 9) { + assertEquals( + new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId("grp") + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code), + fetchOffsets( + groupId = "grp", + memberId = "", + memberEpoch = memberEpoch, + partitions = List.empty, + requireStable = requireStable, + version = version.toShort + ) + ) + + assertEquals( + new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId("grp") + .setErrorCode(Errors.STALE_MEMBER_EPOCH.code), + fetchOffsets( + groupId = "grp", + memberId = memberId, + memberEpoch = memberEpoch + 1, + partitions = List.empty, + requireStable = requireStable, + version = version.toShort + ) + ) + } + + // Fetch all partitions. Starting from version 2 as it was not + // supported before. + if (version >= 2) { + assertEquals( + new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId("grp") + .setTopics(List( + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setName("foo") + .setPartitions(List( + new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(0) + .setCommittedOffset(100L), + new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(1) + .setCommittedOffset(101L), + new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(2) + .setCommittedOffset(102L) + ).asJava) + ).asJava), + fetchOffsets( + groupId = "grp", + memberId = memberId, + memberEpoch = memberEpoch, + partitions = null, + requireStable = requireStable, + version = version.toShort + ) + ) + + assertEquals( + new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId("unknown"), + fetchOffsets( + groupId = "unknown", + memberId = memberId, + memberEpoch = memberEpoch, + partitions = null, + requireStable = requireStable, + version = version.toShort + ) + ) + + if (useNewProtocol && version >= 9) { + assertEquals( + new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId("grp") + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code), + fetchOffsets( + groupId = "grp", + memberId = "", + memberEpoch = memberEpoch, + partitions = null, + requireStable = requireStable, + version = version.toShort + ) + ) + + assertEquals( + new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId("grp") + .setErrorCode(Errors.STALE_MEMBER_EPOCH.code), + fetchOffsets( + groupId = "grp", + memberId = memberId, + memberEpoch = memberEpoch + 1, + partitions = null, + requireStable = requireStable, + version = version.toShort + ) + ) } - verifySingleGroupResponse(version.asInstanceOf[Short], - response.error().code(), partitionData.errorCode(), topicData.name(), - partitionData.partitionIndex(), partitionData.committedOffset(), - partitionData.committedLeaderEpoch(), partitionData.metadata()) - } else { - val request = new OffsetFetchRequest.Builder( - Collections.singletonMap(groupId, null), - false, - false - ).build(version.toShort) - - val response = connectAndReceive[OffsetFetchResponse](request) - assertEquals(Errors.NONE, response.groupLevelError(groupId)) - val groupData = response.data().groups().get(0) - val topicData = groupData.topics().get(0) - val partitionData = topicData.partitions().get(0) - verifySingleGroupResponse(version.asInstanceOf[Short], - groupData.errorCode(), partitionData.errorCode(), topicData.name(), - partitionData.partitionIndex(), partitionData.committedOffset(), - partitionData.committedLeaderEpoch(), partitionData.metadata()) + } } } - @Test - def testOffsetFetchRequestWithMultipleGroups(): Unit = { - createTopic(topics(0)) - createTopic(topics(1), numPartitions = 2) - createTopic(topics(2), numPartitions = 3) - - // create 5 consumers to commit offsets so we can fetch them later - val partitionMap = groupToPartitionMap.asScala.map(e => (e._1, Option(e._2).getOrElse(allTopicsList))) - groups.foreach { groupId => - consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId) - commitOffsets(partitionMap(groupId)) + private def testMultipleGroupsOffsetFetch(useNewProtocol: Boolean, requireStable: Boolean): Unit = { + if (useNewProtocol && !isNewGroupCoordinatorEnabled) { + fail("Cannot use the new protocol with the old group coordinator.") + } + + val admin = cluster.createAdminClient() + + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + TestUtils.createOffsetsTopicWithAdmin( + admin = admin, + brokers = if (cluster.isKRaftTest) { + cluster.asInstanceOf[RaftClusterInstance].brokers.collect(Collectors.toList[KafkaBroker]).asScala + } else { + cluster.asInstanceOf[ZkClusterInstance].servers.collect(Collectors.toList[KafkaBroker]).asScala + } + ) + + // Create the topic. + TestUtils.createTopicWithAdminRaw( + admin = admin, + topic = "foo", + numPartitions = 3 + ) + + // Create groups and commits offsets. + List("grp-0", "grp-1", "grp-2").foreach { groupId => + val (memberId, memberEpoch) = if (useNewProtocol) { + joinConsumerGroupWithNewProtocol(groupId) + } else { + joinConsumerGroupWithOldProtocol(groupId) + } + + for (partitionId <- 0 to 2) { + commitOffset( + groupId = groupId, + memberId = memberId, + memberEpoch = memberEpoch, + topic = "foo", + partition = partitionId, + offset = 100L + partitionId, + expectedError = Errors.NONE, + version = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled) + ) + } } - for (version <- 8 to ApiKeys.OFFSET_FETCH.latestVersion()) { - val request = new OffsetFetchRequest.Builder(groupToPartitionMap, false, false) - .build(version.asInstanceOf[Short]) - val response = connectAndReceive[OffsetFetchResponse](request) - response.data.groups.asScala.map(_.groupId).foreach( groupId => - verifyResponse(response.groupLevelError(groupId), response.partitionDataMap(groupId), partitionMap(groupId)) + // Start from version 8 because older versions do not support + // fetch offsets for multiple groups. + for (version <- 8 to ApiKeys.OFFSET_FETCH.latestVersion(isUnstableApiEnabled)) { + assertEquals( + List( + // Fetch foo-0, foo-1 and foo-5. Review Comment: Since we're already checking offset fetch for foo-0 and foo-1 in the next fetchAllPartitions block, did we wanna check for a mix of existing and non existent partitions?, can we also comment fetch non-existent partition foo-5 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org