chia7712 commented on code in PR #18929: URL: https://github.com/apache/kafka/pull/18929#discussion_r2195573447
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -3738,11 +3738,50 @@ class KafkaApis(val requestChannel: RequestChannel, def handleAlterShareGroupOffsetsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { val alterShareGroupOffsetsRequest = request.body[AlterShareGroupOffsetsRequest] + val groupId = alterShareGroupOffsetsRequest.data.groupId + if (!isShareGroupProtocolEnabled) { requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception)) return CompletableFuture.completedFuture[Unit](()) + } else if (!authHelper.authorize(request.context, READ, GROUP, groupId)) { + requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) + } else { + val responseBuilder = new AlterShareGroupOffsetsResponse.Builder() + val authorizedTopicPartitions = new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopicCollection() + + alterShareGroupOffsetsRequest.data.topics.forEach(topic => { + val topicError = { + if (!authHelper.authorize(request.context, READ, TOPIC, topic.topicName())) { + Some(new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED)) + } else if (!metadataCache.contains(topic.topicName())) { + Some(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION)) + } else { + None + } + } + topicError match { + case Some(error) => + topic.partitions().forEach(partition => responseBuilder.addPartition(topic.topicName(), partition.partitionIndex(), metadataCache.topicNamesToIds(), error.error)) Review Comment: It is not an issue for now. However, we should pass `error` instead of `error.error` to propagate custom error message. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -667,6 +670,56 @@ public void run() { )); } + // Visibility for testing + CompletableFuture<AlterShareGroupOffsetsResponseData> persisterInitialize( + InitializeShareGroupStateParameters request, + AlterShareGroupOffsetsResponseData response + ) { + return persister.initializeState(request) + .handle((result, exp) -> { + if (exp == null) { + if (result.errorCounts().isEmpty()) { + handlePersisterInitializeResponse(request.groupTopicPartitionData().groupId(), result, new ShareGroupHeartbeatResponseData()); + return response; + } else { + //TODO build new AlterShareGroupOffsetsResponseData for error response + return response; + } + } else { + return buildErrorResponse(request, response, exp); + } + + }); + } + + private AlterShareGroupOffsetsResponseData buildErrorResponse(InitializeShareGroupStateParameters request, AlterShareGroupOffsetsResponseData response, Throwable exp) { + // build new AlterShareGroupOffsetsResponseData for error response + AlterShareGroupOffsetsResponseData data = new AlterShareGroupOffsetsResponseData(); + GroupTopicPartitionData<PartitionStateData> gtp = request.groupTopicPartitionData(); + log.error("Unable to initialize share group state for {}, {} while altering share group offsets", gtp.groupId(), gtp.topicsData(), exp); + Errors error = Errors.forException(exp); + data.setErrorCode(error.code()) + .setErrorMessage(error.message()) Review Comment: Should we use the message of `exp` instead of `error.message()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org