abbccdda commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r463682515



##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4068,6 +4093,58 @@ public void testListOffsetsMetadataNonRetriableErrors() 
throws Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsPartialResponse() throws Exception {

Review comment:
       Good coverage

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -949,13 +953,27 @@ public void onFailure(RuntimeException e) {
                             leader, tp);
                     partitionsToRetry.add(tp);
                 } else {
-                    partitionDataMap.put(tp, new 
ListOffsetRequest.PartitionData(offset, leaderAndEpoch.epoch));
+                    int currentLeaderEpoch = 
leaderAndEpoch.epoch.orElse(ListOffsetResponse.UNKNOWN_EPOCH);
+                    partitionDataMap.put(tp, new ListOffsetPartition()
+                            .setPartitionIndex(tp.partition())
+                            .setTimestamp(offset)
+                            .setCurrentLeaderEpoch(currentLeaderEpoch));
                 }
             }
         }
         return regroupPartitionMapByNode(partitionDataMap);
     }
 
+    private static List<ListOffsetTopic> 
toListOffsetTopics(Map<TopicPartition, ListOffsetPartition> timestampsToSearch) 
{

Review comment:
       Let's move this helper into `ListOffsetRequest`

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
##########
@@ -47,96 +42,11 @@
     public static final int CONSUMER_REPLICA_ID = -1;
     public static final int DEBUGGING_REPLICA_ID = -2;
 
-    // top level fields
-    private static final Field.Int32 REPLICA_ID = new Field.Int32("replica_id",
-            "Broker id of the follower. For normal consumers, use -1.");
-    private static final Field.Int8 ISOLATION_LEVEL = new 
Field.Int8("isolation_level",
-            "This setting controls the visibility of transactional records. " +
-                    "Using READ_UNCOMMITTED (isolation_level = 0) makes all 
records visible. With READ_COMMITTED " +
-                    "(isolation_level = 1), non-transactional and COMMITTED 
transactional records are visible. " +
-                    "To be more concrete, READ_COMMITTED returns all data from 
offsets smaller than the current " +
-                    "LSO (last stable offset), and enables the inclusion of 
the list of aborted transactions in the " +
-                    "result, which allows consumers to discard ABORTED 
transactional records");
-    private static final Field.ComplexArray TOPICS = new 
Field.ComplexArray("topics",
-            "Topics to list offsets.");
-
-    // topic level fields
-    private static final Field.ComplexArray PARTITIONS = new 
Field.ComplexArray("partitions",
-            "Partitions to list offsets.");
-
-    // partition level fields
-    private static final Field.Int64 TIMESTAMP = new Field.Int64("timestamp",
-            "The target timestamp for the partition.");
-    private static final Field.Int32 MAX_NUM_OFFSETS = new 
Field.Int32("max_num_offsets",
-            "Maximum offsets to return.");
-
-    private static final Field PARTITIONS_V0 = PARTITIONS.withFields(
-            PARTITION_ID,
-            TIMESTAMP,
-            MAX_NUM_OFFSETS);
-
-    private static final Field TOPICS_V0 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V0);
-
-    private static final Schema LIST_OFFSET_REQUEST_V0 = new Schema(
-            REPLICA_ID,
-            TOPICS_V0);
-
-    // V1 removes max_num_offsets
-    private static final Field PARTITIONS_V1 = PARTITIONS.withFields(
-            PARTITION_ID,
-            TIMESTAMP);
-
-    private static final Field TOPICS_V1 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V1);
-
-    private static final Schema LIST_OFFSET_REQUEST_V1 = new Schema(
-            REPLICA_ID,
-            TOPICS_V1);
-
-    // V2 adds a field for the isolation level
-    private static final Schema LIST_OFFSET_REQUEST_V2 = new Schema(
-            REPLICA_ID,
-            ISOLATION_LEVEL,
-            TOPICS_V1);
-
-    // V3 bump used to indicate that on quota violation brokers send out 
responses before throttling.
-    private static final Schema LIST_OFFSET_REQUEST_V3 = 
LIST_OFFSET_REQUEST_V2;
-
-    // V4 introduces the current leader epoch, which is used for fencing
-    private static final Field PARTITIONS_V4 = PARTITIONS.withFields(
-            PARTITION_ID,
-            CURRENT_LEADER_EPOCH,
-            TIMESTAMP);
-
-    private static final Field TOPICS_V4 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V4);
-
-    private static final Schema LIST_OFFSET_REQUEST_V4 = new Schema(
-            REPLICA_ID,
-            ISOLATION_LEVEL,
-            TOPICS_V4);
-
-    // V5 bump to include new possible error code (OFFSET_NOT_AVAILABLE)
-    private static final Schema LIST_OFFSET_REQUEST_V5 = 
LIST_OFFSET_REQUEST_V4;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[] {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1, 
LIST_OFFSET_REQUEST_V2,
-            LIST_OFFSET_REQUEST_V3, LIST_OFFSET_REQUEST_V4, 
LIST_OFFSET_REQUEST_V5};
-    }
-
-    private final int replicaId;
-    private final IsolationLevel isolationLevel;
-    private final Map<TopicPartition, PartitionData> partitionTimestamps;
+    private final ListOffsetRequestData data;
     private final Set<TopicPartition> duplicatePartitions;

Review comment:
       I think this check is redundant and should be removed, otherwise we 
should have it for all general RPCs with topic => partition structure.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
##########
@@ -156,145 +66,98 @@ private Builder(short oldestAllowedVersion,
                         int replicaId,
                         IsolationLevel isolationLevel) {
             super(ApiKeys.LIST_OFFSETS, oldestAllowedVersion, 
latestAllowedVersion);
-            this.replicaId = replicaId;
-            this.isolationLevel = isolationLevel;
+            data = new ListOffsetRequestData()
+                    .setIsolationLevel(isolationLevel.id())

Review comment:
       nit: space 4 after `=`

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -892,136 +894,175 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleListOffsetRequest(request: RequestChannel.Request): Unit = {
     val version = request.header.apiVersion
 
-    val mergedResponseMap = if (version == 0)
+    val topics = if (version == 0)
       handleListOffsetRequestV0(request)
     else
       handleListOffsetRequestV1AndAbove(request)
 
-    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
+    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(new ListOffsetResponseData()
+      .setThrottleTimeMs(requestThrottleMs)
+      .setTopics(topics.asJava)))
   }
 
-  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
Map[TopicPartition, ListOffsetResponse.PartitionData] = {
+  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
List[ListOffsetTopicResponse] = {
     val correlationId = request.header.correlationId
     val clientId = request.header.clientId
     val offsetRequest = request.body[ListOffsetRequest]
 
-    val partitionTimestamps = offsetRequest.partitionTimestamps.asScala
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, partitionTimestamps)(_.topic)
-
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, 
Seq.empty[JLong].asJava)
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      try {
-        val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
-          topicPartition = topicPartition,
-          timestamp = partitionData.timestamp,
-          maxNumOffsets = partitionData.maxNumOffsets,
-          isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
-          fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
-        (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, 
offsets.map(JLong.valueOf).asJava))
-      } catch {
-        // NOTE: UnknownTopicOrPartitionException and 
NotLeaderForPartitionException are special cased since these error messages
-        // are typically transient and there is no value in logging the entire 
stack trace for the same
-        case e @ (_ : UnknownTopicOrPartitionException |
-                  _ : NotLeaderForPartitionException |
-                  _ : KafkaStorageException) =>
-          debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
-            correlationId, clientId, topicPartition, e.getMessage))
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-        case e: Throwable =>
-          error("Error while responding to offset request", e)
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-      }
-    }
-    responseMap ++ unauthorizedResponseStatus
-  }
-
-  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] 
= {
-    val correlationId = request.header.correlationId
-    val clientId = request.header.clientId
-    val offsetRequest = request.body[ListOffsetRequest]
-
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic)
-
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
-        ListOffsetResponse.UNKNOWN_TIMESTAMP,
-        ListOffsetResponse.UNKNOWN_OFFSET,
-        Optional.empty())
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
-        debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
-            s"failed because the partition is duplicated in the request.")
-        (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST,
-          ListOffsetResponse.UNKNOWN_TIMESTAMP,
-          ListOffsetResponse.UNKNOWN_OFFSET,
-          Optional.empty()))
-      } else {
-
-        def buildErrorResponse(e: Errors): (TopicPartition, 
ListOffsetResponse.PartitionData) = {
-          (topicPartition, new ListOffsetResponse.PartitionData(
-            e,
-            ListOffsetResponse.UNKNOWN_TIMESTAMP,
-            ListOffsetResponse.UNKNOWN_OFFSET,
-            Optional.empty()))
-        }
-
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, 
offsetRequest.topics.asScala.toSeq)(_.name)
+
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
+      new ListOffsetTopicResponse()
+        .setName(topic.name)
+        .setPartitions(topic.partitions.asScala.map(partition =>
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+            .setOldStyleOffsets(Seq.empty[JLong].asJava)).asJava)
+    )
+
+    val responseTopics = authorizedRequestInfo.map { topic =>
+      val responsePartitions = topic.partitions.asScala.map { partition =>
+        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
         try {
-          val fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID
-          val isClientRequest = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID
-          val isolationLevelOpt = if (isClientRequest)
-            Some(offsetRequest.isolationLevel)
-          else
-            None
-
-          val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition,
-            partitionData.timestamp,
-            isolationLevelOpt,
-            partitionData.currentLeaderEpoch,
-            fetchOnlyFromLeader)
-
-          val response = foundOpt match {
-            case Some(found) =>
-              new ListOffsetResponse.PartitionData(Errors.NONE, 
found.timestamp, found.offset, found.leaderEpoch)
-            case None =>
-              new ListOffsetResponse.PartitionData(Errors.NONE, 
ListOffsetResponse.UNKNOWN_TIMESTAMP,
-                ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty())
-          }
-          (topicPartition, response)
+          val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
+            topicPartition = topicPartition,
+            timestamp = partition.timestamp,
+            maxNumOffsets = partition.maxNumOffsets,
+            isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
+            fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.NONE.code)
+            .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava)
         } catch {
-          // NOTE: These exceptions are special cased since these error 
messages are typically transient or the client
-          // would have received a clear exception and there is no value in 
logging the entire stack trace for the same
+          // NOTE: UnknownTopicOrPartitionException and 
NotLeaderForPartitionException are special cased since these error messages
+          // are typically transient and there is no value in logging the 
entire stack trace for the same
           case e @ (_ : UnknownTopicOrPartitionException |
                     _ : NotLeaderForPartitionException |
-                    _ : UnknownLeaderEpochException |
-                    _ : FencedLeaderEpochException |
-                    _ : KafkaStorageException |
-                    _ : UnsupportedForMessageFormatException) =>
-            debug(s"Offset request with correlation id $correlationId from 
client $clientId on " +
-                s"partition $topicPartition failed due to ${e.getMessage}")
-            buildErrorResponse(Errors.forException(e))
-
-          // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE
-          case e: OffsetNotAvailableException =>
-            if(request.header.apiVersion >= 5) {
-              buildErrorResponse(Errors.forException(e))
-            } else {
-              buildErrorResponse(Errors.LEADER_NOT_AVAILABLE)
-            }
-
+                    _ : KafkaStorageException) =>
+            debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
+              correlationId, clientId, topicPartition, e.getMessage))
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(Errors.forException(e).code)
+              .setOldStyleOffsets(List[JLong]().asJava)
           case e: Throwable =>
             error("Error while responding to offset request", e)
-            buildErrorResponse(Errors.forException(e))
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(Errors.forException(e).code)
+              .setOldStyleOffsets(List[JLong]().asJava)
+        }
+      }
+      new 
ListOffsetTopicResponse().setName(topic.name).setPartitions(responsePartitions.asJava)
+    }
+    (responseTopics ++ unauthorizedResponseStatus).toList
+  }
+
+  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): List[ListOffsetTopicResponse] = {
+    val correlationId = request.header.correlationId
+    val clientId = request.header.clientId
+    val offsetRequest = request.body[ListOffsetRequest]
+
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, 
offsetRequest.topics.asScala.toSeq)(_.name)
+
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
+      new ListOffsetTopicResponse()
+        .setName(topic.name)
+        .setPartitions(topic.partitions.asScala.map(partition =>
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+            .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
+            .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)).asJava)
+    )
+
+    val responseTopics = authorizedRequestInfo.map { topic =>
+      val responsePartitions = topic.partitions.asScala.map { partition =>
+        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
+        if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
+          debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
+              s"failed because the partition is duplicated in the request.")
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.INVALID_REQUEST.code)
+            .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
+            .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)
+        } else {
+  
+          def buildErrorResponse(e: Errors): ListOffsetPartitionResponse = {
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(e.code)
+              .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
+              .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)
+          }
+  
+          try {
+            val fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID
+            val isClientRequest = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID
+            val isolationLevelOpt = if (isClientRequest)
+              Some(offsetRequest.isolationLevel)
+            else
+              None
+  
+            val foundOpt = 
replicaManager.fetchOffsetForTimestamp(topicPartition,
+              partition.timestamp,
+              isolationLevelOpt,
+              if (partition.currentLeaderEpoch == 
ListOffsetResponse.UNKNOWN_EPOCH) Optional.empty() else 
Optional.of(partition.currentLeaderEpoch),

Review comment:
       What do you mean here? @dajac 

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1831,4 +1831,5 @@ class ReplicaManager(val config: KafkaConfig,
 
     controller.electLeaders(partitions, electionType, electionCallback)
   }
+

Review comment:
       nit: remove unnecessary empty line

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -3973,25 +3977,38 @@ void handleResponse(AbstractResponse abstractResponse) {
                     ListOffsetResponse response = (ListOffsetResponse) 
abstractResponse;
                     Map<TopicPartition, OffsetSpec> retryTopicPartitionOffsets 
= new HashMap<>();
 
-                    for (Entry<TopicPartition, PartitionData> result : 
response.responseData().entrySet()) {
-                        TopicPartition tp = result.getKey();
-                        PartitionData partitionData = result.getValue();
-
-                        KafkaFutureImpl<ListOffsetsResultInfo> future = 
futures.get(tp);
-                        Errors error = partitionData.error;
-                        OffsetSpec offsetRequestSpec = 
topicPartitionOffsets.get(tp);
-                        if (offsetRequestSpec == null) {
-                            future.completeExceptionally(new 
KafkaException("Unexpected topic partition " + tp + " in broker response!"));
-                        } else if 
(MetadataOperationContext.shouldRefreshMetadata(error)) {
-                            retryTopicPartitionOffsets.put(tp, 
offsetRequestSpec);
-                        } else if (error == Errors.NONE) {
-                            future.complete(new 
ListOffsetsResultInfo(partitionData.offset, partitionData.timestamp, 
partitionData.leaderEpoch));
-                        } else {
-                            future.completeExceptionally(error.exception());
+                    for (ListOffsetTopicResponse topic : response.topics()) {
+                        for (ListOffsetPartitionResponse partition : 
topic.partitions()) {
+                            TopicPartition tp = new 
TopicPartition(topic.name(), partition.partitionIndex());
+                            KafkaFutureImpl<ListOffsetsResultInfo> future = 
futures.get(tp);
+                            Errors error = 
Errors.forCode(partition.errorCode());
+                            OffsetSpec offsetRequestSpec = 
topicPartitionOffsets.get(tp);
+                            if (offsetRequestSpec == null) {
+                                log.warn("Server response mentioned unknown 
topic partition {}", tp);
+                            } else if 
(MetadataOperationContext.shouldRefreshMetadata(error)) {
+                                retryTopicPartitionOffsets.put(tp, 
offsetRequestSpec);
+                            } else if (error == Errors.NONE) {
+                                Optional<Integer> leaderEpoch = 
(partition.leaderEpoch() == ListOffsetResponse.UNKNOWN_EPOCH)
+                                        ? Optional.empty()
+                                        : Optional.of(partition.leaderEpoch());
+                                future.complete(new 
ListOffsetsResultInfo(partition.offset(), partition.timestamp(), leaderEpoch));
+                            } else {
+                                
future.completeExceptionally(error.exception());
+                            }
                         }
                     }
 
-                    if (!retryTopicPartitionOffsets.isEmpty()) {
+                    if (retryTopicPartitionOffsets.isEmpty()) {

Review comment:
       Could we use `completeUnrealizedFutures` here?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -949,13 +953,27 @@ public void onFailure(RuntimeException e) {
                             leader, tp);
                     partitionsToRetry.add(tp);
                 } else {
-                    partitionDataMap.put(tp, new 
ListOffsetRequest.PartitionData(offset, leaderAndEpoch.epoch));
+                    int currentLeaderEpoch = 
leaderAndEpoch.epoch.orElse(ListOffsetResponse.UNKNOWN_EPOCH);
+                    partitionDataMap.put(tp, new ListOffsetPartition()
+                            .setPartitionIndex(tp.partition())
+                            .setTimestamp(offset)
+                            .setCurrentLeaderEpoch(currentLeaderEpoch));
                 }
             }
         }
         return regroupPartitionMapByNode(partitionDataMap);
     }
 
+    private static List<ListOffsetTopic> 
toListOffsetTopics(Map<TopicPartition, ListOffsetPartition> timestampsToSearch) 
{
+        Map<String, ListOffsetTopic> topics = new HashMap<>();
+        for (Map.Entry<TopicPartition, ListOffsetPartition> entry : 
timestampsToSearch.entrySet()) {
+            TopicPartition tp = entry.getKey();
+            ListOffsetTopic topic = topics.computeIfAbsent(tp.topic(), k -> 
new ListOffsetTopic().setName(tp.topic()));
+            topic.partitions().add(entry.getValue());
+        }
+        return new ArrayList<ListOffsetTopic>(topics.values());

Review comment:
       nit: replace with <>

##########
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -1251,15 +1267,28 @@ private ListOffsetRequest createListOffsetRequest(int 
version) {
 
     private ListOffsetResponse createListOffsetResponse(int version) {
         if (version == 0) {
-            Map<TopicPartition, ListOffsetResponse.PartitionData> responseData 
= new HashMap<>();
-            responseData.put(new TopicPartition("test", 0),
-                    new ListOffsetResponse.PartitionData(Errors.NONE, 
asList(100L)));
-            return new ListOffsetResponse(responseData);
+            ListOffsetResponseData data = new ListOffsetResponseData()

Review comment:
       Seems that this data construction logic has been reused in elsewhere 
(`FetcherTest`), we could get a helper like 
   ```
   ListOffsetResponseData getSingletonResponseV0(TopicPartition, Errors, 
OldStyleOffsets);
   ListOffsetResponseData getSingletonResponseV0(TopicPartition, Errors, 
Timestamp, Offset, leaderEpoch);
   ```
   in the ListOffsetResponse to reuse.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
##########
@@ -156,145 +66,98 @@ private Builder(short oldestAllowedVersion,
                         int replicaId,
                         IsolationLevel isolationLevel) {
             super(ApiKeys.LIST_OFFSETS, oldestAllowedVersion, 
latestAllowedVersion);
-            this.replicaId = replicaId;
-            this.isolationLevel = isolationLevel;
+            data = new ListOffsetRequestData()
+                    .setIsolationLevel(isolationLevel.id())
+                    .setReplicaId(replicaId);
         }
 
-        public Builder setTargetTimes(Map<TopicPartition, PartitionData> 
partitionTimestamps) {
-            this.partitionTimestamps = partitionTimestamps;
+        public Builder setTargetTimes(List<ListOffsetTopic> topics) {
+            data.setTopics(topics);
             return this;
         }
 
         @Override
         public ListOffsetRequest build(short version) {
-            return new ListOffsetRequest(replicaId, partitionTimestamps, 
isolationLevel, version);
+            return new ListOffsetRequest(version, data);
         }
 
         @Override
         public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(type=ListOffsetRequest")
-               .append(", replicaId=").append(replicaId);
-            if (partitionTimestamps != null) {
-                bld.append(", 
partitionTimestamps=").append(partitionTimestamps);
-            }
-            bld.append(", isolationLevel=").append(isolationLevel);
-            bld.append(")");
-            return bld.toString();
-        }
-    }
-
-    public static final class PartitionData {
-        public final long timestamp;
-        public final int maxNumOffsets; // only supported in v0
-        public final Optional<Integer> currentLeaderEpoch;
-
-        private PartitionData(long timestamp, int maxNumOffsets, 
Optional<Integer> currentLeaderEpoch) {
-            this.timestamp = timestamp;
-            this.maxNumOffsets = maxNumOffsets;
-            this.currentLeaderEpoch = currentLeaderEpoch;
-        }
-
-        // For V0
-        public PartitionData(long timestamp, int maxNumOffsets) {
-            this(timestamp, maxNumOffsets, Optional.empty());
-        }
-
-        public PartitionData(long timestamp, Optional<Integer> 
currentLeaderEpoch) {
-            this(timestamp, 1, currentLeaderEpoch);
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (!(obj instanceof PartitionData)) return false;
-            PartitionData other = (PartitionData) obj;
-            return this.timestamp == other.timestamp &&
-                this.currentLeaderEpoch.equals(other.currentLeaderEpoch);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(timestamp, currentLeaderEpoch);
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("{timestamp: ").append(timestamp).
-                    append(", maxNumOffsets: ").append(maxNumOffsets).
-                    append(", currentLeaderEpoch: 
").append(currentLeaderEpoch).
-                    append("}");
-            return bld.toString();
+            return data.toString();
         }
     }
 
     /**
      * Private constructor with a specified version.
      */
-    private ListOffsetRequest(int replicaId,
-                              Map<TopicPartition, PartitionData> targetTimes,
-                              IsolationLevel isolationLevel,
-                              short version) {
+    private ListOffsetRequest(short version, ListOffsetRequestData data) {
         super(ApiKeys.LIST_OFFSETS, version);
-        this.replicaId = replicaId;
-        this.isolationLevel = isolationLevel;
-        this.partitionTimestamps = targetTimes;
+        this.data = data;
         this.duplicatePartitions = Collections.emptySet();
     }
 
     public ListOffsetRequest(Struct struct, short version) {
         super(ApiKeys.LIST_OFFSETS, version);
-        Set<TopicPartition> duplicatePartitions = new HashSet<>();
-        replicaId = struct.get(REPLICA_ID);
-        isolationLevel = struct.hasField(ISOLATION_LEVEL) ?
-                IsolationLevel.forId(struct.get(ISOLATION_LEVEL)) :
-                IsolationLevel.READ_UNCOMMITTED;
-        partitionTimestamps = new HashMap<>();
-        for (Object topicResponseObj : struct.get(TOPICS)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.get(TOPIC_NAME);
-            for (Object partitionResponseObj : topicResponse.get(PARTITIONS)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.get(PARTITION_ID);
-                long timestamp = partitionResponse.get(TIMESTAMP);
-                TopicPartition tp = new TopicPartition(topic, partition);
-
-                int maxNumOffsets = 
partitionResponse.getOrElse(MAX_NUM_OFFSETS, 1);
-                Optional<Integer> currentLeaderEpoch = 
RequestUtils.getLeaderEpoch(partitionResponse, CURRENT_LEADER_EPOCH);
-                PartitionData partitionData = new PartitionData(timestamp, 
maxNumOffsets, currentLeaderEpoch);
-                if (partitionTimestamps.put(tp, partitionData) != null)
+        data = new ListOffsetRequestData(struct, version);
+        duplicatePartitions = new HashSet<>();
+        Set<TopicPartition> partitions = new HashSet<>();
+        for (ListOffsetTopic topic : data.topics()) {
+            for (ListOffsetPartition partition : topic.partitions()) {
+                TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+                if (!partitions.add(tp)) {
                     duplicatePartitions.add(tp);
+                }
             }
         }
-        this.duplicatePartitions = duplicatePartitions;
     }
 
     @Override
-    @SuppressWarnings("deprecation")
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = 
new HashMap<>();
         short versionId = version();
-
-        ListOffsetResponse.PartitionData partitionError = versionId == 0 ?
-                new ListOffsetResponse.PartitionData(Errors.forException(e), 
Collections.emptyList()) :
-                new ListOffsetResponse.PartitionData(Errors.forException(e), 
-1L, -1L, Optional.empty());
-        for (TopicPartition partition : partitionTimestamps.keySet()) {
-            responseData.put(partition, partitionError);
+        short errorCode = Errors.forException(e).code();
+
+        List<ListOffsetTopicResponse> responses = new ArrayList<>();
+        for (ListOffsetTopic topic : data.topics()) {
+            ListOffsetTopicResponse topicResponse = new 
ListOffsetTopicResponse().setName(topic.name());
+            List<ListOffsetPartitionResponse> partitions = new ArrayList<>();
+            for (ListOffsetPartition partition : topic.partitions()) {
+                ListOffsetPartitionResponse partitionresponse = new 
ListOffsetPartitionResponse()
+                        .setErrorCode(errorCode)
+                        .setPartitionIndex(partition.partitionIndex());
+                if (versionId == 0) {
+                    
partitionresponse.setOldStyleOffsets(Collections.emptyList());
+                } else {
+                    
partitionresponse.setOffset(ListOffsetResponse.UNKNOWN_OFFSET)
+                                     
.setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP);
+                    if (versionId >= 4) {
+                        
partitionresponse.setLeaderEpoch(ListOffsetResponse.UNKNOWN_EPOCH);
+                    }
+                }
+                partitions.add(partitionresponse);
+            }
+            topicResponse.setPartitions(partitions);
+            responses.add(topicResponse);
         }
+        ListOffsetResponseData reponseData = new ListOffsetResponseData()

Review comment:
       s/`reponseData`/`responseData`

##########
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -1221,28 +1226,39 @@ private DeleteGroupsResponse 
createDeleteGroupsResponse() {
 
     private ListOffsetRequest createListOffsetRequest(int version) {
         if (version == 0) {
-            Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = 
Collections.singletonMap(
-                    new TopicPartition("test", 0),
-                    new ListOffsetRequest.PartitionData(1000000L, 10));
+            ListOffsetTopic topic = new ListOffsetTopic()

Review comment:
       Similar for topic request topic construction, let me know if you think 
we could refactor out a helper like `singletonRequestData(...)` in 
`ListOffsetRequest`

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
##########
@@ -156,145 +66,98 @@ private Builder(short oldestAllowedVersion,
                         int replicaId,
                         IsolationLevel isolationLevel) {
             super(ApiKeys.LIST_OFFSETS, oldestAllowedVersion, 
latestAllowedVersion);
-            this.replicaId = replicaId;
-            this.isolationLevel = isolationLevel;
+            data = new ListOffsetRequestData()
+                    .setIsolationLevel(isolationLevel.id())
+                    .setReplicaId(replicaId);
         }
 
-        public Builder setTargetTimes(Map<TopicPartition, PartitionData> 
partitionTimestamps) {
-            this.partitionTimestamps = partitionTimestamps;
+        public Builder setTargetTimes(List<ListOffsetTopic> topics) {
+            data.setTopics(topics);
             return this;
         }
 
         @Override
         public ListOffsetRequest build(short version) {
-            return new ListOffsetRequest(replicaId, partitionTimestamps, 
isolationLevel, version);
+            return new ListOffsetRequest(version, data);
         }
 
         @Override
         public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(type=ListOffsetRequest")
-               .append(", replicaId=").append(replicaId);
-            if (partitionTimestamps != null) {
-                bld.append(", 
partitionTimestamps=").append(partitionTimestamps);
-            }
-            bld.append(", isolationLevel=").append(isolationLevel);
-            bld.append(")");
-            return bld.toString();
-        }
-    }
-
-    public static final class PartitionData {
-        public final long timestamp;
-        public final int maxNumOffsets; // only supported in v0
-        public final Optional<Integer> currentLeaderEpoch;
-
-        private PartitionData(long timestamp, int maxNumOffsets, 
Optional<Integer> currentLeaderEpoch) {
-            this.timestamp = timestamp;
-            this.maxNumOffsets = maxNumOffsets;
-            this.currentLeaderEpoch = currentLeaderEpoch;
-        }
-
-        // For V0
-        public PartitionData(long timestamp, int maxNumOffsets) {
-            this(timestamp, maxNumOffsets, Optional.empty());
-        }
-
-        public PartitionData(long timestamp, Optional<Integer> 
currentLeaderEpoch) {
-            this(timestamp, 1, currentLeaderEpoch);
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (!(obj instanceof PartitionData)) return false;
-            PartitionData other = (PartitionData) obj;
-            return this.timestamp == other.timestamp &&
-                this.currentLeaderEpoch.equals(other.currentLeaderEpoch);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(timestamp, currentLeaderEpoch);
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("{timestamp: ").append(timestamp).
-                    append(", maxNumOffsets: ").append(maxNumOffsets).
-                    append(", currentLeaderEpoch: 
").append(currentLeaderEpoch).
-                    append("}");
-            return bld.toString();
+            return data.toString();
         }
     }
 
     /**
      * Private constructor with a specified version.
      */
-    private ListOffsetRequest(int replicaId,
-                              Map<TopicPartition, PartitionData> targetTimes,
-                              IsolationLevel isolationLevel,
-                              short version) {
+    private ListOffsetRequest(short version, ListOffsetRequestData data) {
         super(ApiKeys.LIST_OFFSETS, version);
-        this.replicaId = replicaId;
-        this.isolationLevel = isolationLevel;
-        this.partitionTimestamps = targetTimes;
+        this.data = data;
         this.duplicatePartitions = Collections.emptySet();
     }
 
     public ListOffsetRequest(Struct struct, short version) {
         super(ApiKeys.LIST_OFFSETS, version);
-        Set<TopicPartition> duplicatePartitions = new HashSet<>();
-        replicaId = struct.get(REPLICA_ID);
-        isolationLevel = struct.hasField(ISOLATION_LEVEL) ?
-                IsolationLevel.forId(struct.get(ISOLATION_LEVEL)) :
-                IsolationLevel.READ_UNCOMMITTED;
-        partitionTimestamps = new HashMap<>();
-        for (Object topicResponseObj : struct.get(TOPICS)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.get(TOPIC_NAME);
-            for (Object partitionResponseObj : topicResponse.get(PARTITIONS)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.get(PARTITION_ID);
-                long timestamp = partitionResponse.get(TIMESTAMP);
-                TopicPartition tp = new TopicPartition(topic, partition);
-
-                int maxNumOffsets = 
partitionResponse.getOrElse(MAX_NUM_OFFSETS, 1);
-                Optional<Integer> currentLeaderEpoch = 
RequestUtils.getLeaderEpoch(partitionResponse, CURRENT_LEADER_EPOCH);
-                PartitionData partitionData = new PartitionData(timestamp, 
maxNumOffsets, currentLeaderEpoch);
-                if (partitionTimestamps.put(tp, partitionData) != null)
+        data = new ListOffsetRequestData(struct, version);
+        duplicatePartitions = new HashSet<>();
+        Set<TopicPartition> partitions = new HashSet<>();
+        for (ListOffsetTopic topic : data.topics()) {
+            for (ListOffsetPartition partition : topic.partitions()) {
+                TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+                if (!partitions.add(tp)) {
                     duplicatePartitions.add(tp);
+                }
             }
         }
-        this.duplicatePartitions = duplicatePartitions;
     }
 
     @Override
-    @SuppressWarnings("deprecation")
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = 
new HashMap<>();
         short versionId = version();
-
-        ListOffsetResponse.PartitionData partitionError = versionId == 0 ?
-                new ListOffsetResponse.PartitionData(Errors.forException(e), 
Collections.emptyList()) :
-                new ListOffsetResponse.PartitionData(Errors.forException(e), 
-1L, -1L, Optional.empty());
-        for (TopicPartition partition : partitionTimestamps.keySet()) {
-            responseData.put(partition, partitionError);
+        short errorCode = Errors.forException(e).code();
+
+        List<ListOffsetTopicResponse> responses = new ArrayList<>();
+        for (ListOffsetTopic topic : data.topics()) {
+            ListOffsetTopicResponse topicResponse = new 
ListOffsetTopicResponse().setName(topic.name());
+            List<ListOffsetPartitionResponse> partitions = new ArrayList<>();
+            for (ListOffsetPartition partition : topic.partitions()) {
+                ListOffsetPartitionResponse partitionresponse = new 
ListOffsetPartitionResponse()

Review comment:
       s/`partitionresponse`/`partitionResponse`

##########
File path: 
clients/src/test/java/org/apache/kafka/common/requests/ListOffsetRequestTest.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.ListOffsetRequestData;
+import 
org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition;
+import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetTopic;
+import org.apache.kafka.common.message.ListOffsetResponseData;
+import 
org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse;
+import 
org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.Test;
+
+public class ListOffsetRequestTest {
+
+    @Test
+    public void testDuplicatePartitions() {
+        List<ListOffsetTopic> topics = Arrays.asList(
+                new ListOffsetTopic()
+                    .setName("topic")
+                    .setPartitions(Arrays.asList(
+                            new ListOffsetPartition()
+                                .setPartitionIndex(0),
+                            new ListOffsetPartition()
+                                .setPartitionIndex(0))));
+        ListOffsetRequestData data = new ListOffsetRequestData()
+                .setTopics(topics)
+                .setReplicaId(-1);
+        ListOffsetRequest request = new 
ListOffsetRequest(data.toStruct((short) 0), (short) 0);
+        assertEquals(Collections.singleton(new TopicPartition("topic", 0)), 
request.duplicatePartitions());
+    }
+
+    @Test
+    public void testGetErrorResponse() {

Review comment:
       We could iterate through v1 to v5 here to test every case.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
##########
@@ -156,145 +66,98 @@ private Builder(short oldestAllowedVersion,
                         int replicaId,
                         IsolationLevel isolationLevel) {
             super(ApiKeys.LIST_OFFSETS, oldestAllowedVersion, 
latestAllowedVersion);
-            this.replicaId = replicaId;
-            this.isolationLevel = isolationLevel;
+            data = new ListOffsetRequestData()
+                    .setIsolationLevel(isolationLevel.id())
+                    .setReplicaId(replicaId);
         }
 
-        public Builder setTargetTimes(Map<TopicPartition, PartitionData> 
partitionTimestamps) {
-            this.partitionTimestamps = partitionTimestamps;
+        public Builder setTargetTimes(List<ListOffsetTopic> topics) {
+            data.setTopics(topics);
             return this;
         }
 
         @Override
         public ListOffsetRequest build(short version) {
-            return new ListOffsetRequest(replicaId, partitionTimestamps, 
isolationLevel, version);
+            return new ListOffsetRequest(version, data);
         }
 
         @Override
         public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(type=ListOffsetRequest")
-               .append(", replicaId=").append(replicaId);
-            if (partitionTimestamps != null) {
-                bld.append(", 
partitionTimestamps=").append(partitionTimestamps);
-            }
-            bld.append(", isolationLevel=").append(isolationLevel);
-            bld.append(")");
-            return bld.toString();
-        }
-    }
-
-    public static final class PartitionData {
-        public final long timestamp;
-        public final int maxNumOffsets; // only supported in v0
-        public final Optional<Integer> currentLeaderEpoch;
-
-        private PartitionData(long timestamp, int maxNumOffsets, 
Optional<Integer> currentLeaderEpoch) {
-            this.timestamp = timestamp;
-            this.maxNumOffsets = maxNumOffsets;
-            this.currentLeaderEpoch = currentLeaderEpoch;
-        }
-
-        // For V0
-        public PartitionData(long timestamp, int maxNumOffsets) {
-            this(timestamp, maxNumOffsets, Optional.empty());
-        }
-
-        public PartitionData(long timestamp, Optional<Integer> 
currentLeaderEpoch) {
-            this(timestamp, 1, currentLeaderEpoch);
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (!(obj instanceof PartitionData)) return false;
-            PartitionData other = (PartitionData) obj;
-            return this.timestamp == other.timestamp &&
-                this.currentLeaderEpoch.equals(other.currentLeaderEpoch);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(timestamp, currentLeaderEpoch);
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("{timestamp: ").append(timestamp).
-                    append(", maxNumOffsets: ").append(maxNumOffsets).
-                    append(", currentLeaderEpoch: 
").append(currentLeaderEpoch).
-                    append("}");
-            return bld.toString();
+            return data.toString();
         }
     }
 
     /**
      * Private constructor with a specified version.
      */
-    private ListOffsetRequest(int replicaId,
-                              Map<TopicPartition, PartitionData> targetTimes,
-                              IsolationLevel isolationLevel,
-                              short version) {
+    private ListOffsetRequest(short version, ListOffsetRequestData data) {
         super(ApiKeys.LIST_OFFSETS, version);
-        this.replicaId = replicaId;
-        this.isolationLevel = isolationLevel;
-        this.partitionTimestamps = targetTimes;
+        this.data = data;
         this.duplicatePartitions = Collections.emptySet();
     }
 
     public ListOffsetRequest(Struct struct, short version) {
         super(ApiKeys.LIST_OFFSETS, version);
-        Set<TopicPartition> duplicatePartitions = new HashSet<>();
-        replicaId = struct.get(REPLICA_ID);
-        isolationLevel = struct.hasField(ISOLATION_LEVEL) ?
-                IsolationLevel.forId(struct.get(ISOLATION_LEVEL)) :
-                IsolationLevel.READ_UNCOMMITTED;
-        partitionTimestamps = new HashMap<>();
-        for (Object topicResponseObj : struct.get(TOPICS)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.get(TOPIC_NAME);
-            for (Object partitionResponseObj : topicResponse.get(PARTITIONS)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.get(PARTITION_ID);
-                long timestamp = partitionResponse.get(TIMESTAMP);
-                TopicPartition tp = new TopicPartition(topic, partition);
-
-                int maxNumOffsets = 
partitionResponse.getOrElse(MAX_NUM_OFFSETS, 1);
-                Optional<Integer> currentLeaderEpoch = 
RequestUtils.getLeaderEpoch(partitionResponse, CURRENT_LEADER_EPOCH);
-                PartitionData partitionData = new PartitionData(timestamp, 
maxNumOffsets, currentLeaderEpoch);
-                if (partitionTimestamps.put(tp, partitionData) != null)
+        data = new ListOffsetRequestData(struct, version);
+        duplicatePartitions = new HashSet<>();
+        Set<TopicPartition> partitions = new HashSet<>();
+        for (ListOffsetTopic topic : data.topics()) {
+            for (ListOffsetPartition partition : topic.partitions()) {
+                TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+                if (!partitions.add(tp)) {
                     duplicatePartitions.add(tp);
+                }
             }
         }
-        this.duplicatePartitions = duplicatePartitions;
     }
 
     @Override
-    @SuppressWarnings("deprecation")
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = 
new HashMap<>();
         short versionId = version();
-
-        ListOffsetResponse.PartitionData partitionError = versionId == 0 ?
-                new ListOffsetResponse.PartitionData(Errors.forException(e), 
Collections.emptyList()) :
-                new ListOffsetResponse.PartitionData(Errors.forException(e), 
-1L, -1L, Optional.empty());
-        for (TopicPartition partition : partitionTimestamps.keySet()) {
-            responseData.put(partition, partitionError);
+        short errorCode = Errors.forException(e).code();
+
+        List<ListOffsetTopicResponse> responses = new ArrayList<>();
+        for (ListOffsetTopic topic : data.topics()) {
+            ListOffsetTopicResponse topicResponse = new 
ListOffsetTopicResponse().setName(topic.name());
+            List<ListOffsetPartitionResponse> partitions = new ArrayList<>();
+            for (ListOffsetPartition partition : topic.partitions()) {
+                ListOffsetPartitionResponse partitionresponse = new 
ListOffsetPartitionResponse()
+                        .setErrorCode(errorCode)
+                        .setPartitionIndex(partition.partitionIndex());
+                if (versionId == 0) {
+                    
partitionresponse.setOldStyleOffsets(Collections.emptyList());
+                } else {
+                    
partitionresponse.setOffset(ListOffsetResponse.UNKNOWN_OFFSET)
+                                     
.setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP);
+                    if (versionId >= 4) {
+                        
partitionresponse.setLeaderEpoch(ListOffsetResponse.UNKNOWN_EPOCH);

Review comment:
       Is this necessary? The leader epoch is -1 by default.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -910,136 +913,163 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleListOffsetRequest(request: RequestChannel.Request): Unit = {
     val version = request.header.apiVersion
 
-    val mergedResponseMap = if (version == 0)
+    val topics = if (version == 0)
       handleListOffsetRequestV0(request)
     else
       handleListOffsetRequestV1AndAbove(request)
 
-    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
+    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(new ListOffsetResponseData()
+      .setThrottleTimeMs(requestThrottleMs)
+      .setTopics(topics.asJava)))
   }
 
-  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
Map[TopicPartition, ListOffsetResponse.PartitionData] = {
+  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
List[ListOffsetTopicResponse] = {
     val correlationId = request.header.correlationId
     val clientId = request.header.clientId
     val offsetRequest = request.body[ListOffsetRequest]
 
-    val partitionTimestamps = offsetRequest.partitionTimestamps.asScala
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, partitionTimestamps)(_.topic)
-
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, 
Seq.empty[JLong].asJava)
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      try {
-        val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
-          topicPartition = topicPartition,
-          timestamp = partitionData.timestamp,
-          maxNumOffsets = partitionData.maxNumOffsets,
-          isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
-          fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
-        (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, 
offsets.map(JLong.valueOf).asJava))
-      } catch {
-        // NOTE: UnknownTopicOrPartitionException and 
NotLeaderOrFollowerException are special cased since these error messages
-        // are typically transient and there is no value in logging the entire 
stack trace for the same
-        case e @ (_ : UnknownTopicOrPartitionException |
-                  _ : NotLeaderOrFollowerException |
-                  _ : KafkaStorageException) =>
-          debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
-            correlationId, clientId, topicPartition, e.getMessage))
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-        case e: Throwable =>
-          error("Error while responding to offset request", e)
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-      }
-    }
-    responseMap ++ unauthorizedResponseStatus
-  }
-
-  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] 
= {
-    val correlationId = request.header.correlationId
-    val clientId = request.header.clientId
-    val offsetRequest = request.body[ListOffsetRequest]
-
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic)
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionSeqByAuthorized(request.context,
+        DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name)
 
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
-        ListOffsetResponse.UNKNOWN_TIMESTAMP,
-        ListOffsetResponse.UNKNOWN_OFFSET,
-        Optional.empty())
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
-        debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
-            s"failed because the partition is duplicated in the request.")
-        (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST,
-          ListOffsetResponse.UNKNOWN_TIMESTAMP,
-          ListOffsetResponse.UNKNOWN_OFFSET,
-          Optional.empty()))
-      } else {
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
+      new ListOffsetTopicResponse()
+        .setName(topic.name)
+        .setPartitions(topic.partitions.asScala.map(partition =>
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)).asJava)
+    )
 
-        def buildErrorResponse(e: Errors): (TopicPartition, 
ListOffsetResponse.PartitionData) = {
-          (topicPartition, new ListOffsetResponse.PartitionData(
-            e,
-            ListOffsetResponse.UNKNOWN_TIMESTAMP,
-            ListOffsetResponse.UNKNOWN_OFFSET,
-            Optional.empty()))
-        }
+    val responseTopics = authorizedRequestInfo.map { topic =>
+      val responsePartitions = topic.partitions.asScala.map { partition =>
+        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
 
         try {
-          val fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID
-          val isClientRequest = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID
-          val isolationLevelOpt = if (isClientRequest)
-            Some(offsetRequest.isolationLevel)
-          else
-            None
-
-          val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition,
-            partitionData.timestamp,
-            isolationLevelOpt,
-            partitionData.currentLeaderEpoch,
-            fetchOnlyFromLeader)
-
-          val response = foundOpt match {
-            case Some(found) =>
-              new ListOffsetResponse.PartitionData(Errors.NONE, 
found.timestamp, found.offset, found.leaderEpoch)
-            case None =>
-              new ListOffsetResponse.PartitionData(Errors.NONE, 
ListOffsetResponse.UNKNOWN_TIMESTAMP,
-                ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty())
-          }
-          (topicPartition, response)
+          val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
+            topicPartition = topicPartition,
+            timestamp = partition.timestamp,
+            maxNumOffsets = partition.maxNumOffsets,
+            isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
+            fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.NONE.code)
+            .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava)
         } catch {
-          // NOTE: These exceptions are special cased since these error 
messages are typically transient or the client
-          // would have received a clear exception and there is no value in 
logging the entire stack trace for the same
+          // NOTE: UnknownTopicOrPartitionException and 
NotLeaderOrFollowerException are special cased since these error messages
+          // are typically transient and there is no value in logging the 
entire stack trace for the same
           case e @ (_ : UnknownTopicOrPartitionException |
                     _ : NotLeaderOrFollowerException |
-                    _ : UnknownLeaderEpochException |
-                    _ : FencedLeaderEpochException |
-                    _ : KafkaStorageException |
-                    _ : UnsupportedForMessageFormatException) =>
-            debug(s"Offset request with correlation id $correlationId from 
client $clientId on " +
-                s"partition $topicPartition failed due to ${e.getMessage}")
-            buildErrorResponse(Errors.forException(e))
-
-          // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE
-          case e: OffsetNotAvailableException =>
-            if(request.header.apiVersion >= 5) {
-              buildErrorResponse(Errors.forException(e))
-            } else {
-              buildErrorResponse(Errors.LEADER_NOT_AVAILABLE)
-            }
-
+                    _ : KafkaStorageException) =>
+            debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
+              correlationId, clientId, topicPartition, e.getMessage))
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(Errors.forException(e).code)
           case e: Throwable =>
             error("Error while responding to offset request", e)
-            buildErrorResponse(Errors.forException(e))
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(Errors.forException(e).code)
+        }
+      }
+      new 
ListOffsetTopicResponse().setName(topic.name).setPartitions(responsePartitions.asJava)
+    }
+    (responseTopics ++ unauthorizedResponseStatus).toList
+  }
+
+  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): List[ListOffsetTopicResponse] = {
+    val correlationId = request.header.correlationId
+    val clientId = request.header.clientId
+    val offsetRequest = request.body[ListOffsetRequest]
+
+    def buildErrorResponse(e: Errors, partition: ListOffsetPartition): 
ListOffsetPartitionResponse = {
+      new ListOffsetPartitionResponse()
+        .setPartitionIndex(partition.partitionIndex)
+        .setErrorCode(e.code)
+        .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
+        .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)
+    }
+
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionSeqByAuthorized(request.context,
+        DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name)
+
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
+      new ListOffsetTopicResponse()
+        .setName(topic.name)
+        .setPartitions(topic.partitions.asScala.map(partition =>
+          buildErrorResponse(Errors.TOPIC_AUTHORIZATION_FAILED, 
partition)).asJava)
+    )
+
+    val responseTopics = authorizedRequestInfo.map { topic =>
+      val responsePartitions = topic.partitions.asScala.map { partition =>
+        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
+        if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
+          debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
+              s"failed because the partition is duplicated in the request.")
+          buildErrorResponse(Errors.INVALID_REQUEST, partition)
+        } else {
+
+          try {
+            val fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID
+            val isClientRequest = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID
+            val isolationLevelOpt = if (isClientRequest)
+              Some(offsetRequest.isolationLevel)
+            else
+              None
+
+            val foundOpt = 
replicaManager.fetchOffsetForTimestamp(topicPartition,
+              partition.timestamp,
+              isolationLevelOpt,
+              if (partition.currentLeaderEpoch == 
ListOffsetResponse.UNKNOWN_EPOCH) Optional.empty() else 
Optional.of(partition.currentLeaderEpoch),
+              fetchOnlyFromLeader)
+
+            val response = foundOpt match {

Review comment:
       Redundant braces.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/requests/ListOffsetRequestTest.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.ListOffsetRequestData;
+import 
org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition;
+import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetTopic;
+import org.apache.kafka.common.message.ListOffsetResponseData;
+import 
org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse;
+import 
org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.Test;
+
+public class ListOffsetRequestTest {
+
+    @Test
+    public void testDuplicatePartitions() {
+        List<ListOffsetTopic> topics = Arrays.asList(

Review comment:
       `Arrays.asList` could be replaced with `Collections.singletonList`

##########
File path: 
clients/src/test/java/org/apache/kafka/common/requests/ListOffsetRequestTest.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.ListOffsetRequestData;
+import 
org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition;
+import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetTopic;
+import org.apache.kafka.common.message.ListOffsetResponseData;
+import 
org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse;
+import 
org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.Test;
+
+public class ListOffsetRequestTest {

Review comment:
       We should also add a test in `MessageTest` for the automated struct




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to