rondagostino commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1296233969


##########
metadata/src/main/java/org/apache/kafka/metadata/DelegationTokenData.java:
##########


Review Comment:
   Is a Unit test necessary, do you think?  I also realize we don't have one 
for `TokenInformation`.



##########
metadata/src/test/java/org/apache/kafka/image/DelegationTokenImageTest.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.image.writer.ImageWriterOptions;
+import org.apache.kafka.image.writer.RecordListWriter;
+import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+
+@Timeout(value = 40)
+public class DelegationTokenImageTest {
+    public final static DelegationTokenImage IMAGE1;
+
+    public final static List<ApiMessageAndVersion> DELTA1_RECORDS;
+
+    final static DelegationTokenDelta DELTA1;
+
+    final static DelegationTokenImage IMAGE2;
+
+    static DelegationTokenData randomDelegationTokenData(String tokenId, long 
expireTimestamp) {
+        TokenInformation ti = new TokenInformation(
+            tokenId,
+            SecurityUtils.parseKafkaPrincipal(KafkaPrincipal.USER_TYPE + ":" + 
"fred"),
+            SecurityUtils.parseKafkaPrincipal(KafkaPrincipal.USER_TYPE + ":" + 
"fred"),
+            new ArrayList<KafkaPrincipal>(),
+            0,
+            1000,
+            expireTimestamp);
+        return new DelegationTokenData(ti);
+    }
+
+    static {
+        Map<String, DelegationTokenData> image1 = new HashMap<>();
+        image1.put("somerandomuuid1", 
randomDelegationTokenData("somerandomuuid1", 100));
+        image1.put("somerandomuuid2", 
randomDelegationTokenData("somerandomuuid2", 100));
+        image1.put("somerandomuuid3", 
randomDelegationTokenData("somerandomuuid3", 100));
+        IMAGE1 = new DelegationTokenImage(image1);
+
+        DELTA1_RECORDS = new ArrayList<>();
+        DELTA1_RECORDS.add(new ApiMessageAndVersion(new 
DelegationTokenRecord().
+            setOwner(KafkaPrincipal.USER_TYPE + ":" + "fred").
+            setRequester(KafkaPrincipal.USER_TYPE + ":" + "fred").
+            setIssueTimestamp(0).
+            setMaxTimestamp(1000).
+            setExpirationTimestamp(200).
+            setTokenId("somerandomuuid1"), (short) 0));
+        DELTA1_RECORDS.add(new ApiMessageAndVersion(new 
RemoveDelegationTokenRecord().
+            setTokenId("somerandomuuid3"), (short) 0));
+
+        DELTA1 = new DelegationTokenDelta(IMAGE1);
+        RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);
+
+        Map<String, DelegationTokenData> image2 = new HashMap<>();
+        image2.put("somerandomuuid1", 
randomDelegationTokenData("somerandomuuid1", 200));
+        image2.put("somerandomuuid2", 
randomDelegationTokenData("somerandomuuid2", 100));
+        IMAGE2 = new DelegationTokenImage(image2);
+    }
+
+    @Test
+    public void testEmptyImageRoundTrip() throws Throwable {
+        testToImageAndBack(DelegationTokenImage.EMPTY);
+    }
+
+    @Test
+    public void testImage1RoundTrip() throws Throwable {
+        testToImageAndBack(IMAGE1);
+    }
+
+    @Test
+    public void testApplyDelta1() throws Throwable {
+        assertEquals(IMAGE2, DELTA1.apply());
+    }
+
+    @Test
+    public void testImage2RoundTrip() throws Throwable {
+        testToImageAndBack(IMAGE2);
+    }
+
+    private void testToImageAndBack(DelegationTokenImage image) throws 
Throwable {
+        RecordListWriter writer = new RecordListWriter();
+        image.write(writer, new ImageWriterOptions.Builder().build());
+        DelegationTokenDelta delta = new 
DelegationTokenDelta(DelegationTokenImage.EMPTY);
+        RecordTestUtils.replayAll(delta, writer.records());
+        DelegationTokenImage nextImage = delta.apply();
+        assertEquals(image, nextImage);
+    }

Review Comment:
   The preferred boilerplate for these methods has changed since you wrote 
this.  Can you look at 
`metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java` and 
follow the same format?



##########
metadata/src/test/java/org/apache/kafka/image/DelegationTokenImageTest.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.image.writer.ImageWriterOptions;
+import org.apache.kafka.image.writer.RecordListWriter;
+import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+
+@Timeout(value = 40)
+public class DelegationTokenImageTest {
+    public final static DelegationTokenImage IMAGE1;
+
+    public final static List<ApiMessageAndVersion> DELTA1_RECORDS;
+
+    final static DelegationTokenDelta DELTA1;
+
+    final static DelegationTokenImage IMAGE2;
+
+    static DelegationTokenData randomDelegationTokenData(String tokenId, long 
expireTimestamp) {
+        TokenInformation ti = new TokenInformation(
+            tokenId,
+            SecurityUtils.parseKafkaPrincipal(KafkaPrincipal.USER_TYPE + ":" + 
"fred"),
+            SecurityUtils.parseKafkaPrincipal(KafkaPrincipal.USER_TYPE + ":" + 
"fred"),
+            new ArrayList<KafkaPrincipal>(),
+            0,
+            1000,
+            expireTimestamp);
+        return new DelegationTokenData(ti);
+    }
+
+    static {
+        Map<String, DelegationTokenData> image1 = new HashMap<>();
+        image1.put("somerandomuuid1", 
randomDelegationTokenData("somerandomuuid1", 100));
+        image1.put("somerandomuuid2", 
randomDelegationTokenData("somerandomuuid2", 100));
+        image1.put("somerandomuuid3", 
randomDelegationTokenData("somerandomuuid3", 100));
+        IMAGE1 = new DelegationTokenImage(image1);
+
+        DELTA1_RECORDS = new ArrayList<>();
+        DELTA1_RECORDS.add(new ApiMessageAndVersion(new 
DelegationTokenRecord().
+            setOwner(KafkaPrincipal.USER_TYPE + ":" + "fred").
+            setRequester(KafkaPrincipal.USER_TYPE + ":" + "fred").
+            setIssueTimestamp(0).
+            setMaxTimestamp(1000).
+            setExpirationTimestamp(200).
+            setTokenId("somerandomuuid1"), (short) 0));
+        DELTA1_RECORDS.add(new ApiMessageAndVersion(new 
RemoveDelegationTokenRecord().
+            setTokenId("somerandomuuid3"), (short) 0));
+
+        DELTA1 = new DelegationTokenDelta(IMAGE1);
+        RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);
+
+        Map<String, DelegationTokenData> image2 = new HashMap<>();
+        image2.put("somerandomuuid1", 
randomDelegationTokenData("somerandomuuid1", 200));
+        image2.put("somerandomuuid2", 
randomDelegationTokenData("somerandomuuid2", 100));
+        IMAGE2 = new DelegationTokenImage(image2);
+    }
+
+    @Test
+    public void testEmptyImageRoundTrip() throws Throwable {
+        testToImageAndBack(DelegationTokenImage.EMPTY);
+    }
+
+    @Test
+    public void testImage1RoundTrip() throws Throwable {
+        testToImageAndBack(IMAGE1);
+    }
+
+    @Test
+    public void testApplyDelta1() throws Throwable {
+        assertEquals(IMAGE2, DELTA1.apply());
+    }
+
+    @Test
+    public void testImage2RoundTrip() throws Throwable {
+        testToImageAndBack(IMAGE2);
+    }
+
+    private void testToImageAndBack(DelegationTokenImage image) throws 
Throwable {
+        RecordListWriter writer = new RecordListWriter();
+        image.write(writer, new ImageWriterOptions.Builder().build());
+        DelegationTokenDelta delta = new 
DelegationTokenDelta(DelegationTokenImage.EMPTY);
+        RecordTestUtils.replayAll(delta, writer.records());
+        DelegationTokenImage nextImage = delta.apply();
+        assertEquals(image, nextImage);
+    }
+
+    @Test
+    public void testEmptyWithInvalidIBP() throws Throwable {

Review Comment:
   `throws Throwable` isn't necessary, same with next test method



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1396,6 +1441,30 @@ private void cancelNextWriteNoOpRecord() {
         queue.cancelDeferred(WRITE_NO_OP_RECORD);
     }
 
+    private static final String SWEEP_EXPIRED_DELEGATION_TOKENS = 
"sweepExpiredDelegationTokens";
+
+    private void maybeScheduleNextExpiredDelegationTokenSweep() {
+        if (featureControl.metadataVersion().isDelegationTokenSupported() &&
+            delegationTokenControlManager.isEnabled()) {
+

Review Comment:
   Would be good to add a debug log message here



##########
metadata/src/main/java/org/apache/kafka/image/DelegationTokenDelta.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.MetadataVersion;
+
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Optional;
+
+
+/**
+ * Represents changes to delegation tokens in the metadata image.
+ */
+public final class DelegationTokenDelta {
+    private final DelegationTokenImage image;
+
+    // Key is TokenID which is contained in the value TokenInformation
+    private final Map<String, Optional<DelegationTokenData>> changes = new 
HashMap<>();
+
+    public DelegationTokenDelta(DelegationTokenImage image) {
+        this.image = image;
+    }
+
+    public void finishSnapshot() {
+        for (String tokenId : image.tokens().keySet()) {
+            if (!changes.containsKey(tokenId)) {
+                // If the tokenId from the image did not appear in the 
snapshot, mark it as removed
+                changes.put(tokenId, Optional.empty());
+
+            }
+        }
+    }
+
+    public DelegationTokenImage image() {
+        return image;
+    }
+
+    public Map<String, Optional<DelegationTokenData>> changes() {
+        return changes;
+    }
+
+    public void replay(DelegationTokenRecord record) {
+        changes.put(record.tokenId(), 
Optional.of(DelegationTokenData.fromRecord(record)));
+    }
+
+    public void replay(RemoveDelegationTokenRecord record) {
+        changes.put(record.tokenId(), Optional.empty());
+    }
+
+    public void handleMetadataVersionChange(MetadataVersion 
changedMetadataVersion) {
+        // nothing to do
+    }
+
+    public DelegationTokenImage apply() {
+        Map<String, DelegationTokenData> newTokens = new HashMap<>();
+        for (Entry<String, DelegationTokenData> entry : 
image.tokens().entrySet()) {

Review Comment:
   might be nice to add a couple of comments.  This top section grabs tokens 
that already existed in the base image and that did not get deleted via the 
delta, and the bottom section adds new tokens that were added in the delta.



##########
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+    private Time time = Time.SYSTEM;
+
+    static class Builder {
+        private LogContext logContext = null;
+        private DelegationTokenCache tokenCache = null;
+        private String secretKeyString = null;
+        private long tokenDefaultMaxLifetime = 0;
+        private long tokenDefaultRenewLifetime = 0;
+
+        Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder setTokenCache(DelegationTokenCache tokenCache) {
+            this.tokenCache = tokenCache;
+            return this;
+        }
+
+        Builder setTokenKeyString(String secretKeyString) {
+            this.secretKeyString = secretKeyString;
+            return this;
+        }
+
+        Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {

Review Comment:
   Should this be called `setTokenDefaultMaxLifetime()`?



##########
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+    private Time time = Time.SYSTEM;
+
+    static class Builder {
+        private LogContext logContext = null;
+        private DelegationTokenCache tokenCache = null;
+        private String secretKeyString = null;
+        private long tokenDefaultMaxLifetime = 0;
+        private long tokenDefaultRenewLifetime = 0;
+
+        Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder setTokenCache(DelegationTokenCache tokenCache) {
+            this.tokenCache = tokenCache;
+            return this;
+        }
+
+        Builder setTokenKeyString(String secretKeyString) {
+            this.secretKeyString = secretKeyString;
+            return this;
+        }
+
+        Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+            this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+            return this;
+        }
+
+        Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+            this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+            return this;
+        }
+
+        DelegationTokenControlManager build() {
+            if (logContext == null) logContext = new LogContext();
+            return new DelegationTokenControlManager(
+              logContext,
+              tokenCache,
+              secretKeyString,
+              tokenDefaultMaxLifetime,
+              tokenDefaultRenewLifetime);
+        }
+    }
+
+    private final Logger log;
+    private final DelegationTokenCache tokenCache;
+    private final String secretKeyString;
+    private final long tokenDefaultMaxLifetime;
+    private final long tokenDefaultRenewLifetime;
+
+    private DelegationTokenControlManager(
+        LogContext logContext,
+        DelegationTokenCache tokenCache,
+        String secretKeyString,
+        long tokenDefaultMaxLifetime,
+        long tokenDefaultRenewLifetime
+    ) {
+        this.log = logContext.logger(DelegationTokenControlManager.class);
+        this.tokenCache = tokenCache;
+        this.secretKeyString = secretKeyString;
+        this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+        this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+    }
+
+    public static byte[] toBytes(String str) {
+        return str.getBytes(StandardCharsets.UTF_8);
+    }
+
+    private byte[] createHmac(String tokenId) throws Exception {

Review Comment:
   `private static` instead of just `private`



##########
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+    private Time time = Time.SYSTEM;
+
+    static class Builder {
+        private LogContext logContext = null;
+        private DelegationTokenCache tokenCache = null;
+        private String secretKeyString = null;
+        private long tokenDefaultMaxLifetime = 0;
+        private long tokenDefaultRenewLifetime = 0;
+
+        Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder setTokenCache(DelegationTokenCache tokenCache) {
+            this.tokenCache = tokenCache;
+            return this;
+        }
+
+        Builder setTokenKeyString(String secretKeyString) {
+            this.secretKeyString = secretKeyString;
+            return this;
+        }
+
+        Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+            this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+            return this;
+        }
+
+        Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+            this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+            return this;
+        }
+
+        DelegationTokenControlManager build() {
+            if (logContext == null) logContext = new LogContext();
+            return new DelegationTokenControlManager(
+              logContext,
+              tokenCache,
+              secretKeyString,
+              tokenDefaultMaxLifetime,
+              tokenDefaultRenewLifetime);
+        }
+    }
+
+    private final Logger log;
+    private final DelegationTokenCache tokenCache;
+    private final String secretKeyString;
+    private final long tokenDefaultMaxLifetime;
+    private final long tokenDefaultRenewLifetime;
+
+    private DelegationTokenControlManager(
+        LogContext logContext,
+        DelegationTokenCache tokenCache,
+        String secretKeyString,
+        long tokenDefaultMaxLifetime,
+        long tokenDefaultRenewLifetime
+    ) {
+        this.log = logContext.logger(DelegationTokenControlManager.class);
+        this.tokenCache = tokenCache;
+        this.secretKeyString = secretKeyString;
+        this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+        this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+    }
+
+    public static byte[] toBytes(String str) {
+        return str.getBytes(StandardCharsets.UTF_8);
+    }
+
+    private byte[] createHmac(String tokenId) throws Exception {
+        Mac mac = Mac.getInstance("HmacSHA512");
+        SecretKeySpec secretKey = new SecretKeySpec(toBytes(secretKeyString), 
mac.getAlgorithm());
+
+        mac.init(secretKey);
+        return mac.doFinal(toBytes(tokenId));
+    }
+
+    private TokenInformation getToken(byte[] hmac) {
+        String base64Pwd = Base64.getEncoder().encodeToString(hmac);
+        return tokenCache.tokenForHmac(base64Pwd);
+    }
+
+    private boolean allowedToRenew(TokenInformation tokenInfo, KafkaPrincipal 
renewer) {
+        if (tokenInfo.owner().equals(renewer)) {
+            return true;
+        }
+        for (KafkaPrincipal validRenewer : tokenInfo.renewers()) {
+            if (validRenewer.equals(renewer)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public boolean isEnabled() {
+        if (secretKeyString != null) {
+            return true;
+        }
+        return false;
+    }
+
+    /*
+     * Pass in the MetadataVersion so that we can return a response to the 
caller 
+     * if the current metadataVersion is too low.
+     */
+    public ControllerResult<CreateDelegationTokenResponseData> 
createDelegationToken(
+        ControllerRequestContext context,
+        CreateDelegationTokenRequestData requestData,
+        MetadataVersion metadataVersion
+    ) {
+        long now = time.milliseconds();
+        long maxLifeTime = tokenDefaultMaxLifetime;
+        if (requestData.maxLifetimeMs() > 0) {
+            maxLifeTime = Math.min(maxLifeTime, requestData.maxLifetimeMs());
+        }
+
+        long maxTimestamp = now + maxLifeTime;
+        long expiryTimestamp = Math.min(maxTimestamp, now + 
tokenDefaultRenewLifetime);
+
+        String tokenId = Uuid.randomUuid().toString();
+
+        KafkaPrincipal owner = context.principal();
+        if ((requestData.ownerPrincipalName() != null) && 
+            (!requestData.ownerPrincipalName().isEmpty())) {
+
+            owner = new KafkaPrincipal(requestData.ownerPrincipalType(), 
requestData.ownerPrincipalName());
+        }
+        CreateDelegationTokenResponseData responseData = new 
CreateDelegationTokenResponseData()
+                .setPrincipalName(owner.getName())
+                .setPrincipalType(owner.getPrincipalType())
+                .setTokenRequesterPrincipalName(context.principal().getName())
+                
.setTokenRequesterPrincipalType(context.principal().getPrincipalType());
+
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+
+        if (secretKeyString == null) {
+            // DelegationTokens are not enabled
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_AUTH_DISABLED.code()));
+        }
+
+        if (!metadataVersion.isDelegationTokenSupported()) {
+            // DelegationTokens are not supported in this metadata version
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(UNSUPPORTED_VERSION.code()));
+        }

Review Comment:
   Should go at top of method?



##########
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+    private Time time = Time.SYSTEM;
+
+    static class Builder {
+        private LogContext logContext = null;
+        private DelegationTokenCache tokenCache = null;
+        private String secretKeyString = null;
+        private long tokenDefaultMaxLifetime = 0;
+        private long tokenDefaultRenewLifetime = 0;
+
+        Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder setTokenCache(DelegationTokenCache tokenCache) {
+            this.tokenCache = tokenCache;
+            return this;
+        }
+
+        Builder setTokenKeyString(String secretKeyString) {
+            this.secretKeyString = secretKeyString;
+            return this;
+        }
+
+        Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+            this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+            return this;
+        }
+
+        Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+            this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+            return this;
+        }
+
+        DelegationTokenControlManager build() {
+            if (logContext == null) logContext = new LogContext();
+            return new DelegationTokenControlManager(
+              logContext,
+              tokenCache,
+              secretKeyString,
+              tokenDefaultMaxLifetime,
+              tokenDefaultRenewLifetime);
+        }
+    }
+
+    private final Logger log;
+    private final DelegationTokenCache tokenCache;
+    private final String secretKeyString;
+    private final long tokenDefaultMaxLifetime;
+    private final long tokenDefaultRenewLifetime;
+
+    private DelegationTokenControlManager(
+        LogContext logContext,
+        DelegationTokenCache tokenCache,
+        String secretKeyString,
+        long tokenDefaultMaxLifetime,
+        long tokenDefaultRenewLifetime
+    ) {
+        this.log = logContext.logger(DelegationTokenControlManager.class);
+        this.tokenCache = tokenCache;
+        this.secretKeyString = secretKeyString;
+        this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+        this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+    }
+
+    public static byte[] toBytes(String str) {
+        return str.getBytes(StandardCharsets.UTF_8);
+    }
+
+    private byte[] createHmac(String tokenId) throws Exception {
+        Mac mac = Mac.getInstance("HmacSHA512");
+        SecretKeySpec secretKey = new SecretKeySpec(toBytes(secretKeyString), 
mac.getAlgorithm());
+
+        mac.init(secretKey);
+        return mac.doFinal(toBytes(tokenId));
+    }
+
+    private TokenInformation getToken(byte[] hmac) {
+        String base64Pwd = Base64.getEncoder().encodeToString(hmac);
+        return tokenCache.tokenForHmac(base64Pwd);
+    }
+
+    private boolean allowedToRenew(TokenInformation tokenInfo, KafkaPrincipal 
renewer) {
+        if (tokenInfo.owner().equals(renewer)) {
+            return true;
+        }
+        for (KafkaPrincipal validRenewer : tokenInfo.renewers()) {
+            if (validRenewer.equals(renewer)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public boolean isEnabled() {
+        if (secretKeyString != null) {
+            return true;
+        }
+        return false;
+    }
+
+    /*
+     * Pass in the MetadataVersion so that we can return a response to the 
caller 
+     * if the current metadataVersion is too low.
+     */
+    public ControllerResult<CreateDelegationTokenResponseData> 
createDelegationToken(
+        ControllerRequestContext context,
+        CreateDelegationTokenRequestData requestData,
+        MetadataVersion metadataVersion
+    ) {
+        long now = time.milliseconds();
+        long maxLifeTime = tokenDefaultMaxLifetime;
+        if (requestData.maxLifetimeMs() > 0) {
+            maxLifeTime = Math.min(maxLifeTime, requestData.maxLifetimeMs());
+        }
+
+        long maxTimestamp = now + maxLifeTime;
+        long expiryTimestamp = Math.min(maxTimestamp, now + 
tokenDefaultRenewLifetime);
+
+        String tokenId = Uuid.randomUuid().toString();
+
+        KafkaPrincipal owner = context.principal();
+        if ((requestData.ownerPrincipalName() != null) && 
+            (!requestData.ownerPrincipalName().isEmpty())) {
+
+            owner = new KafkaPrincipal(requestData.ownerPrincipalType(), 
requestData.ownerPrincipalName());
+        }
+        CreateDelegationTokenResponseData responseData = new 
CreateDelegationTokenResponseData()
+                .setPrincipalName(owner.getName())
+                .setPrincipalType(owner.getPrincipalType())
+                .setTokenRequesterPrincipalName(context.principal().getName())
+                
.setTokenRequesterPrincipalType(context.principal().getPrincipalType());
+
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+
+        if (secretKeyString == null) {
+            // DelegationTokens are not enabled
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_AUTH_DISABLED.code()));
+        }
+
+        if (!metadataVersion.isDelegationTokenSupported()) {
+            // DelegationTokens are not supported in this metadata version
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(UNSUPPORTED_VERSION.code()));
+        }
+
+        List<KafkaPrincipal> renewers = new ArrayList<KafkaPrincipal>();
+        for (CreatableRenewers renewer : requestData.renewers()) {
+            if (renewer.principalType().equals(KafkaPrincipal.USER_TYPE)) {
+                renewers.add(new KafkaPrincipal(renewer.principalType(), 
renewer.principalName()));
+            } else {
+                return ControllerResult.atomicOf(records, 
responseData.setErrorCode(INVALID_PRINCIPAL_TYPE.code()));
+            }
+        }
+
+        byte[] hmac;
+        try {
+            hmac = createHmac(tokenId);
+        } catch (Throwable e) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(ApiError.fromThrowable(e).error().code()));
+        }
+
+        TokenInformation newTokenInformation = new TokenInformation(tokenId, 
owner,
+            context.principal(), renewers, now, maxTimestamp, expiryTimestamp);
+
+        DelegationTokenData newDelegationTokenData = new 
DelegationTokenData(newTokenInformation);
+
+        responseData
+                .setErrorCode(NONE.code())
+                .setIssueTimestampMs(now)
+                .setExpiryTimestampMs(expiryTimestamp)
+                .setMaxTimestampMs(maxTimestamp)
+                .setTokenId(tokenId)
+                .setHmac(hmac);
+
+        records.add(new 
ApiMessageAndVersion(newDelegationTokenData.toRecord(), (short) 0));
+        return ControllerResult.atomicOf(records, responseData);
+    }
+
+    public ControllerResult<RenewDelegationTokenResponseData> 
renewDelegationToken(
+        ControllerRequestContext context,
+        RenewDelegationTokenRequestData requestData,
+        MetadataVersion metadataVersion
+    ) {
+        long now = time.milliseconds();
+
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        RenewDelegationTokenResponseData responseData = new 
RenewDelegationTokenResponseData();
+
+        TokenInformation myTokenInformation = getToken(requestData.hmac());
+
+        if (myTokenInformation == null) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_NOT_FOUND.code()));
+        }
+
+        if (myTokenInformation.maxTimestamp() < now || 
myTokenInformation.expiryTimestamp() < now) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_EXPIRED.code()));
+        }
+
+        if (!allowedToRenew(myTokenInformation, context.principal())) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_OWNER_MISMATCH.code()));
+        }
+
+        long renewLifeTime = tokenDefaultRenewLifetime;
+        if (requestData.renewPeriodMs() > 0) {
+            renewLifeTime = Math.min(renewLifeTime, 
requestData.renewPeriodMs());
+        }
+        long renewTimeStamp = now + renewLifeTime;
+        long expiryTimestamp = Math.min(myTokenInformation.maxTimestamp(), 
renewTimeStamp);
+
+        myTokenInformation.setExpiryTimestamp(expiryTimestamp);
+
+        DelegationTokenData newDelegationTokenData = new 
DelegationTokenData(myTokenInformation);
+
+        responseData
+            .setErrorCode(NONE.code())
+            .setExpiryTimestampMs(expiryTimestamp);
+
+        records.add(new 
ApiMessageAndVersion(newDelegationTokenData.toRecord(), (short) 0));
+        return ControllerResult.atomicOf(records, responseData);
+    }
+
+    public ControllerResult<ExpireDelegationTokenResponseData> 
expireDelegationToken(
+        ControllerRequestContext context,
+        ExpireDelegationTokenRequestData requestData,
+        MetadataVersion metadataVersion
+    ) {
+        long now = time.milliseconds();
+
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        ExpireDelegationTokenResponseData responseData = new 
ExpireDelegationTokenResponseData();
+
+        if (secretKeyString == null) {
+            // DelegationTokens are not enabled
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_AUTH_DISABLED.code()));
+        }

Review Comment:
   Move to top?  Also need to check for support in metadata version?



##########
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+    private Time time = Time.SYSTEM;
+
+    static class Builder {
+        private LogContext logContext = null;
+        private DelegationTokenCache tokenCache = null;
+        private String secretKeyString = null;
+        private long tokenDefaultMaxLifetime = 0;
+        private long tokenDefaultRenewLifetime = 0;
+
+        Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder setTokenCache(DelegationTokenCache tokenCache) {
+            this.tokenCache = tokenCache;
+            return this;
+        }
+
+        Builder setTokenKeyString(String secretKeyString) {
+            this.secretKeyString = secretKeyString;
+            return this;
+        }
+
+        Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+            this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+            return this;
+        }
+
+        Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+            this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+            return this;
+        }
+
+        DelegationTokenControlManager build() {
+            if (logContext == null) logContext = new LogContext();
+            return new DelegationTokenControlManager(
+              logContext,
+              tokenCache,
+              secretKeyString,
+              tokenDefaultMaxLifetime,
+              tokenDefaultRenewLifetime);
+        }
+    }
+
+    private final Logger log;
+    private final DelegationTokenCache tokenCache;
+    private final String secretKeyString;
+    private final long tokenDefaultMaxLifetime;
+    private final long tokenDefaultRenewLifetime;
+
+    private DelegationTokenControlManager(
+        LogContext logContext,
+        DelegationTokenCache tokenCache,
+        String secretKeyString,
+        long tokenDefaultMaxLifetime,
+        long tokenDefaultRenewLifetime
+    ) {
+        this.log = logContext.logger(DelegationTokenControlManager.class);
+        this.tokenCache = tokenCache;
+        this.secretKeyString = secretKeyString;
+        this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+        this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+    }
+
+    public static byte[] toBytes(String str) {
+        return str.getBytes(StandardCharsets.UTF_8);
+    }
+
+    private byte[] createHmac(String tokenId) throws Exception {
+        Mac mac = Mac.getInstance("HmacSHA512");
+        SecretKeySpec secretKey = new SecretKeySpec(toBytes(secretKeyString), 
mac.getAlgorithm());
+
+        mac.init(secretKey);
+        return mac.doFinal(toBytes(tokenId));
+    }
+
+    private TokenInformation getToken(byte[] hmac) {
+        String base64Pwd = Base64.getEncoder().encodeToString(hmac);
+        return tokenCache.tokenForHmac(base64Pwd);
+    }
+
+    private boolean allowedToRenew(TokenInformation tokenInfo, KafkaPrincipal 
renewer) {
+        if (tokenInfo.owner().equals(renewer)) {
+            return true;
+        }
+        for (KafkaPrincipal validRenewer : tokenInfo.renewers()) {
+            if (validRenewer.equals(renewer)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public boolean isEnabled() {
+        if (secretKeyString != null) {
+            return true;
+        }
+        return false;
+    }
+
+    /*
+     * Pass in the MetadataVersion so that we can return a response to the 
caller 
+     * if the current metadataVersion is too low.
+     */
+    public ControllerResult<CreateDelegationTokenResponseData> 
createDelegationToken(
+        ControllerRequestContext context,
+        CreateDelegationTokenRequestData requestData,
+        MetadataVersion metadataVersion
+    ) {
+        long now = time.milliseconds();
+        long maxLifeTime = tokenDefaultMaxLifetime;
+        if (requestData.maxLifetimeMs() > 0) {
+            maxLifeTime = Math.min(maxLifeTime, requestData.maxLifetimeMs());
+        }
+
+        long maxTimestamp = now + maxLifeTime;
+        long expiryTimestamp = Math.min(maxTimestamp, now + 
tokenDefaultRenewLifetime);
+
+        String tokenId = Uuid.randomUuid().toString();
+
+        KafkaPrincipal owner = context.principal();
+        if ((requestData.ownerPrincipalName() != null) && 
+            (!requestData.ownerPrincipalName().isEmpty())) {
+
+            owner = new KafkaPrincipal(requestData.ownerPrincipalType(), 
requestData.ownerPrincipalName());
+        }
+        CreateDelegationTokenResponseData responseData = new 
CreateDelegationTokenResponseData()
+                .setPrincipalName(owner.getName())
+                .setPrincipalType(owner.getPrincipalType())
+                .setTokenRequesterPrincipalName(context.principal().getName())
+                
.setTokenRequesterPrincipalType(context.principal().getPrincipalType());
+
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+
+        if (secretKeyString == null) {
+            // DelegationTokens are not enabled
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_AUTH_DISABLED.code()));
+        }
+
+        if (!metadataVersion.isDelegationTokenSupported()) {
+            // DelegationTokens are not supported in this metadata version
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(UNSUPPORTED_VERSION.code()));
+        }
+
+        List<KafkaPrincipal> renewers = new ArrayList<KafkaPrincipal>();
+        for (CreatableRenewers renewer : requestData.renewers()) {
+            if (renewer.principalType().equals(KafkaPrincipal.USER_TYPE)) {
+                renewers.add(new KafkaPrincipal(renewer.principalType(), 
renewer.principalName()));
+            } else {
+                return ControllerResult.atomicOf(records, 
responseData.setErrorCode(INVALID_PRINCIPAL_TYPE.code()));
+            }
+        }
+
+        byte[] hmac;
+        try {
+            hmac = createHmac(tokenId);
+        } catch (Throwable e) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(ApiError.fromThrowable(e).error().code()));
+        }
+
+        TokenInformation newTokenInformation = new TokenInformation(tokenId, 
owner,
+            context.principal(), renewers, now, maxTimestamp, expiryTimestamp);
+
+        DelegationTokenData newDelegationTokenData = new 
DelegationTokenData(newTokenInformation);
+
+        responseData
+                .setErrorCode(NONE.code())
+                .setIssueTimestampMs(now)
+                .setExpiryTimestampMs(expiryTimestamp)
+                .setMaxTimestampMs(maxTimestamp)
+                .setTokenId(tokenId)
+                .setHmac(hmac);
+
+        records.add(new 
ApiMessageAndVersion(newDelegationTokenData.toRecord(), (short) 0));
+        return ControllerResult.atomicOf(records, responseData);
+    }
+
+    public ControllerResult<RenewDelegationTokenResponseData> 
renewDelegationToken(
+        ControllerRequestContext context,
+        RenewDelegationTokenRequestData requestData,
+        MetadataVersion metadataVersion
+    ) {
+        long now = time.milliseconds();
+
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        RenewDelegationTokenResponseData responseData = new 
RenewDelegationTokenResponseData();
+
+        TokenInformation myTokenInformation = getToken(requestData.hmac());
+
+        if (myTokenInformation == null) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_NOT_FOUND.code()));
+        }
+
+        if (myTokenInformation.maxTimestamp() < now || 
myTokenInformation.expiryTimestamp() < now) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_EXPIRED.code()));
+        }
+
+        if (!allowedToRenew(myTokenInformation, context.principal())) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_OWNER_MISMATCH.code()));
+        }
+
+        long renewLifeTime = tokenDefaultRenewLifetime;
+        if (requestData.renewPeriodMs() > 0) {
+            renewLifeTime = Math.min(renewLifeTime, 
requestData.renewPeriodMs());
+        }
+        long renewTimeStamp = now + renewLifeTime;
+        long expiryTimestamp = Math.min(myTokenInformation.maxTimestamp(), 
renewTimeStamp);
+
+        myTokenInformation.setExpiryTimestamp(expiryTimestamp);

Review Comment:
   This is updating the record directly within the cache, but it is okay since 
this is the only thread accessing that cache -- is that correct?  Might be 
worth a comment to this effect in `getToken()` and 
`sweepExpiredDelegationTokens()` where we use the token cache.
   
   How does the delegation token cache get updated in other nodes?  Shouldn't 
this updating of the token cache happen when the written record gets replayed, 
as opposed to here?



##########
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+    private Time time = Time.SYSTEM;
+
+    static class Builder {
+        private LogContext logContext = null;
+        private DelegationTokenCache tokenCache = null;
+        private String secretKeyString = null;
+        private long tokenDefaultMaxLifetime = 0;
+        private long tokenDefaultRenewLifetime = 0;
+
+        Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder setTokenCache(DelegationTokenCache tokenCache) {
+            this.tokenCache = tokenCache;
+            return this;
+        }
+
+        Builder setTokenKeyString(String secretKeyString) {
+            this.secretKeyString = secretKeyString;
+            return this;
+        }
+
+        Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+            this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+            return this;
+        }
+
+        Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+            this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+            return this;
+        }
+
+        DelegationTokenControlManager build() {
+            if (logContext == null) logContext = new LogContext();
+            return new DelegationTokenControlManager(
+              logContext,
+              tokenCache,
+              secretKeyString,
+              tokenDefaultMaxLifetime,
+              tokenDefaultRenewLifetime);
+        }
+    }
+
+    private final Logger log;
+    private final DelegationTokenCache tokenCache;
+    private final String secretKeyString;
+    private final long tokenDefaultMaxLifetime;
+    private final long tokenDefaultRenewLifetime;
+
+    private DelegationTokenControlManager(
+        LogContext logContext,
+        DelegationTokenCache tokenCache,
+        String secretKeyString,
+        long tokenDefaultMaxLifetime,
+        long tokenDefaultRenewLifetime
+    ) {
+        this.log = logContext.logger(DelegationTokenControlManager.class);
+        this.tokenCache = tokenCache;
+        this.secretKeyString = secretKeyString;
+        this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+        this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+    }
+
+    public static byte[] toBytes(String str) {
+        return str.getBytes(StandardCharsets.UTF_8);
+    }
+
+    private byte[] createHmac(String tokenId) throws Exception {
+        Mac mac = Mac.getInstance("HmacSHA512");
+        SecretKeySpec secretKey = new SecretKeySpec(toBytes(secretKeyString), 
mac.getAlgorithm());
+
+        mac.init(secretKey);
+        return mac.doFinal(toBytes(tokenId));
+    }
+
+    private TokenInformation getToken(byte[] hmac) {
+        String base64Pwd = Base64.getEncoder().encodeToString(hmac);
+        return tokenCache.tokenForHmac(base64Pwd);
+    }
+
+    private boolean allowedToRenew(TokenInformation tokenInfo, KafkaPrincipal 
renewer) {
+        if (tokenInfo.owner().equals(renewer)) {
+            return true;
+        }
+        for (KafkaPrincipal validRenewer : tokenInfo.renewers()) {
+            if (validRenewer.equals(renewer)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public boolean isEnabled() {
+        if (secretKeyString != null) {
+            return true;
+        }
+        return false;
+    }
+
+    /*
+     * Pass in the MetadataVersion so that we can return a response to the 
caller 
+     * if the current metadataVersion is too low.
+     */
+    public ControllerResult<CreateDelegationTokenResponseData> 
createDelegationToken(
+        ControllerRequestContext context,
+        CreateDelegationTokenRequestData requestData,
+        MetadataVersion metadataVersion
+    ) {
+        long now = time.milliseconds();
+        long maxLifeTime = tokenDefaultMaxLifetime;
+        if (requestData.maxLifetimeMs() > 0) {
+            maxLifeTime = Math.min(maxLifeTime, requestData.maxLifetimeMs());
+        }
+
+        long maxTimestamp = now + maxLifeTime;
+        long expiryTimestamp = Math.min(maxTimestamp, now + 
tokenDefaultRenewLifetime);
+
+        String tokenId = Uuid.randomUuid().toString();
+
+        KafkaPrincipal owner = context.principal();
+        if ((requestData.ownerPrincipalName() != null) && 
+            (!requestData.ownerPrincipalName().isEmpty())) {
+
+            owner = new KafkaPrincipal(requestData.ownerPrincipalType(), 
requestData.ownerPrincipalName());
+        }
+        CreateDelegationTokenResponseData responseData = new 
CreateDelegationTokenResponseData()
+                .setPrincipalName(owner.getName())
+                .setPrincipalType(owner.getPrincipalType())
+                .setTokenRequesterPrincipalName(context.principal().getName())
+                
.setTokenRequesterPrincipalType(context.principal().getPrincipalType());
+
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+
+        if (secretKeyString == null) {
+            // DelegationTokens are not enabled
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_AUTH_DISABLED.code()));
+        }
+
+        if (!metadataVersion.isDelegationTokenSupported()) {
+            // DelegationTokens are not supported in this metadata version
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(UNSUPPORTED_VERSION.code()));
+        }
+
+        List<KafkaPrincipal> renewers = new ArrayList<KafkaPrincipal>();
+        for (CreatableRenewers renewer : requestData.renewers()) {
+            if (renewer.principalType().equals(KafkaPrincipal.USER_TYPE)) {
+                renewers.add(new KafkaPrincipal(renewer.principalType(), 
renewer.principalName()));
+            } else {
+                return ControllerResult.atomicOf(records, 
responseData.setErrorCode(INVALID_PRINCIPAL_TYPE.code()));
+            }
+        }
+
+        byte[] hmac;
+        try {
+            hmac = createHmac(tokenId);
+        } catch (Throwable e) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(ApiError.fromThrowable(e).error().code()));
+        }
+
+        TokenInformation newTokenInformation = new TokenInformation(tokenId, 
owner,
+            context.principal(), renewers, now, maxTimestamp, expiryTimestamp);
+
+        DelegationTokenData newDelegationTokenData = new 
DelegationTokenData(newTokenInformation);
+
+        responseData
+                .setErrorCode(NONE.code())
+                .setIssueTimestampMs(now)
+                .setExpiryTimestampMs(expiryTimestamp)
+                .setMaxTimestampMs(maxTimestamp)
+                .setTokenId(tokenId)
+                .setHmac(hmac);
+
+        records.add(new 
ApiMessageAndVersion(newDelegationTokenData.toRecord(), (short) 0));
+        return ControllerResult.atomicOf(records, responseData);
+    }
+
+    public ControllerResult<RenewDelegationTokenResponseData> 
renewDelegationToken(
+        ControllerRequestContext context,
+        RenewDelegationTokenRequestData requestData,
+        MetadataVersion metadataVersion
+    ) {
+        long now = time.milliseconds();
+
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        RenewDelegationTokenResponseData responseData = new 
RenewDelegationTokenResponseData();
+
+        TokenInformation myTokenInformation = getToken(requestData.hmac());
+
+        if (myTokenInformation == null) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_NOT_FOUND.code()));
+        }
+
+        if (myTokenInformation.maxTimestamp() < now || 
myTokenInformation.expiryTimestamp() < now) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_EXPIRED.code()));
+        }
+
+        if (!allowedToRenew(myTokenInformation, context.principal())) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_OWNER_MISMATCH.code()));
+        }
+
+        long renewLifeTime = tokenDefaultRenewLifetime;
+        if (requestData.renewPeriodMs() > 0) {
+            renewLifeTime = Math.min(renewLifeTime, 
requestData.renewPeriodMs());
+        }
+        long renewTimeStamp = now + renewLifeTime;
+        long expiryTimestamp = Math.min(myTokenInformation.maxTimestamp(), 
renewTimeStamp);
+
+        myTokenInformation.setExpiryTimestamp(expiryTimestamp);
+
+        DelegationTokenData newDelegationTokenData = new 
DelegationTokenData(myTokenInformation);
+
+        responseData
+            .setErrorCode(NONE.code())
+            .setExpiryTimestampMs(expiryTimestamp);
+
+        records.add(new 
ApiMessageAndVersion(newDelegationTokenData.toRecord(), (short) 0));
+        return ControllerResult.atomicOf(records, responseData);
+    }
+
+    public ControllerResult<ExpireDelegationTokenResponseData> 
expireDelegationToken(
+        ControllerRequestContext context,
+        ExpireDelegationTokenRequestData requestData,
+        MetadataVersion metadataVersion
+    ) {
+        long now = time.milliseconds();
+
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        ExpireDelegationTokenResponseData responseData = new 
ExpireDelegationTokenResponseData();
+
+        if (secretKeyString == null) {
+            // DelegationTokens are not enabled
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_AUTH_DISABLED.code()));
+        }
+
+        TokenInformation myTokenInformation = getToken(requestData.hmac());
+
+        if (myTokenInformation == null) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_NOT_FOUND.code()));
+        }
+
+        if (myTokenInformation.maxTimestamp() < now || 
myTokenInformation.expiryTimestamp() < now) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_EXPIRED.code()));
+        }
+
+        if (!allowedToRenew(myTokenInformation, context.principal())) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_OWNER_MISMATCH.code()));
+        }
+
+        if (requestData.expiryTimePeriodMs() < 0) { // expire immediately
+            responseData
+                .setErrorCode(NONE.code())
+                .setExpiryTimestampMs(requestData.expiryTimePeriodMs());
+            records.add(new ApiMessageAndVersion(new 
RemoveDelegationTokenRecord().
+                setTokenId(myTokenInformation.tokenId()), (short) 0));
+        } else {
+            long expiryTimestamp = Math.min(myTokenInformation.maxTimestamp(),
+                now + requestData.expiryTimePeriodMs());
+
+            responseData
+                .setErrorCode(NONE.code())
+                .setExpiryTimestampMs(expiryTimestamp);
+
+            myTokenInformation.setExpiryTimestamp(expiryTimestamp);

Review Comment:
   Same question as above -- should this really be done here, or should it be 
done when the record gets replayed?



##########
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+    private Time time = Time.SYSTEM;
+
+    static class Builder {
+        private LogContext logContext = null;
+        private DelegationTokenCache tokenCache = null;
+        private String secretKeyString = null;
+        private long tokenDefaultMaxLifetime = 0;
+        private long tokenDefaultRenewLifetime = 0;
+
+        Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder setTokenCache(DelegationTokenCache tokenCache) {
+            this.tokenCache = tokenCache;
+            return this;
+        }
+
+        Builder setTokenKeyString(String secretKeyString) {
+            this.secretKeyString = secretKeyString;
+            return this;
+        }
+
+        Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+            this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+            return this;
+        }
+
+        Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+            this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+            return this;
+        }
+
+        DelegationTokenControlManager build() {
+            if (logContext == null) logContext = new LogContext();
+            return new DelegationTokenControlManager(
+              logContext,
+              tokenCache,
+              secretKeyString,
+              tokenDefaultMaxLifetime,
+              tokenDefaultRenewLifetime);
+        }
+    }
+
+    private final Logger log;
+    private final DelegationTokenCache tokenCache;
+    private final String secretKeyString;
+    private final long tokenDefaultMaxLifetime;
+    private final long tokenDefaultRenewLifetime;
+
+    private DelegationTokenControlManager(
+        LogContext logContext,
+        DelegationTokenCache tokenCache,
+        String secretKeyString,
+        long tokenDefaultMaxLifetime,
+        long tokenDefaultRenewLifetime
+    ) {
+        this.log = logContext.logger(DelegationTokenControlManager.class);
+        this.tokenCache = tokenCache;
+        this.secretKeyString = secretKeyString;
+        this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+        this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+    }
+
+    public static byte[] toBytes(String str) {
+        return str.getBytes(StandardCharsets.UTF_8);
+    }
+
+    private byte[] createHmac(String tokenId) throws Exception {
+        Mac mac = Mac.getInstance("HmacSHA512");
+        SecretKeySpec secretKey = new SecretKeySpec(toBytes(secretKeyString), 
mac.getAlgorithm());
+
+        mac.init(secretKey);
+        return mac.doFinal(toBytes(tokenId));
+    }
+
+    private TokenInformation getToken(byte[] hmac) {
+        String base64Pwd = Base64.getEncoder().encodeToString(hmac);
+        return tokenCache.tokenForHmac(base64Pwd);
+    }
+
+    private boolean allowedToRenew(TokenInformation tokenInfo, KafkaPrincipal 
renewer) {
+        if (tokenInfo.owner().equals(renewer)) {
+            return true;
+        }
+        for (KafkaPrincipal validRenewer : tokenInfo.renewers()) {
+            if (validRenewer.equals(renewer)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public boolean isEnabled() {
+        if (secretKeyString != null) {
+            return true;
+        }
+        return false;
+    }
+
+    /*
+     * Pass in the MetadataVersion so that we can return a response to the 
caller 
+     * if the current metadataVersion is too low.
+     */
+    public ControllerResult<CreateDelegationTokenResponseData> 
createDelegationToken(
+        ControllerRequestContext context,
+        CreateDelegationTokenRequestData requestData,
+        MetadataVersion metadataVersion
+    ) {
+        long now = time.milliseconds();
+        long maxLifeTime = tokenDefaultMaxLifetime;
+        if (requestData.maxLifetimeMs() > 0) {
+            maxLifeTime = Math.min(maxLifeTime, requestData.maxLifetimeMs());
+        }
+
+        long maxTimestamp = now + maxLifeTime;
+        long expiryTimestamp = Math.min(maxTimestamp, now + 
tokenDefaultRenewLifetime);
+
+        String tokenId = Uuid.randomUuid().toString();
+
+        KafkaPrincipal owner = context.principal();
+        if ((requestData.ownerPrincipalName() != null) && 
+            (!requestData.ownerPrincipalName().isEmpty())) {
+
+            owner = new KafkaPrincipal(requestData.ownerPrincipalType(), 
requestData.ownerPrincipalName());
+        }
+        CreateDelegationTokenResponseData responseData = new 
CreateDelegationTokenResponseData()
+                .setPrincipalName(owner.getName())
+                .setPrincipalType(owner.getPrincipalType())
+                .setTokenRequesterPrincipalName(context.principal().getName())
+                
.setTokenRequesterPrincipalType(context.principal().getPrincipalType());
+
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+
+        if (secretKeyString == null) {
+            // DelegationTokens are not enabled
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_AUTH_DISABLED.code()));
+        }
+
+        if (!metadataVersion.isDelegationTokenSupported()) {
+            // DelegationTokens are not supported in this metadata version
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(UNSUPPORTED_VERSION.code()));
+        }
+
+        List<KafkaPrincipal> renewers = new ArrayList<KafkaPrincipal>();
+        for (CreatableRenewers renewer : requestData.renewers()) {
+            if (renewer.principalType().equals(KafkaPrincipal.USER_TYPE)) {
+                renewers.add(new KafkaPrincipal(renewer.principalType(), 
renewer.principalName()));
+            } else {
+                return ControllerResult.atomicOf(records, 
responseData.setErrorCode(INVALID_PRINCIPAL_TYPE.code()));
+            }
+        }
+
+        byte[] hmac;
+        try {
+            hmac = createHmac(tokenId);
+        } catch (Throwable e) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(ApiError.fromThrowable(e).error().code()));
+        }
+
+        TokenInformation newTokenInformation = new TokenInformation(tokenId, 
owner,
+            context.principal(), renewers, now, maxTimestamp, expiryTimestamp);
+
+        DelegationTokenData newDelegationTokenData = new 
DelegationTokenData(newTokenInformation);
+
+        responseData
+                .setErrorCode(NONE.code())
+                .setIssueTimestampMs(now)
+                .setExpiryTimestampMs(expiryTimestamp)
+                .setMaxTimestampMs(maxTimestamp)
+                .setTokenId(tokenId)
+                .setHmac(hmac);
+
+        records.add(new 
ApiMessageAndVersion(newDelegationTokenData.toRecord(), (short) 0));
+        return ControllerResult.atomicOf(records, responseData);
+    }
+
+    public ControllerResult<RenewDelegationTokenResponseData> 
renewDelegationToken(
+        ControllerRequestContext context,
+        RenewDelegationTokenRequestData requestData,
+        MetadataVersion metadataVersion
+    ) {
+        long now = time.milliseconds();
+
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        RenewDelegationTokenResponseData responseData = new 
RenewDelegationTokenResponseData();
+
+        TokenInformation myTokenInformation = getToken(requestData.hmac());
+
+        if (myTokenInformation == null) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_NOT_FOUND.code()));
+        }
+
+        if (myTokenInformation.maxTimestamp() < now || 
myTokenInformation.expiryTimestamp() < now) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_EXPIRED.code()));
+        }
+
+        if (!allowedToRenew(myTokenInformation, context.principal())) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_OWNER_MISMATCH.code()));
+        }
+
+        long renewLifeTime = tokenDefaultRenewLifetime;
+        if (requestData.renewPeriodMs() > 0) {
+            renewLifeTime = Math.min(renewLifeTime, 
requestData.renewPeriodMs());
+        }
+        long renewTimeStamp = now + renewLifeTime;
+        long expiryTimestamp = Math.min(myTokenInformation.maxTimestamp(), 
renewTimeStamp);
+
+        myTokenInformation.setExpiryTimestamp(expiryTimestamp);
+
+        DelegationTokenData newDelegationTokenData = new 
DelegationTokenData(myTokenInformation);
+
+        responseData
+            .setErrorCode(NONE.code())
+            .setExpiryTimestampMs(expiryTimestamp);
+
+        records.add(new 
ApiMessageAndVersion(newDelegationTokenData.toRecord(), (short) 0));
+        return ControllerResult.atomicOf(records, responseData);
+    }
+
+    public ControllerResult<ExpireDelegationTokenResponseData> 
expireDelegationToken(
+        ControllerRequestContext context,
+        ExpireDelegationTokenRequestData requestData,
+        MetadataVersion metadataVersion
+    ) {
+        long now = time.milliseconds();
+
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        ExpireDelegationTokenResponseData responseData = new 
ExpireDelegationTokenResponseData();
+
+        if (secretKeyString == null) {
+            // DelegationTokens are not enabled
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_AUTH_DISABLED.code()));
+        }
+
+        TokenInformation myTokenInformation = getToken(requestData.hmac());
+
+        if (myTokenInformation == null) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_NOT_FOUND.code()));
+        }
+
+        if (myTokenInformation.maxTimestamp() < now || 
myTokenInformation.expiryTimestamp() < now) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_EXPIRED.code()));
+        }
+
+        if (!allowedToRenew(myTokenInformation, context.principal())) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_OWNER_MISMATCH.code()));
+        }

Review Comment:
   Seems we check this string of conditions in multiple places -- worth 
refactoring it out into a single method so we can be confident that everybody 
checks everything?



##########
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+    private Time time = Time.SYSTEM;
+
+    static class Builder {
+        private LogContext logContext = null;
+        private DelegationTokenCache tokenCache = null;
+        private String secretKeyString = null;
+        private long tokenDefaultMaxLifetime = 0;
+        private long tokenDefaultRenewLifetime = 0;
+
+        Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder setTokenCache(DelegationTokenCache tokenCache) {
+            this.tokenCache = tokenCache;
+            return this;
+        }
+
+        Builder setTokenKeyString(String secretKeyString) {
+            this.secretKeyString = secretKeyString;
+            return this;
+        }
+
+        Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+            this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+            return this;
+        }
+
+        Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{

Review Comment:
   Should this be called `setTokenDefaultRenewLifetime()`?



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -202,6 +211,11 @@ static public class Builder {
         private BootstrapMetadata bootstrapMetadata = null;
         private int maxRecordsPerBatch = MAX_RECORDS_PER_BATCH;
         private boolean zkMigrationEnabled = false;
+        private DelegationTokenCache tokenCache;
+        private String tokenKeyString;

Review Comment:
   Any particular reason why this is called `tokenKeyString` instead of 
`tokenSecretKeyString` or `tokenSecretString`?



##########
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+    private Time time = Time.SYSTEM;
+
+    static class Builder {
+        private LogContext logContext = null;
+        private DelegationTokenCache tokenCache = null;
+        private String secretKeyString = null;
+        private long tokenDefaultMaxLifetime = 0;
+        private long tokenDefaultRenewLifetime = 0;
+
+        Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder setTokenCache(DelegationTokenCache tokenCache) {
+            this.tokenCache = tokenCache;
+            return this;
+        }
+
+        Builder setTokenKeyString(String secretKeyString) {

Review Comment:
   Any reason why this is called `setTokenKeyString()` instead of 
`setSecretKeyString()`?



##########
metadata/src/main/java/org/apache/kafka/image/node/DelegationTokenDataNode.java:
##########


Review Comment:
   Seems we should have a unit test (to confirm redaction)



##########
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+    private Time time = Time.SYSTEM;
+
+    static class Builder {
+        private LogContext logContext = null;
+        private DelegationTokenCache tokenCache = null;
+        private String secretKeyString = null;
+        private long tokenDefaultMaxLifetime = 0;
+        private long tokenDefaultRenewLifetime = 0;
+
+        Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder setTokenCache(DelegationTokenCache tokenCache) {
+            this.tokenCache = tokenCache;
+            return this;
+        }
+
+        Builder setTokenKeyString(String secretKeyString) {
+            this.secretKeyString = secretKeyString;
+            return this;
+        }
+
+        Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+            this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+            return this;
+        }
+
+        Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+            this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+            return this;
+        }
+
+        DelegationTokenControlManager build() {
+            if (logContext == null) logContext = new LogContext();
+            return new DelegationTokenControlManager(
+              logContext,
+              tokenCache,
+              secretKeyString,
+              tokenDefaultMaxLifetime,
+              tokenDefaultRenewLifetime);
+        }
+    }
+
+    private final Logger log;
+    private final DelegationTokenCache tokenCache;
+    private final String secretKeyString;
+    private final long tokenDefaultMaxLifetime;
+    private final long tokenDefaultRenewLifetime;
+
+    private DelegationTokenControlManager(
+        LogContext logContext,
+        DelegationTokenCache tokenCache,
+        String secretKeyString,
+        long tokenDefaultMaxLifetime,
+        long tokenDefaultRenewLifetime
+    ) {
+        this.log = logContext.logger(DelegationTokenControlManager.class);
+        this.tokenCache = tokenCache;
+        this.secretKeyString = secretKeyString;
+        this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+        this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+    }
+
+    public static byte[] toBytes(String str) {

Review Comment:
   `private` instead of `public`?



##########
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+    private Time time = Time.SYSTEM;
+
+    static class Builder {
+        private LogContext logContext = null;
+        private DelegationTokenCache tokenCache = null;
+        private String secretKeyString = null;
+        private long tokenDefaultMaxLifetime = 0;
+        private long tokenDefaultRenewLifetime = 0;
+
+        Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder setTokenCache(DelegationTokenCache tokenCache) {
+            this.tokenCache = tokenCache;
+            return this;
+        }
+
+        Builder setTokenKeyString(String secretKeyString) {
+            this.secretKeyString = secretKeyString;
+            return this;
+        }
+
+        Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+            this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+            return this;
+        }
+
+        Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+            this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+            return this;
+        }
+
+        DelegationTokenControlManager build() {
+            if (logContext == null) logContext = new LogContext();
+            return new DelegationTokenControlManager(
+              logContext,
+              tokenCache,
+              secretKeyString,
+              tokenDefaultMaxLifetime,
+              tokenDefaultRenewLifetime);
+        }
+    }
+
+    private final Logger log;
+    private final DelegationTokenCache tokenCache;
+    private final String secretKeyString;
+    private final long tokenDefaultMaxLifetime;
+    private final long tokenDefaultRenewLifetime;
+
+    private DelegationTokenControlManager(
+        LogContext logContext,
+        DelegationTokenCache tokenCache,
+        String secretKeyString,
+        long tokenDefaultMaxLifetime,
+        long tokenDefaultRenewLifetime
+    ) {
+        this.log = logContext.logger(DelegationTokenControlManager.class);
+        this.tokenCache = tokenCache;
+        this.secretKeyString = secretKeyString;
+        this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+        this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+    }
+
+    public static byte[] toBytes(String str) {
+        return str.getBytes(StandardCharsets.UTF_8);
+    }
+
+    private byte[] createHmac(String tokenId) throws Exception {
+        Mac mac = Mac.getInstance("HmacSHA512");
+        SecretKeySpec secretKey = new SecretKeySpec(toBytes(secretKeyString), 
mac.getAlgorithm());
+
+        mac.init(secretKey);
+        return mac.doFinal(toBytes(tokenId));
+    }
+
+    private TokenInformation getToken(byte[] hmac) {
+        String base64Pwd = Base64.getEncoder().encodeToString(hmac);
+        return tokenCache.tokenForHmac(base64Pwd);
+    }
+
+    private boolean allowedToRenew(TokenInformation tokenInfo, KafkaPrincipal 
renewer) {
+        if (tokenInfo.owner().equals(renewer)) {
+            return true;
+        }
+        for (KafkaPrincipal validRenewer : tokenInfo.renewers()) {
+            if (validRenewer.equals(renewer)) {
+                return true;
+            }
+        }
+        return false;

Review Comment:
   Simplify?  `return tokenInfo.owner().equals(renewer) || 
tokenInfo.renewers().contains(renewer);`
   



##########
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+    private Time time = Time.SYSTEM;
+
+    static class Builder {
+        private LogContext logContext = null;
+        private DelegationTokenCache tokenCache = null;
+        private String secretKeyString = null;
+        private long tokenDefaultMaxLifetime = 0;
+        private long tokenDefaultRenewLifetime = 0;
+
+        Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder setTokenCache(DelegationTokenCache tokenCache) {
+            this.tokenCache = tokenCache;
+            return this;
+        }
+
+        Builder setTokenKeyString(String secretKeyString) {
+            this.secretKeyString = secretKeyString;
+            return this;
+        }
+
+        Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+            this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+            return this;
+        }
+
+        Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+            this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+            return this;
+        }
+
+        DelegationTokenControlManager build() {
+            if (logContext == null) logContext = new LogContext();
+            return new DelegationTokenControlManager(
+              logContext,
+              tokenCache,
+              secretKeyString,
+              tokenDefaultMaxLifetime,
+              tokenDefaultRenewLifetime);
+        }
+    }
+
+    private final Logger log;
+    private final DelegationTokenCache tokenCache;
+    private final String secretKeyString;
+    private final long tokenDefaultMaxLifetime;
+    private final long tokenDefaultRenewLifetime;
+
+    private DelegationTokenControlManager(
+        LogContext logContext,
+        DelegationTokenCache tokenCache,
+        String secretKeyString,
+        long tokenDefaultMaxLifetime,
+        long tokenDefaultRenewLifetime

Review Comment:
   Might be good to append `Ms` to clearly indicate the units.  Same elsewhere 
if necessary.



##########
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+    private Time time = Time.SYSTEM;
+
+    static class Builder {
+        private LogContext logContext = null;
+        private DelegationTokenCache tokenCache = null;
+        private String secretKeyString = null;
+        private long tokenDefaultMaxLifetime = 0;
+        private long tokenDefaultRenewLifetime = 0;
+
+        Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder setTokenCache(DelegationTokenCache tokenCache) {
+            this.tokenCache = tokenCache;
+            return this;
+        }
+
+        Builder setTokenKeyString(String secretKeyString) {
+            this.secretKeyString = secretKeyString;
+            return this;
+        }
+
+        Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+            this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+            return this;
+        }
+
+        Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+            this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+            return this;
+        }
+
+        DelegationTokenControlManager build() {
+            if (logContext == null) logContext = new LogContext();
+            return new DelegationTokenControlManager(
+              logContext,
+              tokenCache,
+              secretKeyString,
+              tokenDefaultMaxLifetime,
+              tokenDefaultRenewLifetime);
+        }
+    }
+
+    private final Logger log;
+    private final DelegationTokenCache tokenCache;
+    private final String secretKeyString;
+    private final long tokenDefaultMaxLifetime;
+    private final long tokenDefaultRenewLifetime;
+
+    private DelegationTokenControlManager(
+        LogContext logContext,
+        DelegationTokenCache tokenCache,
+        String secretKeyString,
+        long tokenDefaultMaxLifetime,
+        long tokenDefaultRenewLifetime
+    ) {
+        this.log = logContext.logger(DelegationTokenControlManager.class);
+        this.tokenCache = tokenCache;
+        this.secretKeyString = secretKeyString;
+        this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+        this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+    }
+
+    public static byte[] toBytes(String str) {
+        return str.getBytes(StandardCharsets.UTF_8);
+    }
+
+    private byte[] createHmac(String tokenId) throws Exception {
+        Mac mac = Mac.getInstance("HmacSHA512");
+        SecretKeySpec secretKey = new SecretKeySpec(toBytes(secretKeyString), 
mac.getAlgorithm());
+
+        mac.init(secretKey);
+        return mac.doFinal(toBytes(tokenId));
+    }
+
+    private TokenInformation getToken(byte[] hmac) {
+        String base64Pwd = Base64.getEncoder().encodeToString(hmac);
+        return tokenCache.tokenForHmac(base64Pwd);
+    }
+
+    private boolean allowedToRenew(TokenInformation tokenInfo, KafkaPrincipal 
renewer) {
+        if (tokenInfo.owner().equals(renewer)) {
+            return true;
+        }
+        for (KafkaPrincipal validRenewer : tokenInfo.renewers()) {
+            if (validRenewer.equals(renewer)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public boolean isEnabled() {
+        if (secretKeyString != null) {
+            return true;
+        }
+        return false;
+    }
+
+    /*
+     * Pass in the MetadataVersion so that we can return a response to the 
caller 
+     * if the current metadataVersion is too low.
+     */
+    public ControllerResult<CreateDelegationTokenResponseData> 
createDelegationToken(
+        ControllerRequestContext context,
+        CreateDelegationTokenRequestData requestData,
+        MetadataVersion metadataVersion
+    ) {
+        long now = time.milliseconds();
+        long maxLifeTime = tokenDefaultMaxLifetime;
+        if (requestData.maxLifetimeMs() > 0) {
+            maxLifeTime = Math.min(maxLifeTime, requestData.maxLifetimeMs());
+        }
+
+        long maxTimestamp = now + maxLifeTime;
+        long expiryTimestamp = Math.min(maxTimestamp, now + 
tokenDefaultRenewLifetime);
+
+        String tokenId = Uuid.randomUuid().toString();
+
+        KafkaPrincipal owner = context.principal();
+        if ((requestData.ownerPrincipalName() != null) && 
+            (!requestData.ownerPrincipalName().isEmpty())) {
+
+            owner = new KafkaPrincipal(requestData.ownerPrincipalType(), 
requestData.ownerPrincipalName());
+        }
+        CreateDelegationTokenResponseData responseData = new 
CreateDelegationTokenResponseData()
+                .setPrincipalName(owner.getName())
+                .setPrincipalType(owner.getPrincipalType())
+                .setTokenRequesterPrincipalName(context.principal().getName())
+                
.setTokenRequesterPrincipalType(context.principal().getPrincipalType());
+
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+
+        if (secretKeyString == null) {

Review Comment:
   Should reuse `!isEnabled()`, and same elsewhere



##########
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+    private Time time = Time.SYSTEM;
+
+    static class Builder {
+        private LogContext logContext = null;
+        private DelegationTokenCache tokenCache = null;
+        private String secretKeyString = null;
+        private long tokenDefaultMaxLifetime = 0;
+        private long tokenDefaultRenewLifetime = 0;
+
+        Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder setTokenCache(DelegationTokenCache tokenCache) {
+            this.tokenCache = tokenCache;
+            return this;
+        }
+
+        Builder setTokenKeyString(String secretKeyString) {
+            this.secretKeyString = secretKeyString;
+            return this;
+        }
+
+        Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+            this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+            return this;
+        }
+
+        Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+            this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+            return this;
+        }
+
+        DelegationTokenControlManager build() {
+            if (logContext == null) logContext = new LogContext();
+            return new DelegationTokenControlManager(
+              logContext,
+              tokenCache,
+              secretKeyString,
+              tokenDefaultMaxLifetime,
+              tokenDefaultRenewLifetime);
+        }
+    }
+
+    private final Logger log;
+    private final DelegationTokenCache tokenCache;
+    private final String secretKeyString;
+    private final long tokenDefaultMaxLifetime;
+    private final long tokenDefaultRenewLifetime;
+
+    private DelegationTokenControlManager(
+        LogContext logContext,
+        DelegationTokenCache tokenCache,
+        String secretKeyString,
+        long tokenDefaultMaxLifetime,
+        long tokenDefaultRenewLifetime
+    ) {
+        this.log = logContext.logger(DelegationTokenControlManager.class);
+        this.tokenCache = tokenCache;
+        this.secretKeyString = secretKeyString;
+        this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+        this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+    }
+
+    public static byte[] toBytes(String str) {
+        return str.getBytes(StandardCharsets.UTF_8);
+    }
+
+    private byte[] createHmac(String tokenId) throws Exception {
+        Mac mac = Mac.getInstance("HmacSHA512");
+        SecretKeySpec secretKey = new SecretKeySpec(toBytes(secretKeyString), 
mac.getAlgorithm());
+
+        mac.init(secretKey);
+        return mac.doFinal(toBytes(tokenId));
+    }
+
+    private TokenInformation getToken(byte[] hmac) {
+        String base64Pwd = Base64.getEncoder().encodeToString(hmac);
+        return tokenCache.tokenForHmac(base64Pwd);
+    }
+
+    private boolean allowedToRenew(TokenInformation tokenInfo, KafkaPrincipal 
renewer) {
+        if (tokenInfo.owner().equals(renewer)) {
+            return true;
+        }
+        for (KafkaPrincipal validRenewer : tokenInfo.renewers()) {
+            if (validRenewer.equals(renewer)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public boolean isEnabled() {
+        if (secretKeyString != null) {
+            return true;
+        }
+        return false;
+    }
+
+    /*
+     * Pass in the MetadataVersion so that we can return a response to the 
caller 
+     * if the current metadataVersion is too low.
+     */
+    public ControllerResult<CreateDelegationTokenResponseData> 
createDelegationToken(
+        ControllerRequestContext context,
+        CreateDelegationTokenRequestData requestData,
+        MetadataVersion metadataVersion
+    ) {
+        long now = time.milliseconds();
+        long maxLifeTime = tokenDefaultMaxLifetime;
+        if (requestData.maxLifetimeMs() > 0) {
+            maxLifeTime = Math.min(maxLifeTime, requestData.maxLifetimeMs());
+        }
+
+        long maxTimestamp = now + maxLifeTime;
+        long expiryTimestamp = Math.min(maxTimestamp, now + 
tokenDefaultRenewLifetime);
+
+        String tokenId = Uuid.randomUuid().toString();
+
+        KafkaPrincipal owner = context.principal();
+        if ((requestData.ownerPrincipalName() != null) && 
+            (!requestData.ownerPrincipalName().isEmpty())) {
+
+            owner = new KafkaPrincipal(requestData.ownerPrincipalType(), 
requestData.ownerPrincipalName());
+        }
+        CreateDelegationTokenResponseData responseData = new 
CreateDelegationTokenResponseData()
+                .setPrincipalName(owner.getName())
+                .setPrincipalType(owner.getPrincipalType())
+                .setTokenRequesterPrincipalName(context.principal().getName())
+                
.setTokenRequesterPrincipalType(context.principal().getPrincipalType());
+
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+
+        if (secretKeyString == null) {
+            // DelegationTokens are not enabled
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_AUTH_DISABLED.code()));
+        }
+
+        if (!metadataVersion.isDelegationTokenSupported()) {
+            // DelegationTokens are not supported in this metadata version
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(UNSUPPORTED_VERSION.code()));
+        }
+
+        List<KafkaPrincipal> renewers = new ArrayList<KafkaPrincipal>();
+        for (CreatableRenewers renewer : requestData.renewers()) {
+            if (renewer.principalType().equals(KafkaPrincipal.USER_TYPE)) {
+                renewers.add(new KafkaPrincipal(renewer.principalType(), 
renewer.principalName()));
+            } else {
+                return ControllerResult.atomicOf(records, 
responseData.setErrorCode(INVALID_PRINCIPAL_TYPE.code()));
+            }
+        }
+
+        byte[] hmac;
+        try {
+            hmac = createHmac(tokenId);
+        } catch (Throwable e) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(ApiError.fromThrowable(e).error().code()));
+        }
+
+        TokenInformation newTokenInformation = new TokenInformation(tokenId, 
owner,
+            context.principal(), renewers, now, maxTimestamp, expiryTimestamp);
+
+        DelegationTokenData newDelegationTokenData = new 
DelegationTokenData(newTokenInformation);
+
+        responseData
+                .setErrorCode(NONE.code())
+                .setIssueTimestampMs(now)
+                .setExpiryTimestampMs(expiryTimestamp)
+                .setMaxTimestampMs(maxTimestamp)
+                .setTokenId(tokenId)
+                .setHmac(hmac);
+
+        records.add(new 
ApiMessageAndVersion(newDelegationTokenData.toRecord(), (short) 0));
+        return ControllerResult.atomicOf(records, responseData);
+    }
+
+    public ControllerResult<RenewDelegationTokenResponseData> 
renewDelegationToken(
+        ControllerRequestContext context,
+        RenewDelegationTokenRequestData requestData,
+        MetadataVersion metadataVersion
+    ) {
+        long now = time.milliseconds();
+
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        RenewDelegationTokenResponseData responseData = new 
RenewDelegationTokenResponseData();
+
+        TokenInformation myTokenInformation = getToken(requestData.hmac());
+
+        if (myTokenInformation == null) {
+            return ControllerResult.atomicOf(records, 
responseData.setErrorCode(DELEGATION_TOKEN_NOT_FOUND.code()));
+        }

Review Comment:
   Is this what happens if delegation tokens are disabled?  Should we check 
explicitly instead?



##########
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+    private Time time = Time.SYSTEM;
+
+    static class Builder {
+        private LogContext logContext = null;
+        private DelegationTokenCache tokenCache = null;
+        private String secretKeyString = null;
+        private long tokenDefaultMaxLifetime = 0;
+        private long tokenDefaultRenewLifetime = 0;
+
+        Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder setTokenCache(DelegationTokenCache tokenCache) {
+            this.tokenCache = tokenCache;
+            return this;
+        }
+
+        Builder setTokenKeyString(String secretKeyString) {
+            this.secretKeyString = secretKeyString;
+            return this;
+        }
+
+        Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+            this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+            return this;
+        }
+
+        Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+            this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+            return this;
+        }
+
+        DelegationTokenControlManager build() {
+            if (logContext == null) logContext = new LogContext();
+            return new DelegationTokenControlManager(
+              logContext,
+              tokenCache,
+              secretKeyString,
+              tokenDefaultMaxLifetime,
+              tokenDefaultRenewLifetime);
+        }
+    }
+
+    private final Logger log;
+    private final DelegationTokenCache tokenCache;
+    private final String secretKeyString;
+    private final long tokenDefaultMaxLifetime;
+    private final long tokenDefaultRenewLifetime;
+
+    private DelegationTokenControlManager(
+        LogContext logContext,
+        DelegationTokenCache tokenCache,
+        String secretKeyString,
+        long tokenDefaultMaxLifetime,
+        long tokenDefaultRenewLifetime
+    ) {
+        this.log = logContext.logger(DelegationTokenControlManager.class);
+        this.tokenCache = tokenCache;
+        this.secretKeyString = secretKeyString;
+        this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+        this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+    }
+
+    public static byte[] toBytes(String str) {
+        return str.getBytes(StandardCharsets.UTF_8);
+    }
+
+    private byte[] createHmac(String tokenId) throws Exception {
+        Mac mac = Mac.getInstance("HmacSHA512");
+        SecretKeySpec secretKey = new SecretKeySpec(toBytes(secretKeyString), 
mac.getAlgorithm());
+
+        mac.init(secretKey);
+        return mac.doFinal(toBytes(tokenId));
+    }
+
+    private TokenInformation getToken(byte[] hmac) {
+        String base64Pwd = Base64.getEncoder().encodeToString(hmac);
+        return tokenCache.tokenForHmac(base64Pwd);
+    }
+
+    private boolean allowedToRenew(TokenInformation tokenInfo, KafkaPrincipal 
renewer) {
+        if (tokenInfo.owner().equals(renewer)) {
+            return true;
+        }
+        for (KafkaPrincipal validRenewer : tokenInfo.renewers()) {
+            if (validRenewer.equals(renewer)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public boolean isEnabled() {
+        if (secretKeyString != null) {
+            return true;
+        }
+        return false;
+    }
+
+    /*
+     * Pass in the MetadataVersion so that we can return a response to the 
caller 
+     * if the current metadataVersion is too low.
+     */
+    public ControllerResult<CreateDelegationTokenResponseData> 
createDelegationToken(
+        ControllerRequestContext context,
+        CreateDelegationTokenRequestData requestData,
+        MetadataVersion metadataVersion
+    ) {
+        long now = time.milliseconds();
+        long maxLifeTime = tokenDefaultMaxLifetime;
+        if (requestData.maxLifetimeMs() > 0) {
+            maxLifeTime = Math.min(maxLifeTime, requestData.maxLifetimeMs());
+        }
+
+        long maxTimestamp = now + maxLifeTime;
+        long expiryTimestamp = Math.min(maxTimestamp, now + 
tokenDefaultRenewLifetime);
+
+        String tokenId = Uuid.randomUuid().toString();
+
+        KafkaPrincipal owner = context.principal();
+        if ((requestData.ownerPrincipalName() != null) && 
+            (!requestData.ownerPrincipalName().isEmpty())) {

Review Comment:
   Any particular reason for all the extra parens?



##########
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+    private Time time = Time.SYSTEM;
+
+    static class Builder {
+        private LogContext logContext = null;
+        private DelegationTokenCache tokenCache = null;
+        private String secretKeyString = null;
+        private long tokenDefaultMaxLifetime = 0;
+        private long tokenDefaultRenewLifetime = 0;
+
+        Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder setTokenCache(DelegationTokenCache tokenCache) {
+            this.tokenCache = tokenCache;
+            return this;
+        }
+
+        Builder setTokenKeyString(String secretKeyString) {
+            this.secretKeyString = secretKeyString;
+            return this;
+        }
+
+        Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+            this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+            return this;
+        }
+
+        Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+            this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+            return this;
+        }
+
+        DelegationTokenControlManager build() {
+            if (logContext == null) logContext = new LogContext();
+            return new DelegationTokenControlManager(
+              logContext,
+              tokenCache,
+              secretKeyString,
+              tokenDefaultMaxLifetime,
+              tokenDefaultRenewLifetime);
+        }
+    }
+
+    private final Logger log;
+    private final DelegationTokenCache tokenCache;
+    private final String secretKeyString;
+    private final long tokenDefaultMaxLifetime;
+    private final long tokenDefaultRenewLifetime;
+
+    private DelegationTokenControlManager(
+        LogContext logContext,
+        DelegationTokenCache tokenCache,
+        String secretKeyString,
+        long tokenDefaultMaxLifetime,
+        long tokenDefaultRenewLifetime
+    ) {
+        this.log = logContext.logger(DelegationTokenControlManager.class);
+        this.tokenCache = tokenCache;
+        this.secretKeyString = secretKeyString;
+        this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+        this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+    }
+
+    public static byte[] toBytes(String str) {
+        return str.getBytes(StandardCharsets.UTF_8);
+    }
+
+    private byte[] createHmac(String tokenId) throws Exception {
+        Mac mac = Mac.getInstance("HmacSHA512");
+        SecretKeySpec secretKey = new SecretKeySpec(toBytes(secretKeyString), 
mac.getAlgorithm());
+
+        mac.init(secretKey);
+        return mac.doFinal(toBytes(tokenId));
+    }
+
+    private TokenInformation getToken(byte[] hmac) {
+        String base64Pwd = Base64.getEncoder().encodeToString(hmac);
+        return tokenCache.tokenForHmac(base64Pwd);
+    }
+
+    private boolean allowedToRenew(TokenInformation tokenInfo, KafkaPrincipal 
renewer) {
+        if (tokenInfo.owner().equals(renewer)) {
+            return true;
+        }
+        for (KafkaPrincipal validRenewer : tokenInfo.renewers()) {
+            if (validRenewer.equals(renewer)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public boolean isEnabled() {
+        if (secretKeyString != null) {
+            return true;
+        }
+        return false;

Review Comment:
   Simplify? `return secretKeyString != null`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to