jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1004958467


##########
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:
##########
@@ -2236,6 +2236,30 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     assertFalse(response.clusterId.isEmpty, "Cluster id not returned")
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testRetryProducerInitializationAfterPermissionFix(quorum: String): Unit 
= {
+    createTopicWithBrokerPrincipal(topic)
+    val wildcard = new ResourcePattern(TOPIC, 
ResourcePattern.WILDCARD_RESOURCE, LITERAL)
+    val prefixed = new ResourcePattern(TOPIC, "t", PREFIXED)
+    val literal = new ResourcePattern(TOPIC, topic, LITERAL)
+    val allowWriteAce = new AccessControlEntry(clientPrincipalString, 
WildcardHost, WRITE, ALLOW)
+    val denyWriteAce = new AccessControlEntry(clientPrincipalString, 
WildcardHost, WRITE, DENY)
+    val producer = buildIdempotentProducer()
+
+    addAndVerifyAcls(Set(denyWriteAce), wildcard)
+    assertThrows(classOf[Exception], () => {
+      val future = producer.send(new ProducerRecord[Array[Byte], 
Array[Byte]](topic, "hi".getBytes))
+      future.get()
+    })
+    removeAndVerifyAcls(Set(denyWriteAce), wildcard)
+    addAndVerifyAcls(Set(allowWriteAce), prefixed)
+    addAndVerifyAcls(Set(allowWriteAce), literal)
+    val future = producer.send(new ProducerRecord[Array[Byte], 
Array[Byte]](topic, "hi".getBytes))

Review Comment:
   Ah this test makes the benefit a bit more clear to me -- a subsequent send 
call works just fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to