smjn commented on code in PR #20826:
URL: https://github.com/apache/kafka/pull/20826#discussion_r2493691282


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -13765,6 +13899,73 @@ class KafkaApisTest extends Logging {
     assertEquals(alterShareGroupOffsetsResponseData, response.data)
   }
 
+  @ParameterizedTest
+  @CsvSource(value = Array("1,true,true", "1,false,true", "2,true,false", 
"2,false,true"))
+  def testValidateAcknowledgementBatchesForRenew(version: Short, isRenew: 
Boolean, shouldFail: Boolean): Unit = {
+    kafkaApis = createKafkaApis()
+    val tp = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("topic", 0))
+    val ackMap = mutable.Map(tp -> util.List.of(new 
ShareAcknowledgementBatch(0, 0, util.List.of(AcknowledgeType.RENEW.id))))
+    val erroneous:mutable.Map[TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData] = mutable.Map()
+    val errorSet = kafkaApis.validateAcknowledgementBatches(ackMap, erroneous, 
version, isRenewAck = isRenew)
+    if (shouldFail) {
+      assertEquals(1, errorSet.size, s"expected error topic partition, 
version=${version}, isRenew=${isRenew}")
+      assertTrue(errorSet.contains(tp), s"error topic partition mismatch, 
version=${version}, isRenew=${isRenew}")
+    } else {
+      assertEquals(0, errorSet.size, s"unexpected error topic partition, 
version=${version}, isRenew=${isRenew}")
+    }
+  }
+
+  @Test
+  def testHandleShareFetchRenewInvalidRequest(): Unit = {
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    val groupId = "group"
+    val memberId = Uuid.randomUuid()
+    val testPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"test-user")
+    val testClientAddress = InetAddress.getByName("192.168.1.100")
+    val testClientId = "test-client-id"
+    metadataCache = initializeMetadataCacheWithShareGroupsEnabled()
+
+    when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), 
any())).thenReturn(
+      new FinalContext()
+    )
+
+    val shareFetchRequestData = new ShareFetchRequestData()
+      .setGroupId(groupId)
+      .setMemberId(memberId.toString)
+      .setShareSessionEpoch(0)
+      .setIsRenewAck(true)
+      .setMinBytes(10)
+      .setMaxBytes(20)
+      .setMaxRecords(30)
+      .setMaxWaitMs(40)
+      .setTopics(new 
ShareFetchRequestData.FetchTopicCollection(util.List.of(new 
ShareFetchRequestData.FetchTopic()
+        .setTopicId(topicId)
+        .setPartitions(new 
ShareFetchRequestData.FetchPartitionCollection(util.List.of(
+          new ShareFetchRequestData.FetchPartition()
+            .setAcknowledgementBatches(util.List.of(new AcknowledgementBatch()
+              .setFirstOffset(0)
+              .setLastOffset(0)
+              .setAcknowledgeTypes(util.List.of(AcknowledgeType.RENEW.id))))
+            .setPartitionIndex(partitionIndex)
+        ).iterator))
+      ).iterator))
+
+    val shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+
+    // Create request with custom principal and client address to test quota 
tags
+    val requestHeader = new RequestHeader(shareFetchRequest.apiKey, 
shareFetchRequest.version, testClientId, 0)
+    val request = buildRequest(shareFetchRequest, testPrincipal, 
testClientAddress,
+      ListenerName.forSecurityProtocol(SecurityProtocol.SSL), 
fromPrivilegedListener = false, Some(requestHeader), requestChannelMetrics)
+
+    val kafkaApis = createKafkaApis()
+    kafkaApis.handleShareFetchRequest(request)
+    val response = verifyNoThrottling[ShareFetchResponse](request)
+    val responseData = response.data()
+
+    assertEquals(Errors.INVALID_REQUEST.code, responseData.errorCode)

Review Comment:
   I was planning to do this initially but
   
   There is a shortcoming in `Errors` class where the message contained in `e` 
is swallowed (`Errors.forException(e)`) in 
`ShareFetchRequest.getErrorResponse`. Error responses from other requests have 
the same issue as well.
   ErrorCode is way to go.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to