chb2ab commented on code in PR #14444:
URL: https://github.com/apache/kafka/pull/14444#discussion_r1387252814


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2610,6 +2610,73 @@ class KafkaApisTest {
     }
   }
 
+  @Test
+  def testProduceResponseMetadataLookupErrorOnNotLeaderOrFollower(): Unit = {
+    val topic = "topic"
+    metadataCache = mock(classOf[ZkMetadataCache])
+
+    for (version <- 10 to ApiKeys.PRODUCE.latestVersion) {
+
+      reset(replicaManager, clientQuotaManager, clientRequestQuotaManager, 
requestChannel, txnCoordinator)
+
+      val responseCallback: ArgumentCaptor[Map[TopicPartition, 
PartitionResponse] => Unit] = 
ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
+
+      val tp = new TopicPartition(topic, 0)
+
+      val produceRequest = ProduceRequest.forCurrentMagic(new 
ProduceRequestData()
+        .setTopicData(new ProduceRequestData.TopicProduceDataCollection(
+          Collections.singletonList(new ProduceRequestData.TopicProduceData()
+            .setName(tp.topic).setPartitionData(Collections.singletonList(
+            new ProduceRequestData.PartitionProduceData()
+              .setIndex(tp.partition)
+              .setRecords(MemoryRecords.withRecords(CompressionType.NONE, new 
SimpleRecord("test".getBytes))))))
+            .iterator))
+        .setAcks(1.toShort)
+        .setTimeoutMs(5000))
+        .build(version.toShort)
+      val request = buildRequest(produceRequest)
+
+      when(replicaManager.appendRecords(anyLong,
+        anyShort,
+        ArgumentMatchers.eq(false),
+        ArgumentMatchers.eq(AppendOrigin.CLIENT),
+        any(),
+        responseCallback.capture(),
+        any(),
+        any(),
+        any(),
+        any(),
+        any())
+      ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new 
PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER))))
+
+      when(replicaManager.getPartitionOrError(tp)).thenAnswer(_ => 
Left(Errors.UNKNOWN_TOPIC_OR_PARTITION))
+
+      
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+        any[Long])).thenReturn(0)
+      when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+        any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+      when(metadataCache.contains(tp)).thenAnswer(_ => true)
+      when(metadataCache.getPartitionInfo(tp.topic(), 
tp.partition())).thenAnswer(_ => Option.empty)
+      when(metadataCache.getAliveBrokerNode(any(), 
any())).thenReturn(Option.empty)
+
+      createKafkaApis().handleProduceRequest(request, 
RequestLocal.withThreadConfinedCaching)
+
+      val response = verifyNoThrottling[ProduceResponse](request)
+
+      assertEquals(1, response.data.responses.size)
+      val topicProduceResponse = response.data.responses.asScala.head
+      assertEquals(1, topicProduceResponse.partitionResponses.size)
+      val partitionProduceResponse = 
topicProduceResponse.partitionResponses.asScala.head
+      assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, 
Errors.forCode(partitionProduceResponse.errorCode))
+      assertEquals(-1, partitionProduceResponse.currentLeader.leaderId())

Review Comment:
   Right now the client is checking for -1 to decide if a new leader was 
returned or not, we should check that here as well. It might have made more 
sense to use null values but it would be difficult to change now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to