apoorvmittal10 commented on code in PR #16456:
URL: https://github.com/apache/kafka/pull/16456#discussion_r1665990669


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -4004,6 +4471,99 @@ class KafkaApis(val requestChannel: RequestChannel,
     CompletableFuture.completedFuture[Unit](())
   }
 
+
+  def getAcknowledgeBatchesFromShareFetchRequest(
+                                                  shareFetchRequest : 
ShareFetchRequest,
+                                                  topicNames : util.Map[Uuid, 
String],
+                                                  erroneous : 
mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData],
+                                                ) : 
mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]] = {
+
+    val acknowledgeBatchesMap = mutable.Map[TopicIdPartition, 
util.List[ShareAcknowledgementBatch]]()
+    shareFetchRequest.data().topics().forEach ( topic => {
+
+      if(!topicNames.asScala.contains(topic.topicId)) {

Review Comment:
   I am not sure how costly the API for `asScala` on Map is, but should have 
some cost, so do you want to have conversion in forEack loop?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4390,6 +4401,2215 @@ class KafkaApisTest extends Logging {
     assertEquals("broker2", node.host)
   }
 
+  private def expectedAcquiredRecords(firstOffset : Long, lastOffset : Long, 
deliveryCount : Int) : util.List[AcquiredRecords] = {
+    val acquiredRecordsList : util.List[AcquiredRecords] = new util.ArrayList()
+    acquiredRecordsList.add(new AcquiredRecords()
+      .setFirstOffset(firstOffset)
+      .setLastOffset(lastOffset)
+      .setDeliveryCount(deliveryCount.toShort))
+    acquiredRecordsList
+  }
+
+  private def memoryRecordsBuilder(numOfRecords : Int, startOffset : Long) : 
MemoryRecordsBuilder = {
+
+    val buffer: ByteBuffer = ByteBuffer.allocate(1024)
+    val compression: Compression = Compression.of(CompressionType.NONE).build()
+    val timestampType: TimestampType = TimestampType.CREATE_TIME
+
+    val builder: MemoryRecordsBuilder = MemoryRecords.builder(buffer, 
compression, timestampType, startOffset)
+    for (i <- 0 until numOfRecords) {
+      builder.appendWithOffset(startOffset + i, 0L, TestUtils.randomBytes(10), 
TestUtils.randomBytes(10))
+    }
+    builder
+  }
+
+  private def memoryRecords(numOfRecords : Int, startOffset : Long) : 
MemoryRecords = {
+    memoryRecordsBuilder(numOfRecords, startOffset).build()
+  }
+
+  @Test
+  def testHandleShareFetchRequestSuccessWithoutAcknowledgements(): Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    val shareSessionEpoch = 0
+
+    val records = memoryRecords(10, 0)
+
+    val sharePartitionManager : SharePartitionManager = 
mock(classOf[SharePartitionManager])

Review Comment:
   I see we are using mock here, why not to define it on the top itself?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -4004,6 +4471,99 @@ class KafkaApis(val requestChannel: RequestChannel,
     CompletableFuture.completedFuture[Unit](())
   }
 
+
+  def getAcknowledgeBatchesFromShareFetchRequest(

Review Comment:
   should it be `private def`?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3955,11 +3960,473 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  /**
+   * Handle a shareFetch request
+   */
   def handleShareFetchRequest(request: RequestChannel.Request): Unit = {
     val shareFetchRequest = request.body[ShareFetchRequest]
-    // TODO: Implement the ShareFetchRequest handling
-    requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-    CompletableFuture.completedFuture[Unit](())
+
+    if (!config.isNewGroupCoordinatorEnabled) {
+      // The API is not supported by the "old" group coordinator (the 
default). If the
+      // new one is not enabled, we fail directly here.
+      requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
+      return
+    } else if (!config.isShareGroupEnabled) {
+      // The API is not supported when the "share" rebalance protocol has not 
been set explicitly.
+      requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
+      return
+    }
+    val sharePartitionManager : SharePartitionManager = 
this.sharePartitionManager match {
+      case Some(manager) => manager
+      case None => throw new IllegalStateException("ShareFetchRequest received 
but SharePartitionManager is not initialized")
+    }
+
+    val groupId = shareFetchRequest.data.groupId
+    val memberId = shareFetchRequest.data.memberId
+    val shareSessionEpoch = shareFetchRequest.data.shareSessionEpoch
+
+    def isAcknowledgeDataPresentInFetchRequest() : Boolean = {
+      var isAcknowledgeDataPresent = false
+      shareFetchRequest.data.topics.forEach ( topic => {
+        breakable{
+          topic.partitions.forEach ( partition => {
+            if (partition.acknowledgementBatches != null && 
!partition.acknowledgementBatches.isEmpty) {
+              isAcknowledgeDataPresent = true
+              break
+            } else {
+              isAcknowledgeDataPresent = false
+            }
+          })
+        }
+      })
+      isAcknowledgeDataPresent
+    }
+
+    val isAcknowledgeDataPresent = isAcknowledgeDataPresentInFetchRequest()
+
+    def isInvalidShareFetchRequest() : Boolean = {
+      // The Initial Share Fetch Request should not Acknowledge any data.
+      if (shareSessionEpoch == ShareFetchMetadata.INITIAL_EPOCH && 
isAcknowledgeDataPresent) {
+        return true
+      }
+      false
+    }
+
+    val topicNames = metadataCache.topicIdsToNames()

Review Comment:
   Does `topicIdNames` seems better? It was hard to relate later in the code 
what this variable holds, seems more like just name of topics.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -4004,6 +4471,99 @@ class KafkaApis(val requestChannel: RequestChannel,
     CompletableFuture.completedFuture[Unit](())
   }
 
+
+  def getAcknowledgeBatchesFromShareFetchRequest(
+                                                  shareFetchRequest : 
ShareFetchRequest,
+                                                  topicNames : util.Map[Uuid, 
String],
+                                                  erroneous : 
mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData],
+                                                ) : 
mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]] = {
+
+    val acknowledgeBatchesMap = mutable.Map[TopicIdPartition, 
util.List[ShareAcknowledgementBatch]]()
+    shareFetchRequest.data().topics().forEach ( topic => {
+
+      if(!topicNames.asScala.contains(topic.topicId)) {
+        topic.partitions.forEach((partition: 
ShareFetchRequestData.FetchPartition) => {
+          val topicIdPartition = new TopicIdPartition(
+            topic.topicId,
+            new TopicPartition(null, partition.partitionIndex))
+          erroneous +=
+            topicIdPartition -> 
ShareAcknowledgeResponse.partitionResponse(topicIdPartition, 
Errors.UNKNOWN_TOPIC_ID)
+        })
+      }
+      else {
+        topic.partitions().forEach ( partition => {
+          val topicIdPartition = new TopicIdPartition(
+            topic.topicId(),
+            new TopicPartition(topicNames.get(topic.topicId()), 
partition.partitionIndex())
+          )
+          var exceptionThrown = false
+          val acknowledgeBatches = new 
util.ArrayList[ShareAcknowledgementBatch]()
+          breakable{
+            partition.acknowledgementBatches().forEach( batch => {
+              try {
+                acknowledgeBatches.add(new ShareAcknowledgementBatch(
+                  batch.firstOffset(),
+                  batch.lastOffset(),
+                  batch.acknowledgeTypes()
+                ))
+              } catch {
+                case e : IllegalArgumentException =>
+                  exceptionThrown = true
+                  erroneous += topicIdPartition -> 
ShareAcknowledgeResponse.partitionResponse(topicIdPartition, 
Errors.forException(e))
+                  break
+              }

Review Comment:
   What exception is being thrown?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -4004,6 +4471,99 @@ class KafkaApis(val requestChannel: RequestChannel,
     CompletableFuture.completedFuture[Unit](())
   }
 
+
+  def getAcknowledgeBatchesFromShareFetchRequest(
+                                                  shareFetchRequest : 
ShareFetchRequest,
+                                                  topicNames : util.Map[Uuid, 
String],
+                                                  erroneous : 
mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData],
+                                                ) : 
mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]] = {
+
+    val acknowledgeBatchesMap = mutable.Map[TopicIdPartition, 
util.List[ShareAcknowledgementBatch]]()
+    shareFetchRequest.data().topics().forEach ( topic => {
+
+      if(!topicNames.asScala.contains(topic.topicId)) {
+        topic.partitions.forEach((partition: 
ShareFetchRequestData.FetchPartition) => {
+          val topicIdPartition = new TopicIdPartition(
+            topic.topicId,
+            new TopicPartition(null, partition.partitionIndex))
+          erroneous +=
+            topicIdPartition -> 
ShareAcknowledgeResponse.partitionResponse(topicIdPartition, 
Errors.UNKNOWN_TOPIC_ID)
+        })
+      }
+      else {
+        topic.partitions().forEach ( partition => {
+          val topicIdPartition = new TopicIdPartition(
+            topic.topicId(),
+            new TopicPartition(topicNames.get(topic.topicId()), 
partition.partitionIndex())
+          )

Review Comment:
   Do we validate somewhere that partition index exists for the topic i.e. what 
if client request for partition 5 when there exists only 4 partitions for topic?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -153,6 +162,7 @@ class KafkaApisTest extends Logging {
                       enableForwarding: Boolean = false,
                       configRepository: ConfigRepository = new 
MockConfigRepository(),
                       raftSupport: Boolean = false,
+                      sharePartitionManager : SharePartitionManager = 
sharePartitionManager,

Review Comment:
   This change will be not needed if we use mock, that we should.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3955,11 +3960,473 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  /**
+   * Handle a shareFetch request
+   */
   def handleShareFetchRequest(request: RequestChannel.Request): Unit = {
     val shareFetchRequest = request.body[ShareFetchRequest]
-    // TODO: Implement the ShareFetchRequest handling
-    requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-    CompletableFuture.completedFuture[Unit](())
+
+    if (!config.isNewGroupCoordinatorEnabled) {
+      // The API is not supported by the "old" group coordinator (the 
default). If the
+      // new one is not enabled, we fail directly here.
+      requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
+      return
+    } else if (!config.isShareGroupEnabled) {
+      // The API is not supported when the "share" rebalance protocol has not 
been set explicitly.
+      requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
+      return
+    }
+    val sharePartitionManager : SharePartitionManager = 
this.sharePartitionManager match {
+      case Some(manager) => manager
+      case None => throw new IllegalStateException("ShareFetchRequest received 
but SharePartitionManager is not initialized")
+    }
+
+    val groupId = shareFetchRequest.data.groupId
+    val memberId = shareFetchRequest.data.memberId
+    val shareSessionEpoch = shareFetchRequest.data.shareSessionEpoch
+
+    def isAcknowledgeDataPresentInFetchRequest() : Boolean = {
+      var isAcknowledgeDataPresent = false
+      shareFetchRequest.data.topics.forEach ( topic => {
+        breakable{
+          topic.partitions.forEach ( partition => {
+            if (partition.acknowledgementBatches != null && 
!partition.acknowledgementBatches.isEmpty) {
+              isAcknowledgeDataPresent = true
+              break
+            } else {
+              isAcknowledgeDataPresent = false
+            }
+          })
+        }
+      })
+      isAcknowledgeDataPresent
+    }
+
+    val isAcknowledgeDataPresent = isAcknowledgeDataPresentInFetchRequest()
+
+    def isInvalidShareFetchRequest() : Boolean = {
+      // The Initial Share Fetch Request should not Acknowledge any data.
+      if (shareSessionEpoch == ShareFetchMetadata.INITIAL_EPOCH && 
isAcknowledgeDataPresent) {
+        return true
+      }
+      false
+    }
+
+    val topicNames = metadataCache.topicIdsToNames()
+    val shareFetchData = shareFetchRequest.shareFetchData(topicNames)
+    val forgottenTopics = shareFetchRequest.forgottenTopics(topicNames)
+
+    val newReqMetadata : ShareFetchMetadata = new 
ShareFetchMetadata(Uuid.fromString(memberId), shareSessionEpoch)
+    var shareFetchContext : ShareFetchContext = null
+
+    var shareFetchResponse : ShareFetchResponse = null
+
+    def updateConversionStats(send: Send): Unit = {
+      send match {
+        case send: MultiRecordsSend if send.recordConversionStats != null =>
+          send.recordConversionStats.asScala.toMap.foreach {
+            case (tp, stats) => updateRecordConversionStats(request, tp, stats)
+          }
+        case send: NetworkSend =>
+          updateConversionStats(send.send())
+        case _ =>
+      }
+    }
+
+    // check if the Request is Invalid. If it is, the request is failed 
directly here.
+    if(isInvalidShareFetchRequest()) {
+      requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, 
Errors.INVALID_REQUEST.exception))
+      CompletableFuture.completedFuture[Unit](())
+      return
+    }
+
+    try {
+      // Creating the shareFetchContext for Share Session Handling. if context 
creation fails, the request is failed directly here.
+      shareFetchContext = sharePartitionManager.newContext(groupId, 
shareFetchData, forgottenTopics, newReqMetadata)
+    } catch {
+      case e: Exception => requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e))
+        CompletableFuture.completedFuture[Unit](())
+        return
+    }
+
+    // Variable to store any error thrown while the handling piggybacked 
acknowledgements.
+    var acknowledgeError : Errors = Errors.NONE
+    // Variable to store the topic partition wise result of piggybacked 
acknowledgements.
+    var acknowledgeResult = mutable.Map[TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData]()
+
+    val erroneousAndValidPartitionData : ErroneousAndValidPartitionData = 
shareFetchContext.getErroneousAndValidTopicIdPartitions
+    val topicIdPartitionSeq : mutable.Set[TopicIdPartition] = mutable.Set()
+    erroneousAndValidPartitionData.erroneous.forEach {
+      case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) 
topicIdPartitionSeq += tp
+    }
+    erroneousAndValidPartitionData.validTopicIdPartitions.forEach {
+      case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) 
topicIdPartitionSeq += tp
+    }
+    shareFetchData.forEach {
+      case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) 
topicIdPartitionSeq += tp
+    }
+
+    val authorizedTopics = authHelper.filterByAuthorized(
+      request.context,
+      READ,
+      TOPIC,
+      topicIdPartitionSeq
+    )(_.topicPartition.topic)
+
+    // Handling the Acknowledgements from the ShareFetchRequest If this check 
is true, we are sure that this is not an
+    // Initial ShareFetch Request, otherwise the request would have been 
invalid.
+    if(isAcknowledgeDataPresent) {
+      if (!authHelper.authorize(request.context, READ, GROUP, groupId)) {
+        acknowledgeError = Errors.GROUP_AUTHORIZATION_FAILED
+      } else {
+        acknowledgeResult = handleAcknowledgements(request, topicNames, 
sharePartitionManager, authorizedTopics, groupId, memberId, true)
+      }
+    }
+
+    // Handling the Fetch from the ShareFetchRequest.
+    try {
+      shareFetchResponse = handleFetchFromShareFetchRequest(
+        request,
+        erroneousAndValidPartitionData,
+        topicNames,
+        sharePartitionManager,
+        shareFetchContext,
+        authorizedTopics
+      )
+    } catch {
+      case throwable : Throwable =>
+        debug(s"Share fetch request with correlation from client 
${request.header.clientId}  " +
+          s"failed with error ${throwable.getMessage}")
+        requestHelper.handleError(request, throwable)
+        return
+    }
+
+    def combineShareFetchAndShareAcknowledgeResponses(
+                                                       shareFetchResponse: 
ShareFetchResponse,
+                                                       acknowledgeResult : 
mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData],
+                                                       acknowledgeError : 
Errors
+                                                     ) : ShareFetchResponse = {
+
+      // The outer map has topicId as the key and the inner map has 
partitionIndex as the key.
+      val topicPartitionAcknowledgements : mutable.Map[Uuid, mutable.Map[Int, 
Short]] = mutable.Map()
+      if(acknowledgeResult != null && acknowledgeResult.nonEmpty) {
+        acknowledgeResult.asJava.forEach { (tp, partitionData) =>
+          topicPartitionAcknowledgements.get(tp.topicId) match {
+            case Some(subMap) =>
+              subMap += tp.partition -> partitionData.errorCode
+            case None =>
+              val partitionAcknowledgementsMap : mutable.Map[Int, Short] = 
mutable.Map()
+              partitionAcknowledgementsMap += tp.partition -> 
partitionData.errorCode
+              topicPartitionAcknowledgements += tp.topicId -> 
partitionAcknowledgementsMap
+          }
+        }
+      }
+
+      shareFetchResponse.data.responses.forEach(topic => {
+        val topicId = topic.topicId
+        topicPartitionAcknowledgements.get(topicId) match {
+          case Some(subMap) =>
+            topic.partitions.forEach { partition =>
+              subMap.get(partition.partitionIndex) match {
+                case Some(value) =>
+                  val ackErrorCode = if(acknowledgeError.code != 
Errors.NONE.code) acknowledgeError.code else value
+                  partition.setAcknowledgeErrorCode(ackErrorCode)
+                  // Delete the element.
+                  subMap.remove(partition.partitionIndex)
+                case None =>
+              }
+            }
+            // Add the remaining acknowledgements.
+            subMap.foreach { case (partitionIndex, value) =>
+              val ackErrorCode = if(acknowledgeError.code != Errors.NONE.code) 
acknowledgeError.code else value
+              val fetchPartitionData = new 
ShareFetchResponseData.PartitionData()
+                .setPartitionIndex(partitionIndex)
+                .setErrorCode(Errors.NONE.code)
+                .setAcknowledgeErrorCode(ackErrorCode)
+              topic.partitions.add(fetchPartitionData)
+            }
+            topicPartitionAcknowledgements.remove(topicId)
+          case None =>
+        }
+      })
+      // Add the remaining acknowledgements.
+      topicPartitionAcknowledgements.foreach{ case(topicId, subMap) =>
+        val topicData = new 
ShareFetchResponseData.ShareFetchableTopicResponse()
+          .setTopicId(topicId)
+        subMap.foreach { case (partitionIndex, value) =>
+          val ackErrorCode = if(acknowledgeError.code != Errors.NONE.code) 
acknowledgeError.code else value
+          val fetchPartitionData = new ShareFetchResponseData.PartitionData()
+            .setPartitionIndex(partitionIndex)
+            .setErrorCode(Errors.NONE.code)
+            .setAcknowledgeErrorCode(ackErrorCode)
+          topicData.partitions.add(fetchPartitionData)
+        }
+        shareFetchResponse.data.responses.add(topicData)
+      }
+
+      if (shareSessionEpoch == ShareFetchMetadata.FINAL_EPOCH) {
+        sharePartitionManager.releaseAcquiredRecords(groupId, memberId).
+          whenComplete((releaseAcquiredRecordsData, throwable) => {
+            if (throwable != null) {
+              debug(s"Release acquired records on share session close with 
correlation from client ${request.header.clientId}  " +

Review Comment:
   Should it be in `error`?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4390,6 +4401,2215 @@ class KafkaApisTest extends Logging {
     assertEquals("broker2", node.host)
   }
 
+  private def expectedAcquiredRecords(firstOffset : Long, lastOffset : Long, 
deliveryCount : Int) : util.List[AcquiredRecords] = {
+    val acquiredRecordsList : util.List[AcquiredRecords] = new util.ArrayList()
+    acquiredRecordsList.add(new AcquiredRecords()
+      .setFirstOffset(firstOffset)
+      .setLastOffset(lastOffset)
+      .setDeliveryCount(deliveryCount.toShort))
+    acquiredRecordsList
+  }
+
+  private def memoryRecordsBuilder(numOfRecords : Int, startOffset : Long) : 
MemoryRecordsBuilder = {
+
+    val buffer: ByteBuffer = ByteBuffer.allocate(1024)
+    val compression: Compression = Compression.of(CompressionType.NONE).build()
+    val timestampType: TimestampType = TimestampType.CREATE_TIME
+
+    val builder: MemoryRecordsBuilder = MemoryRecords.builder(buffer, 
compression, timestampType, startOffset)
+    for (i <- 0 until numOfRecords) {
+      builder.appendWithOffset(startOffset + i, 0L, TestUtils.randomBytes(10), 
TestUtils.randomBytes(10))
+    }
+    builder
+  }

Review Comment:
   Seems the methods are same as defined in `SharePartitionTest`, shall we move 
them to common test utils (in java)?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -4004,6 +4488,99 @@ class KafkaApis(val requestChannel: RequestChannel,
     CompletableFuture.completedFuture[Unit](())
   }
 
+
+  def getAcknowledgeBatchesFromShareFetchRequest(
+                                                  shareFetchRequest : 
ShareFetchRequest,
+                                                  topicNames : util.Map[Uuid, 
String],
+                                                  erroneous : 
mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData],
+                                                ) : 
mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]] = {
+
+    val acknowledgeBatchesMap = mutable.Map[TopicIdPartition, 
util.List[ShareAcknowledgementBatch]]()
+    shareFetchRequest.data().topics().forEach ( topic => {
+
+      if(!topicNames.asScala.contains(topic.topicId)) {
+        topic.partitions.forEach((partition: 
ShareFetchRequestData.FetchPartition) => {
+          val topicIdPartition = new TopicIdPartition(
+            topic.topicId,
+            new TopicPartition(null, partition.partitionIndex))
+          erroneous +=
+            topicIdPartition -> 
ShareAcknowledgeResponse.partitionResponse(topicIdPartition, 
Errors.UNKNOWN_TOPIC_ID)
+        })
+      }
+      else {
+        topic.partitions().forEach ( partition => {
+          val topicIdPartition = new TopicIdPartition(
+            topic.topicId(),
+            new TopicPartition(topicNames.get(topic.topicId()), 
partition.partitionIndex())
+          )
+          var exceptionThrown = false
+          val acknowledgeBatches = new 
util.ArrayList[ShareAcknowledgementBatch]()
+          breakable{
+            partition.acknowledgementBatches().forEach( batch => {
+              try {
+                acknowledgeBatches.add(new ShareAcknowledgementBatch(
+                  batch.firstOffset(),
+                  batch.lastOffset(),
+                  batch.acknowledgeTypes()
+                ))
+              } catch {
+                case e : IllegalArgumentException =>
+                  exceptionThrown = true
+                  erroneous += topicIdPartition -> 
ShareAcknowledgeResponse.partitionResponse(topicIdPartition, 
Errors.forException(e))
+                  break
+              }
+            })
+          }
+          if(!exceptionThrown && acknowledgeBatches.size() > 0) {
+            acknowledgeBatchesMap += topicIdPartition -> acknowledgeBatches
+          }
+        })
+      }
+    })
+    acknowledgeBatchesMap
+  }
+
+  def validateAcknowledgementBatches(
+                                      acknowledgementDataFromRequest : 
mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]],
+                                      erroneous : 
mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]
+                                    ) : mutable.Set[TopicIdPartition] = {
+    val erroneousTopicIdPartitions: mutable.Set[TopicIdPartition] = 
mutable.Set.empty[TopicIdPartition]
+    acknowledgementDataFromRequest.foreach{ case (tp : TopicIdPartition, 
acknowledgeBatches : util.List[ShareAcknowledgementBatch]) =>
+      var prevEndOffset = -1L
+      breakable {
+        acknowledgeBatches.forEach(batch => {
+          if (batch.firstOffset > batch.lastOffset) {

Review Comment:
   Yeah it's much readable this way, I agree.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -132,12 +138,15 @@ class KafkaApisTest extends Logging {
   private val quotas = QuotaManagers(clientQuotaManager, clientQuotaManager, 
clientRequestQuotaManager,
     clientControllerQuotaManager, replicaQuotaManager, replicaQuotaManager, 
replicaQuotaManager, None)
   private val fetchManager: FetchManager = mock(classOf[FetchManager])
+  private val sharePartitionManager : SharePartitionManager =
+    new SharePartitionManager(replicaManager, Time.SYSTEM, new 
ShareSessionCache(1000, 100), 30000, 5, 200, 
NoOpShareStatePersister.getInstance())

Review Comment:
   Will it not be better to mock `sharePartitionManager`?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4390,6 +4401,2215 @@ class KafkaApisTest extends Logging {
     assertEquals("broker2", node.host)
   }
 
+  private def expectedAcquiredRecords(firstOffset : Long, lastOffset : Long, 
deliveryCount : Int) : util.List[AcquiredRecords] = {

Review Comment:
   nit: would it better to have these methods defines later when used in tests?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4390,6 +4401,2215 @@ class KafkaApisTest extends Logging {
     assertEquals("broker2", node.host)
   }
 
+  private def expectedAcquiredRecords(firstOffset : Long, lastOffset : Long, 
deliveryCount : Int) : util.List[AcquiredRecords] = {
+    val acquiredRecordsList : util.List[AcquiredRecords] = new util.ArrayList()
+    acquiredRecordsList.add(new AcquiredRecords()
+      .setFirstOffset(firstOffset)
+      .setLastOffset(lastOffset)
+      .setDeliveryCount(deliveryCount.toShort))
+    acquiredRecordsList
+  }
+
+  private def memoryRecordsBuilder(numOfRecords : Int, startOffset : Long) : 
MemoryRecordsBuilder = {
+
+    val buffer: ByteBuffer = ByteBuffer.allocate(1024)
+    val compression: Compression = Compression.of(CompressionType.NONE).build()
+    val timestampType: TimestampType = TimestampType.CREATE_TIME
+
+    val builder: MemoryRecordsBuilder = MemoryRecords.builder(buffer, 
compression, timestampType, startOffset)
+    for (i <- 0 until numOfRecords) {
+      builder.appendWithOffset(startOffset + i, 0L, TestUtils.randomBytes(10), 
TestUtils.randomBytes(10))
+    }
+    builder
+  }
+
+  private def memoryRecords(numOfRecords : Int, startOffset : Long) : 
MemoryRecords = {
+    memoryRecordsBuilder(numOfRecords, startOffset).build()
+  }
+
+  @Test
+  def testHandleShareFetchRequestSuccessWithoutAcknowledgements(): Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    val shareSessionEpoch = 0
+
+    val records = memoryRecords(10, 0)
+
+    val sharePartitionManager : SharePartitionManager = 
mock(classOf[SharePartitionManager])
+
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), 
any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+                  new ShareFetchResponseData.PartitionData()
+                    .setErrorCode(Errors.NONE.code)
+                    .setAcknowledgeErrorCode(Errors.NONE.code)
+                    .setRecords(records)
+                    .setAcquiredRecords(new util.ArrayList(List(
+                      new ShareFetchResponseData.AcquiredRecords()
+                        .setFirstOffset(0)
+                        .setLastOffset(9)
+                        .setDeliveryCount(1)
+                      ).asJava))

Review Comment:
   Is `asJava` required? Isn't it already a java API?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to