AndrewJSchofield commented on code in PR #17775: URL: https://github.com/apache/kafka/pull/17775#discussion_r1844139747
########## clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.internals.KafkaFutureImpl; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +/** + * The result of the {@link Admin#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)} call. + * <p> + * The API of this class is evolving, see {@link Admin} for details. + */ [email protected] +public class ListShareGroupOffsetsResult { + + private final Map<String, KafkaFuture<Map<TopicPartition, Long>>> futures; + + public ListShareGroupOffsetsResult(Map<String, KafkaFuture<Map<TopicPartition, Long>>> futures) { + this.futures = futures; + } + + /** + * Return a future which yields all Map<String, Map<TopicPartition, Long> objects, if requests for all the groups succeed. + */ + public KafkaFuture<Map<String, Map<TopicPartition, Long>>> all() { + return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply( + nil -> { + Map<String, Map<TopicPartition, Long>> offsets = new HashMap<>(futures.size()); + futures.forEach((key, future) -> { + try { + offsets.put(key, future.get()); + } catch (InterruptedException | ExecutionException e) { + // This should be unreachable, since the KafkaFuture#allOf already ensured + // that all the futures completed successfully. + throw new RuntimeException(e); + } + }); + return offsets; + }); + } + + /** + * Return a future which yields a map of topic partitions to offsets for the specified group. + */ + public KafkaFuture<Map<TopicPartition, Long>> partitionsToOffset(String groupId) { + KafkaFutureImpl<Map<TopicPartition, Long>> future = new KafkaFutureImpl<>(); + if (futures.containsKey(groupId)) + return futures.get(groupId); + else + future.completeExceptionally(new IllegalArgumentException("Group ID not found: " + groupId)); Review Comment: I would make this throw `IllegalArgumentException` rather than complete the future exceptionally. See `ListConsumerGroupOffsetsResult`.partitionsToOffsetAndMetadata`. ########## clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.internals.KafkaFutureImpl; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +/** + * The result of the {@link Admin#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)} call. + * <p> + * The API of this class is evolving, see {@link Admin} for details. + */ [email protected] +public class ListShareGroupOffsetsResult { + + private final Map<String, KafkaFuture<Map<TopicPartition, Long>>> futures; + + public ListShareGroupOffsetsResult(Map<String, KafkaFuture<Map<TopicPartition, Long>>> futures) { Review Comment: This method should be package-private, not public. Otherwise it becomes part of the public interface and will also end up in the javadoc. ########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java: ########## @@ -478,6 +481,113 @@ public CompletableFuture<ReadShareGroupStateResponseData> readState(RequestConte }); } + @Override + public CompletableFuture<ReadShareGroupStateSummaryResponseData> readStateSummary(RequestContext context, ReadShareGroupStateSummaryRequestData request) { + String groupId = request.groupId(); + // A map to store the futures for each topicId and partition. + Map<Uuid, Map<Integer, CompletableFuture<ReadShareGroupStateSummaryResponseData>>> futureMap = new HashMap<>(); + + // Send an empty response if topic data is empty + if (isEmpty(request.topics())) { + log.error("Topic Data is empty: {}", request); + return CompletableFuture.completedFuture( + new ReadShareGroupStateSummaryResponseData() + ); + } + + // Send an empty response if partition data is empty for any topic + for (ReadShareGroupStateSummaryRequestData.ReadStateSummaryData topicData : request.topics()) { + if (isEmpty(topicData.partitions())) { + log.error("Partition Data for topic {} is empty: {}", topicData.topicId(), request); + return CompletableFuture.completedFuture( + new ReadShareGroupStateSummaryResponseData() + ); + } + } + + // Send an empty response if groupId is invalid + if (isGroupIdEmpty(groupId)) { + log.error("Group id must be specified and non-empty: {}", request); + return CompletableFuture.completedFuture( + new ReadShareGroupStateSummaryResponseData() + ); + } + + // Send an empty response if the coordinator is not active + if (!isActive.get()) { + return CompletableFuture.completedFuture( + generateErrorReadStateSummaryResponse( + request, + Errors.COORDINATOR_NOT_AVAILABLE, + "Share coordinator is not available." + ) + ); + } + + // The request received here could have multiple keys of structure group:topic:partition. However, + // the readState method in ShareCoordinatorShard expects a single key in the request. Hence, we will + // be looping over the keys below and constructing new ReadShareGroupStateRequestData objects to pass + // onto the shard method. + + request.topics().forEach(topicData -> { + Uuid topicId = topicData.topicId(); + topicData.partitions().forEach(partitionData -> { + // Request object containing information of a single topic partition + ReadShareGroupStateSummaryRequestData requestForCurrentPartition = new ReadShareGroupStateSummaryRequestData() + .setGroupId(groupId) + .setTopics(Collections.singletonList(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() Review Comment: List.of ########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java: ########## @@ -478,6 +481,113 @@ public CompletableFuture<ReadShareGroupStateResponseData> readState(RequestConte }); } + @Override + public CompletableFuture<ReadShareGroupStateSummaryResponseData> readStateSummary(RequestContext context, ReadShareGroupStateSummaryRequestData request) { + String groupId = request.groupId(); + // A map to store the futures for each topicId and partition. + Map<Uuid, Map<Integer, CompletableFuture<ReadShareGroupStateSummaryResponseData>>> futureMap = new HashMap<>(); + + // Send an empty response if topic data is empty + if (isEmpty(request.topics())) { + log.error("Topic Data is empty: {}", request); + return CompletableFuture.completedFuture( + new ReadShareGroupStateSummaryResponseData() + ); + } + + // Send an empty response if partition data is empty for any topic + for (ReadShareGroupStateSummaryRequestData.ReadStateSummaryData topicData : request.topics()) { + if (isEmpty(topicData.partitions())) { + log.error("Partition Data for topic {} is empty: {}", topicData.topicId(), request); + return CompletableFuture.completedFuture( + new ReadShareGroupStateSummaryResponseData() + ); + } + } + + // Send an empty response if groupId is invalid + if (isGroupIdEmpty(groupId)) { + log.error("Group id must be specified and non-empty: {}", request); + return CompletableFuture.completedFuture( + new ReadShareGroupStateSummaryResponseData() + ); + } + + // Send an empty response if the coordinator is not active + if (!isActive.get()) { + return CompletableFuture.completedFuture( + generateErrorReadStateSummaryResponse( + request, + Errors.COORDINATOR_NOT_AVAILABLE, + "Share coordinator is not available." + ) + ); + } + + // The request received here could have multiple keys of structure group:topic:partition. However, + // the readState method in ShareCoordinatorShard expects a single key in the request. Hence, we will Review Comment: "summary" ########## share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java: ########## @@ -730,6 +733,147 @@ protected RPCType rpcType() { } } + public class ReadStateSummaryHandler extends PersisterStateManagerHandler { + private final int leaderEpoch; + private final CompletableFuture<ReadShareGroupStateSummaryResponse> result; + private final BackoffManager readStateBackoff; + + public ReadStateSummaryHandler( + String groupId, + Uuid topicId, + int partition, + int leaderEpoch, + CompletableFuture<ReadShareGroupStateSummaryResponse> result, + long backoffMs, + long backoffMaxMs, + int maxRPCRetryAttempts, + Consumer<ClientResponse> onCompleteCallback + ) { + super(groupId, topicId, partition, backoffMs, backoffMaxMs, maxRPCRetryAttempts); + this.leaderEpoch = leaderEpoch; + this.result = result; + this.readStateBackoff = new BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs); + } + + public ReadStateSummaryHandler( + String groupId, + Uuid topicId, + int partition, + int leaderEpoch, + CompletableFuture<ReadShareGroupStateSummaryResponse> result, + Consumer<ClientResponse> onCompleteCallback + ) { + this( + groupId, + topicId, + partition, + leaderEpoch, + result, + REQUEST_BACKOFF_MS, + REQUEST_BACKOFF_MAX_MS, + MAX_FIND_COORD_ATTEMPTS, + onCompleteCallback + ); + } + + @Override + protected String name() { + return "ReadStateSummaryHandler"; + } + + @Override + protected AbstractRequest.Builder<ReadShareGroupStateSummaryRequest> requestBuilder() { + throw new RuntimeException("Read Summary requests are batchable, hence individual requests not needed."); + } + + @Override + protected boolean isResponseForRequest(ClientResponse response) { + return response.requestHeader().apiKey() == ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY; + } + + @Override + protected void handleRequestResponse(ClientResponse response) { + log.debug("Read state summary response received - {}", response); + readStateBackoff.incrementAttempt(); + + ReadShareGroupStateSummaryResponse combinedResponse = (ReadShareGroupStateSummaryResponse) response.responseBody(); + for (ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult readStateResult : combinedResponse.data().results()) { + if (readStateResult.topicId().equals(partitionKey().topicId())) { + Optional<ReadShareGroupStateSummaryResponseData.PartitionResult> partitionStateData = + readStateResult.partitions().stream().filter(partitionResult -> partitionResult.partition() == partitionKey().partition()) + .findFirst(); + + if (partitionStateData.isPresent()) { + Errors error = Errors.forCode(partitionStateData.get().errorCode()); + switch (error) { + case NONE: + readStateBackoff.resetAttempts(); + ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult result = ReadShareGroupStateSummaryResponse.toResponseReadStateSummaryResult( + partitionKey().topicId(), + Collections.singletonList(partitionStateData.get()) + ); + this.result.complete(new ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData() + .setResults(Collections.singletonList(result)))); + return; + + // check retriable errors + case COORDINATOR_NOT_AVAILABLE: + case COORDINATOR_LOAD_IN_PROGRESS: + case NOT_COORDINATOR: + log.warn("Received retriable error in read state RPC for key {}: {}", partitionKey(), error.message()); + if (!readStateBackoff.canAttempt()) { + log.error("Exhausted max retries for read state RPC for key {} without success.", partitionKey()); + readStateSummaryErrorReponse(error, new Exception("Exhausted max retries to complete read state RPC without success.")); + return; + } + super.resetCoordinatorNode(); + timer.add(new PersisterTimerTask(readStateBackoff.backOff(), this)); + return; + + default: + log.error("Unable to perform read state RPC for key {}: {}", partitionKey(), error.message()); + readStateSummaryErrorReponse(error, null); + return; + } + } + } + } + + // no response found specific topic partition + IllegalStateException exception = new IllegalStateException( + "Failed to read state for share partition " + partitionKey() + ); + readStateSummaryErrorReponse(Errors.forException(exception), exception); + } + + protected void readStateSummaryErrorReponse(Errors error, Exception exception) { + this.result.complete(new ReadShareGroupStateSummaryResponse( + ReadShareGroupStateSummaryResponse.toErrorResponseData(partitionKey().topicId(), partitionKey().partition(), error, "Error in find coordinator. " + + (exception == null ? error.message() : exception.getMessage())))); + } + + @Override + protected void findCoordinatorErrorResponse(Errors error, Exception exception) { + this.result.complete(new ReadShareGroupStateSummaryResponse( + ReadShareGroupStateSummaryResponse.toErrorResponseData(partitionKey().topicId(), partitionKey().partition(), error, "Error in read state RPC. " + Review Comment: "summary" ########## clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.internals.KafkaFutureImpl; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +/** + * The result of the {@link Admin#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)} call. + * <p> + * The API of this class is evolving, see {@link Admin} for details. + */ [email protected] +public class ListShareGroupOffsetsResult { + + private final Map<String, KafkaFuture<Map<TopicPartition, Long>>> futures; + + public ListShareGroupOffsetsResult(Map<String, KafkaFuture<Map<TopicPartition, Long>>> futures) { + this.futures = futures; + } + + /** + * Return a future which yields all Map<String, Map<TopicPartition, Long> objects, if requests for all the groups succeed. + */ + public KafkaFuture<Map<String, Map<TopicPartition, Long>>> all() { + return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply( + nil -> { + Map<String, Map<TopicPartition, Long>> offsets = new HashMap<>(futures.size()); + futures.forEach((key, future) -> { Review Comment: It would be a bit easier to see what's going on if `key` was renamed to `groupId` since that's what the key is here. ########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java: ########## @@ -81,6 +83,14 @@ public interface ShareCoordinator { */ CompletableFuture<ReadShareGroupStateResponseData> readState(RequestContext context, ReadShareGroupStateRequestData request); + /** + * Handle read share state call Review Comment: "summary" ########## clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsSpec.java: ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; + +/** + * Specification of share group offsets to list using {@link Admin#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)}. + * <p> + * The API of this class is evolving, see {@link Admin} for details. + */ [email protected] +public class ListShareGroupOffsetsSpec { + + private Collection<TopicPartition> topicPartitions; + + public ListShareGroupOffsetsSpec() { + topicPartitions = new ArrayList<>(); + } + + /** + * Set the topic partitions whose offsets are to be listed for a share group. + */ + ListShareGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions) { + this.topicPartitions = topicPartitions; + return this; + } + + /** + * Returns the topic partitions whose offsets are to be listed for a share group. + */ + Collection<TopicPartition> topicPartitions() { + return topicPartitions; + } +} Review Comment: Please implement equals, hashCode and toString. ########## clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java: ########## @@ -64,4 +67,46 @@ public static ReadShareGroupStateSummaryResponse parse(ByteBuffer buffer, short new ReadShareGroupStateSummaryResponseData(new ByteBufferAccessor(buffer), version) ); } + + public static ReadShareGroupStateSummaryResponseData toErrorResponseData(Uuid topicId, int partitionId, Errors error, String errorMessage) { + return new ReadShareGroupStateSummaryResponseData().setResults( + Collections.singletonList(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(topicId) + .setPartitions(Collections.singletonList(new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partitionId) + .setErrorCode(error.code()) + .setErrorMessage(errorMessage))))); + } + + public static ReadShareGroupStateSummaryResponseData.PartitionResult toErrorResponsePartitionResult(int partitionId, Errors error, String errorMessage) { + return new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partitionId) + .setErrorCode(error.code()) + .setErrorMessage(errorMessage); + } + + public static ReadShareGroupStateSummaryResponseData toResponseData( + Uuid topicId, + int partition, + long startOffset, + int stateEpoch + ) { + return new ReadShareGroupStateSummaryResponseData() + .setResults(Collections.singletonList( + new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(topicId) + .setPartitions(Collections.singletonList( Review Comment: Because we've recently dropped Java 8, you can now use `List.of` instead of `Collections.singletonList`. ########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java: ########## @@ -478,6 +481,113 @@ public CompletableFuture<ReadShareGroupStateResponseData> readState(RequestConte }); } + @Override + public CompletableFuture<ReadShareGroupStateSummaryResponseData> readStateSummary(RequestContext context, ReadShareGroupStateSummaryRequestData request) { + String groupId = request.groupId(); + // A map to store the futures for each topicId and partition. + Map<Uuid, Map<Integer, CompletableFuture<ReadShareGroupStateSummaryResponseData>>> futureMap = new HashMap<>(); + + // Send an empty response if topic data is empty + if (isEmpty(request.topics())) { + log.error("Topic Data is empty: {}", request); + return CompletableFuture.completedFuture( + new ReadShareGroupStateSummaryResponseData() + ); + } + + // Send an empty response if partition data is empty for any topic + for (ReadShareGroupStateSummaryRequestData.ReadStateSummaryData topicData : request.topics()) { + if (isEmpty(topicData.partitions())) { + log.error("Partition Data for topic {} is empty: {}", topicData.topicId(), request); + return CompletableFuture.completedFuture( + new ReadShareGroupStateSummaryResponseData() + ); + } + } + + // Send an empty response if groupId is invalid + if (isGroupIdEmpty(groupId)) { + log.error("Group id must be specified and non-empty: {}", request); + return CompletableFuture.completedFuture( + new ReadShareGroupStateSummaryResponseData() + ); + } + + // Send an empty response if the coordinator is not active + if (!isActive.get()) { + return CompletableFuture.completedFuture( + generateErrorReadStateSummaryResponse( + request, + Errors.COORDINATOR_NOT_AVAILABLE, + "Share coordinator is not available." + ) + ); + } + + // The request received here could have multiple keys of structure group:topic:partition. However, + // the readState method in ShareCoordinatorShard expects a single key in the request. Hence, we will + // be looping over the keys below and constructing new ReadShareGroupStateRequestData objects to pass Review Comment: "Summary" ########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java: ########## @@ -435,6 +438,60 @@ public ReadShareGroupStateResponseData readState(ReadShareGroupStateRequestData return ReadShareGroupStateResponse.toResponseData(topicId, partition, offsetValue.startOffset(), offsetValue.stateEpoch(), stateBatches); } + /** + * This method finds the ShareSnapshotValue record corresponding to the requested topic partition from the + * in-memory state of coordinator shard, the shareStateMap. + * <p> + * This method as called by the ShareCoordinatorService will be provided with + * the request data which covers only key i.e. group1:topic1:partition1. The implementation + * below was done keeping this in mind. + * + * @param request - ReadShareGroupStateSummaryRequestData for a single key + * @param offset - offset to read from the __share_group_state topic partition + * @return CoordinatorResult(records, response) + */ + public ReadShareGroupStateSummaryResponseData readStateSummary(ReadShareGroupStateSummaryRequestData request, Long offset) { + // records to read (with the key of snapshot type), response to caller + // only one key will be there in the request by design + Optional<ReadShareGroupStateSummaryResponseData> error = maybeGetReadStateSummaryError(request, offset); + if (error.isPresent()) { + return error.get(); + } + + Uuid topicId = request.topics().get(0).topicId(); + int partition = request.topics().get(0).partitions().get(0).partition(); + int leaderEpoch = request.topics().get(0).partitions().get(0).leaderEpoch(); + + SharePartitionKey coordinatorKey = SharePartitionKey.getInstance(request.groupId(), topicId, partition); + + if (!shareStateMap.containsKey(coordinatorKey)) { + return ReadShareGroupStateSummaryResponse.toResponseData( + topicId, + partition, + PartitionFactory.UNINITIALIZED_START_OFFSET, + PartitionFactory.DEFAULT_STATE_EPOCH + ); + } + + ShareGroupOffset offsetValue = shareStateMap.get(coordinatorKey, offset); + + if (offsetValue == null) { + // Returning an error response as the snapshot value was not found + return ReadShareGroupStateSummaryResponse.toErrorResponseData( + topicId, + partition, + Errors.UNKNOWN_SERVER_ERROR, + "Data not found for topic {}, partition {} for group {}, in the in-memory state of share coordinator" + ); + } + + // Updating the leader map with the new leader epoch + leaderEpochMap.put(coordinatorKey, leaderEpoch); Review Comment: I would expect that the leader epoch is not known in this case. Since the call comes from the group coordinator and it is the not the partition leader, I think it's going to specify -1 as the epoch and the update of the leader epoch map should not occur. ########## share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java: ########## @@ -730,6 +733,147 @@ protected RPCType rpcType() { } } + public class ReadStateSummaryHandler extends PersisterStateManagerHandler { + private final int leaderEpoch; + private final CompletableFuture<ReadShareGroupStateSummaryResponse> result; + private final BackoffManager readStateBackoff; + + public ReadStateSummaryHandler( + String groupId, + Uuid topicId, + int partition, + int leaderEpoch, + CompletableFuture<ReadShareGroupStateSummaryResponse> result, + long backoffMs, + long backoffMaxMs, + int maxRPCRetryAttempts, + Consumer<ClientResponse> onCompleteCallback + ) { + super(groupId, topicId, partition, backoffMs, backoffMaxMs, maxRPCRetryAttempts); + this.leaderEpoch = leaderEpoch; + this.result = result; + this.readStateBackoff = new BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs); + } + + public ReadStateSummaryHandler( + String groupId, + Uuid topicId, + int partition, + int leaderEpoch, + CompletableFuture<ReadShareGroupStateSummaryResponse> result, + Consumer<ClientResponse> onCompleteCallback + ) { + this( + groupId, + topicId, + partition, + leaderEpoch, + result, + REQUEST_BACKOFF_MS, + REQUEST_BACKOFF_MAX_MS, + MAX_FIND_COORD_ATTEMPTS, + onCompleteCallback + ); + } + + @Override + protected String name() { + return "ReadStateSummaryHandler"; + } + + @Override + protected AbstractRequest.Builder<ReadShareGroupStateSummaryRequest> requestBuilder() { + throw new RuntimeException("Read Summary requests are batchable, hence individual requests not needed."); + } + + @Override + protected boolean isResponseForRequest(ClientResponse response) { + return response.requestHeader().apiKey() == ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY; + } + + @Override + protected void handleRequestResponse(ClientResponse response) { + log.debug("Read state summary response received - {}", response); + readStateBackoff.incrementAttempt(); + + ReadShareGroupStateSummaryResponse combinedResponse = (ReadShareGroupStateSummaryResponse) response.responseBody(); + for (ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult readStateResult : combinedResponse.data().results()) { + if (readStateResult.topicId().equals(partitionKey().topicId())) { + Optional<ReadShareGroupStateSummaryResponseData.PartitionResult> partitionStateData = + readStateResult.partitions().stream().filter(partitionResult -> partitionResult.partition() == partitionKey().partition()) + .findFirst(); + + if (partitionStateData.isPresent()) { + Errors error = Errors.forCode(partitionStateData.get().errorCode()); + switch (error) { + case NONE: + readStateBackoff.resetAttempts(); + ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult result = ReadShareGroupStateSummaryResponse.toResponseReadStateSummaryResult( + partitionKey().topicId(), + Collections.singletonList(partitionStateData.get()) Review Comment: List.of ########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java: ########## @@ -478,6 +481,113 @@ public CompletableFuture<ReadShareGroupStateResponseData> readState(RequestConte }); } + @Override + public CompletableFuture<ReadShareGroupStateSummaryResponseData> readStateSummary(RequestContext context, ReadShareGroupStateSummaryRequestData request) { + String groupId = request.groupId(); + // A map to store the futures for each topicId and partition. + Map<Uuid, Map<Integer, CompletableFuture<ReadShareGroupStateSummaryResponseData>>> futureMap = new HashMap<>(); + + // Send an empty response if topic data is empty + if (isEmpty(request.topics())) { + log.error("Topic Data is empty: {}", request); + return CompletableFuture.completedFuture( + new ReadShareGroupStateSummaryResponseData() + ); + } + + // Send an empty response if partition data is empty for any topic + for (ReadShareGroupStateSummaryRequestData.ReadStateSummaryData topicData : request.topics()) { + if (isEmpty(topicData.partitions())) { + log.error("Partition Data for topic {} is empty: {}", topicData.topicId(), request); + return CompletableFuture.completedFuture( + new ReadShareGroupStateSummaryResponseData() + ); + } + } + + // Send an empty response if groupId is invalid + if (isGroupIdEmpty(groupId)) { + log.error("Group id must be specified and non-empty: {}", request); + return CompletableFuture.completedFuture( + new ReadShareGroupStateSummaryResponseData() + ); + } + + // Send an empty response if the coordinator is not active + if (!isActive.get()) { + return CompletableFuture.completedFuture( + generateErrorReadStateSummaryResponse( + request, + Errors.COORDINATOR_NOT_AVAILABLE, + "Share coordinator is not available." + ) + ); + } + + // The request received here could have multiple keys of structure group:topic:partition. However, + // the readState method in ShareCoordinatorShard expects a single key in the request. Hence, we will + // be looping over the keys below and constructing new ReadShareGroupStateRequestData objects to pass + // onto the shard method. + + request.topics().forEach(topicData -> { + Uuid topicId = topicData.topicId(); + topicData.partitions().forEach(partitionData -> { + // Request object containing information of a single topic partition + ReadShareGroupStateSummaryRequestData requestForCurrentPartition = new ReadShareGroupStateSummaryRequestData() + .setGroupId(groupId) + .setTopics(Collections.singletonList(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() + .setTopicId(topicId) + .setPartitions(Collections.singletonList(partitionData)))); + SharePartitionKey coordinatorKey = SharePartitionKey.getInstance(request.groupId(), topicId, partitionData.partition()); + // Scheduling a runtime read operation to read share partition state from the coordinator in memory state + CompletableFuture<ReadShareGroupStateSummaryResponseData> future = runtime.scheduleReadOperation( + "read-share-group-state-summary", + topicPartitionFor(coordinatorKey), + (coordinator, offset) -> coordinator.readStateSummary(requestForCurrentPartition, offset) + ).exceptionally(exception -> handleOperationException( + "read-share-group-state", Review Comment: "summary" ########## share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java: ########## @@ -730,6 +733,147 @@ protected RPCType rpcType() { } } + public class ReadStateSummaryHandler extends PersisterStateManagerHandler { + private final int leaderEpoch; + private final CompletableFuture<ReadShareGroupStateSummaryResponse> result; + private final BackoffManager readStateBackoff; + + public ReadStateSummaryHandler( + String groupId, + Uuid topicId, + int partition, + int leaderEpoch, + CompletableFuture<ReadShareGroupStateSummaryResponse> result, + long backoffMs, + long backoffMaxMs, + int maxRPCRetryAttempts, + Consumer<ClientResponse> onCompleteCallback + ) { + super(groupId, topicId, partition, backoffMs, backoffMaxMs, maxRPCRetryAttempts); + this.leaderEpoch = leaderEpoch; + this.result = result; + this.readStateBackoff = new BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs); + } + + public ReadStateSummaryHandler( + String groupId, + Uuid topicId, + int partition, + int leaderEpoch, + CompletableFuture<ReadShareGroupStateSummaryResponse> result, + Consumer<ClientResponse> onCompleteCallback + ) { + this( + groupId, + topicId, + partition, + leaderEpoch, + result, + REQUEST_BACKOFF_MS, + REQUEST_BACKOFF_MAX_MS, + MAX_FIND_COORD_ATTEMPTS, + onCompleteCallback + ); + } + + @Override + protected String name() { + return "ReadStateSummaryHandler"; + } + + @Override + protected AbstractRequest.Builder<ReadShareGroupStateSummaryRequest> requestBuilder() { + throw new RuntimeException("Read Summary requests are batchable, hence individual requests not needed."); + } + + @Override + protected boolean isResponseForRequest(ClientResponse response) { + return response.requestHeader().apiKey() == ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY; + } + + @Override + protected void handleRequestResponse(ClientResponse response) { + log.debug("Read state summary response received - {}", response); + readStateBackoff.incrementAttempt(); + + ReadShareGroupStateSummaryResponse combinedResponse = (ReadShareGroupStateSummaryResponse) response.responseBody(); + for (ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult readStateResult : combinedResponse.data().results()) { + if (readStateResult.topicId().equals(partitionKey().topicId())) { + Optional<ReadShareGroupStateSummaryResponseData.PartitionResult> partitionStateData = + readStateResult.partitions().stream().filter(partitionResult -> partitionResult.partition() == partitionKey().partition()) + .findFirst(); + + if (partitionStateData.isPresent()) { + Errors error = Errors.forCode(partitionStateData.get().errorCode()); + switch (error) { + case NONE: + readStateBackoff.resetAttempts(); + ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult result = ReadShareGroupStateSummaryResponse.toResponseReadStateSummaryResult( + partitionKey().topicId(), + Collections.singletonList(partitionStateData.get()) + ); + this.result.complete(new ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData() + .setResults(Collections.singletonList(result)))); + return; + + // check retriable errors + case COORDINATOR_NOT_AVAILABLE: + case COORDINATOR_LOAD_IN_PROGRESS: + case NOT_COORDINATOR: + log.warn("Received retriable error in read state RPC for key {}: {}", partitionKey(), error.message()); Review Comment: "summary" ########## share/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java: ########## @@ -278,9 +279,92 @@ public CompletableFuture<DeleteShareGroupStateResult> deleteState(DeleteShareGro * @return A completable future of ReadShareGroupStateSummaryResult */ public CompletableFuture<ReadShareGroupStateSummaryResult> readSummary(ReadShareGroupStateSummaryParameters request) throws IllegalArgumentException { - throw new RuntimeException("not implemented"); + validate(request); + GroupTopicPartitionData<PartitionIdLeaderEpochData> gtp = request.groupTopicPartitionData(); + String groupId = gtp.groupId(); + Map<Uuid, Map<Integer, CompletableFuture<ReadShareGroupStateSummaryResponse>>> futureMap = new HashMap<>(); + List<PersisterStateManager.ReadStateSummaryHandler> handlers = new ArrayList<>(); + + gtp.topicsData().forEach(topicData -> { + topicData.partitions().forEach(partitionData -> { + CompletableFuture<ReadShareGroupStateSummaryResponse> future = futureMap + .computeIfAbsent(topicData.topicId(), k -> new HashMap<>()) + .computeIfAbsent(partitionData.partition(), k -> new CompletableFuture<>()); + + handlers.add( + stateManager.new ReadStateSummaryHandler( + groupId, + topicData.topicId(), + partitionData.partition(), + partitionData.leaderEpoch(), + future, + null) + ); + }); + }); + + for (PersisterStateManager.PersisterStateManagerHandler handler : handlers) { + stateManager.enqueue(handler); + } + + // Combine all futures into a single CompletableFuture<Void> + CompletableFuture<Void> combinedFuture = CompletableFuture.allOf( + handlers.stream() + .map(PersisterStateManager.ReadStateSummaryHandler::result) + .toArray(CompletableFuture[]::new)); + + // Transform the combined CompletableFuture<Void> into CompletableFuture<ReadShareGroupStateResult> + return combinedFuture.thenApply(v -> readSummaryResponsesToResult(futureMap)); } + /** + * Takes in a list of COMPLETED futures and combines the results, + * taking care of errors if any, into a single ReadShareGroupStateSummaryResult + * @param futureMap - HashMap of {topic -> {part -> future}} + * @return Object representing combined result of type ReadShareGroupStateSummaryResult + */ + // visible for testing + ReadShareGroupStateSummaryResult readSummaryResponsesToResult( + Map<Uuid, Map<Integer, CompletableFuture<ReadShareGroupStateSummaryResponse>>> futureMap + ) { + List<TopicData<PartitionStateErrorData>> topicsData = futureMap.keySet().stream() + .map(topicId -> { Review Comment: nit: I think your line continuation indentation is 8 here, but the source file uses 4. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
