chirag-wadhwa5 commented on code in PR #16456:
URL: https://github.com/apache/kafka/pull/16456#discussion_r1697405173


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -4020,11 +4012,381 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  /**
+   * Handle a shareFetch request
+   */
   def handleShareFetchRequest(request: RequestChannel.Request): Unit = {
     val shareFetchRequest = request.body[ShareFetchRequest]
-    // TODO: Implement the ShareFetchRequest handling
-    requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-    CompletableFuture.completedFuture[Unit](())
+
+    if (!isShareGroupProtocolEnabled) {
+      requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      return
+    }
+    val sharePartitionManager : SharePartitionManager = 
this.sharePartitionManager match {
+      case Some(manager) => manager
+      case None =>
+        // The API is not supported when the SharePartitionManager is not 
defined on the broker
+        info("SharePartitionManager not defined on the broker")
+        requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+        return
+    }
+
+    val groupId = shareFetchRequest.data.groupId
+
+    // Share Fetch needs permission to perform the READ action on the named 
group resource (groupId)
+    if(!authHelper.authorize(request.context, READ, GROUP, groupId)) {
+      requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, 
Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      return
+    }
+
+    val memberId = shareFetchRequest.data.memberId
+    val shareSessionEpoch = shareFetchRequest.data.shareSessionEpoch
+
+    def isAcknowledgeDataPresentInFetchRequest: Boolean = {
+      shareFetchRequest.data.topics.asScala
+        .flatMap(t => t.partitions().asScala)
+        .exists(partition => partition.acknowledgementBatches != null && 
!partition.acknowledgementBatches.isEmpty)
+    }
+
+    val isAcknowledgeDataPresent = isAcknowledgeDataPresentInFetchRequest
+    val topicIdNames = metadataCache.topicIdsToNames()
+
+    def isTopicPresent(topicName: String) : Boolean = {
+      metadataCache.contains(topicName)
+    }
+
+    def isPartitionPresent(partition : Int, topicName : String) : Boolean = {
+      metadataCache.getTopicPartitions(topicName).foreach(tp => {
+        if(tp.partition() == partition) {
+          return true
+        }
+      })
+      false
+    }
+
+    def isInvalidShareFetchRequest : Boolean = {
+      // The Initial Share Fetch Request should not Acknowledge any data.
+      if (shareSessionEpoch == ShareFetchMetadata.INITIAL_EPOCH && 
isAcknowledgeDataPresent) {
+        return true
+      }
+      // validating the existence of all topics and partitions in the request.
+      shareFetchRequest.data().topics().forEach(topic => {
+        if(!topicIdNames.keySet().contains(topic.topicId)) {
+          return true
+        }
+        if(!isTopicPresent(topicIdNames.get(topic.topicId))) {
+          return true
+        }
+        topic.partitions().forEach(partition => {
+          if (!isPartitionPresent(partition.partitionIndex, 
topicIdNames.get(topic.topicId))) {
+            return true
+          }
+        })
+      })
+      false
+    }
+
+    val shareFetchData = shareFetchRequest.shareFetchData(topicIdNames)
+    val forgottenTopics = shareFetchRequest.forgottenTopics(topicIdNames)
+
+    val newReqMetadata : ShareFetchMetadata = new 
ShareFetchMetadata(Uuid.fromString(memberId), shareSessionEpoch)
+    var shareFetchContext : ShareFetchContext = null
+
+    // check if the Request is Invalid. If it is, the request is failed 
directly here.
+    if(isInvalidShareFetchRequest) {

Review Comment:
   Thanks for the review. You are correct here. But the code actually has that 
check already, I think this is not needed at all. I will remove this in the 
next commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to