This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new e076d2b KAFKA-13045: Adding a test for batched offsetFetch requests
with one group repeating (#11000)
e076d2b is described below
commit e076d2b32bec0db190796b9ce3c9541a8ea0999d
Author: Sanjana Kaundinya <[email protected]>
AuthorDate: Sat Jul 10 00:43:55 2021 -0700
KAFKA-13045: Adding a test for batched offsetFetch requests with one group
repeating (#11000)
Reviewers: Rajini Sivaram <[email protected]>
---
.../kafka/api/AuthorizerIntegrationTest.scala | 58 ++------
.../unit/kafka/server/OffsetFetchRequestTest.scala | 147 ++++++++++-----------
2 files changed, 84 insertions(+), 121 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index fd5f12c..36cdc51 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -1422,40 +1422,23 @@ class AuthorizerIntegrationTest extends BaseRequestTest
{
val offset = 15L
val leaderEpoch: Optional[Integer] = Optional.of(1)
val metadata = "metadata"
- val topicOneOffsets = topic1List.asScala.map {
- tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
- }.toMap.asJava
- val topicOneAndTwoOffsets = topic1And2List.asScala.map {
- tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
- }.toMap.asJava
- val allTopicOffsets = allTopicsList.asScala.map {
- tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
- }.toMap.asJava
- // create 5 consumers to commit offsets so we can fetch them later
-
- def commitOffsets(tpList: util.List[TopicPartition],
- offsets: util.Map[TopicPartition, OffsetAndMetadata]):
Unit = {
+ def commitOffsets(tpList: util.List[TopicPartition]): Unit = {
val consumer = createConsumer()
consumer.assign(tpList)
+ val offsets = tpList.asScala.map{
+ tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
+ }.toMap.asJava
consumer.commitSync(offsets)
consumer.close()
}
- consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(0))
- commitOffsets(topic1List, topicOneOffsets)
-
- consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(1))
- commitOffsets(topic1And2List, topicOneAndTwoOffsets)
-
- consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(2))
- commitOffsets(allTopicsList, allTopicOffsets)
-
- consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(3))
- commitOffsets(allTopicsList, allTopicOffsets)
-
- consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(4))
- commitOffsets(allTopicsList, allTopicOffsets)
+ // create 5 consumers to commit offsets so we can fetch them later
+ val partitionMap = groupToPartitionMap.asScala.map(e => (e._1,
Option(e._2).getOrElse(allTopicsList)))
+ groups.foreach { groupId =>
+ consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+ commitOffsets(partitionMap(groupId))
+ }
removeAllClientAcls()
@@ -1548,24 +1531,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
// from the offsetFetch response
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WildcardHost, DESCRIBE, ALLOW)), topicResources(2))
offsetFetchResponse =
connectAndReceive[OffsetFetchResponse](offsetFetchRequest)
- offsetFetchResponse.data().groups().forEach(g =>
- g.groupId() match {
- case "group1" =>
- verifyResponse(offsetFetchResponse.groupLevelError(groups(0)),
offsetFetchResponse
- .partitionDataMap(groups(0)), topic1List)
- case "group2" =>
- verifyResponse(offsetFetchResponse.groupLevelError(groups(1)),
offsetFetchResponse
- .partitionDataMap(groups(1)), topic1And2List)
- case "group3" =>
- verifyResponse(offsetFetchResponse.groupLevelError(groups(2)),
offsetFetchResponse
- .partitionDataMap(groups(2)), allTopicsList)
- case "group4" =>
- verifyResponse(offsetFetchResponse.groupLevelError(groups(3)),
offsetFetchResponse
- .partitionDataMap(groups(3)), allTopicsList)
- case "group5" =>
- verifyResponse(offsetFetchResponse.groupLevelError(groups(4)),
offsetFetchResponse
- .partitionDataMap(groups(4)), allTopicsList)
- })
+ offsetFetchResponse.data.groups.asScala.map(_.groupId).foreach( groupId =>
+ verifyResponse(offsetFetchResponse.groupLevelError(groupId),
offsetFetchResponse.partitionDataMap(groupId), partitionMap(groupId))
+ )
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index ea5064b..5e821fb 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -20,6 +20,7 @@ package kafka.server
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata}
import org.apache.kafka.common.TopicPartition
+import
org.apache.kafka.common.message.OffsetFetchRequestData.{OffsetFetchRequestGroup,
OffsetFetchRequestTopics}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest,
OffsetFetchResponse}
@@ -42,6 +43,26 @@ class OffsetFetchRequestTest extends BaseRequestTest {
val topic = "topic"
val groupId = "groupId"
val groups: Seq[String] = (1 to 5).map(i => s"group$i")
+ val topics: Seq[String] = (1 to 3).map(i => s"topic$i")
+ val topic1List = singletonList(new TopicPartition(topics(0), 0))
+ val topic1And2List = util.Arrays.asList(
+ new TopicPartition(topics(0), 0),
+ new TopicPartition(topics(1), 0),
+ new TopicPartition(topics(1), 1))
+ val allTopicsList = util.Arrays.asList(
+ new TopicPartition(topics(0), 0),
+ new TopicPartition(topics(1), 0),
+ new TopicPartition(topics(1), 1),
+ new TopicPartition(topics(2), 0),
+ new TopicPartition(topics(2), 1),
+ new TopicPartition(topics(2), 2))
+ val groupToPartitionMap: util.Map[String, util.List[TopicPartition]] =
+ new util.HashMap[String, util.List[TopicPartition]]()
+ groupToPartitionMap.put(groups(0), topic1List)
+ groupToPartitionMap.put(groups(1), topic1And2List)
+ groupToPartitionMap.put(groups(2), allTopicsList)
+ groupToPartitionMap.put(groups(3), null)
+ groupToPartitionMap.put(groups(4), null)
override def brokerPropertyOverrides(properties: Properties): Unit = {
properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
@@ -64,12 +85,8 @@ class OffsetFetchRequestTest extends BaseRequestTest {
createTopic(topic)
val tpList = singletonList(new TopicPartition(topic, 0))
- val topicOffsets = tpList.asScala.map{
- tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
- }.toMap.asJava
-
consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
- commitOffsets(tpList, topicOffsets)
+ commitOffsets(tpList)
// testing from version 1 onward since version 0 read offsets from ZK
for (version <- 1 to ApiKeys.OFFSET_FETCH.latestVersion()) {
@@ -112,84 +129,60 @@ class OffsetFetchRequestTest extends BaseRequestTest {
@Test
def testOffsetFetchRequestWithMultipleGroups(): Unit = {
-
- val topic1 = "topic1"
- val topic1List = singletonList(new TopicPartition(topic1, 0))
- val topic2 = "topic2"
- val topic1And2List = util.Arrays.asList(
- new TopicPartition(topic1, 0),
- new TopicPartition(topic2, 0),
- new TopicPartition(topic2, 1))
- val topic3 = "topic3"
- val allTopicsList = util.Arrays.asList(
- new TopicPartition(topic1, 0),
- new TopicPartition(topic2, 0),
- new TopicPartition(topic2, 1),
- new TopicPartition(topic3, 0),
- new TopicPartition(topic3, 1),
- new TopicPartition(topic3, 2))
-
- // create group to partition map to build batched offsetFetch request
- val groupToPartitionMap: util.Map[String, util.List[TopicPartition]] =
- new util.HashMap[String, util.List[TopicPartition]]()
- groupToPartitionMap.put(groups(0), topic1List)
- groupToPartitionMap.put(groups(1), topic1And2List)
- groupToPartitionMap.put(groups(2), allTopicsList)
- groupToPartitionMap.put(groups(3), null)
- groupToPartitionMap.put(groups(4), null)
-
- createTopic(topic1)
- createTopic(topic2, numPartitions = 2)
- createTopic(topic3, numPartitions = 3)
-
- val topicOneOffsets = topic1List.asScala.map{
- tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
- }.toMap.asJava
- val topicOneAndTwoOffsets = topic1And2List.asScala.map{
- tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
- }.toMap.asJava
- val allTopicOffsets = allTopicsList.asScala.map{
- tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
- }.toMap.asJava
+ createTopic(topics(0))
+ createTopic(topics(1), numPartitions = 2)
+ createTopic(topics(2), numPartitions = 3)
// create 5 consumers to commit offsets so we can fetch them later
- consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(0))
- commitOffsets(topic1List, topicOneOffsets)
-
- consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(1))
- commitOffsets(topic1And2List, topicOneAndTwoOffsets)
+ val partitionMap = groupToPartitionMap.asScala.map(e => (e._1,
Option(e._2).getOrElse(allTopicsList)))
+ groups.foreach { groupId =>
+ consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+ commitOffsets(partitionMap(groupId))
+ }
- consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(2))
- commitOffsets(allTopicsList, allTopicOffsets)
+ for (version <- 8 to ApiKeys.OFFSET_FETCH.latestVersion()) {
+ val request = new OffsetFetchRequest.Builder(groupToPartitionMap,
false, false)
+ .build(version.asInstanceOf[Short])
+ val response = connectAndReceive[OffsetFetchResponse](request)
+ response.data.groups.asScala.map(_.groupId).foreach( groupId =>
+ verifyResponse(response.groupLevelError(groupId),
response.partitionDataMap(groupId), partitionMap(groupId))
+ )
+ }
+ }
- consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(3))
- commitOffsets(allTopicsList, allTopicOffsets)
+ @Test
+ def testOffsetFetchRequestWithMultipleGroupsWithOneGroupRepeating(): Unit = {
+ createTopic(topics(0))
+ createTopic(topics(1), numPartitions = 2)
+ createTopic(topics(2), numPartitions = 3)
- consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groups(4))
- commitOffsets(allTopicsList, allTopicOffsets)
+ // create 5 consumers to commit offsets so we can fetch them later
+ val partitionMap = groupToPartitionMap.asScala.map(e => (e._1,
Option(e._2).getOrElse(allTopicsList)))
+ groups.foreach { groupId =>
+ consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+ commitOffsets(partitionMap(groupId))
+ }
for (version <- 8 to ApiKeys.OFFSET_FETCH.latestVersion()) {
- val request = new OffsetFetchRequest.Builder(groupToPartitionMap,
false, false)
+ val request = new OffsetFetchRequest.Builder(groupToPartitionMap, false,
false)
.build(version.asInstanceOf[Short])
+ val requestGroups = request.data().groups()
+ requestGroups.add(
+ // add the same group as before with different topic partitions
+ new OffsetFetchRequestGroup()
+ .setGroupId(groups(2))
+ .setTopics(singletonList(
+ new OffsetFetchRequestTopics()
+ .setName(topics(0))
+ .setPartitionIndexes(singletonList(0)))))
+ request.data().setGroups(requestGroups)
val response = connectAndReceive[OffsetFetchResponse](request)
- response.data().groups().forEach(g =>
- g.groupId() match {
- case "group1" =>
- verifyResponse(response.groupLevelError(groups(0)),
- response.partitionDataMap(groups(0)), topic1List)
- case "group2" =>
- verifyResponse(response.groupLevelError(groups(1)),
- response.partitionDataMap(groups(1)), topic1And2List)
- case "group3" =>
- verifyResponse(response.groupLevelError(groups(2)),
- response.partitionDataMap(groups(2)), allTopicsList)
- case "group4" =>
- verifyResponse(response.groupLevelError(groups(3)),
- response.partitionDataMap(groups(3)), allTopicsList)
- case "group5" =>
- verifyResponse(response.groupLevelError(groups(4)),
- response.partitionDataMap(groups(4)), allTopicsList)
- })
+ response.data.groups.asScala.map(_.groupId).foreach( groupId =>
+ if (groupId == "group3") // verify that the response gives back the
latest changed topic partition list
+ verifyResponse(response.groupLevelError(groupId),
response.partitionDataMap(groupId), topic1List)
+ else
+ verifyResponse(response.groupLevelError(groupId),
response.partitionDataMap(groupId), partitionMap(groupId))
+ )
}
}
@@ -227,10 +220,12 @@ class OffsetFetchRequestTest extends BaseRequestTest {
topicList.forEach(t => verifyPartitionData(partitionData.get(t)))
}
- private def commitOffsets(tpList: util.List[TopicPartition],
- offsets: util.Map[TopicPartition,
OffsetAndMetadata]): Unit = {
+ private def commitOffsets(tpList: util.List[TopicPartition]): Unit = {
val consumer = createConsumer()
consumer.assign(tpList)
+ val offsets = tpList.asScala.map{
+ tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
+ }.toMap.asJava
consumer.commitSync(offsets)
consumer.close()
}