chirag-wadhwa5 commented on code in PR #16456:
URL: https://github.com/apache/kafka/pull/16456#discussion_r1699572051


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -4020,11 +4012,381 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  /**
+   * Handle a shareFetch request
+   */
   def handleShareFetchRequest(request: RequestChannel.Request): Unit = {
     val shareFetchRequest = request.body[ShareFetchRequest]
-    // TODO: Implement the ShareFetchRequest handling
-    requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-    CompletableFuture.completedFuture[Unit](())
+
+    if (!isShareGroupProtocolEnabled) {
+      requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      return
+    }
+    val sharePartitionManager : SharePartitionManager = 
this.sharePartitionManager match {
+      case Some(manager) => manager
+      case None =>
+        // The API is not supported when the SharePartitionManager is not 
defined on the broker
+        info("SharePartitionManager not defined on the broker")
+        requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+        return
+    }
+
+    val groupId = shareFetchRequest.data.groupId
+
+    // Share Fetch needs permission to perform the READ action on the named 
group resource (groupId)
+    if(!authHelper.authorize(request.context, READ, GROUP, groupId)) {
+      requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, 
Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      return
+    }
+
+    val memberId = shareFetchRequest.data.memberId
+    val shareSessionEpoch = shareFetchRequest.data.shareSessionEpoch
+
+    def isAcknowledgeDataPresentInFetchRequest: Boolean = {
+      shareFetchRequest.data.topics.asScala
+        .flatMap(t => t.partitions().asScala)
+        .exists(partition => partition.acknowledgementBatches != null && 
!partition.acknowledgementBatches.isEmpty)
+    }
+
+    val isAcknowledgeDataPresent = isAcknowledgeDataPresentInFetchRequest
+    val topicIdNames = metadataCache.topicIdsToNames()
+
+    def isTopicPresent(topicName: String) : Boolean = {
+      metadataCache.contains(topicName)
+    }
+
+    def isPartitionPresent(partition : Int, topicName : String) : Boolean = {
+      metadataCache.getTopicPartitions(topicName).foreach(tp => {
+        if(tp.partition() == partition) {
+          return true
+        }
+      })
+      false
+    }
+
+    def isInvalidShareFetchRequest : Boolean = {
+      // The Initial Share Fetch Request should not Acknowledge any data.
+      if (shareSessionEpoch == ShareFetchMetadata.INITIAL_EPOCH && 
isAcknowledgeDataPresent) {
+        return true
+      }
+      // validating the existence of all topics and partitions in the request.
+      shareFetchRequest.data().topics().forEach(topic => {
+        if(!topicIdNames.keySet().contains(topic.topicId)) {
+          return true
+        }
+        if(!isTopicPresent(topicIdNames.get(topic.topicId))) {
+          return true
+        }
+        topic.partitions().forEach(partition => {
+          if (!isPartitionPresent(partition.partitionIndex, 
topicIdNames.get(topic.topicId))) {
+            return true
+          }
+        })
+      })
+      false
+    }
+
+    val shareFetchData = shareFetchRequest.shareFetchData(topicIdNames)
+    val forgottenTopics = shareFetchRequest.forgottenTopics(topicIdNames)
+
+    val newReqMetadata : ShareFetchMetadata = new 
ShareFetchMetadata(Uuid.fromString(memberId), shareSessionEpoch)
+    var shareFetchContext : ShareFetchContext = null
+
+    // check if the Request is Invalid. If it is, the request is failed 
directly here.
+    if(isInvalidShareFetchRequest) {
+      requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, 
Errors.INVALID_REQUEST.exception))
+      return
+    }
+
+    try {
+      // Creating the shareFetchContext for Share Session Handling. if context 
creation fails, the request is failed directly here.
+      shareFetchContext = sharePartitionManager.newContext(groupId, 
shareFetchData, forgottenTopics, newReqMetadata)
+    } catch {
+      case e: Exception => requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e))
+        return
+    }
+
+    val erroneousAndValidPartitionData : ErroneousAndValidPartitionData = 
shareFetchContext.getErroneousAndValidTopicIdPartitions
+    val topicIdPartitionSeq : mutable.Set[TopicIdPartition] = mutable.Set()
+    erroneousAndValidPartitionData.erroneous.forEach {
+      case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) 
topicIdPartitionSeq += tp
+    }
+    erroneousAndValidPartitionData.validTopicIdPartitions.forEach {
+      case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) 
topicIdPartitionSeq += tp
+    }
+    shareFetchData.forEach {
+      case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) 
topicIdPartitionSeq += tp
+    }
+
+    val authorizedTopics = authHelper.filterByAuthorized(
+      request.context,
+      READ,
+      TOPIC,
+      topicIdPartitionSeq
+    )(_.topicPartition.topic)
+
+    // Variable to store the topic partition wise result of fetching.
+    var fetchResult : CompletableFuture[mutable.Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData]] =
+      CompletableFuture.completedFuture(mutable.Map.empty)
+    // Variable to store the topic partition wise result of piggybacked 
acknowledgements.
+    var acknowledgeResult : CompletableFuture[mutable.Map[TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData]] =
+      CompletableFuture.completedFuture(mutable.Map.empty)
+
+    // Handling the Acknowledgements from the ShareFetchRequest If this check 
is true, we are sure that this is not an
+    // Initial ShareFetch Request, otherwise the request would have been 
invalid.
+    if(isAcknowledgeDataPresent) {
+      val erroneous = mutable.Map[TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData]()
+      val acknowledgementDataFromRequest = 
getAcknowledgeBatchesFromShareFetchRequest(request.body[ShareFetchRequest], 
topicIdNames, erroneous)
+      acknowledgeResult = handleAcknowledgements(
+        acknowledgementDataFromRequest,
+        erroneous,
+        sharePartitionManager,
+        authorizedTopics,
+        groupId,
+        memberId,
+      )
+    }
+
+    // Handling the Fetch from the ShareFetchRequest.
+    fetchResult = handleFetchFromShareFetchRequest(
+      request,
+      erroneousAndValidPartitionData,
+      sharePartitionManager,
+      authorizedTopics
+    )
+
+    def combineShareFetchAndShareAcknowledgeResponses(
+                                                       fetchResult: 
CompletableFuture[mutable.Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData]],
+                                                       acknowledgeResult : 
CompletableFuture[mutable.Map[TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData]],
+                                                     ) : 
CompletableFuture[ShareFetchResponse] = {
+
+      fetchResult.thenCombine (acknowledgeResult,
+        (
+          fetchMap : mutable.Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData],
+          acknowledgeMap : mutable.Map[TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData]
+        ) => {
+        val shareFetchResponse = processShareFetchResponse(fetchMap, request, 
topicIdNames, shareFetchContext)
+        // The outer map has topicId as the key and the inner map has 
partitionIndex as the key.
+        val topicPartitionAcknowledgements : mutable.Map[Uuid, 
mutable.Map[Int, Short]] = mutable.Map()
+        if(acknowledgeMap != null && acknowledgeMap.nonEmpty) {
+          acknowledgeMap.foreach { case (tp, partitionData) =>
+            topicPartitionAcknowledgements.get(tp.topicId) match {
+              case Some(subMap) =>
+                subMap += tp.partition -> partitionData.errorCode
+              case None =>
+                val partitionAcknowledgementsMap : mutable.Map[Int, Short] = 
mutable.Map()
+                partitionAcknowledgementsMap += tp.partition -> 
partitionData.errorCode
+                topicPartitionAcknowledgements += tp.topicId -> 
partitionAcknowledgementsMap
+            }
+          }
+        }
+
+        shareFetchResponse.data.responses.forEach(topic => {
+          val topicId = topic.topicId
+          topicPartitionAcknowledgements.get(topicId) match {
+            case Some(subMap) =>
+              topic.partitions.forEach { partition =>
+                subMap.get(partition.partitionIndex) match {
+                  case Some(value) =>
+                    partition.setAcknowledgeErrorCode(value)
+                    // Delete the element.
+                    subMap.remove(partition.partitionIndex)
+                  case None =>
+                }
+              }
+              // Add the remaining acknowledgements.
+              subMap.foreach { case (partitionIndex, value) =>
+                val fetchPartitionData = new 
ShareFetchResponseData.PartitionData()
+                  .setPartitionIndex(partitionIndex)
+                  .setErrorCode(Errors.NONE.code)
+                  .setAcknowledgeErrorCode(value)
+                topic.partitions.add(fetchPartitionData)
+              }
+              topicPartitionAcknowledgements.remove(topicId)
+            case None =>
+          }
+        })
+        // Add the remaining acknowledgements.
+        topicPartitionAcknowledgements.foreach{ case(topicId, subMap) =>
+          val topicData = new 
ShareFetchResponseData.ShareFetchableTopicResponse()
+            .setTopicId(topicId)
+          subMap.foreach { case (partitionIndex, value) =>
+            val fetchPartitionData = new ShareFetchResponseData.PartitionData()
+              .setPartitionIndex(partitionIndex)
+              .setErrorCode(Errors.NONE.code)
+              .setAcknowledgeErrorCode(value)
+            topicData.partitions.add(fetchPartitionData)
+          }
+          shareFetchResponse.data.responses.add(topicData)
+        }
+
+        if (shareSessionEpoch == ShareFetchMetadata.FINAL_EPOCH) {
+          sharePartitionManager.releaseAcquiredRecords(groupId, memberId).
+            whenComplete((releaseAcquiredRecordsData, throwable) => {
+              if (throwable != null) {
+                error(s"Release acquired records on share session close with 
correlation from client ${request.header.clientId}  " +
+                  s"failed with error ${throwable.getMessage}")
+              } else {
+                info(s"Release acquired records on share session close 
$releaseAcquiredRecordsData succeeded")
+              }
+            })
+        }
+          shareFetchResponse
+      })
+    }
+
+    // Send the response once the future completes.
+    combineShareFetchAndShareAcknowledgeResponses(fetchResult, 
acknowledgeResult).handle[Unit] {(result, exception) => {
+      if (exception != null) {
+        requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, 
exception))
+      } else {
+        requestChannel.sendResponse(request, result, fetchOnComplete(request))
+      }
+    }}
+  }
+
+  def handleFetchFromShareFetchRequest(request: RequestChannel.Request,
+                                       erroneousAndValidPartitionData : 
ErroneousAndValidPartitionData,
+                                       sharePartitionManager : 
SharePartitionManager,
+                                       authorizedTopics: Set[String]
+                                      ): 
CompletableFuture[mutable.Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData]] = {
+
+    val erroneous = mutable.Map.empty[TopicIdPartition, 
ShareFetchResponseData.PartitionData]
+    // Kafka share consumers need READ permission on each partition they are 
fetching.
+    val partitionDatas = new mutable.ArrayBuffer[(TopicIdPartition, 
ShareFetchRequest.SharePartitionData)]
+    erroneousAndValidPartitionData.erroneous.forEach {
+      erroneousData => erroneous += erroneousData
+    }
+    erroneousAndValidPartitionData.validTopicIdPartitions.forEach {
+      validPartitionData => partitionDatas += validPartitionData
+    }
+
+    val interestingWithMaxBytes = new util.LinkedHashMap[TopicIdPartition, 
Integer]
+
+    partitionDatas.foreach { case (topicIdPartition, sharePartitionData) =>
+      if (!authorizedTopics.contains(topicIdPartition.topicPartition.topic))
+        erroneous += topicIdPartition -> 
ShareFetchResponse.partitionResponse(topicIdPartition, 
Errors.TOPIC_AUTHORIZATION_FAILED)
+      else if (!metadataCache.contains(topicIdPartition.topicPartition))
+        erroneous += topicIdPartition -> 
ShareFetchResponse.partitionResponse(topicIdPartition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+      else
+        interestingWithMaxBytes.put(topicIdPartition, 
sharePartitionData.maxBytes)
+    }
+
+    val shareFetchRequest = request.body[ShareFetchRequest]
+
+    val clientId = request.header.clientId
+    val versionId = request.header.apiVersion
+    val groupId = shareFetchRequest.data.groupId
+
+    if (interestingWithMaxBytes.isEmpty) {
+      CompletableFuture.completedFuture(erroneous)
+    } else {
+      // for share fetch from consumer, cap fetchMaxBytes to the maximum bytes 
that could be fetched without being
+      // throttled given no bytes were recorded in the recent quota window. 
Trying to fetch more bytes would result
+      // in a guaranteed throttling potentially blocking consumer progress.
+      val maxQuotaWindowBytes = 
quotas.fetch.getMaxValueInQuotaWindow(request.session, clientId).toInt
+
+      val fetchMaxBytes = Math.min(Math.min(shareFetchRequest.maxBytes, 
config.fetchMaxBytes), maxQuotaWindowBytes)
+      val fetchMinBytes = Math.min(shareFetchRequest.minBytes, fetchMaxBytes)
+
+      val clientMetadata: Optional[ClientMetadata] =
+        Optional.of(new DefaultClientMetadata(
+          CommonClientConfigs.DEFAULT_CLIENT_RACK,
+          clientId,
+          request.context.clientAddress,
+          request.context.principal,
+          request.context.listenerName.value))
+
+      // TODO : Change the dummy values to actual values after the required 
implementations are completed
+      val params = new FetchParams(
+        versionId,
+        FetchRequest.FUTURE_LOCAL_REPLICA_ID,
+        -1,
+        shareFetchRequest.maxWait,
+        fetchMinBytes,
+        fetchMaxBytes,
+        FetchIsolation.HIGH_WATERMARK,
+        clientMetadata
+      )
+
+      val interestingTopicPartitions: util.List[TopicIdPartition] = new 
util.ArrayList[TopicIdPartition]()
+      val iterator = interestingWithMaxBytes.entrySet().iterator()
+      while (iterator.hasNext) {
+        val entry = iterator.next()
+        interestingTopicPartitions.add(entry.getKey)
+      }
+
+      // call the share partition manager to fetch messages from the local 
replica.
+      sharePartitionManager.fetchMessages(
+        groupId,
+        shareFetchRequest.data.memberId,
+        params,
+        interestingTopicPartitions,
+        interestingWithMaxBytes
+      ).thenApply(result => {

Review Comment:
   Thanks for the review. I think this comment was addressing an issue in the 
previous version of the PR. I pushed a commit later, which I believe resolved 
all issues related to asynchronous code. If you find any other gaps, pls let me 
know. Thanks !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to