dajac commented on code in PR #14353: URL: https://github.com/apache/kafka/pull/14353#discussion_r1319545269
########## core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala: ########## @@ -14,233 +14,532 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package kafka.server +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance +import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance import kafka.utils.TestUtils -import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata} import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.OffsetFetchResponseData import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData -import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, OffsetFetchResponse} -import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} -import org.junit.jupiter.api.{BeforeEach, Test, TestInfo} +import org.apache.kafka.common.requests.{OffsetFetchRequest, OffsetFetchResponse} +import org.junit.jupiter.api.Assertions.{assertEquals, fail} +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith -import java.util -import java.util.Collections.singletonList +import java.util.Comparator +import java.util.stream.Collectors import scala.jdk.CollectionConverters._ -import java.util.{Collections, Optional, Properties} - -class OffsetFetchRequestTest extends BaseRequestTest { - - override def brokerCount: Int = 1 - - val brokerId: Integer = 0 - val offset = 15L - val leaderEpoch: Optional[Integer] = Optional.of(3) - val metadata = "metadata" - val topic = "topic" - val groupId = "groupId" - val groups: Seq[String] = (1 to 5).map(i => s"group$i") - val topics: Seq[String] = (1 to 3).map(i => s"topic$i") - val topic1List = singletonList(new TopicPartition(topics(0), 0)) - val topic1And2List = util.Arrays.asList( - new TopicPartition(topics(0), 0), - new TopicPartition(topics(1), 0), - new TopicPartition(topics(1), 1)) - val allTopicsList = util.Arrays.asList( - new TopicPartition(topics(0), 0), - new TopicPartition(topics(1), 0), - new TopicPartition(topics(1), 1), - new TopicPartition(topics(2), 0), - new TopicPartition(topics(2), 1), - new TopicPartition(topics(2), 2)) - val groupToPartitionMap: util.Map[String, util.List[TopicPartition]] = - new util.HashMap[String, util.List[TopicPartition]]() - groupToPartitionMap.put(groups(0), topic1List) - groupToPartitionMap.put(groups(1), topic1And2List) - groupToPartitionMap.put(groups(2), allTopicsList) - groupToPartitionMap.put(groups(3), null) - groupToPartitionMap.put(groups(4), null) - - override def brokerPropertyOverrides(properties: Properties): Unit = { - properties.put(KafkaConfig.BrokerIdProp, brokerId.toString) - properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") - properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") - properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1") - properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1") - properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1") - properties.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true") + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testSingleGroupOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testSingleGroupOffsetFetch(useNewProtocol = true, requireStable = true) } - @BeforeEach - override def setUp(testInfo: TestInfo): Unit = { - doSetup(testInfo, createOffsetsTopic = false) + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testSingleGroupOffsetFetchWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testSingleGroupOffsetFetch(useNewProtocol = false, requireStable = false) + } - TestUtils.createOffsetsTopic(zkClient, servers) + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testSingleGroupOffsetFetchWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { + testSingleGroupOffsetFetch(useNewProtocol = false, requireStable = true) } - @Test - def testOffsetFetchRequestSingleGroup(): Unit = { - createTopic(topic) - - val tpList = singletonList(new TopicPartition(topic, 0)) - consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId) - commitOffsets(tpList) - - // testing from version 1 onward since version 0 read offsets from ZK - for (version <- 1 to ApiKeys.OFFSET_FETCH.latestVersion()) { - if (version < 8) { - val request = - if (version < 7) { - new OffsetFetchRequest.Builder( - groupId, false, tpList, false) - .build(version.asInstanceOf[Short]) - } else { - new OffsetFetchRequest.Builder( - groupId, false, tpList, true) - .build(version.asInstanceOf[Short]) - } - val response = connectAndReceive[OffsetFetchResponse](request) - val topicData = response.data().topics().get(0) - val partitionData = topicData.partitions().get(0) - if (version < 3) { - assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs()) - } - verifySingleGroupResponse(version.asInstanceOf[Short], - response.error().code(), partitionData.errorCode(), topicData.name(), - partitionData.partitionIndex(), partitionData.committedOffset(), - partitionData.committedLeaderEpoch(), partitionData.metadata()) + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testMultiGroupsOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testMultipleGroupsOffsetFetch(useNewProtocol = true, requireStable = true) + } + + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testMultiGroupsOffsetFetchWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testMultipleGroupsOffsetFetch(useNewProtocol = false, requireStable = false) + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testMultiGroupsOffsetFetchWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { + testMultipleGroupsOffsetFetch(useNewProtocol = false, requireStable = true) + } + + private def testSingleGroupOffsetFetch(useNewProtocol: Boolean, requireStable: Boolean): Unit = { Review Comment: Sure. I can separate the fetch offsets vs fetch all offsets cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org