sjhajharia commented on code in PR #18671:
URL: https://github.com/apache/kafka/pull/18671#discussion_r1931659169
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java:
##########
@@ -256,6 +258,18 @@
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetchAllOffs
boolean requireStable
);
+ /**
+ * Fetch the Share Group Offsets for a given group.
+ *
+ * @param context The request context
+ * @param request The DescribeShareGroupOffsets request.
+ * @return A future yielding the results.
+ */
+ CompletableFuture<ReadShareGroupStateSummaryResponseData>
describeShareGroupOffsets(
Review Comment:
Even I am not the biggest fan of the same. The only reason this needs to be
done is because the `DescribeShareGroupOffsetsRequest/ResponseData` has the
topicName in them while the `ReadShareGroupStateSummaryRequest/ResponseData`
needs the topicId in it.
I could use the topicNametoId and topicIdtoName maps which are present in
KafkaApis for the same conversion. The alternative here would be to pass these
two maps to the GroupCoordinator as well, but that would deviate from the way
all other GC methods are written.
If there is a way the GC can directly have an access to these mappings, it
can be done. Do you think I should pass those two maps to the GC as well?
##########
clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java:
##########
@@ -37,7 +37,7 @@ public class ListShareGroupOffsetsResult {
private final Map<String, KafkaFuture<Map<TopicPartition, Long>>> futures;
- ListShareGroupOffsetsResult(final Map<CoordinatorKey,
KafkaFuture<Map<TopicPartition, Long>>> futures) {
+ public ListShareGroupOffsetsResult(final Map<CoordinatorKey,
KafkaFuture<Map<TopicPartition, Long>>> futures) {
Review Comment:
We would need a public access for the same as the return type for the
`Admin.listShareGroupOffsets()` is ListShareGroupOffsetsResult. Thus, in the
`ShareGroupCommandTest`, when we mock the behaviour of AdminClient, we would
need to create an instance of `ListShareGroupOffsetsResult`. Pls see
`ShareGroupCommandTest` for more details on usage.
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3185,9 +3187,87 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleDescribeShareGroupOffsetsRequest(request: RequestChannel.Request):
Unit = {
val describeShareGroupOffsetsRequest =
request.body[DescribeShareGroupOffsetsRequest]
- // TODO: Implement the DescribeShareGroupOffsetsRequest handling
- requestHelper.sendMaybeThrottle(request,
describeShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
- CompletableFuture.completedFuture[Unit](())
+
+ if (!isShareGroupProtocolEnabled) {
+ requestHelper.sendMaybeThrottle(request,
describeShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+ CompletableFuture.completedFuture[Unit](())
+ } else if (!authHelper.authorize(request.context, READ, GROUP,
describeShareGroupOffsetsRequest.data.groupId)) {
+ requestHelper.sendMaybeThrottle(request,
describeShareGroupOffsetsRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+ CompletableFuture.completedFuture[Unit](())
+ } else {
+ val topicNamesToIds = metadataCache.topicNamesToIds()
+ val topicIdToNames = metadataCache.topicIdsToNames()
+
+ val readStateSummaryData =
getReadShareGroupStateSummaryRequestFromDescribeShareGroupOffsetsRequest(
+ describeShareGroupOffsetsRequest.data(),
+ topicNamesToIds
+ )
+ groupCoordinator.describeShareGroupOffsets(
+ request.context,
+ readStateSummaryData,
+ ).handle[Unit] { (response, exception) =>
+ if (exception != null) {
+ requestHelper.sendMaybeThrottle(request,
describeShareGroupOffsetsRequest.getErrorResponse(exception))
+ } else {
+ requestHelper.sendMaybeThrottle(
+ request,
+ new DescribeShareGroupOffsetsResponse(
+
getDescribeShareGroupOffsetsResponseFromReadShareGroupStateSummaryResponse(response,
topicIdToNames)
+ )
+ )
+ }
+ }
+ }
+ }
+
+ private def
getReadShareGroupStateSummaryRequestFromDescribeShareGroupOffsetsRequest(describeShareGroupOffsetsRequestData:
DescribeShareGroupOffsetsRequestData,
+
topicNamesId: util.Map[String, Uuid]
+
): ReadShareGroupStateSummaryRequestData = {
+ val readStateSummaryTopics =
describeShareGroupOffsetsRequestData.topics.asScala.map(
+ topic => {
+ val partitions = topic.partitions.asScala.map(
+ partitionIndex => {
+ new PartitionData()
+ .setPartition(partitionIndex)
+ .setLeaderEpoch(0)
+ }
+ ).asJava
+ new ReadStateSummaryData()
+ .setTopicId(topicNamesId.get(topic.topicName()))
+ .setPartitions(partitions)
+ }
+ ).asJava
Review Comment:
They seems to be necessary. Getting rid of them causes type mismatches,
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3185,9 +3187,87 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleDescribeShareGroupOffsetsRequest(request: RequestChannel.Request):
Unit = {
val describeShareGroupOffsetsRequest =
request.body[DescribeShareGroupOffsetsRequest]
- // TODO: Implement the DescribeShareGroupOffsetsRequest handling
- requestHelper.sendMaybeThrottle(request,
describeShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
- CompletableFuture.completedFuture[Unit](())
+
+ if (!isShareGroupProtocolEnabled) {
+ requestHelper.sendMaybeThrottle(request,
describeShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+ CompletableFuture.completedFuture[Unit](())
+ } else if (!authHelper.authorize(request.context, READ, GROUP,
describeShareGroupOffsetsRequest.data.groupId)) {
+ requestHelper.sendMaybeThrottle(request,
describeShareGroupOffsetsRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+ CompletableFuture.completedFuture[Unit](())
+ } else {
+ val topicNamesToIds = metadataCache.topicNamesToIds()
+ val topicIdToNames = metadataCache.topicIdsToNames()
+
+ val readStateSummaryData =
getReadShareGroupStateSummaryRequestFromDescribeShareGroupOffsetsRequest(
+ describeShareGroupOffsetsRequest.data(),
+ topicNamesToIds
+ )
+ groupCoordinator.describeShareGroupOffsets(
+ request.context,
+ readStateSummaryData,
+ ).handle[Unit] { (response, exception) =>
+ if (exception != null) {
+ requestHelper.sendMaybeThrottle(request,
describeShareGroupOffsetsRequest.getErrorResponse(exception))
+ } else {
+ requestHelper.sendMaybeThrottle(
+ request,
+ new DescribeShareGroupOffsetsResponse(
+
getDescribeShareGroupOffsetsResponseFromReadShareGroupStateSummaryResponse(response,
topicIdToNames)
+ )
+ )
+ }
+ }
+ }
+ }
+
+ private def
getReadShareGroupStateSummaryRequestFromDescribeShareGroupOffsetsRequest(describeShareGroupOffsetsRequestData:
DescribeShareGroupOffsetsRequestData,
+
topicNamesId: util.Map[String, Uuid]
+
): ReadShareGroupStateSummaryRequestData = {
+ val readStateSummaryTopics =
describeShareGroupOffsetsRequestData.topics.asScala.map(
+ topic => {
+ val partitions = topic.partitions.asScala.map(
+ partitionIndex => {
+ new PartitionData()
+ .setPartition(partitionIndex)
+ .setLeaderEpoch(0)
+ }
+ ).asJava
+ new ReadStateSummaryData()
+ .setTopicId(topicNamesId.get(topic.topicName()))
+ .setPartitions(partitions)
+ }
+ ).asJava
+
+ val result = new ReadShareGroupStateSummaryRequestData()
+ .setGroupId(describeShareGroupOffsetsRequestData.groupId())
+ .setTopics(readStateSummaryTopics)
+ result
+ }
+
+ private def
getDescribeShareGroupOffsetsResponseFromReadShareGroupStateSummaryResponse(readShareGroupStateSummaryResponseData:
ReadShareGroupStateSummaryResponseData,
+
topicIdNames: util.Map[Uuid, String]
+
): DescribeShareGroupOffsetsResponseData = {
+ val describeShareGroupOffsetsResponseData =
readShareGroupStateSummaryResponseData.results().asScala.map(
+ readStateSummaryResult => {
+ val partitions = readStateSummaryResult.partitions().asScala.map(
+ partitionResult => {
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partitionResult.partition())
+ .setStartOffset(partitionResult.startOffset())
+ .setLeaderEpoch(partitionResult.stateEpoch())
+ .setErrorCode(partitionResult.errorCode())
+ .setErrorMessage(partitionResult.errorMessage())
+ }
+ ).asJava
+ new DescribeShareGroupOffsetsResponseTopic()
+ .setTopicId(readStateSummaryResult.topicId())
+ .setTopicName(topicIdNames.get(readStateSummaryResult.topicId()))
+ .setPartitions(partitions)
+ }
+ ).asJava
Review Comment:
Same ^
##########
server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryParameters.java:
##########
@@ -58,4 +59,16 @@ public ReadShareGroupStateSummaryParameters build() {
return new
ReadShareGroupStateSummaryParameters(groupTopicPartitionData);
}
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) return false;
+ ReadShareGroupStateSummaryParameters that =
(ReadShareGroupStateSummaryParameters) o;
+ return Objects.equals(groupTopicPartitionData,
that.groupTopicPartitionData);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(groupTopicPartitionData);
+ }
Review Comment:
They are required for the `GroupCoordinatorServiceTest.
testDescribeShareGroupOffsetsWithDefaultPersister`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]