pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1293448066


##########
core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala:
##########
@@ -109,13 +127,44 @@ class DelegationTokenRequestsTest extends BaseRequestTest 
with SaslSetup {
     val expireResult2 = adminClient.expireDelegationToken(token2.hmac())
     expiryTimestamp = expireResult2.expiryTimestamp().get()
 
+    val expireResult3 = adminClient.expireDelegationToken(token3.hmac())
+    expiryTimestamp = expireResult3.expiryTimestamp().get()
+
+    TestUtils.waitUntilTrue(() => brokers.forall(server => 
server.tokenCache.tokens().size() == 0),
+          "Timed out waiting for token to propagate to all servers")
+
     tokens = adminClient.describeDelegationToken().delegationTokens().get()
     assertTrue(tokens.size == 0)
 
     //create token with invalid principal type
-    val renewer3 = 
List(SecurityUtils.parseKafkaPrincipal("Group:Renewer3")).asJava
-    val createResult3 = adminClient.createDelegationToken(new 
CreateDelegationTokenOptions().renewers(renewer3))
-    assertThrows(classOf[ExecutionException], () => 
createResult3.delegationToken().get()).getCause.isInstanceOf[InvalidPrincipalTypeException]
+    val renewer4 = 
List(SecurityUtils.parseKafkaPrincipal("Group:Renewer4")).asJava
+    val createResult4 = adminClient.createDelegationToken(new 
CreateDelegationTokenOptions().renewers(renewer4))
+    val createResult4Error = assertThrows(classOf[ExecutionException], () => 
createResult4.delegationToken().get())
+    
assertTrue(createResult4Error.getCause.isInstanceOf[InvalidPrincipalTypeException])
+
+    // Try to renew a deleted token
+    val renewResultPostDelete = adminClient.renewDelegationToken(token1.hmac())
+    val renewResultPostDeleteError = assertThrows(classOf[ExecutionException], 
() => renewResultPostDelete.expiryTimestamp().get())
+    
assertTrue(renewResultPostDeleteError.getCause.isInstanceOf[DelegationTokenNotFoundException])
+
+    // Create a DelegationToken with a short lifetime to validate the expire 
code
+    val createResult5 = adminClient.createDelegationToken(new 
CreateDelegationTokenOptions()
+      .renewers(renewer1)
+      .maxlifeTimeMs(60 * 1000))
+    val token5 = createResult5.delegationToken().get()
+
+    TestUtils.waitUntilTrue(() => brokers.forall(server => 
server.tokenCache.tokens().size() == 1),
+          "Timed out waiting for token to propagate to all servers")
+    
+    Thread.sleep(2 * 60 *1000)

Review Comment:
   > is this Thread.sleep required?
   
   Yes, we are testing that the sweep of expired tokens works. We are creating 
a token with a short lifetime and we have set up the sweep of expired tokens to 
run every minute. We are testing that the sweep removed the token it is no 
longer in the cache.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to