[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


rajinisivaram commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664918688



##
File path: 
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##
@@ -1358,17 +1367,222 @@ class AuthorizerIntegrationTest extends 
BaseRequestTest {
 // note there's only one broker, so no need to lookup the group coordinator
 
 // without describe permission on the topic, we shouldn't be able to fetch 
offsets
-val offsetFetchRequest = new requests.OffsetFetchRequest.Builder(group, 
false, null, false).build()
+val offsetFetchRequest = createOffsetFetchRequestAllPartitions
 var offsetFetchResponse = 
connectAndReceive[OffsetFetchResponse](offsetFetchRequest)
-assertEquals(Errors.NONE, offsetFetchResponse.error)
-assertTrue(offsetFetchResponse.responseData.isEmpty)
+assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(group))
+assertTrue(offsetFetchResponse.partitionDataMap(group).isEmpty)
 
 // now add describe permission on the topic and verify that the offset can 
be fetched
 addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, DESCRIBE, ALLOW)), topicResource)
 offsetFetchResponse = 
connectAndReceive[OffsetFetchResponse](offsetFetchRequest)
-assertEquals(Errors.NONE, offsetFetchResponse.error)
-assertTrue(offsetFetchResponse.responseData.containsKey(tp))
-assertEquals(offset, offsetFetchResponse.responseData.get(tp).offset)
+assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(group))
+assertTrue(offsetFetchResponse.partitionDataMap(group).containsKey(tp))
+assertEquals(offset, 
offsetFetchResponse.partitionDataMap(group).get(tp).offset)
+  }
+
+  @Test
+  def testOffsetFetchMultipleGroupsAuthorization(): Unit = {
+val groups = (0 until 5).map(i => s"group$i")
+val groupResources = groups.map(group => new ResourcePattern(GROUP, group, 
LITERAL))
+
+val topic1 = "topic1"
+val topic1List = singletonList(new TopicPartition(topic1, 0))
+val topicOneResource = new ResourcePattern(TOPIC, topic1, LITERAL)
+val topic2 = "topic2"
+val topic1And2List = util.Arrays.asList(
+  new TopicPartition(topic1, 0),
+  new TopicPartition(topic2, 0),
+  new TopicPartition(topic2, 1))
+val topicTwoResource = new ResourcePattern(TOPIC, topic2, LITERAL)
+val topic3 = "topic3"
+val allTopicsList = util.Arrays.asList(
+  new TopicPartition(topic1, 0),
+  new TopicPartition(topic2, 0),
+  new TopicPartition(topic2, 1),
+  new TopicPartition(topic3, 0),
+  new TopicPartition(topic3, 1),
+  new TopicPartition(topic3, 2))
+val topicThreeResource = new ResourcePattern(TOPIC, topic3, LITERAL)
+
+// create group to partition map to build batched offsetFetch request
+val groupToPartitionMap = new util.HashMap[String, 
util.List[TopicPartition]]()
+groupToPartitionMap.put(groups(1), topic1List)

Review comment:
   groups(0) to group(4) 

##
File path: core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
##
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
+import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, 
OffsetFetchResponse}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.{BeforeEach, Test}
+
+import java.util
+import java.util.Collections.singletonList
+import scala.jdk.CollectionConverters._
+import java.util.{Optional, Properties}
+
+class OffsetFetchRequestTest extends BaseRequestTest {
+
+  override def brokerCount: Int = 1
+
+  val brokerId: Integer = 0
+  val offset = 15L
+  val leaderEpoch: Optional[Integer] = Optional.of(3)
+  val metadata = "metadata"
+  val topic = "topic"
+  val groupId = "groupId"
+  val groups: Seq[String] = (0 until 5).map(i => 

[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


rajinisivaram commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664880346



##
File path: clients/src/main/resources/common/message/OffsetFetchResponse.json
##
@@ -30,30 +30,57 @@
   // Version 6 is the first flexible version.
   //
   // Version 7 adds pending offset commit as new error response on partition 
level.
-  "validVersions": "0-7",
+  //
+  // Version 8 is adding support for fetching offsets for multiple groups
+  "validVersions": "0-8",
   "flexibleVersions": "6+",
   "fields": [
 { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", 
"ignorable": true,
   "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
-{ "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": 
"0+", 
+{ "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": 
"0-7",
   "about": "The responses per topic.", "fields": [
-  { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+  { "name": "Name", "type": "string", "versions": "0-7", "entityType": 
"topicName",
 "about": "The topic name." },
-  { "name": "Partitions", "type": "[]OffsetFetchResponsePartition", 
"versions": "0+",
+  { "name": "Partitions", "type": "[]OffsetFetchResponsePartition", 
"versions": "0-7",
 "about": "The responses per partition", "fields": [
-{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
+{ "name": "PartitionIndex", "type": "int32", "versions": "0-7",
   "about": "The partition index." },
-{ "name": "CommittedOffset", "type": "int64", "versions": "0+",
+{ "name": "CommittedOffset", "type": "int64", "versions": "0-7",
   "about": "The committed message offset." },
-{ "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5+", 
"default": "-1",
+{ "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5-7", 
"default": "-1",
   "ignorable": true, "about": "The leader epoch." },
-{ "name": "Metadata", "type": "string", "versions": "0+", 
"nullableVersions": "0+",
+{ "name": "Metadata", "type": "string", "versions": "0-7", 
"nullableVersions": "0-7",
   "about": "The partition metadata." },
-{ "name": "ErrorCode", "type": "int16", "versions": "0+",
+{ "name": "ErrorCode", "type": "int16", "versions": "0-7",
   "about": "The error code, or 0 if there was no error." }
   ]}
 ]},
-{ "name": "ErrorCode", "type": "int16", "versions": "2+", "default": "0", 
"ignorable": true,
-  "about": "The top-level error code, or 0 if there was no error." }
+{ "name": "ErrorCode", "type": "int16", "versions": "2-7", "default": "0", 
"ignorable": true,
+  "about": "The top-level error code, or 0 if there was no error." },
+{"name": "GroupIds", "type": "[]OffsetFetchResponseGroup", "versions": 
"8+",

Review comment:
   As with the response, should we call this `Groups` rather than 
`GroupIds`?

##
File path: clients/src/main/resources/common/message/OffsetFetchRequest.json
##
@@ -31,19 +31,33 @@
   // Version 6 is the first flexible version.
   //
   // Version 7 is adding the require stable flag.
-  "validVersions": "0-7",
+  //
+  // Version 8 is adding support for fetching offsets for multiple groups at a 
time
+  "validVersions": "0-8",
   "flexibleVersions": "6+",
   "fields": [
-{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
+{ "name": "GroupId", "type": "string", "versions": "0-7", "entityType": 
"groupId",
   "about": "The group to fetch offsets for." },
-{ "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0+", 
"nullableVersions": "2+",
+{ "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": 
"0-7", "nullableVersions": "2-7",
   "about": "Each topic we would like to fetch offsets for, or null to 
fetch offsets for all topics.", "fields": [
-  { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+  { "name": "Name", "type": "string", "versions": "0-7", "entityType": 
"topicName",
 "about": "The topic name."},
-  { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+",
+  { "name": "PartitionIndexes", "type": "[]int32", "versions": "0-7",
 "about": "The partition indexes we would like to fetch offsets for." }
 ]},
+{ "name": "GroupIds", "type": "[]OffsetFetchRequestGroup", "versions": 
"8+",

Review comment:
   Should we call this `Groups` rather than `GroupIds` since it is not just 
the group id?

##
File path: 
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##
@@ -1358,17 +1367,233 @@ class AuthorizerIntegrationTest extends 
BaseRequestTest {
 // note there's 

[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


rajinisivaram commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664873858



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##
@@ -174,6 +315,28 @@ public boolean isAllPartitions() {
 return data.topics() == ALL_TOPIC_PARTITIONS;
 }
 
+public boolean isAllPartitionsForGroup(String groupId) {
+OffsetFetchRequestGroup group = data
+.groupIds()
+.stream()
+.filter(g -> g.groupId().equals(groupId))
+.collect(toSingleton());
+return group.topics() == ALL_TOPIC_PARTITIONS_BATCH;
+}
+
+// Custom collector to filter a single element
+private  Collector toSingleton() {

Review comment:
   You can probably set the list in `OffsetFetchRequestData` directly in 
the test. But let's leave that for a follow-on PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


rajinisivaram commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664806203



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##
@@ -174,6 +315,28 @@ public boolean isAllPartitions() {
 return data.topics() == ALL_TOPIC_PARTITIONS;
 }
 
+public boolean isAllPartitionsForGroup(String groupId) {
+OffsetFetchRequestGroup group = data
+.groupIds()
+.stream()
+.filter(g -> g.groupId().equals(groupId))
+.collect(toSingleton());
+return group.topics() == ALL_TOPIC_PARTITIONS_BATCH;
+}
+
+// Custom collector to filter a single element
+private  Collector toSingleton() {

Review comment:
   Same test class `OffsetFetchRequestTest.scala`, a test with v8 with a 
batched request where the same group appears twice, perhaps with different 
topics. The response should be either InvalidRequestException because we want 
to treat it as an error OR actual offsets because we handle the request 
correctly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


rajinisivaram commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664804113



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##
@@ -174,6 +315,28 @@ public boolean isAllPartitions() {
 return data.topics() == ALL_TOPIC_PARTITIONS;
 }
 
+public boolean isAllPartitionsForGroup(String groupId) {
+OffsetFetchRequestGroup group = data
+.groupIds()
+.stream()
+.filter(g -> g.groupId().equals(groupId))
+.collect(toSingleton());

Review comment:
   The helper method `toSingleton()` throws IllegalStateException if the 
list size is greater than one. If a request contains the same group twice, it 
can appear twice in the list.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


rajinisivaram commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664798710



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##
@@ -174,6 +315,28 @@ public boolean isAllPartitions() {
 return data.topics() == ALL_TOPIC_PARTITIONS;
 }
 
+public boolean isAllPartitionsForGroup(String groupId) {
+OffsetFetchRequestGroup group = data
+.groupIds()
+.stream()
+.filter(g -> g.groupId().equals(groupId))
+.collect(toSingleton());
+return group.topics() == ALL_TOPIC_PARTITIONS_BATCH;
+}
+
+// Custom collector to filter a single element
+private  Collector toSingleton() {

Review comment:
   Can we a add a test in the new `kafka.server.OffsetFetchRequestTest`? 
Can do that in a separate PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


rajinisivaram commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664796679



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -1308,29 +1308,31 @@ private OffsetFetchResponseHandler() {
 
 @Override
 public void handle(OffsetFetchResponse response, 
RequestFuture> future) {
-if (response.hasError()) {
-Errors error = response.error();
-log.debug("Offset fetch failed: {}", error.message());
+Errors responseError = 
response.groupLevelError(rebalanceConfig.groupId);
+if (responseError != Errors.NONE) {
+log.debug("Offset fetch failed: {}", responseError.message());
 
-if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
 // just retry
-future.raise(error);
-} else if (error == Errors.NOT_COORDINATOR) {
+future.raise(responseError);
+} else if (responseError == Errors.NOT_COORDINATOR) {
 // re-discover the coordinator and retry
-markCoordinatorUnknown(error);
-future.raise(error);
-} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+markCoordinatorUnknown(responseError);
+future.raise(responseError);
+} else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) 
{
 
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
 } else {
-future.raise(new KafkaException("Unexpected error in fetch 
offset response: " + error.message()));
+future.raise(new KafkaException("Unexpected error in fetch 
offset response: " + responseError.message()));
 }
 return;
 }
 
 Set unauthorizedTopics = null;
-Map offsets = new 
HashMap<>(response.responseData().size());
+Map 
responseData =

Review comment:
   ok, let's leave as is.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


rajinisivaram commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664795285



##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java
##
@@ -76,62 +73,169 @@ public void testConstructor() {
 }
 
 for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
-OffsetFetchRequest request = builder.build(version);
-assertFalse(request.isAllPartitions());
-assertEquals(groupId, request.groupId());
-assertEquals(partitions, request.partitions());
-
-OffsetFetchResponse response = 
request.getErrorResponse(throttleTimeMs, Errors.NONE);
-assertEquals(Errors.NONE, response.error());
-assertFalse(response.hasError());
-assertEquals(Collections.singletonMap(Errors.NONE, version <= 
(short) 1 ? 3 : 1), response.errorCounts(),
-"Incorrect error count for version " + version);
-
-if (version <= 1) {
-assertEquals(expectedData, response.responseData());
+if (version < 8) {
+builder = new OffsetFetchRequest.Builder(
+group1,
+false,
+partitions,
+false);
+assertFalse(builder.isAllTopicPartitions());
+OffsetFetchRequest request = builder.build(version);
+assertFalse(request.isAllPartitions());
+assertEquals(group1, request.groupId());
+assertEquals(partitions, request.partitions());
+
+OffsetFetchResponse response = 
request.getErrorResponse(throttleTimeMs, Errors.NONE);
+assertEquals(Errors.NONE, response.error());
+assertFalse(response.hasError());
+assertEquals(Collections.singletonMap(Errors.NONE, version <= 
(short) 1 ? 3 : 1), response.errorCounts(),
+"Incorrect error count for version " + version);
+
+if (version <= 1) {
+assertEquals(expectedData, response.responseDataV0ToV7());
+}
+
+if (version >= 3) {
+assertEquals(throttleTimeMs, response.throttleTimeMs());
+} else {
+assertEquals(DEFAULT_THROTTLE_TIME, 
response.throttleTimeMs());
+}
+} else {
+builder = new Builder(Collections.singletonMap(group1, 
partitions), false, false);
+OffsetFetchRequest request = builder.build(version);
+Map> groupToPartitionMap =
+request.groupIdsToPartitions();
+Map> groupToTopicMap =
+request.groupIdsToTopics();
+assertFalse(request.isAllPartitionsForGroup(group1));
+assertTrue(groupToPartitionMap.containsKey(group1) && 
groupToTopicMap.containsKey(
+group1));
+assertEquals(partitions, groupToPartitionMap.get(group1));
+OffsetFetchResponse response = 
request.getErrorResponse(throttleTimeMs, Errors.NONE);
+assertEquals(Errors.NONE, response.groupLevelError(group1));
+assertFalse(response.groupHasError(group1));
+assertEquals(Collections.singletonMap(Errors.NONE, 1), 
response.errorCounts(),
+"Incorrect error count for version " + version);
+assertEquals(throttleTimeMs, response.throttleTimeMs());
 }
+}
+}
+
+@Test
+public void testConstructorWithMultipleGroups() {
+List topic1Partitions = Arrays.asList(
+new TopicPartition(topicOne, partitionOne),
+new TopicPartition(topicOne, partitionTwo));
+List topic2Partitions = Arrays.asList(
+new TopicPartition(topicTwo, partitionOne),
+new TopicPartition(topicTwo, partitionTwo));
+List topic3Partitions = Arrays.asList(
+new TopicPartition(topicThree, partitionOne),
+new TopicPartition(topicThree, partitionTwo));
+Map> groupToTp = new HashMap<>();
+groupToTp.put(group1, topic1Partitions);
+groupToTp.put(group2, topic2Partitions);
+groupToTp.put(group3, topic3Partitions);
+groupToTp.put(group4, null);
+groupToTp.put(group5, null);
+int throttleTimeMs = 10;
 
-if (version >= 3) {
+for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
+if (version >= 8) {
+builder = new Builder(groupToTp, false, false);
+OffsetFetchRequest request = builder.build(version);
+Map> groupToPartitionMap =
+request.groupIdsToPartitions();
+Map> groupToTopicMap =
+request.groupIdsToTopics();
+assertEquals(groupToTp.keySet(), 

[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


rajinisivaram commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664793097



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##
@@ -78,26 +85,107 @@ boolean isAllTopicPartitions() {
 return this.data.topics() == ALL_TOPIC_PARTITIONS;
 }
 
+public Builder(Map> 
groupIdToTopicPartitionMap,
+boolean requireStable,
+boolean throwOnFetchStableOffsetsUnsupported) {
+super(ApiKeys.OFFSET_FETCH);
+
+List groups = new ArrayList<>();
+for (Entry> entry : 
groupIdToTopicPartitionMap.entrySet()) {
+String groupName = entry.getKey();
+List tpList = entry.getValue();
+final List topics;
+if (tpList != null) {
+Map 
offsetFetchRequestTopicMap =
+new HashMap<>();
+for (TopicPartition topicPartition : tpList) {
+String topicName = topicPartition.topic();
+OffsetFetchRequestTopics topic = 
offsetFetchRequestTopicMap.getOrDefault(
+topicName, new 
OffsetFetchRequestTopics().setName(topicName));
+
topic.partitionIndexes().add(topicPartition.partition());
+offsetFetchRequestTopicMap.put(topicName, topic);
+}
+topics = new 
ArrayList<>(offsetFetchRequestTopicMap.values());
+} else {
+topics = ALL_TOPIC_PARTITIONS_BATCH;
+}
+groups.add(new OffsetFetchRequestGroup()
+.setGroupId(groupName)
+.setTopics(topics));
+}
+this.data = new OffsetFetchRequestData()
+.setGroupIds(groups)
+.setRequireStable(requireStable);
+this.throwOnFetchStableOffsetsUnsupported = 
throwOnFetchStableOffsetsUnsupported;
+}
+
 @Override
 public OffsetFetchRequest build(short version) {
 if (isAllTopicPartitions() && version < 2) {
 throw new UnsupportedVersionException("The broker only 
supports OffsetFetchRequest " +
 "v" + version + ", but we need v2 or newer to request all 
topic partitions.");
 }
-
+if (data.groupIds().size() > 1 && version < 8) {
+throw new NoBatchedOffsetFetchRequestException("Broker does 
not support"
++ " batching groups for fetch offset request on version " 
+ version);
+}
 if (data.requireStable() && version < 7) {
 if (throwOnFetchStableOffsetsUnsupported) {
 throw new UnsupportedVersionException("Broker unexpectedly 
" +
 "doesn't support requireStable flag on version " + 
version);
 } else {
 log.trace("Fallback the requireStable flag to false as 
broker " +
-  "only supports OffsetFetchRequest version 
{}. Need " +
-  "v7 or newer to enable this feature", 
version);
+"only supports OffsetFetchRequest version {}. Need " +
+"v7 or newer to enable this feature", version);
 
 return new 
OffsetFetchRequest(data.setRequireStable(false), version);
 }
 }
-
+if (version < 8) {
+OffsetFetchRequestData oldDataFormat = null;
+if (!data.groupIds().isEmpty()) {
+OffsetFetchRequestGroup group = data.groupIds().get(0);
+String groupName = group.groupId();
+List topics = group.topics();
+List oldFormatTopics = null;
+if (topics != null) {
+oldFormatTopics = topics
+.stream()
+.map(t ->
+new OffsetFetchRequestTopic()
+.setName(t.name())
+.setPartitionIndexes(t.partitionIndexes()))
+.collect(Collectors.toList());
+}
+oldDataFormat = new OffsetFetchRequestData()
+.setGroupId(groupName)
+.setTopics(oldFormatTopics)
+.setRequireStable(data.requireStable());
+}
+return new OffsetFetchRequest(oldDataFormat == null ? data : 
oldDataFormat, version);
+}
+// version 8 but have used old format of request, convert to 
version 8 of request

Review comment:
   At the moment, the code seems to do:
   ```
   if (version < 8) {
do-conversion-if-necessary
   }
   // 

[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


rajinisivaram commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664785624



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -1308,29 +1308,31 @@ private OffsetFetchResponseHandler() {
 
 @Override
 public void handle(OffsetFetchResponse response, 
RequestFuture> future) {
-if (response.hasError()) {
-Errors error = response.error();
-log.debug("Offset fetch failed: {}", error.message());
+Errors responseError = 
response.groupLevelError(rebalanceConfig.groupId);

Review comment:
   we can leave as is.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


rajinisivaram commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r663488495



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -1308,29 +1308,31 @@ private OffsetFetchResponseHandler() {
 
 @Override
 public void handle(OffsetFetchResponse response, 
RequestFuture> future) {
-if (response.hasError()) {
-Errors error = response.error();
-log.debug("Offset fetch failed: {}", error.message());
+Errors responseError = 
response.groupLevelError(rebalanceConfig.groupId);

Review comment:
   We could just call this `error` and then won't require the remaining 
changes below.

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##
@@ -78,26 +85,107 @@ boolean isAllTopicPartitions() {
 return this.data.topics() == ALL_TOPIC_PARTITIONS;
 }
 
+public Builder(Map> 
groupIdToTopicPartitionMap,
+boolean requireStable,

Review comment:
   nit: indentation

##
File path: 
clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
##
@@ -662,20 +669,179 @@ public void testOffsetFetchVersions() throws Exception {
   .setErrorCode(Errors.NOT_COORDINATOR.code())
   .setThrottleTimeMs(10);
 for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
-OffsetFetchResponseData responseData = response.get();
-if (version <= 1) {
-responseData.setErrorCode(Errors.NONE.code());
+if (version < 8) {

Review comment:
   As before, we should have `if version >= 8` as well.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##
@@ -87,12 +87,16 @@ public String apiName() {
 Map failed = new HashMap<>();
 List unmapped = new ArrayList<>();
 
-if (response.error() != Errors.NONE) {
-handleError(groupId, response.error(), failed, unmapped);
+Errors responseError = response.groupLevelError(groupId.idValue);
+if (responseError != Errors.NONE) {
+handleError(groupId, responseError, failed, unmapped);
 } else {
 final Map groupOffsetsListing = 
new HashMap<>();
-for (Map.Entry 
entry :
-response.responseData().entrySet()) {
+// if entry for group level response data is null, we are getting 
back an older version
+// of the response

Review comment:
   Is this comment required?

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1255,81 +1256,143 @@ class KafkaApis(val requestChannel: RequestChannel,
* Handle an offset fetch request
*/
   def handleOffsetFetchRequest(request: RequestChannel.Request): Unit = {
+val version = request.header.apiVersion
+if (version == 0) {
+  // reading offsets from ZK
+  handleOffsetFetchRequestV0(request)
+} else if (version >= 1 && version <= 7) {
+  // reading offsets from Kafka
+  handleOffsetFetchRequestBetweenV1AndV7(request)
+} else {
+  // batching offset reads for multiple groups starts with version 8 and 
greater
+  handleOffsetFetchRequestV8AndAbove(request)
+}
+  }
+
+  private def handleOffsetFetchRequestV0(request: RequestChannel.Request): 
Unit = {
 val header = request.header
 val offsetFetchRequest = request.body[OffsetFetchRequest]
 
-def partitionByAuthorized(seq: Seq[TopicPartition]): (Seq[TopicPartition], 
Seq[TopicPartition]) =
-  authHelper.partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, 
seq)(_.topic)
-
 def createResponse(requestThrottleMs: Int): AbstractResponse = {
   val offsetFetchResponse =
-// reject the request if not authorized to the group
+  // reject the request if not authorized to the group

Review comment:
   nit: revert indentation change

##
File path: 
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##
@@ -1358,17 +1367,241 @@ class AuthorizerIntegrationTest extends 
BaseRequestTest {
 // note there's only one broker, so no need to lookup the group coordinator
 
 // without describe permission on the topic, we shouldn't be able to fetch 
offsets
-val offsetFetchRequest = new requests.OffsetFetchRequest.Builder(group, 
false, null, false).build()
+val offsetFetchRequest = createOffsetFetchRequestAllPartitions
 var offsetFetchResponse = 
connectAndReceive[OffsetFetchResponse](offsetFetchRequest)
-assertEquals(Errors.NONE, offsetFetchResponse.error)
-assertTrue(offsetFetchResponse.responseData.isEmpty)
+assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(group))
+

[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-05 Thread GitBox


rajinisivaram commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r663488495



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -1308,29 +1308,31 @@ private OffsetFetchResponseHandler() {
 
 @Override
 public void handle(OffsetFetchResponse response, 
RequestFuture> future) {
-if (response.hasError()) {
-Errors error = response.error();
-log.debug("Offset fetch failed: {}", error.message());
+Errors responseError = 
response.groupLevelError(rebalanceConfig.groupId);

Review comment:
   We could just call this `error` and then won't require the remaining 
changes below.

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##
@@ -78,26 +85,107 @@ boolean isAllTopicPartitions() {
 return this.data.topics() == ALL_TOPIC_PARTITIONS;
 }
 
+public Builder(Map> 
groupIdToTopicPartitionMap,
+boolean requireStable,

Review comment:
   nit: indentation

##
File path: 
clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
##
@@ -662,20 +669,179 @@ public void testOffsetFetchVersions() throws Exception {
   .setErrorCode(Errors.NOT_COORDINATOR.code())
   .setThrottleTimeMs(10);
 for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
-OffsetFetchResponseData responseData = response.get();
-if (version <= 1) {
-responseData.setErrorCode(Errors.NONE.code());
+if (version < 8) {

Review comment:
   As before, we should have `if version >= 8` as well.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##
@@ -87,12 +87,16 @@ public String apiName() {
 Map failed = new HashMap<>();
 List unmapped = new ArrayList<>();
 
-if (response.error() != Errors.NONE) {
-handleError(groupId, response.error(), failed, unmapped);
+Errors responseError = response.groupLevelError(groupId.idValue);
+if (responseError != Errors.NONE) {
+handleError(groupId, responseError, failed, unmapped);
 } else {
 final Map groupOffsetsListing = 
new HashMap<>();
-for (Map.Entry 
entry :
-response.responseData().entrySet()) {
+// if entry for group level response data is null, we are getting 
back an older version
+// of the response

Review comment:
   Is this comment required?

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1255,81 +1256,143 @@ class KafkaApis(val requestChannel: RequestChannel,
* Handle an offset fetch request
*/
   def handleOffsetFetchRequest(request: RequestChannel.Request): Unit = {
+val version = request.header.apiVersion
+if (version == 0) {
+  // reading offsets from ZK
+  handleOffsetFetchRequestV0(request)
+} else if (version >= 1 && version <= 7) {
+  // reading offsets from Kafka
+  handleOffsetFetchRequestBetweenV1AndV7(request)
+} else {
+  // batching offset reads for multiple groups starts with version 8 and 
greater
+  handleOffsetFetchRequestV8AndAbove(request)
+}
+  }
+
+  private def handleOffsetFetchRequestV0(request: RequestChannel.Request): 
Unit = {
 val header = request.header
 val offsetFetchRequest = request.body[OffsetFetchRequest]
 
-def partitionByAuthorized(seq: Seq[TopicPartition]): (Seq[TopicPartition], 
Seq[TopicPartition]) =
-  authHelper.partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, 
seq)(_.topic)
-
 def createResponse(requestThrottleMs: Int): AbstractResponse = {
   val offsetFetchResponse =
-// reject the request if not authorized to the group
+  // reject the request if not authorized to the group

Review comment:
   nit: revert indentation change

##
File path: 
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##
@@ -1358,17 +1367,241 @@ class AuthorizerIntegrationTest extends 
BaseRequestTest {
 // note there's only one broker, so no need to lookup the group coordinator
 
 // without describe permission on the topic, we shouldn't be able to fetch 
offsets
-val offsetFetchRequest = new requests.OffsetFetchRequest.Builder(group, 
false, null, false).build()
+val offsetFetchRequest = createOffsetFetchRequestAllPartitions
 var offsetFetchResponse = 
connectAndReceive[OffsetFetchResponse](offsetFetchRequest)
-assertEquals(Errors.NONE, offsetFetchResponse.error)
-assertTrue(offsetFetchResponse.responseData.isEmpty)
+assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(group))
+

[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-04 Thread GitBox


rajinisivaram commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r663485656



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
##
@@ -214,6 +321,10 @@ public Errors error() {
 return responseData;
 }
 
+public Map responseData(String groupId) {

Review comment:
   Yes, sounds good.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-04 Thread GitBox


rajinisivaram commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r663485547



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
##
@@ -154,14 +166,88 @@ public OffsetFetchResponse(int throttleTimeMs, Errors 
error, Map errors, Map> responseData) {

Review comment:
   The other constructor without throttle time is for versions which didn't 
have throttle time. For newer constructors that will be used with newer 
versions, we shouldn't need that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-04 Thread GitBox


rajinisivaram commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r663485187



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##
@@ -73,7 +73,10 @@ public String apiName() {
 public OffsetFetchRequest.Builder buildRequest(int coordinatorId, 
Set keys) {
 // Set the flag to false as for admin client request,
 // we don't need to wait for any pending offset state to clear.
-return new OffsetFetchRequest.Builder(groupId.idValue, false, 
partitions, false);

Review comment:
   We can leave the admin client changes for the next PR to keep the 
changes in this one small for 3.0.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-03 Thread GitBox


rajinisivaram commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r663350976



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
##
@@ -73,7 +73,10 @@ public String apiName() {
 public OffsetFetchRequest.Builder buildRequest(int coordinatorId, 
Set keys) {
 // Set the flag to false as for admin client request,
 // we don't need to wait for any pending offset state to clear.
-return new OffsetFetchRequest.Builder(groupId.idValue, false, 
partitions, false);

Review comment:
   Given that single groupid is a common pattern, we could just retain the 
old constructor as well.

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##
@@ -174,6 +319,10 @@ public boolean isAllPartitions() {
 return data.topics() == ALL_TOPIC_PARTITIONS;
 }
 
+public List isAllPartitionsForGroup() {

Review comment:
   Name of method suggests we are returning a boolean, but we are returning 
null. Why do we need a public method that always returns null?

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
##
@@ -65,6 +69,8 @@
 
 private final OffsetFetchResponseData data;
 private final Errors error;
+private final Map groupLevelErrors = new HashMap<>();
+private final Map> 
groupToPartitionData = new HashMap<>();

Review comment:
   Why are we caching these when we have them in `data`?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -1308,29 +1308,41 @@ private OffsetFetchResponseHandler() {
 
 @Override
 public void handle(OffsetFetchResponse response, 
RequestFuture> future) {
-if (response.hasError()) {
-Errors error = response.error();
-log.debug("Offset fetch failed: {}", error.message());
+Errors responseError = response.error();
+// check if error is null, if it is we are dealing with v8 response
+if (responseError == null) {

Review comment:
   We can move this to the response object, otherwise it is duplicated from 
above.

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##
@@ -68,36 +74,117 @@ public Builder(String groupId,
 }
 
 this.data = new OffsetFetchRequestData()
-.setGroupId(groupId)
-.setRequireStable(requireStable)
-.setTopics(topics);
+.setGroupId(groupId)
+.setRequireStable(requireStable)
+.setTopics(topics);

Review comment:
   nit: unnecessary indentation change

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -1308,29 +1308,41 @@ private OffsetFetchResponseHandler() {
 
 @Override
 public void handle(OffsetFetchResponse response, 
RequestFuture> future) {
-if (response.hasError()) {
-Errors error = response.error();
-log.debug("Offset fetch failed: {}", error.message());
+Errors responseError = response.error();
+// check if error is null, if it is we are dealing with v8 response
+if (responseError == null) {
+if (response.groupHasError(rebalanceConfig.groupId)) {
+responseError = 
response.groupLevelError(rebalanceConfig.groupId);
+} else {
+responseError = Errors.NONE;
+}
+}
+if (responseError != Errors.NONE) {
+log.debug("Offset fetch failed: {}", responseError.message());
 
-if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
 // just retry
-future.raise(error);
-} else if (error == Errors.NOT_COORDINATOR) {
+future.raise(responseError);
+} else if (responseError == Errors.NOT_COORDINATOR) {
 // re-discover the coordinator and retry
-markCoordinatorUnknown(error);
-future.raise(error);
-} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+markCoordinatorUnknown(responseError);
+future.raise(responseError);
+} else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) 
{
 
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
 } else {
-future.raise(new KafkaException("Unexpected error in