pprovenzano commented on code in PR #14083: URL: https://github.com/apache/kafka/pull/14083#discussion_r1293448066
########## core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala: ########## @@ -109,13 +127,44 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup { val expireResult2 = adminClient.expireDelegationToken(token2.hmac()) expiryTimestamp = expireResult2.expiryTimestamp().get() + val expireResult3 = adminClient.expireDelegationToken(token3.hmac()) + expiryTimestamp = expireResult3.expiryTimestamp().get() + + TestUtils.waitUntilTrue(() => brokers.forall(server => server.tokenCache.tokens().size() == 0), + "Timed out waiting for token to propagate to all servers") + tokens = adminClient.describeDelegationToken().delegationTokens().get() assertTrue(tokens.size == 0) //create token with invalid principal type - val renewer3 = List(SecurityUtils.parseKafkaPrincipal("Group:Renewer3")).asJava - val createResult3 = adminClient.createDelegationToken(new CreateDelegationTokenOptions().renewers(renewer3)) - assertThrows(classOf[ExecutionException], () => createResult3.delegationToken().get()).getCause.isInstanceOf[InvalidPrincipalTypeException] + val renewer4 = List(SecurityUtils.parseKafkaPrincipal("Group:Renewer4")).asJava + val createResult4 = adminClient.createDelegationToken(new CreateDelegationTokenOptions().renewers(renewer4)) + val createResult4Error = assertThrows(classOf[ExecutionException], () => createResult4.delegationToken().get()) + assertTrue(createResult4Error.getCause.isInstanceOf[InvalidPrincipalTypeException]) + + // Try to renew a deleted token + val renewResultPostDelete = adminClient.renewDelegationToken(token1.hmac()) + val renewResultPostDeleteError = assertThrows(classOf[ExecutionException], () => renewResultPostDelete.expiryTimestamp().get()) + assertTrue(renewResultPostDeleteError.getCause.isInstanceOf[DelegationTokenNotFoundException]) + + // Create a DelegationToken with a short lifetime to validate the expire code + val createResult5 = adminClient.createDelegationToken(new CreateDelegationTokenOptions() + .renewers(renewer1) + .maxlifeTimeMs(60 * 1000)) + val token5 = createResult5.delegationToken().get() + + TestUtils.waitUntilTrue(() => brokers.forall(server => server.tokenCache.tokens().size() == 1), + "Timed out waiting for token to propagate to all servers") + + Thread.sleep(2 * 60 *1000) Review Comment: > is this Thread.sleep required? Yes, we are testing that the sweep of expired tokens works. We are creating a token with a short lifetime and we have set up the sweep of expired tokens to run every minute. We are testing that the sweep removed the token it is no longer in the cache. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org