abbccdda commented on a change in pull request #8295: URL: https://github.com/apache/kafka/pull/8295#discussion_r463682515
########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -4068,6 +4093,58 @@ public void testListOffsetsMetadataNonRetriableErrors() throws Exception { } } + @Test + public void testListOffsetsPartialResponse() throws Exception { Review comment: Good coverage ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -949,13 +953,27 @@ public void onFailure(RuntimeException e) { leader, tp); partitionsToRetry.add(tp); } else { - partitionDataMap.put(tp, new ListOffsetRequest.PartitionData(offset, leaderAndEpoch.epoch)); + int currentLeaderEpoch = leaderAndEpoch.epoch.orElse(ListOffsetResponse.UNKNOWN_EPOCH); + partitionDataMap.put(tp, new ListOffsetPartition() + .setPartitionIndex(tp.partition()) + .setTimestamp(offset) + .setCurrentLeaderEpoch(currentLeaderEpoch)); } } } return regroupPartitionMapByNode(partitionDataMap); } + private static List<ListOffsetTopic> toListOffsetTopics(Map<TopicPartition, ListOffsetPartition> timestampsToSearch) { Review comment: Let's move this helper into `ListOffsetRequest` ########## File path: clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java ########## @@ -47,96 +42,11 @@ public static final int CONSUMER_REPLICA_ID = -1; public static final int DEBUGGING_REPLICA_ID = -2; - // top level fields - private static final Field.Int32 REPLICA_ID = new Field.Int32("replica_id", - "Broker id of the follower. For normal consumers, use -1."); - private static final Field.Int8 ISOLATION_LEVEL = new Field.Int8("isolation_level", - "This setting controls the visibility of transactional records. " + - "Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED " + - "(isolation_level = 1), non-transactional and COMMITTED transactional records are visible. " + - "To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current " + - "LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the " + - "result, which allows consumers to discard ABORTED transactional records"); - private static final Field.ComplexArray TOPICS = new Field.ComplexArray("topics", - "Topics to list offsets."); - - // topic level fields - private static final Field.ComplexArray PARTITIONS = new Field.ComplexArray("partitions", - "Partitions to list offsets."); - - // partition level fields - private static final Field.Int64 TIMESTAMP = new Field.Int64("timestamp", - "The target timestamp for the partition."); - private static final Field.Int32 MAX_NUM_OFFSETS = new Field.Int32("max_num_offsets", - "Maximum offsets to return."); - - private static final Field PARTITIONS_V0 = PARTITIONS.withFields( - PARTITION_ID, - TIMESTAMP, - MAX_NUM_OFFSETS); - - private static final Field TOPICS_V0 = TOPICS.withFields( - TOPIC_NAME, - PARTITIONS_V0); - - private static final Schema LIST_OFFSET_REQUEST_V0 = new Schema( - REPLICA_ID, - TOPICS_V0); - - // V1 removes max_num_offsets - private static final Field PARTITIONS_V1 = PARTITIONS.withFields( - PARTITION_ID, - TIMESTAMP); - - private static final Field TOPICS_V1 = TOPICS.withFields( - TOPIC_NAME, - PARTITIONS_V1); - - private static final Schema LIST_OFFSET_REQUEST_V1 = new Schema( - REPLICA_ID, - TOPICS_V1); - - // V2 adds a field for the isolation level - private static final Schema LIST_OFFSET_REQUEST_V2 = new Schema( - REPLICA_ID, - ISOLATION_LEVEL, - TOPICS_V1); - - // V3 bump used to indicate that on quota violation brokers send out responses before throttling. - private static final Schema LIST_OFFSET_REQUEST_V3 = LIST_OFFSET_REQUEST_V2; - - // V4 introduces the current leader epoch, which is used for fencing - private static final Field PARTITIONS_V4 = PARTITIONS.withFields( - PARTITION_ID, - CURRENT_LEADER_EPOCH, - TIMESTAMP); - - private static final Field TOPICS_V4 = TOPICS.withFields( - TOPIC_NAME, - PARTITIONS_V4); - - private static final Schema LIST_OFFSET_REQUEST_V4 = new Schema( - REPLICA_ID, - ISOLATION_LEVEL, - TOPICS_V4); - - // V5 bump to include new possible error code (OFFSET_NOT_AVAILABLE) - private static final Schema LIST_OFFSET_REQUEST_V5 = LIST_OFFSET_REQUEST_V4; - - public static Schema[] schemaVersions() { - return new Schema[] {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1, LIST_OFFSET_REQUEST_V2, - LIST_OFFSET_REQUEST_V3, LIST_OFFSET_REQUEST_V4, LIST_OFFSET_REQUEST_V5}; - } - - private final int replicaId; - private final IsolationLevel isolationLevel; - private final Map<TopicPartition, PartitionData> partitionTimestamps; + private final ListOffsetRequestData data; private final Set<TopicPartition> duplicatePartitions; Review comment: I think this check is redundant and should be removed, otherwise we should have it for all general RPCs with topic => partition structure. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java ########## @@ -156,145 +66,98 @@ private Builder(short oldestAllowedVersion, int replicaId, IsolationLevel isolationLevel) { super(ApiKeys.LIST_OFFSETS, oldestAllowedVersion, latestAllowedVersion); - this.replicaId = replicaId; - this.isolationLevel = isolationLevel; + data = new ListOffsetRequestData() + .setIsolationLevel(isolationLevel.id()) Review comment: nit: space 4 after `=` ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -892,136 +894,175 @@ class KafkaApis(val requestChannel: RequestChannel, def handleListOffsetRequest(request: RequestChannel.Request): Unit = { val version = request.header.apiVersion - val mergedResponseMap = if (version == 0) + val topics = if (version == 0) handleListOffsetRequestV0(request) else handleListOffsetRequestV1AndAbove(request) - sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava)) + sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(new ListOffsetResponseData() + .setThrottleTimeMs(requestThrottleMs) + .setTopics(topics.asJava))) } - private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = { + private def handleListOffsetRequestV0(request : RequestChannel.Request) : List[ListOffsetTopicResponse] = { val correlationId = request.header.correlationId val clientId = request.header.clientId val offsetRequest = request.body[ListOffsetRequest] - val partitionTimestamps = offsetRequest.partitionTimestamps.asScala - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, partitionTimestamps)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, Seq.empty[JLong].asJava) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - try { - val offsets = replicaManager.legacyFetchOffsetsForTimestamp( - topicPartition = topicPartition, - timestamp = partitionData.timestamp, - maxNumOffsets = partitionData.maxNumOffsets, - isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, - fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, offsets.map(JLong.valueOf).asJava)) - } catch { - // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages - // are typically transient and there is no value in logging the entire stack trace for the same - case e @ (_ : UnknownTopicOrPartitionException | - _ : NotLeaderForPartitionException | - _ : KafkaStorageException) => - debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( - correlationId, clientId, topicPartition, e.getMessage)) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - case e: Throwable => - error("Error while responding to offset request", e) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - } - } - responseMap ++ unauthorizedResponseStatus - } - - private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = { - val correlationId = request.header.correlationId - val clientId = request.header.clientId - val offsetRequest = request.body[ListOffsetRequest] - - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty()) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - if (offsetRequest.duplicatePartitions.contains(topicPartition)) { - debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + - s"failed because the partition is duplicated in the request.") - (topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } else { - - def buildErrorResponse(e: Errors): (TopicPartition, ListOffsetResponse.PartitionData) = { - (topicPartition, new ListOffsetResponse.PartitionData( - e, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } - + val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) + + val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => + new ListOffsetTopicResponse() + .setName(topic.name) + .setPartitions(topic.partitions.asScala.map(partition => + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + .setOldStyleOffsets(Seq.empty[JLong].asJava)).asJava) + ) + + val responseTopics = authorizedRequestInfo.map { topic => + val responsePartitions = topic.partitions.asScala.map { partition => + val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) try { - val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID - val isClientRequest = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID - val isolationLevelOpt = if (isClientRequest) - Some(offsetRequest.isolationLevel) - else - None - - val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition, - partitionData.timestamp, - isolationLevelOpt, - partitionData.currentLeaderEpoch, - fetchOnlyFromLeader) - - val response = foundOpt match { - case Some(found) => - new ListOffsetResponse.PartitionData(Errors.NONE, found.timestamp, found.offset, found.leaderEpoch) - case None => - new ListOffsetResponse.PartitionData(Errors.NONE, ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty()) - } - (topicPartition, response) + val offsets = replicaManager.legacyFetchOffsetsForTimestamp( + topicPartition = topicPartition, + timestamp = partition.timestamp, + maxNumOffsets = partition.maxNumOffsets, + isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, + fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.NONE.code) + .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava) } catch { - // NOTE: These exceptions are special cased since these error messages are typically transient or the client - // would have received a clear exception and there is no value in logging the entire stack trace for the same + // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages + // are typically transient and there is no value in logging the entire stack trace for the same case e @ (_ : UnknownTopicOrPartitionException | _ : NotLeaderForPartitionException | - _ : UnknownLeaderEpochException | - _ : FencedLeaderEpochException | - _ : KafkaStorageException | - _ : UnsupportedForMessageFormatException) => - debug(s"Offset request with correlation id $correlationId from client $clientId on " + - s"partition $topicPartition failed due to ${e.getMessage}") - buildErrorResponse(Errors.forException(e)) - - // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE - case e: OffsetNotAvailableException => - if(request.header.apiVersion >= 5) { - buildErrorResponse(Errors.forException(e)) - } else { - buildErrorResponse(Errors.LEADER_NOT_AVAILABLE) - } - + _ : KafkaStorageException) => + debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( + correlationId, clientId, topicPartition, e.getMessage)) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.forException(e).code) + .setOldStyleOffsets(List[JLong]().asJava) case e: Throwable => error("Error while responding to offset request", e) - buildErrorResponse(Errors.forException(e)) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.forException(e).code) + .setOldStyleOffsets(List[JLong]().asJava) + } + } + new ListOffsetTopicResponse().setName(topic.name).setPartitions(responsePartitions.asJava) + } + (responseTopics ++ unauthorizedResponseStatus).toList + } + + private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): List[ListOffsetTopicResponse] = { + val correlationId = request.header.correlationId + val clientId = request.header.clientId + val offsetRequest = request.body[ListOffsetRequest] + + val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) + + val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => + new ListOffsetTopicResponse() + .setName(topic.name) + .setPartitions(topic.partitions.asScala.map(partition => + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)).asJava) + ) + + val responseTopics = authorizedRequestInfo.map { topic => + val responsePartitions = topic.partitions.asScala.map { partition => + val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) + if (offsetRequest.duplicatePartitions.contains(topicPartition)) { + debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + + s"failed because the partition is duplicated in the request.") + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.INVALID_REQUEST.code) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET) + } else { + + def buildErrorResponse(e: Errors): ListOffsetPartitionResponse = { + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(e.code) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET) + } + + try { + val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID + val isClientRequest = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID + val isolationLevelOpt = if (isClientRequest) + Some(offsetRequest.isolationLevel) + else + None + + val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition, + partition.timestamp, + isolationLevelOpt, + if (partition.currentLeaderEpoch == ListOffsetResponse.UNKNOWN_EPOCH) Optional.empty() else Optional.of(partition.currentLeaderEpoch), Review comment: What do you mean here? @dajac ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -1831,4 +1831,5 @@ class ReplicaManager(val config: KafkaConfig, controller.electLeaders(partitions, electionType, electionCallback) } + Review comment: nit: remove unnecessary empty line ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -3973,25 +3977,38 @@ void handleResponse(AbstractResponse abstractResponse) { ListOffsetResponse response = (ListOffsetResponse) abstractResponse; Map<TopicPartition, OffsetSpec> retryTopicPartitionOffsets = new HashMap<>(); - for (Entry<TopicPartition, PartitionData> result : response.responseData().entrySet()) { - TopicPartition tp = result.getKey(); - PartitionData partitionData = result.getValue(); - - KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp); - Errors error = partitionData.error; - OffsetSpec offsetRequestSpec = topicPartitionOffsets.get(tp); - if (offsetRequestSpec == null) { - future.completeExceptionally(new KafkaException("Unexpected topic partition " + tp + " in broker response!")); - } else if (MetadataOperationContext.shouldRefreshMetadata(error)) { - retryTopicPartitionOffsets.put(tp, offsetRequestSpec); - } else if (error == Errors.NONE) { - future.complete(new ListOffsetsResultInfo(partitionData.offset, partitionData.timestamp, partitionData.leaderEpoch)); - } else { - future.completeExceptionally(error.exception()); + for (ListOffsetTopicResponse topic : response.topics()) { + for (ListOffsetPartitionResponse partition : topic.partitions()) { + TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); + KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp); + Errors error = Errors.forCode(partition.errorCode()); + OffsetSpec offsetRequestSpec = topicPartitionOffsets.get(tp); + if (offsetRequestSpec == null) { + log.warn("Server response mentioned unknown topic partition {}", tp); + } else if (MetadataOperationContext.shouldRefreshMetadata(error)) { + retryTopicPartitionOffsets.put(tp, offsetRequestSpec); + } else if (error == Errors.NONE) { + Optional<Integer> leaderEpoch = (partition.leaderEpoch() == ListOffsetResponse.UNKNOWN_EPOCH) + ? Optional.empty() + : Optional.of(partition.leaderEpoch()); + future.complete(new ListOffsetsResultInfo(partition.offset(), partition.timestamp(), leaderEpoch)); + } else { + future.completeExceptionally(error.exception()); + } } } - if (!retryTopicPartitionOffsets.isEmpty()) { + if (retryTopicPartitionOffsets.isEmpty()) { Review comment: Could we use `completeUnrealizedFutures` here? ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -949,13 +953,27 @@ public void onFailure(RuntimeException e) { leader, tp); partitionsToRetry.add(tp); } else { - partitionDataMap.put(tp, new ListOffsetRequest.PartitionData(offset, leaderAndEpoch.epoch)); + int currentLeaderEpoch = leaderAndEpoch.epoch.orElse(ListOffsetResponse.UNKNOWN_EPOCH); + partitionDataMap.put(tp, new ListOffsetPartition() + .setPartitionIndex(tp.partition()) + .setTimestamp(offset) + .setCurrentLeaderEpoch(currentLeaderEpoch)); } } } return regroupPartitionMapByNode(partitionDataMap); } + private static List<ListOffsetTopic> toListOffsetTopics(Map<TopicPartition, ListOffsetPartition> timestampsToSearch) { + Map<String, ListOffsetTopic> topics = new HashMap<>(); + for (Map.Entry<TopicPartition, ListOffsetPartition> entry : timestampsToSearch.entrySet()) { + TopicPartition tp = entry.getKey(); + ListOffsetTopic topic = topics.computeIfAbsent(tp.topic(), k -> new ListOffsetTopic().setName(tp.topic())); + topic.partitions().add(entry.getValue()); + } + return new ArrayList<ListOffsetTopic>(topics.values()); Review comment: nit: replace with <> ########## File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ########## @@ -1251,15 +1267,28 @@ private ListOffsetRequest createListOffsetRequest(int version) { private ListOffsetResponse createListOffsetResponse(int version) { if (version == 0) { - Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>(); - responseData.put(new TopicPartition("test", 0), - new ListOffsetResponse.PartitionData(Errors.NONE, asList(100L))); - return new ListOffsetResponse(responseData); + ListOffsetResponseData data = new ListOffsetResponseData() Review comment: Seems that this data construction logic has been reused in elsewhere (`FetcherTest`), we could get a helper like ``` ListOffsetResponseData getSingletonResponseV0(TopicPartition, Errors, OldStyleOffsets); ListOffsetResponseData getSingletonResponseV0(TopicPartition, Errors, Timestamp, Offset, leaderEpoch); ``` in the ListOffsetResponse to reuse. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java ########## @@ -156,145 +66,98 @@ private Builder(short oldestAllowedVersion, int replicaId, IsolationLevel isolationLevel) { super(ApiKeys.LIST_OFFSETS, oldestAllowedVersion, latestAllowedVersion); - this.replicaId = replicaId; - this.isolationLevel = isolationLevel; + data = new ListOffsetRequestData() + .setIsolationLevel(isolationLevel.id()) + .setReplicaId(replicaId); } - public Builder setTargetTimes(Map<TopicPartition, PartitionData> partitionTimestamps) { - this.partitionTimestamps = partitionTimestamps; + public Builder setTargetTimes(List<ListOffsetTopic> topics) { + data.setTopics(topics); return this; } @Override public ListOffsetRequest build(short version) { - return new ListOffsetRequest(replicaId, partitionTimestamps, isolationLevel, version); + return new ListOffsetRequest(version, data); } @Override public String toString() { - StringBuilder bld = new StringBuilder(); - bld.append("(type=ListOffsetRequest") - .append(", replicaId=").append(replicaId); - if (partitionTimestamps != null) { - bld.append(", partitionTimestamps=").append(partitionTimestamps); - } - bld.append(", isolationLevel=").append(isolationLevel); - bld.append(")"); - return bld.toString(); - } - } - - public static final class PartitionData { - public final long timestamp; - public final int maxNumOffsets; // only supported in v0 - public final Optional<Integer> currentLeaderEpoch; - - private PartitionData(long timestamp, int maxNumOffsets, Optional<Integer> currentLeaderEpoch) { - this.timestamp = timestamp; - this.maxNumOffsets = maxNumOffsets; - this.currentLeaderEpoch = currentLeaderEpoch; - } - - // For V0 - public PartitionData(long timestamp, int maxNumOffsets) { - this(timestamp, maxNumOffsets, Optional.empty()); - } - - public PartitionData(long timestamp, Optional<Integer> currentLeaderEpoch) { - this(timestamp, 1, currentLeaderEpoch); - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof PartitionData)) return false; - PartitionData other = (PartitionData) obj; - return this.timestamp == other.timestamp && - this.currentLeaderEpoch.equals(other.currentLeaderEpoch); - } - - @Override - public int hashCode() { - return Objects.hash(timestamp, currentLeaderEpoch); - } - - @Override - public String toString() { - StringBuilder bld = new StringBuilder(); - bld.append("{timestamp: ").append(timestamp). - append(", maxNumOffsets: ").append(maxNumOffsets). - append(", currentLeaderEpoch: ").append(currentLeaderEpoch). - append("}"); - return bld.toString(); + return data.toString(); } } /** * Private constructor with a specified version. */ - private ListOffsetRequest(int replicaId, - Map<TopicPartition, PartitionData> targetTimes, - IsolationLevel isolationLevel, - short version) { + private ListOffsetRequest(short version, ListOffsetRequestData data) { super(ApiKeys.LIST_OFFSETS, version); - this.replicaId = replicaId; - this.isolationLevel = isolationLevel; - this.partitionTimestamps = targetTimes; + this.data = data; this.duplicatePartitions = Collections.emptySet(); } public ListOffsetRequest(Struct struct, short version) { super(ApiKeys.LIST_OFFSETS, version); - Set<TopicPartition> duplicatePartitions = new HashSet<>(); - replicaId = struct.get(REPLICA_ID); - isolationLevel = struct.hasField(ISOLATION_LEVEL) ? - IsolationLevel.forId(struct.get(ISOLATION_LEVEL)) : - IsolationLevel.READ_UNCOMMITTED; - partitionTimestamps = new HashMap<>(); - for (Object topicResponseObj : struct.get(TOPICS)) { - Struct topicResponse = (Struct) topicResponseObj; - String topic = topicResponse.get(TOPIC_NAME); - for (Object partitionResponseObj : topicResponse.get(PARTITIONS)) { - Struct partitionResponse = (Struct) partitionResponseObj; - int partition = partitionResponse.get(PARTITION_ID); - long timestamp = partitionResponse.get(TIMESTAMP); - TopicPartition tp = new TopicPartition(topic, partition); - - int maxNumOffsets = partitionResponse.getOrElse(MAX_NUM_OFFSETS, 1); - Optional<Integer> currentLeaderEpoch = RequestUtils.getLeaderEpoch(partitionResponse, CURRENT_LEADER_EPOCH); - PartitionData partitionData = new PartitionData(timestamp, maxNumOffsets, currentLeaderEpoch); - if (partitionTimestamps.put(tp, partitionData) != null) + data = new ListOffsetRequestData(struct, version); + duplicatePartitions = new HashSet<>(); + Set<TopicPartition> partitions = new HashSet<>(); + for (ListOffsetTopic topic : data.topics()) { + for (ListOffsetPartition partition : topic.partitions()) { + TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); + if (!partitions.add(tp)) { duplicatePartitions.add(tp); + } } } - this.duplicatePartitions = duplicatePartitions; } @Override - @SuppressWarnings("deprecation") public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>(); short versionId = version(); - - ListOffsetResponse.PartitionData partitionError = versionId == 0 ? - new ListOffsetResponse.PartitionData(Errors.forException(e), Collections.emptyList()) : - new ListOffsetResponse.PartitionData(Errors.forException(e), -1L, -1L, Optional.empty()); - for (TopicPartition partition : partitionTimestamps.keySet()) { - responseData.put(partition, partitionError); + short errorCode = Errors.forException(e).code(); + + List<ListOffsetTopicResponse> responses = new ArrayList<>(); + for (ListOffsetTopic topic : data.topics()) { + ListOffsetTopicResponse topicResponse = new ListOffsetTopicResponse().setName(topic.name()); + List<ListOffsetPartitionResponse> partitions = new ArrayList<>(); + for (ListOffsetPartition partition : topic.partitions()) { + ListOffsetPartitionResponse partitionresponse = new ListOffsetPartitionResponse() + .setErrorCode(errorCode) + .setPartitionIndex(partition.partitionIndex()); + if (versionId == 0) { + partitionresponse.setOldStyleOffsets(Collections.emptyList()); + } else { + partitionresponse.setOffset(ListOffsetResponse.UNKNOWN_OFFSET) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP); + if (versionId >= 4) { + partitionresponse.setLeaderEpoch(ListOffsetResponse.UNKNOWN_EPOCH); + } + } + partitions.add(partitionresponse); + } + topicResponse.setPartitions(partitions); + responses.add(topicResponse); } + ListOffsetResponseData reponseData = new ListOffsetResponseData() Review comment: s/`reponseData`/`responseData` ########## File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ########## @@ -1221,28 +1226,39 @@ private DeleteGroupsResponse createDeleteGroupsResponse() { private ListOffsetRequest createListOffsetRequest(int version) { if (version == 0) { - Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = Collections.singletonMap( - new TopicPartition("test", 0), - new ListOffsetRequest.PartitionData(1000000L, 10)); + ListOffsetTopic topic = new ListOffsetTopic() Review comment: Similar for topic request topic construction, let me know if you think we could refactor out a helper like `singletonRequestData(...)` in `ListOffsetRequest` ########## File path: clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java ########## @@ -156,145 +66,98 @@ private Builder(short oldestAllowedVersion, int replicaId, IsolationLevel isolationLevel) { super(ApiKeys.LIST_OFFSETS, oldestAllowedVersion, latestAllowedVersion); - this.replicaId = replicaId; - this.isolationLevel = isolationLevel; + data = new ListOffsetRequestData() + .setIsolationLevel(isolationLevel.id()) + .setReplicaId(replicaId); } - public Builder setTargetTimes(Map<TopicPartition, PartitionData> partitionTimestamps) { - this.partitionTimestamps = partitionTimestamps; + public Builder setTargetTimes(List<ListOffsetTopic> topics) { + data.setTopics(topics); return this; } @Override public ListOffsetRequest build(short version) { - return new ListOffsetRequest(replicaId, partitionTimestamps, isolationLevel, version); + return new ListOffsetRequest(version, data); } @Override public String toString() { - StringBuilder bld = new StringBuilder(); - bld.append("(type=ListOffsetRequest") - .append(", replicaId=").append(replicaId); - if (partitionTimestamps != null) { - bld.append(", partitionTimestamps=").append(partitionTimestamps); - } - bld.append(", isolationLevel=").append(isolationLevel); - bld.append(")"); - return bld.toString(); - } - } - - public static final class PartitionData { - public final long timestamp; - public final int maxNumOffsets; // only supported in v0 - public final Optional<Integer> currentLeaderEpoch; - - private PartitionData(long timestamp, int maxNumOffsets, Optional<Integer> currentLeaderEpoch) { - this.timestamp = timestamp; - this.maxNumOffsets = maxNumOffsets; - this.currentLeaderEpoch = currentLeaderEpoch; - } - - // For V0 - public PartitionData(long timestamp, int maxNumOffsets) { - this(timestamp, maxNumOffsets, Optional.empty()); - } - - public PartitionData(long timestamp, Optional<Integer> currentLeaderEpoch) { - this(timestamp, 1, currentLeaderEpoch); - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof PartitionData)) return false; - PartitionData other = (PartitionData) obj; - return this.timestamp == other.timestamp && - this.currentLeaderEpoch.equals(other.currentLeaderEpoch); - } - - @Override - public int hashCode() { - return Objects.hash(timestamp, currentLeaderEpoch); - } - - @Override - public String toString() { - StringBuilder bld = new StringBuilder(); - bld.append("{timestamp: ").append(timestamp). - append(", maxNumOffsets: ").append(maxNumOffsets). - append(", currentLeaderEpoch: ").append(currentLeaderEpoch). - append("}"); - return bld.toString(); + return data.toString(); } } /** * Private constructor with a specified version. */ - private ListOffsetRequest(int replicaId, - Map<TopicPartition, PartitionData> targetTimes, - IsolationLevel isolationLevel, - short version) { + private ListOffsetRequest(short version, ListOffsetRequestData data) { super(ApiKeys.LIST_OFFSETS, version); - this.replicaId = replicaId; - this.isolationLevel = isolationLevel; - this.partitionTimestamps = targetTimes; + this.data = data; this.duplicatePartitions = Collections.emptySet(); } public ListOffsetRequest(Struct struct, short version) { super(ApiKeys.LIST_OFFSETS, version); - Set<TopicPartition> duplicatePartitions = new HashSet<>(); - replicaId = struct.get(REPLICA_ID); - isolationLevel = struct.hasField(ISOLATION_LEVEL) ? - IsolationLevel.forId(struct.get(ISOLATION_LEVEL)) : - IsolationLevel.READ_UNCOMMITTED; - partitionTimestamps = new HashMap<>(); - for (Object topicResponseObj : struct.get(TOPICS)) { - Struct topicResponse = (Struct) topicResponseObj; - String topic = topicResponse.get(TOPIC_NAME); - for (Object partitionResponseObj : topicResponse.get(PARTITIONS)) { - Struct partitionResponse = (Struct) partitionResponseObj; - int partition = partitionResponse.get(PARTITION_ID); - long timestamp = partitionResponse.get(TIMESTAMP); - TopicPartition tp = new TopicPartition(topic, partition); - - int maxNumOffsets = partitionResponse.getOrElse(MAX_NUM_OFFSETS, 1); - Optional<Integer> currentLeaderEpoch = RequestUtils.getLeaderEpoch(partitionResponse, CURRENT_LEADER_EPOCH); - PartitionData partitionData = new PartitionData(timestamp, maxNumOffsets, currentLeaderEpoch); - if (partitionTimestamps.put(tp, partitionData) != null) + data = new ListOffsetRequestData(struct, version); + duplicatePartitions = new HashSet<>(); + Set<TopicPartition> partitions = new HashSet<>(); + for (ListOffsetTopic topic : data.topics()) { + for (ListOffsetPartition partition : topic.partitions()) { + TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); + if (!partitions.add(tp)) { duplicatePartitions.add(tp); + } } } - this.duplicatePartitions = duplicatePartitions; } @Override - @SuppressWarnings("deprecation") public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>(); short versionId = version(); - - ListOffsetResponse.PartitionData partitionError = versionId == 0 ? - new ListOffsetResponse.PartitionData(Errors.forException(e), Collections.emptyList()) : - new ListOffsetResponse.PartitionData(Errors.forException(e), -1L, -1L, Optional.empty()); - for (TopicPartition partition : partitionTimestamps.keySet()) { - responseData.put(partition, partitionError); + short errorCode = Errors.forException(e).code(); + + List<ListOffsetTopicResponse> responses = new ArrayList<>(); + for (ListOffsetTopic topic : data.topics()) { + ListOffsetTopicResponse topicResponse = new ListOffsetTopicResponse().setName(topic.name()); + List<ListOffsetPartitionResponse> partitions = new ArrayList<>(); + for (ListOffsetPartition partition : topic.partitions()) { + ListOffsetPartitionResponse partitionresponse = new ListOffsetPartitionResponse() Review comment: s/`partitionresponse`/`partitionResponse` ########## File path: clients/src/test/java/org/apache/kafka/common/requests/ListOffsetRequestTest.java ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.ListOffsetRequestData; +import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition; +import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetTopic; +import org.apache.kafka.common.message.ListOffsetResponseData; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse; +import org.apache.kafka.common.protocol.Errors; +import org.junit.Test; + +public class ListOffsetRequestTest { + + @Test + public void testDuplicatePartitions() { + List<ListOffsetTopic> topics = Arrays.asList( + new ListOffsetTopic() + .setName("topic") + .setPartitions(Arrays.asList( + new ListOffsetPartition() + .setPartitionIndex(0), + new ListOffsetPartition() + .setPartitionIndex(0)))); + ListOffsetRequestData data = new ListOffsetRequestData() + .setTopics(topics) + .setReplicaId(-1); + ListOffsetRequest request = new ListOffsetRequest(data.toStruct((short) 0), (short) 0); + assertEquals(Collections.singleton(new TopicPartition("topic", 0)), request.duplicatePartitions()); + } + + @Test + public void testGetErrorResponse() { Review comment: We could iterate through v1 to v5 here to test every case. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java ########## @@ -156,145 +66,98 @@ private Builder(short oldestAllowedVersion, int replicaId, IsolationLevel isolationLevel) { super(ApiKeys.LIST_OFFSETS, oldestAllowedVersion, latestAllowedVersion); - this.replicaId = replicaId; - this.isolationLevel = isolationLevel; + data = new ListOffsetRequestData() + .setIsolationLevel(isolationLevel.id()) + .setReplicaId(replicaId); } - public Builder setTargetTimes(Map<TopicPartition, PartitionData> partitionTimestamps) { - this.partitionTimestamps = partitionTimestamps; + public Builder setTargetTimes(List<ListOffsetTopic> topics) { + data.setTopics(topics); return this; } @Override public ListOffsetRequest build(short version) { - return new ListOffsetRequest(replicaId, partitionTimestamps, isolationLevel, version); + return new ListOffsetRequest(version, data); } @Override public String toString() { - StringBuilder bld = new StringBuilder(); - bld.append("(type=ListOffsetRequest") - .append(", replicaId=").append(replicaId); - if (partitionTimestamps != null) { - bld.append(", partitionTimestamps=").append(partitionTimestamps); - } - bld.append(", isolationLevel=").append(isolationLevel); - bld.append(")"); - return bld.toString(); - } - } - - public static final class PartitionData { - public final long timestamp; - public final int maxNumOffsets; // only supported in v0 - public final Optional<Integer> currentLeaderEpoch; - - private PartitionData(long timestamp, int maxNumOffsets, Optional<Integer> currentLeaderEpoch) { - this.timestamp = timestamp; - this.maxNumOffsets = maxNumOffsets; - this.currentLeaderEpoch = currentLeaderEpoch; - } - - // For V0 - public PartitionData(long timestamp, int maxNumOffsets) { - this(timestamp, maxNumOffsets, Optional.empty()); - } - - public PartitionData(long timestamp, Optional<Integer> currentLeaderEpoch) { - this(timestamp, 1, currentLeaderEpoch); - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof PartitionData)) return false; - PartitionData other = (PartitionData) obj; - return this.timestamp == other.timestamp && - this.currentLeaderEpoch.equals(other.currentLeaderEpoch); - } - - @Override - public int hashCode() { - return Objects.hash(timestamp, currentLeaderEpoch); - } - - @Override - public String toString() { - StringBuilder bld = new StringBuilder(); - bld.append("{timestamp: ").append(timestamp). - append(", maxNumOffsets: ").append(maxNumOffsets). - append(", currentLeaderEpoch: ").append(currentLeaderEpoch). - append("}"); - return bld.toString(); + return data.toString(); } } /** * Private constructor with a specified version. */ - private ListOffsetRequest(int replicaId, - Map<TopicPartition, PartitionData> targetTimes, - IsolationLevel isolationLevel, - short version) { + private ListOffsetRequest(short version, ListOffsetRequestData data) { super(ApiKeys.LIST_OFFSETS, version); - this.replicaId = replicaId; - this.isolationLevel = isolationLevel; - this.partitionTimestamps = targetTimes; + this.data = data; this.duplicatePartitions = Collections.emptySet(); } public ListOffsetRequest(Struct struct, short version) { super(ApiKeys.LIST_OFFSETS, version); - Set<TopicPartition> duplicatePartitions = new HashSet<>(); - replicaId = struct.get(REPLICA_ID); - isolationLevel = struct.hasField(ISOLATION_LEVEL) ? - IsolationLevel.forId(struct.get(ISOLATION_LEVEL)) : - IsolationLevel.READ_UNCOMMITTED; - partitionTimestamps = new HashMap<>(); - for (Object topicResponseObj : struct.get(TOPICS)) { - Struct topicResponse = (Struct) topicResponseObj; - String topic = topicResponse.get(TOPIC_NAME); - for (Object partitionResponseObj : topicResponse.get(PARTITIONS)) { - Struct partitionResponse = (Struct) partitionResponseObj; - int partition = partitionResponse.get(PARTITION_ID); - long timestamp = partitionResponse.get(TIMESTAMP); - TopicPartition tp = new TopicPartition(topic, partition); - - int maxNumOffsets = partitionResponse.getOrElse(MAX_NUM_OFFSETS, 1); - Optional<Integer> currentLeaderEpoch = RequestUtils.getLeaderEpoch(partitionResponse, CURRENT_LEADER_EPOCH); - PartitionData partitionData = new PartitionData(timestamp, maxNumOffsets, currentLeaderEpoch); - if (partitionTimestamps.put(tp, partitionData) != null) + data = new ListOffsetRequestData(struct, version); + duplicatePartitions = new HashSet<>(); + Set<TopicPartition> partitions = new HashSet<>(); + for (ListOffsetTopic topic : data.topics()) { + for (ListOffsetPartition partition : topic.partitions()) { + TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); + if (!partitions.add(tp)) { duplicatePartitions.add(tp); + } } } - this.duplicatePartitions = duplicatePartitions; } @Override - @SuppressWarnings("deprecation") public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>(); short versionId = version(); - - ListOffsetResponse.PartitionData partitionError = versionId == 0 ? - new ListOffsetResponse.PartitionData(Errors.forException(e), Collections.emptyList()) : - new ListOffsetResponse.PartitionData(Errors.forException(e), -1L, -1L, Optional.empty()); - for (TopicPartition partition : partitionTimestamps.keySet()) { - responseData.put(partition, partitionError); + short errorCode = Errors.forException(e).code(); + + List<ListOffsetTopicResponse> responses = new ArrayList<>(); + for (ListOffsetTopic topic : data.topics()) { + ListOffsetTopicResponse topicResponse = new ListOffsetTopicResponse().setName(topic.name()); + List<ListOffsetPartitionResponse> partitions = new ArrayList<>(); + for (ListOffsetPartition partition : topic.partitions()) { + ListOffsetPartitionResponse partitionresponse = new ListOffsetPartitionResponse() + .setErrorCode(errorCode) + .setPartitionIndex(partition.partitionIndex()); + if (versionId == 0) { + partitionresponse.setOldStyleOffsets(Collections.emptyList()); + } else { + partitionresponse.setOffset(ListOffsetResponse.UNKNOWN_OFFSET) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP); + if (versionId >= 4) { + partitionresponse.setLeaderEpoch(ListOffsetResponse.UNKNOWN_EPOCH); Review comment: Is this necessary? The leader epoch is -1 by default. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -910,136 +913,163 @@ class KafkaApis(val requestChannel: RequestChannel, def handleListOffsetRequest(request: RequestChannel.Request): Unit = { val version = request.header.apiVersion - val mergedResponseMap = if (version == 0) + val topics = if (version == 0) handleListOffsetRequestV0(request) else handleListOffsetRequestV1AndAbove(request) - sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava)) + sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(new ListOffsetResponseData() + .setThrottleTimeMs(requestThrottleMs) + .setTopics(topics.asJava))) } - private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = { + private def handleListOffsetRequestV0(request : RequestChannel.Request) : List[ListOffsetTopicResponse] = { val correlationId = request.header.correlationId val clientId = request.header.clientId val offsetRequest = request.body[ListOffsetRequest] - val partitionTimestamps = offsetRequest.partitionTimestamps.asScala - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, partitionTimestamps)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, Seq.empty[JLong].asJava) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - try { - val offsets = replicaManager.legacyFetchOffsetsForTimestamp( - topicPartition = topicPartition, - timestamp = partitionData.timestamp, - maxNumOffsets = partitionData.maxNumOffsets, - isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, - fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, offsets.map(JLong.valueOf).asJava)) - } catch { - // NOTE: UnknownTopicOrPartitionException and NotLeaderOrFollowerException are special cased since these error messages - // are typically transient and there is no value in logging the entire stack trace for the same - case e @ (_ : UnknownTopicOrPartitionException | - _ : NotLeaderOrFollowerException | - _ : KafkaStorageException) => - debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( - correlationId, clientId, topicPartition, e.getMessage)) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - case e: Throwable => - error("Error while responding to offset request", e) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - } - } - responseMap ++ unauthorizedResponseStatus - } - - private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = { - val correlationId = request.header.correlationId - val clientId = request.header.clientId - val offsetRequest = request.body[ListOffsetRequest] - - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic) + val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, + DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty()) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - if (offsetRequest.duplicatePartitions.contains(topicPartition)) { - debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + - s"failed because the partition is duplicated in the request.") - (topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } else { + val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => + new ListOffsetTopicResponse() + .setName(topic.name) + .setPartitions(topic.partitions.asScala.map(partition => + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)).asJava) + ) - def buildErrorResponse(e: Errors): (TopicPartition, ListOffsetResponse.PartitionData) = { - (topicPartition, new ListOffsetResponse.PartitionData( - e, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } + val responseTopics = authorizedRequestInfo.map { topic => + val responsePartitions = topic.partitions.asScala.map { partition => + val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) try { - val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID - val isClientRequest = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID - val isolationLevelOpt = if (isClientRequest) - Some(offsetRequest.isolationLevel) - else - None - - val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition, - partitionData.timestamp, - isolationLevelOpt, - partitionData.currentLeaderEpoch, - fetchOnlyFromLeader) - - val response = foundOpt match { - case Some(found) => - new ListOffsetResponse.PartitionData(Errors.NONE, found.timestamp, found.offset, found.leaderEpoch) - case None => - new ListOffsetResponse.PartitionData(Errors.NONE, ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty()) - } - (topicPartition, response) + val offsets = replicaManager.legacyFetchOffsetsForTimestamp( + topicPartition = topicPartition, + timestamp = partition.timestamp, + maxNumOffsets = partition.maxNumOffsets, + isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, + fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.NONE.code) + .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava) } catch { - // NOTE: These exceptions are special cased since these error messages are typically transient or the client - // would have received a clear exception and there is no value in logging the entire stack trace for the same + // NOTE: UnknownTopicOrPartitionException and NotLeaderOrFollowerException are special cased since these error messages + // are typically transient and there is no value in logging the entire stack trace for the same case e @ (_ : UnknownTopicOrPartitionException | _ : NotLeaderOrFollowerException | - _ : UnknownLeaderEpochException | - _ : FencedLeaderEpochException | - _ : KafkaStorageException | - _ : UnsupportedForMessageFormatException) => - debug(s"Offset request with correlation id $correlationId from client $clientId on " + - s"partition $topicPartition failed due to ${e.getMessage}") - buildErrorResponse(Errors.forException(e)) - - // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE - case e: OffsetNotAvailableException => - if(request.header.apiVersion >= 5) { - buildErrorResponse(Errors.forException(e)) - } else { - buildErrorResponse(Errors.LEADER_NOT_AVAILABLE) - } - + _ : KafkaStorageException) => + debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( + correlationId, clientId, topicPartition, e.getMessage)) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.forException(e).code) case e: Throwable => error("Error while responding to offset request", e) - buildErrorResponse(Errors.forException(e)) + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(Errors.forException(e).code) + } + } + new ListOffsetTopicResponse().setName(topic.name).setPartitions(responsePartitions.asJava) + } + (responseTopics ++ unauthorizedResponseStatus).toList + } + + private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): List[ListOffsetTopicResponse] = { + val correlationId = request.header.correlationId + val clientId = request.header.clientId + val offsetRequest = request.body[ListOffsetRequest] + + def buildErrorResponse(e: Errors, partition: ListOffsetPartition): ListOffsetPartitionResponse = { + new ListOffsetPartitionResponse() + .setPartitionIndex(partition.partitionIndex) + .setErrorCode(e.code) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET) + } + + val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, + DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) + + val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => + new ListOffsetTopicResponse() + .setName(topic.name) + .setPartitions(topic.partitions.asScala.map(partition => + buildErrorResponse(Errors.TOPIC_AUTHORIZATION_FAILED, partition)).asJava) + ) + + val responseTopics = authorizedRequestInfo.map { topic => + val responsePartitions = topic.partitions.asScala.map { partition => + val topicPartition = new TopicPartition(topic.name, partition.partitionIndex) + if (offsetRequest.duplicatePartitions.contains(topicPartition)) { + debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + + s"failed because the partition is duplicated in the request.") + buildErrorResponse(Errors.INVALID_REQUEST, partition) + } else { + + try { + val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID + val isClientRequest = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID + val isolationLevelOpt = if (isClientRequest) + Some(offsetRequest.isolationLevel) + else + None + + val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition, + partition.timestamp, + isolationLevelOpt, + if (partition.currentLeaderEpoch == ListOffsetResponse.UNKNOWN_EPOCH) Optional.empty() else Optional.of(partition.currentLeaderEpoch), + fetchOnlyFromLeader) + + val response = foundOpt match { Review comment: Redundant braces. ########## File path: clients/src/test/java/org/apache/kafka/common/requests/ListOffsetRequestTest.java ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.ListOffsetRequestData; +import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition; +import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetTopic; +import org.apache.kafka.common.message.ListOffsetResponseData; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse; +import org.apache.kafka.common.protocol.Errors; +import org.junit.Test; + +public class ListOffsetRequestTest { + + @Test + public void testDuplicatePartitions() { + List<ListOffsetTopic> topics = Arrays.asList( Review comment: `Arrays.asList` could be replaced with `Collections.singletonList` ########## File path: clients/src/test/java/org/apache/kafka/common/requests/ListOffsetRequestTest.java ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.ListOffsetRequestData; +import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition; +import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetTopic; +import org.apache.kafka.common.message.ListOffsetResponseData; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse; +import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse; +import org.apache.kafka.common.protocol.Errors; +import org.junit.Test; + +public class ListOffsetRequestTest { Review comment: We should also add a test in `MessageTest` for the automated struct ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org