apoorvmittal10 commented on code in PR #16456: URL: https://github.com/apache/kafka/pull/16456#discussion_r1665990669
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -4004,6 +4471,99 @@ class KafkaApis(val requestChannel: RequestChannel, CompletableFuture.completedFuture[Unit](()) } + + def getAcknowledgeBatchesFromShareFetchRequest( + shareFetchRequest : ShareFetchRequest, + topicNames : util.Map[Uuid, String], + erroneous : mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData], + ) : mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]] = { + + val acknowledgeBatchesMap = mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]]() + shareFetchRequest.data().topics().forEach ( topic => { + + if(!topicNames.asScala.contains(topic.topicId)) { Review Comment: I am not sure how costly the API for `asScala` on Map is, but should have some cost, so do you want to have conversion in forEack loop? ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -4390,6 +4401,2215 @@ class KafkaApisTest extends Logging { assertEquals("broker2", node.host) } + private def expectedAcquiredRecords(firstOffset : Long, lastOffset : Long, deliveryCount : Int) : util.List[AcquiredRecords] = { + val acquiredRecordsList : util.List[AcquiredRecords] = new util.ArrayList() + acquiredRecordsList.add(new AcquiredRecords() + .setFirstOffset(firstOffset) + .setLastOffset(lastOffset) + .setDeliveryCount(deliveryCount.toShort)) + acquiredRecordsList + } + + private def memoryRecordsBuilder(numOfRecords : Int, startOffset : Long) : MemoryRecordsBuilder = { + + val buffer: ByteBuffer = ByteBuffer.allocate(1024) + val compression: Compression = Compression.of(CompressionType.NONE).build() + val timestampType: TimestampType = TimestampType.CREATE_TIME + + val builder: MemoryRecordsBuilder = MemoryRecords.builder(buffer, compression, timestampType, startOffset) + for (i <- 0 until numOfRecords) { + builder.appendWithOffset(startOffset + i, 0L, TestUtils.randomBytes(10), TestUtils.randomBytes(10)) + } + builder + } + + private def memoryRecords(numOfRecords : Int, startOffset : Long) : MemoryRecords = { + memoryRecordsBuilder(numOfRecords, startOffset).build() + } + + @Test + def testHandleShareFetchRequestSuccessWithoutAcknowledgements(): Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + val partitionIndex = 0 + metadataCache = MetadataCache.kRaftMetadataCache(brokerId) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId : Uuid = Uuid.ZERO_UUID + + val shareSessionEpoch = 0 + + val records = memoryRecords(10, 0) + + val sharePartitionManager : SharePartitionManager = mock(classOf[SharePartitionManager]) Review Comment: I see we are using mock here, why not to define it on the top itself? ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -4004,6 +4471,99 @@ class KafkaApis(val requestChannel: RequestChannel, CompletableFuture.completedFuture[Unit](()) } + + def getAcknowledgeBatchesFromShareFetchRequest( Review Comment: should it be `private def`? ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -3955,11 +3960,473 @@ class KafkaApis(val requestChannel: RequestChannel, } } + /** + * Handle a shareFetch request + */ def handleShareFetchRequest(request: RequestChannel.Request): Unit = { val shareFetchRequest = request.body[ShareFetchRequest] - // TODO: Implement the ShareFetchRequest handling - requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) + + if (!config.isNewGroupCoordinatorEnabled) { + // The API is not supported by the "old" group coordinator (the default). If the + // new one is not enabled, we fail directly here. + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + return + } else if (!config.isShareGroupEnabled) { + // The API is not supported when the "share" rebalance protocol has not been set explicitly. + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + return + } + val sharePartitionManager : SharePartitionManager = this.sharePartitionManager match { + case Some(manager) => manager + case None => throw new IllegalStateException("ShareFetchRequest received but SharePartitionManager is not initialized") + } + + val groupId = shareFetchRequest.data.groupId + val memberId = shareFetchRequest.data.memberId + val shareSessionEpoch = shareFetchRequest.data.shareSessionEpoch + + def isAcknowledgeDataPresentInFetchRequest() : Boolean = { + var isAcknowledgeDataPresent = false + shareFetchRequest.data.topics.forEach ( topic => { + breakable{ + topic.partitions.forEach ( partition => { + if (partition.acknowledgementBatches != null && !partition.acknowledgementBatches.isEmpty) { + isAcknowledgeDataPresent = true + break + } else { + isAcknowledgeDataPresent = false + } + }) + } + }) + isAcknowledgeDataPresent + } + + val isAcknowledgeDataPresent = isAcknowledgeDataPresentInFetchRequest() + + def isInvalidShareFetchRequest() : Boolean = { + // The Initial Share Fetch Request should not Acknowledge any data. + if (shareSessionEpoch == ShareFetchMetadata.INITIAL_EPOCH && isAcknowledgeDataPresent) { + return true + } + false + } + + val topicNames = metadataCache.topicIdsToNames() Review Comment: Does `topicIdNames` seems better? It was hard to relate later in the code what this variable holds, seems more like just name of topics. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -4004,6 +4471,99 @@ class KafkaApis(val requestChannel: RequestChannel, CompletableFuture.completedFuture[Unit](()) } + + def getAcknowledgeBatchesFromShareFetchRequest( + shareFetchRequest : ShareFetchRequest, + topicNames : util.Map[Uuid, String], + erroneous : mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData], + ) : mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]] = { + + val acknowledgeBatchesMap = mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]]() + shareFetchRequest.data().topics().forEach ( topic => { + + if(!topicNames.asScala.contains(topic.topicId)) { + topic.partitions.forEach((partition: ShareFetchRequestData.FetchPartition) => { + val topicIdPartition = new TopicIdPartition( + topic.topicId, + new TopicPartition(null, partition.partitionIndex)) + erroneous += + topicIdPartition -> ShareAcknowledgeResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID) + }) + } + else { + topic.partitions().forEach ( partition => { + val topicIdPartition = new TopicIdPartition( + topic.topicId(), + new TopicPartition(topicNames.get(topic.topicId()), partition.partitionIndex()) + ) + var exceptionThrown = false + val acknowledgeBatches = new util.ArrayList[ShareAcknowledgementBatch]() + breakable{ + partition.acknowledgementBatches().forEach( batch => { + try { + acknowledgeBatches.add(new ShareAcknowledgementBatch( + batch.firstOffset(), + batch.lastOffset(), + batch.acknowledgeTypes() + )) + } catch { + case e : IllegalArgumentException => + exceptionThrown = true + erroneous += topicIdPartition -> ShareAcknowledgeResponse.partitionResponse(topicIdPartition, Errors.forException(e)) + break + } Review Comment: What exception is being thrown? ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -4004,6 +4471,99 @@ class KafkaApis(val requestChannel: RequestChannel, CompletableFuture.completedFuture[Unit](()) } + + def getAcknowledgeBatchesFromShareFetchRequest( + shareFetchRequest : ShareFetchRequest, + topicNames : util.Map[Uuid, String], + erroneous : mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData], + ) : mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]] = { + + val acknowledgeBatchesMap = mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]]() + shareFetchRequest.data().topics().forEach ( topic => { + + if(!topicNames.asScala.contains(topic.topicId)) { + topic.partitions.forEach((partition: ShareFetchRequestData.FetchPartition) => { + val topicIdPartition = new TopicIdPartition( + topic.topicId, + new TopicPartition(null, partition.partitionIndex)) + erroneous += + topicIdPartition -> ShareAcknowledgeResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID) + }) + } + else { + topic.partitions().forEach ( partition => { + val topicIdPartition = new TopicIdPartition( + topic.topicId(), + new TopicPartition(topicNames.get(topic.topicId()), partition.partitionIndex()) + ) Review Comment: Do we validate somewhere that partition index exists for the topic i.e. what if client request for partition 5 when there exists only 4 partitions for topic? ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -153,6 +162,7 @@ class KafkaApisTest extends Logging { enableForwarding: Boolean = false, configRepository: ConfigRepository = new MockConfigRepository(), raftSupport: Boolean = false, + sharePartitionManager : SharePartitionManager = sharePartitionManager, Review Comment: This change will be not needed if we use mock, that we should. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -3955,11 +3960,473 @@ class KafkaApis(val requestChannel: RequestChannel, } } + /** + * Handle a shareFetch request + */ def handleShareFetchRequest(request: RequestChannel.Request): Unit = { val shareFetchRequest = request.body[ShareFetchRequest] - // TODO: Implement the ShareFetchRequest handling - requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) + + if (!config.isNewGroupCoordinatorEnabled) { + // The API is not supported by the "old" group coordinator (the default). If the + // new one is not enabled, we fail directly here. + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + return + } else if (!config.isShareGroupEnabled) { + // The API is not supported when the "share" rebalance protocol has not been set explicitly. + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + return + } + val sharePartitionManager : SharePartitionManager = this.sharePartitionManager match { + case Some(manager) => manager + case None => throw new IllegalStateException("ShareFetchRequest received but SharePartitionManager is not initialized") + } + + val groupId = shareFetchRequest.data.groupId + val memberId = shareFetchRequest.data.memberId + val shareSessionEpoch = shareFetchRequest.data.shareSessionEpoch + + def isAcknowledgeDataPresentInFetchRequest() : Boolean = { + var isAcknowledgeDataPresent = false + shareFetchRequest.data.topics.forEach ( topic => { + breakable{ + topic.partitions.forEach ( partition => { + if (partition.acknowledgementBatches != null && !partition.acknowledgementBatches.isEmpty) { + isAcknowledgeDataPresent = true + break + } else { + isAcknowledgeDataPresent = false + } + }) + } + }) + isAcknowledgeDataPresent + } + + val isAcknowledgeDataPresent = isAcknowledgeDataPresentInFetchRequest() + + def isInvalidShareFetchRequest() : Boolean = { + // The Initial Share Fetch Request should not Acknowledge any data. + if (shareSessionEpoch == ShareFetchMetadata.INITIAL_EPOCH && isAcknowledgeDataPresent) { + return true + } + false + } + + val topicNames = metadataCache.topicIdsToNames() + val shareFetchData = shareFetchRequest.shareFetchData(topicNames) + val forgottenTopics = shareFetchRequest.forgottenTopics(topicNames) + + val newReqMetadata : ShareFetchMetadata = new ShareFetchMetadata(Uuid.fromString(memberId), shareSessionEpoch) + var shareFetchContext : ShareFetchContext = null + + var shareFetchResponse : ShareFetchResponse = null + + def updateConversionStats(send: Send): Unit = { + send match { + case send: MultiRecordsSend if send.recordConversionStats != null => + send.recordConversionStats.asScala.toMap.foreach { + case (tp, stats) => updateRecordConversionStats(request, tp, stats) + } + case send: NetworkSend => + updateConversionStats(send.send()) + case _ => + } + } + + // check if the Request is Invalid. If it is, the request is failed directly here. + if(isInvalidShareFetchRequest()) { + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.INVALID_REQUEST.exception)) + CompletableFuture.completedFuture[Unit](()) + return + } + + try { + // Creating the shareFetchContext for Share Session Handling. if context creation fails, the request is failed directly here. + shareFetchContext = sharePartitionManager.newContext(groupId, shareFetchData, forgottenTopics, newReqMetadata) + } catch { + case e: Exception => requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e)) + CompletableFuture.completedFuture[Unit](()) + return + } + + // Variable to store any error thrown while the handling piggybacked acknowledgements. + var acknowledgeError : Errors = Errors.NONE + // Variable to store the topic partition wise result of piggybacked acknowledgements. + var acknowledgeResult = mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]() + + val erroneousAndValidPartitionData : ErroneousAndValidPartitionData = shareFetchContext.getErroneousAndValidTopicIdPartitions + val topicIdPartitionSeq : mutable.Set[TopicIdPartition] = mutable.Set() + erroneousAndValidPartitionData.erroneous.forEach { + case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp + } + erroneousAndValidPartitionData.validTopicIdPartitions.forEach { + case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp + } + shareFetchData.forEach { + case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp + } + + val authorizedTopics = authHelper.filterByAuthorized( + request.context, + READ, + TOPIC, + topicIdPartitionSeq + )(_.topicPartition.topic) + + // Handling the Acknowledgements from the ShareFetchRequest If this check is true, we are sure that this is not an + // Initial ShareFetch Request, otherwise the request would have been invalid. + if(isAcknowledgeDataPresent) { + if (!authHelper.authorize(request.context, READ, GROUP, groupId)) { + acknowledgeError = Errors.GROUP_AUTHORIZATION_FAILED + } else { + acknowledgeResult = handleAcknowledgements(request, topicNames, sharePartitionManager, authorizedTopics, groupId, memberId, true) + } + } + + // Handling the Fetch from the ShareFetchRequest. + try { + shareFetchResponse = handleFetchFromShareFetchRequest( + request, + erroneousAndValidPartitionData, + topicNames, + sharePartitionManager, + shareFetchContext, + authorizedTopics + ) + } catch { + case throwable : Throwable => + debug(s"Share fetch request with correlation from client ${request.header.clientId} " + + s"failed with error ${throwable.getMessage}") + requestHelper.handleError(request, throwable) + return + } + + def combineShareFetchAndShareAcknowledgeResponses( + shareFetchResponse: ShareFetchResponse, + acknowledgeResult : mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData], + acknowledgeError : Errors + ) : ShareFetchResponse = { + + // The outer map has topicId as the key and the inner map has partitionIndex as the key. + val topicPartitionAcknowledgements : mutable.Map[Uuid, mutable.Map[Int, Short]] = mutable.Map() + if(acknowledgeResult != null && acknowledgeResult.nonEmpty) { + acknowledgeResult.asJava.forEach { (tp, partitionData) => + topicPartitionAcknowledgements.get(tp.topicId) match { + case Some(subMap) => + subMap += tp.partition -> partitionData.errorCode + case None => + val partitionAcknowledgementsMap : mutable.Map[Int, Short] = mutable.Map() + partitionAcknowledgementsMap += tp.partition -> partitionData.errorCode + topicPartitionAcknowledgements += tp.topicId -> partitionAcknowledgementsMap + } + } + } + + shareFetchResponse.data.responses.forEach(topic => { + val topicId = topic.topicId + topicPartitionAcknowledgements.get(topicId) match { + case Some(subMap) => + topic.partitions.forEach { partition => + subMap.get(partition.partitionIndex) match { + case Some(value) => + val ackErrorCode = if(acknowledgeError.code != Errors.NONE.code) acknowledgeError.code else value + partition.setAcknowledgeErrorCode(ackErrorCode) + // Delete the element. + subMap.remove(partition.partitionIndex) + case None => + } + } + // Add the remaining acknowledgements. + subMap.foreach { case (partitionIndex, value) => + val ackErrorCode = if(acknowledgeError.code != Errors.NONE.code) acknowledgeError.code else value + val fetchPartitionData = new ShareFetchResponseData.PartitionData() + .setPartitionIndex(partitionIndex) + .setErrorCode(Errors.NONE.code) + .setAcknowledgeErrorCode(ackErrorCode) + topic.partitions.add(fetchPartitionData) + } + topicPartitionAcknowledgements.remove(topicId) + case None => + } + }) + // Add the remaining acknowledgements. + topicPartitionAcknowledgements.foreach{ case(topicId, subMap) => + val topicData = new ShareFetchResponseData.ShareFetchableTopicResponse() + .setTopicId(topicId) + subMap.foreach { case (partitionIndex, value) => + val ackErrorCode = if(acknowledgeError.code != Errors.NONE.code) acknowledgeError.code else value + val fetchPartitionData = new ShareFetchResponseData.PartitionData() + .setPartitionIndex(partitionIndex) + .setErrorCode(Errors.NONE.code) + .setAcknowledgeErrorCode(ackErrorCode) + topicData.partitions.add(fetchPartitionData) + } + shareFetchResponse.data.responses.add(topicData) + } + + if (shareSessionEpoch == ShareFetchMetadata.FINAL_EPOCH) { + sharePartitionManager.releaseAcquiredRecords(groupId, memberId). + whenComplete((releaseAcquiredRecordsData, throwable) => { + if (throwable != null) { + debug(s"Release acquired records on share session close with correlation from client ${request.header.clientId} " + Review Comment: Should it be in `error`? ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -4390,6 +4401,2215 @@ class KafkaApisTest extends Logging { assertEquals("broker2", node.host) } + private def expectedAcquiredRecords(firstOffset : Long, lastOffset : Long, deliveryCount : Int) : util.List[AcquiredRecords] = { + val acquiredRecordsList : util.List[AcquiredRecords] = new util.ArrayList() + acquiredRecordsList.add(new AcquiredRecords() + .setFirstOffset(firstOffset) + .setLastOffset(lastOffset) + .setDeliveryCount(deliveryCount.toShort)) + acquiredRecordsList + } + + private def memoryRecordsBuilder(numOfRecords : Int, startOffset : Long) : MemoryRecordsBuilder = { + + val buffer: ByteBuffer = ByteBuffer.allocate(1024) + val compression: Compression = Compression.of(CompressionType.NONE).build() + val timestampType: TimestampType = TimestampType.CREATE_TIME + + val builder: MemoryRecordsBuilder = MemoryRecords.builder(buffer, compression, timestampType, startOffset) + for (i <- 0 until numOfRecords) { + builder.appendWithOffset(startOffset + i, 0L, TestUtils.randomBytes(10), TestUtils.randomBytes(10)) + } + builder + } Review Comment: Seems the methods are same as defined in `SharePartitionTest`, shall we move them to common test utils (in java)? ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -4004,6 +4488,99 @@ class KafkaApis(val requestChannel: RequestChannel, CompletableFuture.completedFuture[Unit](()) } + + def getAcknowledgeBatchesFromShareFetchRequest( + shareFetchRequest : ShareFetchRequest, + topicNames : util.Map[Uuid, String], + erroneous : mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData], + ) : mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]] = { + + val acknowledgeBatchesMap = mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]]() + shareFetchRequest.data().topics().forEach ( topic => { + + if(!topicNames.asScala.contains(topic.topicId)) { + topic.partitions.forEach((partition: ShareFetchRequestData.FetchPartition) => { + val topicIdPartition = new TopicIdPartition( + topic.topicId, + new TopicPartition(null, partition.partitionIndex)) + erroneous += + topicIdPartition -> ShareAcknowledgeResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID) + }) + } + else { + topic.partitions().forEach ( partition => { + val topicIdPartition = new TopicIdPartition( + topic.topicId(), + new TopicPartition(topicNames.get(topic.topicId()), partition.partitionIndex()) + ) + var exceptionThrown = false + val acknowledgeBatches = new util.ArrayList[ShareAcknowledgementBatch]() + breakable{ + partition.acknowledgementBatches().forEach( batch => { + try { + acknowledgeBatches.add(new ShareAcknowledgementBatch( + batch.firstOffset(), + batch.lastOffset(), + batch.acknowledgeTypes() + )) + } catch { + case e : IllegalArgumentException => + exceptionThrown = true + erroneous += topicIdPartition -> ShareAcknowledgeResponse.partitionResponse(topicIdPartition, Errors.forException(e)) + break + } + }) + } + if(!exceptionThrown && acknowledgeBatches.size() > 0) { + acknowledgeBatchesMap += topicIdPartition -> acknowledgeBatches + } + }) + } + }) + acknowledgeBatchesMap + } + + def validateAcknowledgementBatches( + acknowledgementDataFromRequest : mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]], + erroneous : mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData] + ) : mutable.Set[TopicIdPartition] = { + val erroneousTopicIdPartitions: mutable.Set[TopicIdPartition] = mutable.Set.empty[TopicIdPartition] + acknowledgementDataFromRequest.foreach{ case (tp : TopicIdPartition, acknowledgeBatches : util.List[ShareAcknowledgementBatch]) => + var prevEndOffset = -1L + breakable { + acknowledgeBatches.forEach(batch => { + if (batch.firstOffset > batch.lastOffset) { Review Comment: Yeah it's much readable this way, I agree. ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -132,12 +138,15 @@ class KafkaApisTest extends Logging { private val quotas = QuotaManagers(clientQuotaManager, clientQuotaManager, clientRequestQuotaManager, clientControllerQuotaManager, replicaQuotaManager, replicaQuotaManager, replicaQuotaManager, None) private val fetchManager: FetchManager = mock(classOf[FetchManager]) + private val sharePartitionManager : SharePartitionManager = + new SharePartitionManager(replicaManager, Time.SYSTEM, new ShareSessionCache(1000, 100), 30000, 5, 200, NoOpShareStatePersister.getInstance()) Review Comment: Will it not be better to mock `sharePartitionManager`? ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -4390,6 +4401,2215 @@ class KafkaApisTest extends Logging { assertEquals("broker2", node.host) } + private def expectedAcquiredRecords(firstOffset : Long, lastOffset : Long, deliveryCount : Int) : util.List[AcquiredRecords] = { Review Comment: nit: would it better to have these methods defines later when used in tests? ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -4390,6 +4401,2215 @@ class KafkaApisTest extends Logging { assertEquals("broker2", node.host) } + private def expectedAcquiredRecords(firstOffset : Long, lastOffset : Long, deliveryCount : Int) : util.List[AcquiredRecords] = { + val acquiredRecordsList : util.List[AcquiredRecords] = new util.ArrayList() + acquiredRecordsList.add(new AcquiredRecords() + .setFirstOffset(firstOffset) + .setLastOffset(lastOffset) + .setDeliveryCount(deliveryCount.toShort)) + acquiredRecordsList + } + + private def memoryRecordsBuilder(numOfRecords : Int, startOffset : Long) : MemoryRecordsBuilder = { + + val buffer: ByteBuffer = ByteBuffer.allocate(1024) + val compression: Compression = Compression.of(CompressionType.NONE).build() + val timestampType: TimestampType = TimestampType.CREATE_TIME + + val builder: MemoryRecordsBuilder = MemoryRecords.builder(buffer, compression, timestampType, startOffset) + for (i <- 0 until numOfRecords) { + builder.appendWithOffset(startOffset + i, 0L, TestUtils.randomBytes(10), TestUtils.randomBytes(10)) + } + builder + } + + private def memoryRecords(numOfRecords : Int, startOffset : Long) : MemoryRecords = { + memoryRecordsBuilder(numOfRecords, startOffset).build() + } + + @Test + def testHandleShareFetchRequestSuccessWithoutAcknowledgements(): Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + val partitionIndex = 0 + metadataCache = MetadataCache.kRaftMetadataCache(brokerId) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId : Uuid = Uuid.ZERO_UUID + + val shareSessionEpoch = 0 + + val records = memoryRecords(10, 0) + + val sharePartitionManager : SharePartitionManager = mock(classOf[SharePartitionManager]) + + when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), any())).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareFetchResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchResponseData.PartitionData() + .setErrorCode(Errors.NONE.code) + .setAcknowledgeErrorCode(Errors.NONE.code) + .setRecords(records) + .setAcquiredRecords(new util.ArrayList(List( + new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(0) + .setLastOffset(9) + .setDeliveryCount(1) + ).asJava)) Review Comment: Is `asJava` required? Isn't it already a java API? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org