junrao commented on code in PR #16456: URL: https://github.com/apache/kafka/pull/16456#discussion_r1702140987
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -4069,6 +4367,203 @@ class KafkaApis(val requestChannel: RequestChannel, CompletableFuture.completedFuture[Unit](()) } + private def getAcknowledgeBatchesFromShareFetchRequest( + shareFetchRequest: ShareFetchRequest, Review Comment: merge with previous line? ########## core/src/test/java/kafka/server/share/SharePartitionManagerTest.java: ########## @@ -142,13 +142,13 @@ public void testNewContextReturnsFinalContext() { new TopicIdPartition(topicId, new TopicPartition("foo", 0)), new ShareFetchRequest.SharePartitionData(topicId, 4000)); assertThrows(InvalidRequestException.class, - () -> sharePartitionManager.newContext("grp", shareFetchData, Collections.emptyList(), new ShareFetchMetadata(Uuid.ZERO_UUID, -1))); + () -> sharePartitionManager.newContext("grp", shareFetchData, Collections.emptyList(), new ShareFetchMetadata(Uuid.ZERO_UUID, -1), true)); Review Comment: `ShareFetchMetadata(Uuid.ZERO_UUID, -1)` could just be `newReqMetadata`? ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -4020,11 +4012,317 @@ class KafkaApis(val requestChannel: RequestChannel, } } + /** + * Handle a shareFetch request + */ def handleShareFetchRequest(request: RequestChannel.Request): Unit = { val shareFetchRequest = request.body[ShareFetchRequest] - // TODO: Implement the ShareFetchRequest handling - requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) + + if (!isShareGroupProtocolEnabled) { + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception)) + return + } + val sharePartitionManagerInstance: SharePartitionManager = sharePartitionManager match { + case Some(manager) => manager + case None => + // The API is not supported when the SharePartitionManager is not defined on the broker + info("Received share fetch request for zookeeper based cluster") + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception)) + return + } + + val groupId = shareFetchRequest.data.groupId + + // Share Fetch needs permission to perform the READ action on the named group resource (groupId) + if (!authHelper.authorize(request.context, READ, GROUP, groupId)) { + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.GROUP_AUTHORIZATION_FAILED.exception)) + return + } + + val memberId = shareFetchRequest.data.memberId + val shareSessionEpoch = shareFetchRequest.data.shareSessionEpoch + + def isAcknowledgeDataPresentInFetchRequest: Boolean = { + shareFetchRequest.data.topics.asScala + .flatMap(t => t.partitions().asScala) + .exists(partition => partition.acknowledgementBatches != null && !partition.acknowledgementBatches.isEmpty) + } + + val isAcknowledgeDataPresent = isAcknowledgeDataPresentInFetchRequest + val topicIdNames = metadataCache.topicIdsToNames() + + val shareFetchData = shareFetchRequest.shareFetchData(topicIdNames) + val forgottenTopics = shareFetchRequest.forgottenTopics(topicIdNames) + + val newReqMetadata: ShareFetchMetadata = new ShareFetchMetadata(Uuid.fromString(memberId), shareSessionEpoch) + var shareFetchContext: ShareFetchContext = null + + try { + // Creating the shareFetchContext for Share Session Handling. if context creation fails, the request is failed directly here. + shareFetchContext = sharePartitionManagerInstance.newContext(groupId, shareFetchData, forgottenTopics, newReqMetadata, isAcknowledgeDataPresent) + } catch { + case e: + Exception => requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e)) + return + } + + val erroneousAndValidPartitionData: ErroneousAndValidPartitionData = shareFetchContext.getErroneousAndValidTopicIdPartitions + val topicIdPartitionSeq: mutable.Set[TopicIdPartition] = mutable.Set() + erroneousAndValidPartitionData.erroneous.forEach { + case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp + } + erroneousAndValidPartitionData.validTopicIdPartitions.forEach { + case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp + } + shareFetchData.forEach { + case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp + } + + // Kafka share consumers need READ permission on each topic they are fetching. + val authorizedTopics = authHelper.filterByAuthorized( + request.context, + READ, + TOPIC, + topicIdPartitionSeq + )(_.topicPartition.topic) + + // Variable to store the topic partition wise result of piggybacked acknowledgements. + var acknowledgeResult: CompletableFuture[Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]] = + CompletableFuture.completedFuture(mutable.Map.empty) + + // Handling the Acknowledgements from the ShareFetchRequest If this check is true, we are sure that this is not an + // Initial ShareFetch Request, otherwise the request would have been invalid. + if (isAcknowledgeDataPresent) { + val erroneous = mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]() + val acknowledgementDataFromRequest = getAcknowledgeBatchesFromShareFetchRequest(request.body[ShareFetchRequest], topicIdNames, erroneous) + acknowledgeResult = handleAcknowledgements( + acknowledgementDataFromRequest, + erroneous, + sharePartitionManagerInstance, + authorizedTopics, + groupId, + memberId, + ) + } + + // Handling the Fetch from the ShareFetchRequest. + // Variable to store the topic partition wise result of fetching. + val fetchResult: CompletableFuture[Map[TopicIdPartition, ShareFetchResponseData.PartitionData]] = + handleFetchFromShareFetchRequest( + request, + erroneousAndValidPartitionData, + sharePartitionManagerInstance, + authorizedTopics + ) + + def combineShareFetchAndShareAcknowledgeResponses(fetchResult: CompletableFuture[Map[TopicIdPartition, ShareFetchResponseData.PartitionData]], + acknowledgeResult: CompletableFuture[Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]], + ): CompletableFuture[ShareFetchResponse] = { + + fetchResult.thenCombine(acknowledgeResult, + (fetchMap: Map[TopicIdPartition, ShareFetchResponseData.PartitionData], + acknowledgeMap: Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]) => { + val shareFetchResponse = processShareFetchResponse(fetchMap, request, topicIdNames, shareFetchContext) + // The outer map has topicId as the key and the inner map has partitionIndex as the key. + val topicPartitionAcknowledgements: mutable.Map[Uuid, mutable.Map[Int, Short]] = mutable.Map() + if (acknowledgeMap != null && acknowledgeMap.nonEmpty) { + acknowledgeMap.foreach { case (tp, partitionData) => + topicPartitionAcknowledgements.get(tp.topicId) match { + case Some(subMap) => + subMap += tp.partition -> partitionData.errorCode + case None => + val partitionAcknowledgementsMap: mutable.Map[Int, Short] = mutable.Map() + partitionAcknowledgementsMap += tp.partition -> partitionData.errorCode + topicPartitionAcknowledgements += tp.topicId -> partitionAcknowledgementsMap + } + } + } + + shareFetchResponse.data.responses.forEach{ topic => + val topicId = topic.topicId + topicPartitionAcknowledgements.get(topicId) match { + case Some(subMap) => + topic.partitions.forEach { partition => + subMap.get(partition.partitionIndex) match { + case Some(value) => + partition.setAcknowledgeErrorCode(value) + // Delete the element. + subMap.remove(partition.partitionIndex) + case None => + } + } + // Add the remaining acknowledgements. + subMap.foreach { case (partitionIndex, value) => + val fetchPartitionData = new ShareFetchResponseData.PartitionData() + .setPartitionIndex(partitionIndex) + .setErrorCode(Errors.NONE.code) + .setAcknowledgeErrorCode(value) + topic.partitions.add(fetchPartitionData) + } + topicPartitionAcknowledgements.remove(topicId) + case None => + } + } + // Add the remaining acknowledgements. + topicPartitionAcknowledgements.foreach { case (topicId, subMap) => + val topicData = new ShareFetchResponseData.ShareFetchableTopicResponse() + .setTopicId(topicId) + subMap.foreach { case (partitionIndex, value) => + val fetchPartitionData = new ShareFetchResponseData.PartitionData() + .setPartitionIndex(partitionIndex) + .setErrorCode(Errors.NONE.code) + .setAcknowledgeErrorCode(value) + topicData.partitions.add(fetchPartitionData) + } + shareFetchResponse.data.responses.add(topicData) + } + + if (shareSessionEpoch == ShareFetchMetadata.FINAL_EPOCH) { + sharePartitionManagerInstance.releaseAcquiredRecords(groupId, memberId). + whenComplete((releaseAcquiredRecordsData, throwable) => + if (throwable != null) { + error(s"Release acquired records on share session close with correlation from client ${request.header.clientId} " + + s"failed with error ${throwable.getMessage}") + } else { + info(s"Release acquired records on share session close $releaseAcquiredRecordsData succeeded") + } + ) + } + shareFetchResponse + }) + } + + // Send the response once the future completes. + combineShareFetchAndShareAcknowledgeResponses(fetchResult, acknowledgeResult).handle[Unit] {(result, exception) => + if (exception != null) { + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, exception)) + } else { + requestChannel.sendResponse(request, result, onFetchComplete(request)) + } + } + } + + // Visible for Testing + def handleFetchFromShareFetchRequest(request: RequestChannel.Request, + erroneousAndValidPartitionData: ErroneousAndValidPartitionData, + sharePartitionManagerInstance: SharePartitionManager, + authorizedTopics: Set[String] + ): CompletableFuture[Map[TopicIdPartition, ShareFetchResponseData.PartitionData]] = { + + val erroneous = mutable.Map.empty[TopicIdPartition, ShareFetchResponseData.PartitionData] + erroneousAndValidPartitionData.erroneous.forEach { erroneousData => erroneous += erroneousData } + + val interestedWithMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer] + + erroneousAndValidPartitionData.validTopicIdPartitions.forEach { case (topicIdPartition, sharePartitionData) => + if (!authorizedTopics.contains(topicIdPartition.topicPartition.topic)) + erroneous += topicIdPartition -> ShareFetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED) + else if (!metadataCache.contains(topicIdPartition.topicPartition)) + erroneous += topicIdPartition -> ShareFetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION) + else + interestedWithMaxBytes.put(topicIdPartition, sharePartitionData.maxBytes) + } + + val shareFetchRequest = request.body[ShareFetchRequest] + + val clientId = request.header.clientId + val versionId = request.header.apiVersion + val groupId = shareFetchRequest.data.groupId + + if (interestedWithMaxBytes.isEmpty) { + CompletableFuture.completedFuture(erroneous) + } else { + // for share fetch from consumer, cap fetchMaxBytes to the maximum bytes that could be fetched without being + // throttled given no bytes were recorded in the recent quota window. Trying to fetch more bytes would result + // in a guaranteed throttling potentially blocking consumer progress. + val maxQuotaWindowBytes = quotas.fetch.getMaxValueInQuotaWindow(request.session, clientId).toInt + + val fetchMaxBytes = Math.min(Math.min(shareFetchRequest.maxBytes, config.fetchMaxBytes), maxQuotaWindowBytes) + val fetchMinBytes = Math.min(shareFetchRequest.minBytes, fetchMaxBytes) + + val clientMetadata: Optional[ClientMetadata] = + Optional.of(new DefaultClientMetadata( + CommonClientConfigs.DEFAULT_CLIENT_RACK, + clientId, + request.context.clientAddress, + request.context.principal, + request.context.listenerName.value)) + + val params = new FetchParams( + versionId, + FetchRequest.CONSUMER_REPLICA_ID, + -1, + shareFetchRequest.maxWait, + fetchMinBytes, + fetchMaxBytes, + FetchIsolation.HIGH_WATERMARK, + clientMetadata + ) + + // call the share partition manager to fetch messages from the local replica. + sharePartitionManagerInstance.fetchMessages( + groupId, + shareFetchRequest.data.memberId, + params, + interestedWithMaxBytes + ).thenApply{ result => + val combinedResult = mutable.Map.empty[TopicIdPartition, ShareFetchResponseData.PartitionData] + result.asScala.foreach { case (tp, data) => + combinedResult += (tp -> data) + } + erroneous.foreach { case (tp, data) => + combinedResult += (tp -> data) + } + combinedResult.toMap + } + } + } + + // Visible for Testing + def handleAcknowledgements(acknowledgementData: mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]], + erroneous: mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData], Review Comment: indentation ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -4069,6 +4367,203 @@ class KafkaApis(val requestChannel: RequestChannel, CompletableFuture.completedFuture[Unit](()) } + private def getAcknowledgeBatchesFromShareFetchRequest( + shareFetchRequest: ShareFetchRequest, + topicIdNames: util.Map[Uuid, String], + erroneous: mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData], + ): mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]] = { + + val acknowledgeBatchesMap = mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]]() + shareFetchRequest.data().topics().forEach{ topic => + if (!topicIdNames.containsKey(topic.topicId)) { + topic.partitions.forEach{ partition: ShareFetchRequestData.FetchPartition => + val topicIdPartition = new TopicIdPartition( + topic.topicId, + new TopicPartition(null, partition.partitionIndex)) + erroneous += + topicIdPartition -> ShareAcknowledgeResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID) + } + } else { + topic.partitions().forEach { partition => + val topicIdPartition = new TopicIdPartition( + topic.topicId(), + new TopicPartition(topicIdNames.get(topic.topicId()), partition.partitionIndex()) + ) + val acknowledgeBatches = new util.ArrayList[ShareAcknowledgementBatch]() + partition.acknowledgementBatches().forEach{ batch => + acknowledgeBatches.add(new ShareAcknowledgementBatch( + batch.firstOffset(), + batch.lastOffset(), + batch.acknowledgeTypes() + )) + } + acknowledgeBatchesMap += topicIdPartition -> acknowledgeBatches + } + } + } + acknowledgeBatchesMap + } + + private def validateAcknowledgementBatches(acknowledgementDataFromRequest: mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]], + erroneous: mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData] + ): mutable.Set[TopicIdPartition] = { + val erroneousTopicIdPartitions: mutable.Set[TopicIdPartition] = mutable.Set.empty[TopicIdPartition] + + acknowledgementDataFromRequest.foreach { case (tp: TopicIdPartition, acknowledgeBatches: util.List[ShareAcknowledgementBatch]) => + var prevEndOffset = -1L + var isErroneous = false + acknowledgeBatches.forEach { batch => + if (!isErroneous) { + if (batch.firstOffset > batch.lastOffset) { + erroneous += tp -> ShareAcknowledgeResponse.partitionResponse(tp, Errors.INVALID_REQUEST) + erroneousTopicIdPartitions.add(tp) + isErroneous = true + } else if (batch.firstOffset < prevEndOffset) { + erroneous += tp -> ShareAcknowledgeResponse.partitionResponse(tp, Errors.INVALID_REQUEST) + erroneousTopicIdPartitions.add(tp) + isErroneous = true + } else if (batch.acknowledgeTypes == null || batch.acknowledgeTypes.isEmpty) { + erroneous += tp -> ShareAcknowledgeResponse.partitionResponse(tp, Errors.INVALID_REQUEST) + erroneousTopicIdPartitions.add(tp) + isErroneous = true + } else if (batch.acknowledgeTypes.size() > 1 && batch.lastOffset - batch.firstOffset != batch.acknowledgeTypes.size() - 1) { + erroneous += tp -> ShareAcknowledgeResponse.partitionResponse(tp, Errors.INVALID_REQUEST) + erroneousTopicIdPartitions.add(tp) + isErroneous = true + } else if (batch.acknowledgeTypes.stream().anyMatch(ackType => ackType < 0 || ackType > 3)) { + erroneous += tp -> ShareAcknowledgeResponse.partitionResponse(tp, Errors.INVALID_REQUEST) + erroneousTopicIdPartitions.add(tp) + isErroneous = true + } else { + prevEndOffset = batch.lastOffset + } + } + } + } + + erroneousTopicIdPartitions + } + + // the callback for processing a share fetch response. + private def processShareFetchResponse(responsePartitionData: Map[TopicIdPartition, ShareFetchResponseData.PartitionData], + request: RequestChannel.Request, + topicIdNames: util.Map[Uuid, String], + shareFetchContext: ShareFetchContext): ShareFetchResponse = { + Review Comment: extra new line ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -4020,11 +4012,317 @@ class KafkaApis(val requestChannel: RequestChannel, } } + /** + * Handle a shareFetch request + */ def handleShareFetchRequest(request: RequestChannel.Request): Unit = { val shareFetchRequest = request.body[ShareFetchRequest] - // TODO: Implement the ShareFetchRequest handling - requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) + + if (!isShareGroupProtocolEnabled) { + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception)) + return + } + val sharePartitionManagerInstance: SharePartitionManager = sharePartitionManager match { + case Some(manager) => manager + case None => + // The API is not supported when the SharePartitionManager is not defined on the broker + info("Received share fetch request for zookeeper based cluster") + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception)) + return + } + + val groupId = shareFetchRequest.data.groupId + + // Share Fetch needs permission to perform the READ action on the named group resource (groupId) + if (!authHelper.authorize(request.context, READ, GROUP, groupId)) { + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.GROUP_AUTHORIZATION_FAILED.exception)) + return + } + + val memberId = shareFetchRequest.data.memberId + val shareSessionEpoch = shareFetchRequest.data.shareSessionEpoch + + def isAcknowledgeDataPresentInFetchRequest: Boolean = { + shareFetchRequest.data.topics.asScala + .flatMap(t => t.partitions().asScala) + .exists(partition => partition.acknowledgementBatches != null && !partition.acknowledgementBatches.isEmpty) + } + + val isAcknowledgeDataPresent = isAcknowledgeDataPresentInFetchRequest + val topicIdNames = metadataCache.topicIdsToNames() + + val shareFetchData = shareFetchRequest.shareFetchData(topicIdNames) + val forgottenTopics = shareFetchRequest.forgottenTopics(topicIdNames) + + val newReqMetadata: ShareFetchMetadata = new ShareFetchMetadata(Uuid.fromString(memberId), shareSessionEpoch) + var shareFetchContext: ShareFetchContext = null + + try { + // Creating the shareFetchContext for Share Session Handling. if context creation fails, the request is failed directly here. + shareFetchContext = sharePartitionManagerInstance.newContext(groupId, shareFetchData, forgottenTopics, newReqMetadata, isAcknowledgeDataPresent) + } catch { + case e: + Exception => requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e)) Review Comment: This should be ``` case e: Exception => requestHelper.sendMaybeThrottle(request, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org