rondagostino commented on code in PR #14083: URL: https://github.com/apache/kafka/pull/14083#discussion_r1296233969
########## metadata/src/main/java/org/apache/kafka/metadata/DelegationTokenData.java: ########## Review Comment: Is a Unit test necessary, do you think? I also realize we don't have one for `TokenInformation`. ########## metadata/src/test/java/org/apache/kafka/image/DelegationTokenImageTest.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import org.apache.kafka.common.metadata.DelegationTokenRecord; +import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.token.delegation.TokenInformation; +import org.apache.kafka.common.utils.SecurityUtils; +import org.apache.kafka.image.writer.ImageWriterOptions; +import org.apache.kafka.image.writer.RecordListWriter; +import org.apache.kafka.metadata.RecordTestUtils; +import org.apache.kafka.metadata.DelegationTokenData; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + + +@Timeout(value = 40) +public class DelegationTokenImageTest { + public final static DelegationTokenImage IMAGE1; + + public final static List<ApiMessageAndVersion> DELTA1_RECORDS; + + final static DelegationTokenDelta DELTA1; + + final static DelegationTokenImage IMAGE2; + + static DelegationTokenData randomDelegationTokenData(String tokenId, long expireTimestamp) { + TokenInformation ti = new TokenInformation( + tokenId, + SecurityUtils.parseKafkaPrincipal(KafkaPrincipal.USER_TYPE + ":" + "fred"), + SecurityUtils.parseKafkaPrincipal(KafkaPrincipal.USER_TYPE + ":" + "fred"), + new ArrayList<KafkaPrincipal>(), + 0, + 1000, + expireTimestamp); + return new DelegationTokenData(ti); + } + + static { + Map<String, DelegationTokenData> image1 = new HashMap<>(); + image1.put("somerandomuuid1", randomDelegationTokenData("somerandomuuid1", 100)); + image1.put("somerandomuuid2", randomDelegationTokenData("somerandomuuid2", 100)); + image1.put("somerandomuuid3", randomDelegationTokenData("somerandomuuid3", 100)); + IMAGE1 = new DelegationTokenImage(image1); + + DELTA1_RECORDS = new ArrayList<>(); + DELTA1_RECORDS.add(new ApiMessageAndVersion(new DelegationTokenRecord(). + setOwner(KafkaPrincipal.USER_TYPE + ":" + "fred"). + setRequester(KafkaPrincipal.USER_TYPE + ":" + "fred"). + setIssueTimestamp(0). + setMaxTimestamp(1000). + setExpirationTimestamp(200). + setTokenId("somerandomuuid1"), (short) 0)); + DELTA1_RECORDS.add(new ApiMessageAndVersion(new RemoveDelegationTokenRecord(). + setTokenId("somerandomuuid3"), (short) 0)); + + DELTA1 = new DelegationTokenDelta(IMAGE1); + RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS); + + Map<String, DelegationTokenData> image2 = new HashMap<>(); + image2.put("somerandomuuid1", randomDelegationTokenData("somerandomuuid1", 200)); + image2.put("somerandomuuid2", randomDelegationTokenData("somerandomuuid2", 100)); + IMAGE2 = new DelegationTokenImage(image2); + } + + @Test + public void testEmptyImageRoundTrip() throws Throwable { + testToImageAndBack(DelegationTokenImage.EMPTY); + } + + @Test + public void testImage1RoundTrip() throws Throwable { + testToImageAndBack(IMAGE1); + } + + @Test + public void testApplyDelta1() throws Throwable { + assertEquals(IMAGE2, DELTA1.apply()); + } + + @Test + public void testImage2RoundTrip() throws Throwable { + testToImageAndBack(IMAGE2); + } + + private void testToImageAndBack(DelegationTokenImage image) throws Throwable { + RecordListWriter writer = new RecordListWriter(); + image.write(writer, new ImageWriterOptions.Builder().build()); + DelegationTokenDelta delta = new DelegationTokenDelta(DelegationTokenImage.EMPTY); + RecordTestUtils.replayAll(delta, writer.records()); + DelegationTokenImage nextImage = delta.apply(); + assertEquals(image, nextImage); + } Review Comment: The preferred boilerplate for these methods has changed since you wrote this. Can you look at `metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java` and follow the same format? ########## metadata/src/test/java/org/apache/kafka/image/DelegationTokenImageTest.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import org.apache.kafka.common.metadata.DelegationTokenRecord; +import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.token.delegation.TokenInformation; +import org.apache.kafka.common.utils.SecurityUtils; +import org.apache.kafka.image.writer.ImageWriterOptions; +import org.apache.kafka.image.writer.RecordListWriter; +import org.apache.kafka.metadata.RecordTestUtils; +import org.apache.kafka.metadata.DelegationTokenData; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + + +@Timeout(value = 40) +public class DelegationTokenImageTest { + public final static DelegationTokenImage IMAGE1; + + public final static List<ApiMessageAndVersion> DELTA1_RECORDS; + + final static DelegationTokenDelta DELTA1; + + final static DelegationTokenImage IMAGE2; + + static DelegationTokenData randomDelegationTokenData(String tokenId, long expireTimestamp) { + TokenInformation ti = new TokenInformation( + tokenId, + SecurityUtils.parseKafkaPrincipal(KafkaPrincipal.USER_TYPE + ":" + "fred"), + SecurityUtils.parseKafkaPrincipal(KafkaPrincipal.USER_TYPE + ":" + "fred"), + new ArrayList<KafkaPrincipal>(), + 0, + 1000, + expireTimestamp); + return new DelegationTokenData(ti); + } + + static { + Map<String, DelegationTokenData> image1 = new HashMap<>(); + image1.put("somerandomuuid1", randomDelegationTokenData("somerandomuuid1", 100)); + image1.put("somerandomuuid2", randomDelegationTokenData("somerandomuuid2", 100)); + image1.put("somerandomuuid3", randomDelegationTokenData("somerandomuuid3", 100)); + IMAGE1 = new DelegationTokenImage(image1); + + DELTA1_RECORDS = new ArrayList<>(); + DELTA1_RECORDS.add(new ApiMessageAndVersion(new DelegationTokenRecord(). + setOwner(KafkaPrincipal.USER_TYPE + ":" + "fred"). + setRequester(KafkaPrincipal.USER_TYPE + ":" + "fred"). + setIssueTimestamp(0). + setMaxTimestamp(1000). + setExpirationTimestamp(200). + setTokenId("somerandomuuid1"), (short) 0)); + DELTA1_RECORDS.add(new ApiMessageAndVersion(new RemoveDelegationTokenRecord(). + setTokenId("somerandomuuid3"), (short) 0)); + + DELTA1 = new DelegationTokenDelta(IMAGE1); + RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS); + + Map<String, DelegationTokenData> image2 = new HashMap<>(); + image2.put("somerandomuuid1", randomDelegationTokenData("somerandomuuid1", 200)); + image2.put("somerandomuuid2", randomDelegationTokenData("somerandomuuid2", 100)); + IMAGE2 = new DelegationTokenImage(image2); + } + + @Test + public void testEmptyImageRoundTrip() throws Throwable { + testToImageAndBack(DelegationTokenImage.EMPTY); + } + + @Test + public void testImage1RoundTrip() throws Throwable { + testToImageAndBack(IMAGE1); + } + + @Test + public void testApplyDelta1() throws Throwable { + assertEquals(IMAGE2, DELTA1.apply()); + } + + @Test + public void testImage2RoundTrip() throws Throwable { + testToImageAndBack(IMAGE2); + } + + private void testToImageAndBack(DelegationTokenImage image) throws Throwable { + RecordListWriter writer = new RecordListWriter(); + image.write(writer, new ImageWriterOptions.Builder().build()); + DelegationTokenDelta delta = new DelegationTokenDelta(DelegationTokenImage.EMPTY); + RecordTestUtils.replayAll(delta, writer.records()); + DelegationTokenImage nextImage = delta.apply(); + assertEquals(image, nextImage); + } + + @Test + public void testEmptyWithInvalidIBP() throws Throwable { Review Comment: `throws Throwable` isn't necessary, same with next test method ########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -1396,6 +1441,30 @@ private void cancelNextWriteNoOpRecord() { queue.cancelDeferred(WRITE_NO_OP_RECORD); } + private static final String SWEEP_EXPIRED_DELEGATION_TOKENS = "sweepExpiredDelegationTokens"; + + private void maybeScheduleNextExpiredDelegationTokenSweep() { + if (featureControl.metadataVersion().isDelegationTokenSupported() && + delegationTokenControlManager.isEnabled()) { + Review Comment: Would be good to add a debug log message here ########## metadata/src/main/java/org/apache/kafka/image/DelegationTokenDelta.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import org.apache.kafka.common.metadata.DelegationTokenRecord; +import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord; +import org.apache.kafka.metadata.DelegationTokenData; +import org.apache.kafka.server.common.MetadataVersion; + +import java.util.HashMap; +import java.util.Map.Entry; +import java.util.Map; +import java.util.Optional; + + +/** + * Represents changes to delegation tokens in the metadata image. + */ +public final class DelegationTokenDelta { + private final DelegationTokenImage image; + + // Key is TokenID which is contained in the value TokenInformation + private final Map<String, Optional<DelegationTokenData>> changes = new HashMap<>(); + + public DelegationTokenDelta(DelegationTokenImage image) { + this.image = image; + } + + public void finishSnapshot() { + for (String tokenId : image.tokens().keySet()) { + if (!changes.containsKey(tokenId)) { + // If the tokenId from the image did not appear in the snapshot, mark it as removed + changes.put(tokenId, Optional.empty()); + + } + } + } + + public DelegationTokenImage image() { + return image; + } + + public Map<String, Optional<DelegationTokenData>> changes() { + return changes; + } + + public void replay(DelegationTokenRecord record) { + changes.put(record.tokenId(), Optional.of(DelegationTokenData.fromRecord(record))); + } + + public void replay(RemoveDelegationTokenRecord record) { + changes.put(record.tokenId(), Optional.empty()); + } + + public void handleMetadataVersionChange(MetadataVersion changedMetadataVersion) { + // nothing to do + } + + public DelegationTokenImage apply() { + Map<String, DelegationTokenData> newTokens = new HashMap<>(); + for (Entry<String, DelegationTokenData> entry : image.tokens().entrySet()) { Review Comment: might be nice to add a couple of comments. This top section grabs tokens that already existed in the base image and that did not get deleted via the delta, and the bottom section adds new tokens that were added in the delta. ########## metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.message.CreateDelegationTokenRequestData; +import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers; +import org.apache.kafka.common.message.CreateDelegationTokenResponseData; +import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; +import org.apache.kafka.common.message.ExpireDelegationTokenResponseData; +import org.apache.kafka.common.message.RenewDelegationTokenRequestData; +import org.apache.kafka.common.message.RenewDelegationTokenResponseData; +import org.apache.kafka.common.metadata.DelegationTokenRecord; +import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.token.delegation.TokenInformation; +import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.metadata.DelegationTokenData; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.common.utils.Time; + +import java.nio.charset.StandardCharsets; +import javax.crypto.spec.SecretKeySpec; +import javax.crypto.Mac; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH; +import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE; +import static org.apache.kafka.common.protocol.Errors.NONE; +import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION; + +/** + * Manages DelegationTokens. + */ +public class DelegationTokenControlManager { + private Time time = Time.SYSTEM; + + static class Builder { + private LogContext logContext = null; + private DelegationTokenCache tokenCache = null; + private String secretKeyString = null; + private long tokenDefaultMaxLifetime = 0; + private long tokenDefaultRenewLifetime = 0; + + Builder setLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder setTokenCache(DelegationTokenCache tokenCache) { + this.tokenCache = tokenCache; + return this; + } + + Builder setTokenKeyString(String secretKeyString) { + this.secretKeyString = secretKeyString; + return this; + } + + Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) { Review Comment: Should this be called `setTokenDefaultMaxLifetime()`? ########## metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.message.CreateDelegationTokenRequestData; +import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers; +import org.apache.kafka.common.message.CreateDelegationTokenResponseData; +import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; +import org.apache.kafka.common.message.ExpireDelegationTokenResponseData; +import org.apache.kafka.common.message.RenewDelegationTokenRequestData; +import org.apache.kafka.common.message.RenewDelegationTokenResponseData; +import org.apache.kafka.common.metadata.DelegationTokenRecord; +import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.token.delegation.TokenInformation; +import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.metadata.DelegationTokenData; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.common.utils.Time; + +import java.nio.charset.StandardCharsets; +import javax.crypto.spec.SecretKeySpec; +import javax.crypto.Mac; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH; +import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE; +import static org.apache.kafka.common.protocol.Errors.NONE; +import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION; + +/** + * Manages DelegationTokens. + */ +public class DelegationTokenControlManager { + private Time time = Time.SYSTEM; + + static class Builder { + private LogContext logContext = null; + private DelegationTokenCache tokenCache = null; + private String secretKeyString = null; + private long tokenDefaultMaxLifetime = 0; + private long tokenDefaultRenewLifetime = 0; + + Builder setLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder setTokenCache(DelegationTokenCache tokenCache) { + this.tokenCache = tokenCache; + return this; + } + + Builder setTokenKeyString(String secretKeyString) { + this.secretKeyString = secretKeyString; + return this; + } + + Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) { + this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime; + return this; + } + + Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) { + this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime; + return this; + } + + DelegationTokenControlManager build() { + if (logContext == null) logContext = new LogContext(); + return new DelegationTokenControlManager( + logContext, + tokenCache, + secretKeyString, + tokenDefaultMaxLifetime, + tokenDefaultRenewLifetime); + } + } + + private final Logger log; + private final DelegationTokenCache tokenCache; + private final String secretKeyString; + private final long tokenDefaultMaxLifetime; + private final long tokenDefaultRenewLifetime; + + private DelegationTokenControlManager( + LogContext logContext, + DelegationTokenCache tokenCache, + String secretKeyString, + long tokenDefaultMaxLifetime, + long tokenDefaultRenewLifetime + ) { + this.log = logContext.logger(DelegationTokenControlManager.class); + this.tokenCache = tokenCache; + this.secretKeyString = secretKeyString; + this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime; + this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime; + } + + public static byte[] toBytes(String str) { + return str.getBytes(StandardCharsets.UTF_8); + } + + private byte[] createHmac(String tokenId) throws Exception { Review Comment: `private static` instead of just `private` ########## metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.message.CreateDelegationTokenRequestData; +import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers; +import org.apache.kafka.common.message.CreateDelegationTokenResponseData; +import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; +import org.apache.kafka.common.message.ExpireDelegationTokenResponseData; +import org.apache.kafka.common.message.RenewDelegationTokenRequestData; +import org.apache.kafka.common.message.RenewDelegationTokenResponseData; +import org.apache.kafka.common.metadata.DelegationTokenRecord; +import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.token.delegation.TokenInformation; +import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.metadata.DelegationTokenData; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.common.utils.Time; + +import java.nio.charset.StandardCharsets; +import javax.crypto.spec.SecretKeySpec; +import javax.crypto.Mac; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH; +import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE; +import static org.apache.kafka.common.protocol.Errors.NONE; +import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION; + +/** + * Manages DelegationTokens. + */ +public class DelegationTokenControlManager { + private Time time = Time.SYSTEM; + + static class Builder { + private LogContext logContext = null; + private DelegationTokenCache tokenCache = null; + private String secretKeyString = null; + private long tokenDefaultMaxLifetime = 0; + private long tokenDefaultRenewLifetime = 0; + + Builder setLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder setTokenCache(DelegationTokenCache tokenCache) { + this.tokenCache = tokenCache; + return this; + } + + Builder setTokenKeyString(String secretKeyString) { + this.secretKeyString = secretKeyString; + return this; + } + + Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) { + this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime; + return this; + } + + Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) { + this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime; + return this; + } + + DelegationTokenControlManager build() { + if (logContext == null) logContext = new LogContext(); + return new DelegationTokenControlManager( + logContext, + tokenCache, + secretKeyString, + tokenDefaultMaxLifetime, + tokenDefaultRenewLifetime); + } + } + + private final Logger log; + private final DelegationTokenCache tokenCache; + private final String secretKeyString; + private final long tokenDefaultMaxLifetime; + private final long tokenDefaultRenewLifetime; + + private DelegationTokenControlManager( + LogContext logContext, + DelegationTokenCache tokenCache, + String secretKeyString, + long tokenDefaultMaxLifetime, + long tokenDefaultRenewLifetime + ) { + this.log = logContext.logger(DelegationTokenControlManager.class); + this.tokenCache = tokenCache; + this.secretKeyString = secretKeyString; + this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime; + this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime; + } + + public static byte[] toBytes(String str) { + return str.getBytes(StandardCharsets.UTF_8); + } + + private byte[] createHmac(String tokenId) throws Exception { + Mac mac = Mac.getInstance("HmacSHA512"); + SecretKeySpec secretKey = new SecretKeySpec(toBytes(secretKeyString), mac.getAlgorithm()); + + mac.init(secretKey); + return mac.doFinal(toBytes(tokenId)); + } + + private TokenInformation getToken(byte[] hmac) { + String base64Pwd = Base64.getEncoder().encodeToString(hmac); + return tokenCache.tokenForHmac(base64Pwd); + } + + private boolean allowedToRenew(TokenInformation tokenInfo, KafkaPrincipal renewer) { + if (tokenInfo.owner().equals(renewer)) { + return true; + } + for (KafkaPrincipal validRenewer : tokenInfo.renewers()) { + if (validRenewer.equals(renewer)) { + return true; + } + } + return false; + } + + public boolean isEnabled() { + if (secretKeyString != null) { + return true; + } + return false; + } + + /* + * Pass in the MetadataVersion so that we can return a response to the caller + * if the current metadataVersion is too low. + */ + public ControllerResult<CreateDelegationTokenResponseData> createDelegationToken( + ControllerRequestContext context, + CreateDelegationTokenRequestData requestData, + MetadataVersion metadataVersion + ) { + long now = time.milliseconds(); + long maxLifeTime = tokenDefaultMaxLifetime; + if (requestData.maxLifetimeMs() > 0) { + maxLifeTime = Math.min(maxLifeTime, requestData.maxLifetimeMs()); + } + + long maxTimestamp = now + maxLifeTime; + long expiryTimestamp = Math.min(maxTimestamp, now + tokenDefaultRenewLifetime); + + String tokenId = Uuid.randomUuid().toString(); + + KafkaPrincipal owner = context.principal(); + if ((requestData.ownerPrincipalName() != null) && + (!requestData.ownerPrincipalName().isEmpty())) { + + owner = new KafkaPrincipal(requestData.ownerPrincipalType(), requestData.ownerPrincipalName()); + } + CreateDelegationTokenResponseData responseData = new CreateDelegationTokenResponseData() + .setPrincipalName(owner.getName()) + .setPrincipalType(owner.getPrincipalType()) + .setTokenRequesterPrincipalName(context.principal().getName()) + .setTokenRequesterPrincipalType(context.principal().getPrincipalType()); + + List<ApiMessageAndVersion> records = new ArrayList<>(); + + if (secretKeyString == null) { + // DelegationTokens are not enabled + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_AUTH_DISABLED.code())); + } + + if (!metadataVersion.isDelegationTokenSupported()) { + // DelegationTokens are not supported in this metadata version + return ControllerResult.atomicOf(records, responseData.setErrorCode(UNSUPPORTED_VERSION.code())); + } Review Comment: Should go at top of method? ########## metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.message.CreateDelegationTokenRequestData; +import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers; +import org.apache.kafka.common.message.CreateDelegationTokenResponseData; +import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; +import org.apache.kafka.common.message.ExpireDelegationTokenResponseData; +import org.apache.kafka.common.message.RenewDelegationTokenRequestData; +import org.apache.kafka.common.message.RenewDelegationTokenResponseData; +import org.apache.kafka.common.metadata.DelegationTokenRecord; +import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.token.delegation.TokenInformation; +import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.metadata.DelegationTokenData; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.common.utils.Time; + +import java.nio.charset.StandardCharsets; +import javax.crypto.spec.SecretKeySpec; +import javax.crypto.Mac; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH; +import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE; +import static org.apache.kafka.common.protocol.Errors.NONE; +import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION; + +/** + * Manages DelegationTokens. + */ +public class DelegationTokenControlManager { + private Time time = Time.SYSTEM; + + static class Builder { + private LogContext logContext = null; + private DelegationTokenCache tokenCache = null; + private String secretKeyString = null; + private long tokenDefaultMaxLifetime = 0; + private long tokenDefaultRenewLifetime = 0; + + Builder setLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder setTokenCache(DelegationTokenCache tokenCache) { + this.tokenCache = tokenCache; + return this; + } + + Builder setTokenKeyString(String secretKeyString) { + this.secretKeyString = secretKeyString; + return this; + } + + Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) { + this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime; + return this; + } + + Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) { + this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime; + return this; + } + + DelegationTokenControlManager build() { + if (logContext == null) logContext = new LogContext(); + return new DelegationTokenControlManager( + logContext, + tokenCache, + secretKeyString, + tokenDefaultMaxLifetime, + tokenDefaultRenewLifetime); + } + } + + private final Logger log; + private final DelegationTokenCache tokenCache; + private final String secretKeyString; + private final long tokenDefaultMaxLifetime; + private final long tokenDefaultRenewLifetime; + + private DelegationTokenControlManager( + LogContext logContext, + DelegationTokenCache tokenCache, + String secretKeyString, + long tokenDefaultMaxLifetime, + long tokenDefaultRenewLifetime + ) { + this.log = logContext.logger(DelegationTokenControlManager.class); + this.tokenCache = tokenCache; + this.secretKeyString = secretKeyString; + this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime; + this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime; + } + + public static byte[] toBytes(String str) { + return str.getBytes(StandardCharsets.UTF_8); + } + + private byte[] createHmac(String tokenId) throws Exception { + Mac mac = Mac.getInstance("HmacSHA512"); + SecretKeySpec secretKey = new SecretKeySpec(toBytes(secretKeyString), mac.getAlgorithm()); + + mac.init(secretKey); + return mac.doFinal(toBytes(tokenId)); + } + + private TokenInformation getToken(byte[] hmac) { + String base64Pwd = Base64.getEncoder().encodeToString(hmac); + return tokenCache.tokenForHmac(base64Pwd); + } + + private boolean allowedToRenew(TokenInformation tokenInfo, KafkaPrincipal renewer) { + if (tokenInfo.owner().equals(renewer)) { + return true; + } + for (KafkaPrincipal validRenewer : tokenInfo.renewers()) { + if (validRenewer.equals(renewer)) { + return true; + } + } + return false; + } + + public boolean isEnabled() { + if (secretKeyString != null) { + return true; + } + return false; + } + + /* + * Pass in the MetadataVersion so that we can return a response to the caller + * if the current metadataVersion is too low. + */ + public ControllerResult<CreateDelegationTokenResponseData> createDelegationToken( + ControllerRequestContext context, + CreateDelegationTokenRequestData requestData, + MetadataVersion metadataVersion + ) { + long now = time.milliseconds(); + long maxLifeTime = tokenDefaultMaxLifetime; + if (requestData.maxLifetimeMs() > 0) { + maxLifeTime = Math.min(maxLifeTime, requestData.maxLifetimeMs()); + } + + long maxTimestamp = now + maxLifeTime; + long expiryTimestamp = Math.min(maxTimestamp, now + tokenDefaultRenewLifetime); + + String tokenId = Uuid.randomUuid().toString(); + + KafkaPrincipal owner = context.principal(); + if ((requestData.ownerPrincipalName() != null) && + (!requestData.ownerPrincipalName().isEmpty())) { + + owner = new KafkaPrincipal(requestData.ownerPrincipalType(), requestData.ownerPrincipalName()); + } + CreateDelegationTokenResponseData responseData = new CreateDelegationTokenResponseData() + .setPrincipalName(owner.getName()) + .setPrincipalType(owner.getPrincipalType()) + .setTokenRequesterPrincipalName(context.principal().getName()) + .setTokenRequesterPrincipalType(context.principal().getPrincipalType()); + + List<ApiMessageAndVersion> records = new ArrayList<>(); + + if (secretKeyString == null) { + // DelegationTokens are not enabled + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_AUTH_DISABLED.code())); + } + + if (!metadataVersion.isDelegationTokenSupported()) { + // DelegationTokens are not supported in this metadata version + return ControllerResult.atomicOf(records, responseData.setErrorCode(UNSUPPORTED_VERSION.code())); + } + + List<KafkaPrincipal> renewers = new ArrayList<KafkaPrincipal>(); + for (CreatableRenewers renewer : requestData.renewers()) { + if (renewer.principalType().equals(KafkaPrincipal.USER_TYPE)) { + renewers.add(new KafkaPrincipal(renewer.principalType(), renewer.principalName())); + } else { + return ControllerResult.atomicOf(records, responseData.setErrorCode(INVALID_PRINCIPAL_TYPE.code())); + } + } + + byte[] hmac; + try { + hmac = createHmac(tokenId); + } catch (Throwable e) { + return ControllerResult.atomicOf(records, responseData.setErrorCode(ApiError.fromThrowable(e).error().code())); + } + + TokenInformation newTokenInformation = new TokenInformation(tokenId, owner, + context.principal(), renewers, now, maxTimestamp, expiryTimestamp); + + DelegationTokenData newDelegationTokenData = new DelegationTokenData(newTokenInformation); + + responseData + .setErrorCode(NONE.code()) + .setIssueTimestampMs(now) + .setExpiryTimestampMs(expiryTimestamp) + .setMaxTimestampMs(maxTimestamp) + .setTokenId(tokenId) + .setHmac(hmac); + + records.add(new ApiMessageAndVersion(newDelegationTokenData.toRecord(), (short) 0)); + return ControllerResult.atomicOf(records, responseData); + } + + public ControllerResult<RenewDelegationTokenResponseData> renewDelegationToken( + ControllerRequestContext context, + RenewDelegationTokenRequestData requestData, + MetadataVersion metadataVersion + ) { + long now = time.milliseconds(); + + List<ApiMessageAndVersion> records = new ArrayList<>(); + RenewDelegationTokenResponseData responseData = new RenewDelegationTokenResponseData(); + + TokenInformation myTokenInformation = getToken(requestData.hmac()); + + if (myTokenInformation == null) { + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_NOT_FOUND.code())); + } + + if (myTokenInformation.maxTimestamp() < now || myTokenInformation.expiryTimestamp() < now) { + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_EXPIRED.code())); + } + + if (!allowedToRenew(myTokenInformation, context.principal())) { + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_OWNER_MISMATCH.code())); + } + + long renewLifeTime = tokenDefaultRenewLifetime; + if (requestData.renewPeriodMs() > 0) { + renewLifeTime = Math.min(renewLifeTime, requestData.renewPeriodMs()); + } + long renewTimeStamp = now + renewLifeTime; + long expiryTimestamp = Math.min(myTokenInformation.maxTimestamp(), renewTimeStamp); + + myTokenInformation.setExpiryTimestamp(expiryTimestamp); + + DelegationTokenData newDelegationTokenData = new DelegationTokenData(myTokenInformation); + + responseData + .setErrorCode(NONE.code()) + .setExpiryTimestampMs(expiryTimestamp); + + records.add(new ApiMessageAndVersion(newDelegationTokenData.toRecord(), (short) 0)); + return ControllerResult.atomicOf(records, responseData); + } + + public ControllerResult<ExpireDelegationTokenResponseData> expireDelegationToken( + ControllerRequestContext context, + ExpireDelegationTokenRequestData requestData, + MetadataVersion metadataVersion + ) { + long now = time.milliseconds(); + + List<ApiMessageAndVersion> records = new ArrayList<>(); + ExpireDelegationTokenResponseData responseData = new ExpireDelegationTokenResponseData(); + + if (secretKeyString == null) { + // DelegationTokens are not enabled + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_AUTH_DISABLED.code())); + } Review Comment: Move to top? Also need to check for support in metadata version? ########## metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.message.CreateDelegationTokenRequestData; +import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers; +import org.apache.kafka.common.message.CreateDelegationTokenResponseData; +import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; +import org.apache.kafka.common.message.ExpireDelegationTokenResponseData; +import org.apache.kafka.common.message.RenewDelegationTokenRequestData; +import org.apache.kafka.common.message.RenewDelegationTokenResponseData; +import org.apache.kafka.common.metadata.DelegationTokenRecord; +import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.token.delegation.TokenInformation; +import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.metadata.DelegationTokenData; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.common.utils.Time; + +import java.nio.charset.StandardCharsets; +import javax.crypto.spec.SecretKeySpec; +import javax.crypto.Mac; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH; +import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE; +import static org.apache.kafka.common.protocol.Errors.NONE; +import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION; + +/** + * Manages DelegationTokens. + */ +public class DelegationTokenControlManager { + private Time time = Time.SYSTEM; + + static class Builder { + private LogContext logContext = null; + private DelegationTokenCache tokenCache = null; + private String secretKeyString = null; + private long tokenDefaultMaxLifetime = 0; + private long tokenDefaultRenewLifetime = 0; + + Builder setLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder setTokenCache(DelegationTokenCache tokenCache) { + this.tokenCache = tokenCache; + return this; + } + + Builder setTokenKeyString(String secretKeyString) { + this.secretKeyString = secretKeyString; + return this; + } + + Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) { + this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime; + return this; + } + + Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) { + this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime; + return this; + } + + DelegationTokenControlManager build() { + if (logContext == null) logContext = new LogContext(); + return new DelegationTokenControlManager( + logContext, + tokenCache, + secretKeyString, + tokenDefaultMaxLifetime, + tokenDefaultRenewLifetime); + } + } + + private final Logger log; + private final DelegationTokenCache tokenCache; + private final String secretKeyString; + private final long tokenDefaultMaxLifetime; + private final long tokenDefaultRenewLifetime; + + private DelegationTokenControlManager( + LogContext logContext, + DelegationTokenCache tokenCache, + String secretKeyString, + long tokenDefaultMaxLifetime, + long tokenDefaultRenewLifetime + ) { + this.log = logContext.logger(DelegationTokenControlManager.class); + this.tokenCache = tokenCache; + this.secretKeyString = secretKeyString; + this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime; + this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime; + } + + public static byte[] toBytes(String str) { + return str.getBytes(StandardCharsets.UTF_8); + } + + private byte[] createHmac(String tokenId) throws Exception { + Mac mac = Mac.getInstance("HmacSHA512"); + SecretKeySpec secretKey = new SecretKeySpec(toBytes(secretKeyString), mac.getAlgorithm()); + + mac.init(secretKey); + return mac.doFinal(toBytes(tokenId)); + } + + private TokenInformation getToken(byte[] hmac) { + String base64Pwd = Base64.getEncoder().encodeToString(hmac); + return tokenCache.tokenForHmac(base64Pwd); + } + + private boolean allowedToRenew(TokenInformation tokenInfo, KafkaPrincipal renewer) { + if (tokenInfo.owner().equals(renewer)) { + return true; + } + for (KafkaPrincipal validRenewer : tokenInfo.renewers()) { + if (validRenewer.equals(renewer)) { + return true; + } + } + return false; + } + + public boolean isEnabled() { + if (secretKeyString != null) { + return true; + } + return false; + } + + /* + * Pass in the MetadataVersion so that we can return a response to the caller + * if the current metadataVersion is too low. + */ + public ControllerResult<CreateDelegationTokenResponseData> createDelegationToken( + ControllerRequestContext context, + CreateDelegationTokenRequestData requestData, + MetadataVersion metadataVersion + ) { + long now = time.milliseconds(); + long maxLifeTime = tokenDefaultMaxLifetime; + if (requestData.maxLifetimeMs() > 0) { + maxLifeTime = Math.min(maxLifeTime, requestData.maxLifetimeMs()); + } + + long maxTimestamp = now + maxLifeTime; + long expiryTimestamp = Math.min(maxTimestamp, now + tokenDefaultRenewLifetime); + + String tokenId = Uuid.randomUuid().toString(); + + KafkaPrincipal owner = context.principal(); + if ((requestData.ownerPrincipalName() != null) && + (!requestData.ownerPrincipalName().isEmpty())) { + + owner = new KafkaPrincipal(requestData.ownerPrincipalType(), requestData.ownerPrincipalName()); + } + CreateDelegationTokenResponseData responseData = new CreateDelegationTokenResponseData() + .setPrincipalName(owner.getName()) + .setPrincipalType(owner.getPrincipalType()) + .setTokenRequesterPrincipalName(context.principal().getName()) + .setTokenRequesterPrincipalType(context.principal().getPrincipalType()); + + List<ApiMessageAndVersion> records = new ArrayList<>(); + + if (secretKeyString == null) { + // DelegationTokens are not enabled + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_AUTH_DISABLED.code())); + } + + if (!metadataVersion.isDelegationTokenSupported()) { + // DelegationTokens are not supported in this metadata version + return ControllerResult.atomicOf(records, responseData.setErrorCode(UNSUPPORTED_VERSION.code())); + } + + List<KafkaPrincipal> renewers = new ArrayList<KafkaPrincipal>(); + for (CreatableRenewers renewer : requestData.renewers()) { + if (renewer.principalType().equals(KafkaPrincipal.USER_TYPE)) { + renewers.add(new KafkaPrincipal(renewer.principalType(), renewer.principalName())); + } else { + return ControllerResult.atomicOf(records, responseData.setErrorCode(INVALID_PRINCIPAL_TYPE.code())); + } + } + + byte[] hmac; + try { + hmac = createHmac(tokenId); + } catch (Throwable e) { + return ControllerResult.atomicOf(records, responseData.setErrorCode(ApiError.fromThrowable(e).error().code())); + } + + TokenInformation newTokenInformation = new TokenInformation(tokenId, owner, + context.principal(), renewers, now, maxTimestamp, expiryTimestamp); + + DelegationTokenData newDelegationTokenData = new DelegationTokenData(newTokenInformation); + + responseData + .setErrorCode(NONE.code()) + .setIssueTimestampMs(now) + .setExpiryTimestampMs(expiryTimestamp) + .setMaxTimestampMs(maxTimestamp) + .setTokenId(tokenId) + .setHmac(hmac); + + records.add(new ApiMessageAndVersion(newDelegationTokenData.toRecord(), (short) 0)); + return ControllerResult.atomicOf(records, responseData); + } + + public ControllerResult<RenewDelegationTokenResponseData> renewDelegationToken( + ControllerRequestContext context, + RenewDelegationTokenRequestData requestData, + MetadataVersion metadataVersion + ) { + long now = time.milliseconds(); + + List<ApiMessageAndVersion> records = new ArrayList<>(); + RenewDelegationTokenResponseData responseData = new RenewDelegationTokenResponseData(); + + TokenInformation myTokenInformation = getToken(requestData.hmac()); + + if (myTokenInformation == null) { + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_NOT_FOUND.code())); + } + + if (myTokenInformation.maxTimestamp() < now || myTokenInformation.expiryTimestamp() < now) { + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_EXPIRED.code())); + } + + if (!allowedToRenew(myTokenInformation, context.principal())) { + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_OWNER_MISMATCH.code())); + } + + long renewLifeTime = tokenDefaultRenewLifetime; + if (requestData.renewPeriodMs() > 0) { + renewLifeTime = Math.min(renewLifeTime, requestData.renewPeriodMs()); + } + long renewTimeStamp = now + renewLifeTime; + long expiryTimestamp = Math.min(myTokenInformation.maxTimestamp(), renewTimeStamp); + + myTokenInformation.setExpiryTimestamp(expiryTimestamp); Review Comment: This is updating the record directly within the cache, but it is okay since this is the only thread accessing that cache -- is that correct? Might be worth a comment to this effect in `getToken()` and `sweepExpiredDelegationTokens()` where we use the token cache. How does the delegation token cache get updated in other nodes? Shouldn't this updating of the token cache happen when the written record gets replayed, as opposed to here? ########## metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.message.CreateDelegationTokenRequestData; +import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers; +import org.apache.kafka.common.message.CreateDelegationTokenResponseData; +import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; +import org.apache.kafka.common.message.ExpireDelegationTokenResponseData; +import org.apache.kafka.common.message.RenewDelegationTokenRequestData; +import org.apache.kafka.common.message.RenewDelegationTokenResponseData; +import org.apache.kafka.common.metadata.DelegationTokenRecord; +import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.token.delegation.TokenInformation; +import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.metadata.DelegationTokenData; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.common.utils.Time; + +import java.nio.charset.StandardCharsets; +import javax.crypto.spec.SecretKeySpec; +import javax.crypto.Mac; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH; +import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE; +import static org.apache.kafka.common.protocol.Errors.NONE; +import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION; + +/** + * Manages DelegationTokens. + */ +public class DelegationTokenControlManager { + private Time time = Time.SYSTEM; + + static class Builder { + private LogContext logContext = null; + private DelegationTokenCache tokenCache = null; + private String secretKeyString = null; + private long tokenDefaultMaxLifetime = 0; + private long tokenDefaultRenewLifetime = 0; + + Builder setLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder setTokenCache(DelegationTokenCache tokenCache) { + this.tokenCache = tokenCache; + return this; + } + + Builder setTokenKeyString(String secretKeyString) { + this.secretKeyString = secretKeyString; + return this; + } + + Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) { + this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime; + return this; + } + + Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) { + this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime; + return this; + } + + DelegationTokenControlManager build() { + if (logContext == null) logContext = new LogContext(); + return new DelegationTokenControlManager( + logContext, + tokenCache, + secretKeyString, + tokenDefaultMaxLifetime, + tokenDefaultRenewLifetime); + } + } + + private final Logger log; + private final DelegationTokenCache tokenCache; + private final String secretKeyString; + private final long tokenDefaultMaxLifetime; + private final long tokenDefaultRenewLifetime; + + private DelegationTokenControlManager( + LogContext logContext, + DelegationTokenCache tokenCache, + String secretKeyString, + long tokenDefaultMaxLifetime, + long tokenDefaultRenewLifetime + ) { + this.log = logContext.logger(DelegationTokenControlManager.class); + this.tokenCache = tokenCache; + this.secretKeyString = secretKeyString; + this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime; + this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime; + } + + public static byte[] toBytes(String str) { + return str.getBytes(StandardCharsets.UTF_8); + } + + private byte[] createHmac(String tokenId) throws Exception { + Mac mac = Mac.getInstance("HmacSHA512"); + SecretKeySpec secretKey = new SecretKeySpec(toBytes(secretKeyString), mac.getAlgorithm()); + + mac.init(secretKey); + return mac.doFinal(toBytes(tokenId)); + } + + private TokenInformation getToken(byte[] hmac) { + String base64Pwd = Base64.getEncoder().encodeToString(hmac); + return tokenCache.tokenForHmac(base64Pwd); + } + + private boolean allowedToRenew(TokenInformation tokenInfo, KafkaPrincipal renewer) { + if (tokenInfo.owner().equals(renewer)) { + return true; + } + for (KafkaPrincipal validRenewer : tokenInfo.renewers()) { + if (validRenewer.equals(renewer)) { + return true; + } + } + return false; + } + + public boolean isEnabled() { + if (secretKeyString != null) { + return true; + } + return false; + } + + /* + * Pass in the MetadataVersion so that we can return a response to the caller + * if the current metadataVersion is too low. + */ + public ControllerResult<CreateDelegationTokenResponseData> createDelegationToken( + ControllerRequestContext context, + CreateDelegationTokenRequestData requestData, + MetadataVersion metadataVersion + ) { + long now = time.milliseconds(); + long maxLifeTime = tokenDefaultMaxLifetime; + if (requestData.maxLifetimeMs() > 0) { + maxLifeTime = Math.min(maxLifeTime, requestData.maxLifetimeMs()); + } + + long maxTimestamp = now + maxLifeTime; + long expiryTimestamp = Math.min(maxTimestamp, now + tokenDefaultRenewLifetime); + + String tokenId = Uuid.randomUuid().toString(); + + KafkaPrincipal owner = context.principal(); + if ((requestData.ownerPrincipalName() != null) && + (!requestData.ownerPrincipalName().isEmpty())) { + + owner = new KafkaPrincipal(requestData.ownerPrincipalType(), requestData.ownerPrincipalName()); + } + CreateDelegationTokenResponseData responseData = new CreateDelegationTokenResponseData() + .setPrincipalName(owner.getName()) + .setPrincipalType(owner.getPrincipalType()) + .setTokenRequesterPrincipalName(context.principal().getName()) + .setTokenRequesterPrincipalType(context.principal().getPrincipalType()); + + List<ApiMessageAndVersion> records = new ArrayList<>(); + + if (secretKeyString == null) { + // DelegationTokens are not enabled + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_AUTH_DISABLED.code())); + } + + if (!metadataVersion.isDelegationTokenSupported()) { + // DelegationTokens are not supported in this metadata version + return ControllerResult.atomicOf(records, responseData.setErrorCode(UNSUPPORTED_VERSION.code())); + } + + List<KafkaPrincipal> renewers = new ArrayList<KafkaPrincipal>(); + for (CreatableRenewers renewer : requestData.renewers()) { + if (renewer.principalType().equals(KafkaPrincipal.USER_TYPE)) { + renewers.add(new KafkaPrincipal(renewer.principalType(), renewer.principalName())); + } else { + return ControllerResult.atomicOf(records, responseData.setErrorCode(INVALID_PRINCIPAL_TYPE.code())); + } + } + + byte[] hmac; + try { + hmac = createHmac(tokenId); + } catch (Throwable e) { + return ControllerResult.atomicOf(records, responseData.setErrorCode(ApiError.fromThrowable(e).error().code())); + } + + TokenInformation newTokenInformation = new TokenInformation(tokenId, owner, + context.principal(), renewers, now, maxTimestamp, expiryTimestamp); + + DelegationTokenData newDelegationTokenData = new DelegationTokenData(newTokenInformation); + + responseData + .setErrorCode(NONE.code()) + .setIssueTimestampMs(now) + .setExpiryTimestampMs(expiryTimestamp) + .setMaxTimestampMs(maxTimestamp) + .setTokenId(tokenId) + .setHmac(hmac); + + records.add(new ApiMessageAndVersion(newDelegationTokenData.toRecord(), (short) 0)); + return ControllerResult.atomicOf(records, responseData); + } + + public ControllerResult<RenewDelegationTokenResponseData> renewDelegationToken( + ControllerRequestContext context, + RenewDelegationTokenRequestData requestData, + MetadataVersion metadataVersion + ) { + long now = time.milliseconds(); + + List<ApiMessageAndVersion> records = new ArrayList<>(); + RenewDelegationTokenResponseData responseData = new RenewDelegationTokenResponseData(); + + TokenInformation myTokenInformation = getToken(requestData.hmac()); + + if (myTokenInformation == null) { + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_NOT_FOUND.code())); + } + + if (myTokenInformation.maxTimestamp() < now || myTokenInformation.expiryTimestamp() < now) { + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_EXPIRED.code())); + } + + if (!allowedToRenew(myTokenInformation, context.principal())) { + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_OWNER_MISMATCH.code())); + } + + long renewLifeTime = tokenDefaultRenewLifetime; + if (requestData.renewPeriodMs() > 0) { + renewLifeTime = Math.min(renewLifeTime, requestData.renewPeriodMs()); + } + long renewTimeStamp = now + renewLifeTime; + long expiryTimestamp = Math.min(myTokenInformation.maxTimestamp(), renewTimeStamp); + + myTokenInformation.setExpiryTimestamp(expiryTimestamp); + + DelegationTokenData newDelegationTokenData = new DelegationTokenData(myTokenInformation); + + responseData + .setErrorCode(NONE.code()) + .setExpiryTimestampMs(expiryTimestamp); + + records.add(new ApiMessageAndVersion(newDelegationTokenData.toRecord(), (short) 0)); + return ControllerResult.atomicOf(records, responseData); + } + + public ControllerResult<ExpireDelegationTokenResponseData> expireDelegationToken( + ControllerRequestContext context, + ExpireDelegationTokenRequestData requestData, + MetadataVersion metadataVersion + ) { + long now = time.milliseconds(); + + List<ApiMessageAndVersion> records = new ArrayList<>(); + ExpireDelegationTokenResponseData responseData = new ExpireDelegationTokenResponseData(); + + if (secretKeyString == null) { + // DelegationTokens are not enabled + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_AUTH_DISABLED.code())); + } + + TokenInformation myTokenInformation = getToken(requestData.hmac()); + + if (myTokenInformation == null) { + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_NOT_FOUND.code())); + } + + if (myTokenInformation.maxTimestamp() < now || myTokenInformation.expiryTimestamp() < now) { + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_EXPIRED.code())); + } + + if (!allowedToRenew(myTokenInformation, context.principal())) { + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_OWNER_MISMATCH.code())); + } + + if (requestData.expiryTimePeriodMs() < 0) { // expire immediately + responseData + .setErrorCode(NONE.code()) + .setExpiryTimestampMs(requestData.expiryTimePeriodMs()); + records.add(new ApiMessageAndVersion(new RemoveDelegationTokenRecord(). + setTokenId(myTokenInformation.tokenId()), (short) 0)); + } else { + long expiryTimestamp = Math.min(myTokenInformation.maxTimestamp(), + now + requestData.expiryTimePeriodMs()); + + responseData + .setErrorCode(NONE.code()) + .setExpiryTimestampMs(expiryTimestamp); + + myTokenInformation.setExpiryTimestamp(expiryTimestamp); Review Comment: Same question as above -- should this really be done here, or should it be done when the record gets replayed? ########## metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.message.CreateDelegationTokenRequestData; +import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers; +import org.apache.kafka.common.message.CreateDelegationTokenResponseData; +import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; +import org.apache.kafka.common.message.ExpireDelegationTokenResponseData; +import org.apache.kafka.common.message.RenewDelegationTokenRequestData; +import org.apache.kafka.common.message.RenewDelegationTokenResponseData; +import org.apache.kafka.common.metadata.DelegationTokenRecord; +import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.token.delegation.TokenInformation; +import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.metadata.DelegationTokenData; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.common.utils.Time; + +import java.nio.charset.StandardCharsets; +import javax.crypto.spec.SecretKeySpec; +import javax.crypto.Mac; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH; +import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE; +import static org.apache.kafka.common.protocol.Errors.NONE; +import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION; + +/** + * Manages DelegationTokens. + */ +public class DelegationTokenControlManager { + private Time time = Time.SYSTEM; + + static class Builder { + private LogContext logContext = null; + private DelegationTokenCache tokenCache = null; + private String secretKeyString = null; + private long tokenDefaultMaxLifetime = 0; + private long tokenDefaultRenewLifetime = 0; + + Builder setLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder setTokenCache(DelegationTokenCache tokenCache) { + this.tokenCache = tokenCache; + return this; + } + + Builder setTokenKeyString(String secretKeyString) { + this.secretKeyString = secretKeyString; + return this; + } + + Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) { + this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime; + return this; + } + + Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) { + this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime; + return this; + } + + DelegationTokenControlManager build() { + if (logContext == null) logContext = new LogContext(); + return new DelegationTokenControlManager( + logContext, + tokenCache, + secretKeyString, + tokenDefaultMaxLifetime, + tokenDefaultRenewLifetime); + } + } + + private final Logger log; + private final DelegationTokenCache tokenCache; + private final String secretKeyString; + private final long tokenDefaultMaxLifetime; + private final long tokenDefaultRenewLifetime; + + private DelegationTokenControlManager( + LogContext logContext, + DelegationTokenCache tokenCache, + String secretKeyString, + long tokenDefaultMaxLifetime, + long tokenDefaultRenewLifetime + ) { + this.log = logContext.logger(DelegationTokenControlManager.class); + this.tokenCache = tokenCache; + this.secretKeyString = secretKeyString; + this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime; + this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime; + } + + public static byte[] toBytes(String str) { + return str.getBytes(StandardCharsets.UTF_8); + } + + private byte[] createHmac(String tokenId) throws Exception { + Mac mac = Mac.getInstance("HmacSHA512"); + SecretKeySpec secretKey = new SecretKeySpec(toBytes(secretKeyString), mac.getAlgorithm()); + + mac.init(secretKey); + return mac.doFinal(toBytes(tokenId)); + } + + private TokenInformation getToken(byte[] hmac) { + String base64Pwd = Base64.getEncoder().encodeToString(hmac); + return tokenCache.tokenForHmac(base64Pwd); + } + + private boolean allowedToRenew(TokenInformation tokenInfo, KafkaPrincipal renewer) { + if (tokenInfo.owner().equals(renewer)) { + return true; + } + for (KafkaPrincipal validRenewer : tokenInfo.renewers()) { + if (validRenewer.equals(renewer)) { + return true; + } + } + return false; + } + + public boolean isEnabled() { + if (secretKeyString != null) { + return true; + } + return false; + } + + /* + * Pass in the MetadataVersion so that we can return a response to the caller + * if the current metadataVersion is too low. + */ + public ControllerResult<CreateDelegationTokenResponseData> createDelegationToken( + ControllerRequestContext context, + CreateDelegationTokenRequestData requestData, + MetadataVersion metadataVersion + ) { + long now = time.milliseconds(); + long maxLifeTime = tokenDefaultMaxLifetime; + if (requestData.maxLifetimeMs() > 0) { + maxLifeTime = Math.min(maxLifeTime, requestData.maxLifetimeMs()); + } + + long maxTimestamp = now + maxLifeTime; + long expiryTimestamp = Math.min(maxTimestamp, now + tokenDefaultRenewLifetime); + + String tokenId = Uuid.randomUuid().toString(); + + KafkaPrincipal owner = context.principal(); + if ((requestData.ownerPrincipalName() != null) && + (!requestData.ownerPrincipalName().isEmpty())) { + + owner = new KafkaPrincipal(requestData.ownerPrincipalType(), requestData.ownerPrincipalName()); + } + CreateDelegationTokenResponseData responseData = new CreateDelegationTokenResponseData() + .setPrincipalName(owner.getName()) + .setPrincipalType(owner.getPrincipalType()) + .setTokenRequesterPrincipalName(context.principal().getName()) + .setTokenRequesterPrincipalType(context.principal().getPrincipalType()); + + List<ApiMessageAndVersion> records = new ArrayList<>(); + + if (secretKeyString == null) { + // DelegationTokens are not enabled + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_AUTH_DISABLED.code())); + } + + if (!metadataVersion.isDelegationTokenSupported()) { + // DelegationTokens are not supported in this metadata version + return ControllerResult.atomicOf(records, responseData.setErrorCode(UNSUPPORTED_VERSION.code())); + } + + List<KafkaPrincipal> renewers = new ArrayList<KafkaPrincipal>(); + for (CreatableRenewers renewer : requestData.renewers()) { + if (renewer.principalType().equals(KafkaPrincipal.USER_TYPE)) { + renewers.add(new KafkaPrincipal(renewer.principalType(), renewer.principalName())); + } else { + return ControllerResult.atomicOf(records, responseData.setErrorCode(INVALID_PRINCIPAL_TYPE.code())); + } + } + + byte[] hmac; + try { + hmac = createHmac(tokenId); + } catch (Throwable e) { + return ControllerResult.atomicOf(records, responseData.setErrorCode(ApiError.fromThrowable(e).error().code())); + } + + TokenInformation newTokenInformation = new TokenInformation(tokenId, owner, + context.principal(), renewers, now, maxTimestamp, expiryTimestamp); + + DelegationTokenData newDelegationTokenData = new DelegationTokenData(newTokenInformation); + + responseData + .setErrorCode(NONE.code()) + .setIssueTimestampMs(now) + .setExpiryTimestampMs(expiryTimestamp) + .setMaxTimestampMs(maxTimestamp) + .setTokenId(tokenId) + .setHmac(hmac); + + records.add(new ApiMessageAndVersion(newDelegationTokenData.toRecord(), (short) 0)); + return ControllerResult.atomicOf(records, responseData); + } + + public ControllerResult<RenewDelegationTokenResponseData> renewDelegationToken( + ControllerRequestContext context, + RenewDelegationTokenRequestData requestData, + MetadataVersion metadataVersion + ) { + long now = time.milliseconds(); + + List<ApiMessageAndVersion> records = new ArrayList<>(); + RenewDelegationTokenResponseData responseData = new RenewDelegationTokenResponseData(); + + TokenInformation myTokenInformation = getToken(requestData.hmac()); + + if (myTokenInformation == null) { + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_NOT_FOUND.code())); + } + + if (myTokenInformation.maxTimestamp() < now || myTokenInformation.expiryTimestamp() < now) { + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_EXPIRED.code())); + } + + if (!allowedToRenew(myTokenInformation, context.principal())) { + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_OWNER_MISMATCH.code())); + } + + long renewLifeTime = tokenDefaultRenewLifetime; + if (requestData.renewPeriodMs() > 0) { + renewLifeTime = Math.min(renewLifeTime, requestData.renewPeriodMs()); + } + long renewTimeStamp = now + renewLifeTime; + long expiryTimestamp = Math.min(myTokenInformation.maxTimestamp(), renewTimeStamp); + + myTokenInformation.setExpiryTimestamp(expiryTimestamp); + + DelegationTokenData newDelegationTokenData = new DelegationTokenData(myTokenInformation); + + responseData + .setErrorCode(NONE.code()) + .setExpiryTimestampMs(expiryTimestamp); + + records.add(new ApiMessageAndVersion(newDelegationTokenData.toRecord(), (short) 0)); + return ControllerResult.atomicOf(records, responseData); + } + + public ControllerResult<ExpireDelegationTokenResponseData> expireDelegationToken( + ControllerRequestContext context, + ExpireDelegationTokenRequestData requestData, + MetadataVersion metadataVersion + ) { + long now = time.milliseconds(); + + List<ApiMessageAndVersion> records = new ArrayList<>(); + ExpireDelegationTokenResponseData responseData = new ExpireDelegationTokenResponseData(); + + if (secretKeyString == null) { + // DelegationTokens are not enabled + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_AUTH_DISABLED.code())); + } + + TokenInformation myTokenInformation = getToken(requestData.hmac()); + + if (myTokenInformation == null) { + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_NOT_FOUND.code())); + } + + if (myTokenInformation.maxTimestamp() < now || myTokenInformation.expiryTimestamp() < now) { + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_EXPIRED.code())); + } + + if (!allowedToRenew(myTokenInformation, context.principal())) { + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_OWNER_MISMATCH.code())); + } Review Comment: Seems we check this string of conditions in multiple places -- worth refactoring it out into a single method so we can be confident that everybody checks everything? ########## metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.message.CreateDelegationTokenRequestData; +import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers; +import org.apache.kafka.common.message.CreateDelegationTokenResponseData; +import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; +import org.apache.kafka.common.message.ExpireDelegationTokenResponseData; +import org.apache.kafka.common.message.RenewDelegationTokenRequestData; +import org.apache.kafka.common.message.RenewDelegationTokenResponseData; +import org.apache.kafka.common.metadata.DelegationTokenRecord; +import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.token.delegation.TokenInformation; +import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.metadata.DelegationTokenData; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.common.utils.Time; + +import java.nio.charset.StandardCharsets; +import javax.crypto.spec.SecretKeySpec; +import javax.crypto.Mac; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH; +import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE; +import static org.apache.kafka.common.protocol.Errors.NONE; +import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION; + +/** + * Manages DelegationTokens. + */ +public class DelegationTokenControlManager { + private Time time = Time.SYSTEM; + + static class Builder { + private LogContext logContext = null; + private DelegationTokenCache tokenCache = null; + private String secretKeyString = null; + private long tokenDefaultMaxLifetime = 0; + private long tokenDefaultRenewLifetime = 0; + + Builder setLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder setTokenCache(DelegationTokenCache tokenCache) { + this.tokenCache = tokenCache; + return this; + } + + Builder setTokenKeyString(String secretKeyString) { + this.secretKeyString = secretKeyString; + return this; + } + + Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) { + this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime; + return this; + } + + Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) { Review Comment: Should this be called `setTokenDefaultRenewLifetime()`? ########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -202,6 +211,11 @@ static public class Builder { private BootstrapMetadata bootstrapMetadata = null; private int maxRecordsPerBatch = MAX_RECORDS_PER_BATCH; private boolean zkMigrationEnabled = false; + private DelegationTokenCache tokenCache; + private String tokenKeyString; Review Comment: Any particular reason why this is called `tokenKeyString` instead of `tokenSecretKeyString` or `tokenSecretString`? ########## metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.message.CreateDelegationTokenRequestData; +import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers; +import org.apache.kafka.common.message.CreateDelegationTokenResponseData; +import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; +import org.apache.kafka.common.message.ExpireDelegationTokenResponseData; +import org.apache.kafka.common.message.RenewDelegationTokenRequestData; +import org.apache.kafka.common.message.RenewDelegationTokenResponseData; +import org.apache.kafka.common.metadata.DelegationTokenRecord; +import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.token.delegation.TokenInformation; +import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.metadata.DelegationTokenData; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.common.utils.Time; + +import java.nio.charset.StandardCharsets; +import javax.crypto.spec.SecretKeySpec; +import javax.crypto.Mac; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH; +import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE; +import static org.apache.kafka.common.protocol.Errors.NONE; +import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION; + +/** + * Manages DelegationTokens. + */ +public class DelegationTokenControlManager { + private Time time = Time.SYSTEM; + + static class Builder { + private LogContext logContext = null; + private DelegationTokenCache tokenCache = null; + private String secretKeyString = null; + private long tokenDefaultMaxLifetime = 0; + private long tokenDefaultRenewLifetime = 0; + + Builder setLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder setTokenCache(DelegationTokenCache tokenCache) { + this.tokenCache = tokenCache; + return this; + } + + Builder setTokenKeyString(String secretKeyString) { Review Comment: Any reason why this is called `setTokenKeyString()` instead of `setSecretKeyString()`? ########## metadata/src/main/java/org/apache/kafka/image/node/DelegationTokenDataNode.java: ########## Review Comment: Seems we should have a unit test (to confirm redaction) ########## metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.message.CreateDelegationTokenRequestData; +import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers; +import org.apache.kafka.common.message.CreateDelegationTokenResponseData; +import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; +import org.apache.kafka.common.message.ExpireDelegationTokenResponseData; +import org.apache.kafka.common.message.RenewDelegationTokenRequestData; +import org.apache.kafka.common.message.RenewDelegationTokenResponseData; +import org.apache.kafka.common.metadata.DelegationTokenRecord; +import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.token.delegation.TokenInformation; +import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.metadata.DelegationTokenData; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.common.utils.Time; + +import java.nio.charset.StandardCharsets; +import javax.crypto.spec.SecretKeySpec; +import javax.crypto.Mac; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH; +import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE; +import static org.apache.kafka.common.protocol.Errors.NONE; +import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION; + +/** + * Manages DelegationTokens. + */ +public class DelegationTokenControlManager { + private Time time = Time.SYSTEM; + + static class Builder { + private LogContext logContext = null; + private DelegationTokenCache tokenCache = null; + private String secretKeyString = null; + private long tokenDefaultMaxLifetime = 0; + private long tokenDefaultRenewLifetime = 0; + + Builder setLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder setTokenCache(DelegationTokenCache tokenCache) { + this.tokenCache = tokenCache; + return this; + } + + Builder setTokenKeyString(String secretKeyString) { + this.secretKeyString = secretKeyString; + return this; + } + + Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) { + this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime; + return this; + } + + Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) { + this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime; + return this; + } + + DelegationTokenControlManager build() { + if (logContext == null) logContext = new LogContext(); + return new DelegationTokenControlManager( + logContext, + tokenCache, + secretKeyString, + tokenDefaultMaxLifetime, + tokenDefaultRenewLifetime); + } + } + + private final Logger log; + private final DelegationTokenCache tokenCache; + private final String secretKeyString; + private final long tokenDefaultMaxLifetime; + private final long tokenDefaultRenewLifetime; + + private DelegationTokenControlManager( + LogContext logContext, + DelegationTokenCache tokenCache, + String secretKeyString, + long tokenDefaultMaxLifetime, + long tokenDefaultRenewLifetime + ) { + this.log = logContext.logger(DelegationTokenControlManager.class); + this.tokenCache = tokenCache; + this.secretKeyString = secretKeyString; + this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime; + this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime; + } + + public static byte[] toBytes(String str) { Review Comment: `private` instead of `public`? ########## metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.message.CreateDelegationTokenRequestData; +import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers; +import org.apache.kafka.common.message.CreateDelegationTokenResponseData; +import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; +import org.apache.kafka.common.message.ExpireDelegationTokenResponseData; +import org.apache.kafka.common.message.RenewDelegationTokenRequestData; +import org.apache.kafka.common.message.RenewDelegationTokenResponseData; +import org.apache.kafka.common.metadata.DelegationTokenRecord; +import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.token.delegation.TokenInformation; +import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.metadata.DelegationTokenData; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.common.utils.Time; + +import java.nio.charset.StandardCharsets; +import javax.crypto.spec.SecretKeySpec; +import javax.crypto.Mac; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH; +import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE; +import static org.apache.kafka.common.protocol.Errors.NONE; +import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION; + +/** + * Manages DelegationTokens. + */ +public class DelegationTokenControlManager { + private Time time = Time.SYSTEM; + + static class Builder { + private LogContext logContext = null; + private DelegationTokenCache tokenCache = null; + private String secretKeyString = null; + private long tokenDefaultMaxLifetime = 0; + private long tokenDefaultRenewLifetime = 0; + + Builder setLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder setTokenCache(DelegationTokenCache tokenCache) { + this.tokenCache = tokenCache; + return this; + } + + Builder setTokenKeyString(String secretKeyString) { + this.secretKeyString = secretKeyString; + return this; + } + + Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) { + this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime; + return this; + } + + Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) { + this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime; + return this; + } + + DelegationTokenControlManager build() { + if (logContext == null) logContext = new LogContext(); + return new DelegationTokenControlManager( + logContext, + tokenCache, + secretKeyString, + tokenDefaultMaxLifetime, + tokenDefaultRenewLifetime); + } + } + + private final Logger log; + private final DelegationTokenCache tokenCache; + private final String secretKeyString; + private final long tokenDefaultMaxLifetime; + private final long tokenDefaultRenewLifetime; + + private DelegationTokenControlManager( + LogContext logContext, + DelegationTokenCache tokenCache, + String secretKeyString, + long tokenDefaultMaxLifetime, + long tokenDefaultRenewLifetime + ) { + this.log = logContext.logger(DelegationTokenControlManager.class); + this.tokenCache = tokenCache; + this.secretKeyString = secretKeyString; + this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime; + this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime; + } + + public static byte[] toBytes(String str) { + return str.getBytes(StandardCharsets.UTF_8); + } + + private byte[] createHmac(String tokenId) throws Exception { + Mac mac = Mac.getInstance("HmacSHA512"); + SecretKeySpec secretKey = new SecretKeySpec(toBytes(secretKeyString), mac.getAlgorithm()); + + mac.init(secretKey); + return mac.doFinal(toBytes(tokenId)); + } + + private TokenInformation getToken(byte[] hmac) { + String base64Pwd = Base64.getEncoder().encodeToString(hmac); + return tokenCache.tokenForHmac(base64Pwd); + } + + private boolean allowedToRenew(TokenInformation tokenInfo, KafkaPrincipal renewer) { + if (tokenInfo.owner().equals(renewer)) { + return true; + } + for (KafkaPrincipal validRenewer : tokenInfo.renewers()) { + if (validRenewer.equals(renewer)) { + return true; + } + } + return false; Review Comment: Simplify? `return tokenInfo.owner().equals(renewer) || tokenInfo.renewers().contains(renewer);` ########## metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.message.CreateDelegationTokenRequestData; +import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers; +import org.apache.kafka.common.message.CreateDelegationTokenResponseData; +import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; +import org.apache.kafka.common.message.ExpireDelegationTokenResponseData; +import org.apache.kafka.common.message.RenewDelegationTokenRequestData; +import org.apache.kafka.common.message.RenewDelegationTokenResponseData; +import org.apache.kafka.common.metadata.DelegationTokenRecord; +import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.token.delegation.TokenInformation; +import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.metadata.DelegationTokenData; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.common.utils.Time; + +import java.nio.charset.StandardCharsets; +import javax.crypto.spec.SecretKeySpec; +import javax.crypto.Mac; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH; +import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE; +import static org.apache.kafka.common.protocol.Errors.NONE; +import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION; + +/** + * Manages DelegationTokens. + */ +public class DelegationTokenControlManager { + private Time time = Time.SYSTEM; + + static class Builder { + private LogContext logContext = null; + private DelegationTokenCache tokenCache = null; + private String secretKeyString = null; + private long tokenDefaultMaxLifetime = 0; + private long tokenDefaultRenewLifetime = 0; + + Builder setLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder setTokenCache(DelegationTokenCache tokenCache) { + this.tokenCache = tokenCache; + return this; + } + + Builder setTokenKeyString(String secretKeyString) { + this.secretKeyString = secretKeyString; + return this; + } + + Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) { + this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime; + return this; + } + + Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) { + this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime; + return this; + } + + DelegationTokenControlManager build() { + if (logContext == null) logContext = new LogContext(); + return new DelegationTokenControlManager( + logContext, + tokenCache, + secretKeyString, + tokenDefaultMaxLifetime, + tokenDefaultRenewLifetime); + } + } + + private final Logger log; + private final DelegationTokenCache tokenCache; + private final String secretKeyString; + private final long tokenDefaultMaxLifetime; + private final long tokenDefaultRenewLifetime; + + private DelegationTokenControlManager( + LogContext logContext, + DelegationTokenCache tokenCache, + String secretKeyString, + long tokenDefaultMaxLifetime, + long tokenDefaultRenewLifetime Review Comment: Might be good to append `Ms` to clearly indicate the units. Same elsewhere if necessary. ########## metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.message.CreateDelegationTokenRequestData; +import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers; +import org.apache.kafka.common.message.CreateDelegationTokenResponseData; +import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; +import org.apache.kafka.common.message.ExpireDelegationTokenResponseData; +import org.apache.kafka.common.message.RenewDelegationTokenRequestData; +import org.apache.kafka.common.message.RenewDelegationTokenResponseData; +import org.apache.kafka.common.metadata.DelegationTokenRecord; +import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.token.delegation.TokenInformation; +import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.metadata.DelegationTokenData; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.common.utils.Time; + +import java.nio.charset.StandardCharsets; +import javax.crypto.spec.SecretKeySpec; +import javax.crypto.Mac; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH; +import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE; +import static org.apache.kafka.common.protocol.Errors.NONE; +import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION; + +/** + * Manages DelegationTokens. + */ +public class DelegationTokenControlManager { + private Time time = Time.SYSTEM; + + static class Builder { + private LogContext logContext = null; + private DelegationTokenCache tokenCache = null; + private String secretKeyString = null; + private long tokenDefaultMaxLifetime = 0; + private long tokenDefaultRenewLifetime = 0; + + Builder setLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder setTokenCache(DelegationTokenCache tokenCache) { + this.tokenCache = tokenCache; + return this; + } + + Builder setTokenKeyString(String secretKeyString) { + this.secretKeyString = secretKeyString; + return this; + } + + Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) { + this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime; + return this; + } + + Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) { + this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime; + return this; + } + + DelegationTokenControlManager build() { + if (logContext == null) logContext = new LogContext(); + return new DelegationTokenControlManager( + logContext, + tokenCache, + secretKeyString, + tokenDefaultMaxLifetime, + tokenDefaultRenewLifetime); + } + } + + private final Logger log; + private final DelegationTokenCache tokenCache; + private final String secretKeyString; + private final long tokenDefaultMaxLifetime; + private final long tokenDefaultRenewLifetime; + + private DelegationTokenControlManager( + LogContext logContext, + DelegationTokenCache tokenCache, + String secretKeyString, + long tokenDefaultMaxLifetime, + long tokenDefaultRenewLifetime + ) { + this.log = logContext.logger(DelegationTokenControlManager.class); + this.tokenCache = tokenCache; + this.secretKeyString = secretKeyString; + this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime; + this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime; + } + + public static byte[] toBytes(String str) { + return str.getBytes(StandardCharsets.UTF_8); + } + + private byte[] createHmac(String tokenId) throws Exception { + Mac mac = Mac.getInstance("HmacSHA512"); + SecretKeySpec secretKey = new SecretKeySpec(toBytes(secretKeyString), mac.getAlgorithm()); + + mac.init(secretKey); + return mac.doFinal(toBytes(tokenId)); + } + + private TokenInformation getToken(byte[] hmac) { + String base64Pwd = Base64.getEncoder().encodeToString(hmac); + return tokenCache.tokenForHmac(base64Pwd); + } + + private boolean allowedToRenew(TokenInformation tokenInfo, KafkaPrincipal renewer) { + if (tokenInfo.owner().equals(renewer)) { + return true; + } + for (KafkaPrincipal validRenewer : tokenInfo.renewers()) { + if (validRenewer.equals(renewer)) { + return true; + } + } + return false; + } + + public boolean isEnabled() { + if (secretKeyString != null) { + return true; + } + return false; + } + + /* + * Pass in the MetadataVersion so that we can return a response to the caller + * if the current metadataVersion is too low. + */ + public ControllerResult<CreateDelegationTokenResponseData> createDelegationToken( + ControllerRequestContext context, + CreateDelegationTokenRequestData requestData, + MetadataVersion metadataVersion + ) { + long now = time.milliseconds(); + long maxLifeTime = tokenDefaultMaxLifetime; + if (requestData.maxLifetimeMs() > 0) { + maxLifeTime = Math.min(maxLifeTime, requestData.maxLifetimeMs()); + } + + long maxTimestamp = now + maxLifeTime; + long expiryTimestamp = Math.min(maxTimestamp, now + tokenDefaultRenewLifetime); + + String tokenId = Uuid.randomUuid().toString(); + + KafkaPrincipal owner = context.principal(); + if ((requestData.ownerPrincipalName() != null) && + (!requestData.ownerPrincipalName().isEmpty())) { + + owner = new KafkaPrincipal(requestData.ownerPrincipalType(), requestData.ownerPrincipalName()); + } + CreateDelegationTokenResponseData responseData = new CreateDelegationTokenResponseData() + .setPrincipalName(owner.getName()) + .setPrincipalType(owner.getPrincipalType()) + .setTokenRequesterPrincipalName(context.principal().getName()) + .setTokenRequesterPrincipalType(context.principal().getPrincipalType()); + + List<ApiMessageAndVersion> records = new ArrayList<>(); + + if (secretKeyString == null) { Review Comment: Should reuse `!isEnabled()`, and same elsewhere ########## metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.message.CreateDelegationTokenRequestData; +import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers; +import org.apache.kafka.common.message.CreateDelegationTokenResponseData; +import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; +import org.apache.kafka.common.message.ExpireDelegationTokenResponseData; +import org.apache.kafka.common.message.RenewDelegationTokenRequestData; +import org.apache.kafka.common.message.RenewDelegationTokenResponseData; +import org.apache.kafka.common.metadata.DelegationTokenRecord; +import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.token.delegation.TokenInformation; +import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.metadata.DelegationTokenData; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.common.utils.Time; + +import java.nio.charset.StandardCharsets; +import javax.crypto.spec.SecretKeySpec; +import javax.crypto.Mac; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH; +import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE; +import static org.apache.kafka.common.protocol.Errors.NONE; +import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION; + +/** + * Manages DelegationTokens. + */ +public class DelegationTokenControlManager { + private Time time = Time.SYSTEM; + + static class Builder { + private LogContext logContext = null; + private DelegationTokenCache tokenCache = null; + private String secretKeyString = null; + private long tokenDefaultMaxLifetime = 0; + private long tokenDefaultRenewLifetime = 0; + + Builder setLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder setTokenCache(DelegationTokenCache tokenCache) { + this.tokenCache = tokenCache; + return this; + } + + Builder setTokenKeyString(String secretKeyString) { + this.secretKeyString = secretKeyString; + return this; + } + + Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) { + this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime; + return this; + } + + Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) { + this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime; + return this; + } + + DelegationTokenControlManager build() { + if (logContext == null) logContext = new LogContext(); + return new DelegationTokenControlManager( + logContext, + tokenCache, + secretKeyString, + tokenDefaultMaxLifetime, + tokenDefaultRenewLifetime); + } + } + + private final Logger log; + private final DelegationTokenCache tokenCache; + private final String secretKeyString; + private final long tokenDefaultMaxLifetime; + private final long tokenDefaultRenewLifetime; + + private DelegationTokenControlManager( + LogContext logContext, + DelegationTokenCache tokenCache, + String secretKeyString, + long tokenDefaultMaxLifetime, + long tokenDefaultRenewLifetime + ) { + this.log = logContext.logger(DelegationTokenControlManager.class); + this.tokenCache = tokenCache; + this.secretKeyString = secretKeyString; + this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime; + this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime; + } + + public static byte[] toBytes(String str) { + return str.getBytes(StandardCharsets.UTF_8); + } + + private byte[] createHmac(String tokenId) throws Exception { + Mac mac = Mac.getInstance("HmacSHA512"); + SecretKeySpec secretKey = new SecretKeySpec(toBytes(secretKeyString), mac.getAlgorithm()); + + mac.init(secretKey); + return mac.doFinal(toBytes(tokenId)); + } + + private TokenInformation getToken(byte[] hmac) { + String base64Pwd = Base64.getEncoder().encodeToString(hmac); + return tokenCache.tokenForHmac(base64Pwd); + } + + private boolean allowedToRenew(TokenInformation tokenInfo, KafkaPrincipal renewer) { + if (tokenInfo.owner().equals(renewer)) { + return true; + } + for (KafkaPrincipal validRenewer : tokenInfo.renewers()) { + if (validRenewer.equals(renewer)) { + return true; + } + } + return false; + } + + public boolean isEnabled() { + if (secretKeyString != null) { + return true; + } + return false; + } + + /* + * Pass in the MetadataVersion so that we can return a response to the caller + * if the current metadataVersion is too low. + */ + public ControllerResult<CreateDelegationTokenResponseData> createDelegationToken( + ControllerRequestContext context, + CreateDelegationTokenRequestData requestData, + MetadataVersion metadataVersion + ) { + long now = time.milliseconds(); + long maxLifeTime = tokenDefaultMaxLifetime; + if (requestData.maxLifetimeMs() > 0) { + maxLifeTime = Math.min(maxLifeTime, requestData.maxLifetimeMs()); + } + + long maxTimestamp = now + maxLifeTime; + long expiryTimestamp = Math.min(maxTimestamp, now + tokenDefaultRenewLifetime); + + String tokenId = Uuid.randomUuid().toString(); + + KafkaPrincipal owner = context.principal(); + if ((requestData.ownerPrincipalName() != null) && + (!requestData.ownerPrincipalName().isEmpty())) { + + owner = new KafkaPrincipal(requestData.ownerPrincipalType(), requestData.ownerPrincipalName()); + } + CreateDelegationTokenResponseData responseData = new CreateDelegationTokenResponseData() + .setPrincipalName(owner.getName()) + .setPrincipalType(owner.getPrincipalType()) + .setTokenRequesterPrincipalName(context.principal().getName()) + .setTokenRequesterPrincipalType(context.principal().getPrincipalType()); + + List<ApiMessageAndVersion> records = new ArrayList<>(); + + if (secretKeyString == null) { + // DelegationTokens are not enabled + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_AUTH_DISABLED.code())); + } + + if (!metadataVersion.isDelegationTokenSupported()) { + // DelegationTokens are not supported in this metadata version + return ControllerResult.atomicOf(records, responseData.setErrorCode(UNSUPPORTED_VERSION.code())); + } + + List<KafkaPrincipal> renewers = new ArrayList<KafkaPrincipal>(); + for (CreatableRenewers renewer : requestData.renewers()) { + if (renewer.principalType().equals(KafkaPrincipal.USER_TYPE)) { + renewers.add(new KafkaPrincipal(renewer.principalType(), renewer.principalName())); + } else { + return ControllerResult.atomicOf(records, responseData.setErrorCode(INVALID_PRINCIPAL_TYPE.code())); + } + } + + byte[] hmac; + try { + hmac = createHmac(tokenId); + } catch (Throwable e) { + return ControllerResult.atomicOf(records, responseData.setErrorCode(ApiError.fromThrowable(e).error().code())); + } + + TokenInformation newTokenInformation = new TokenInformation(tokenId, owner, + context.principal(), renewers, now, maxTimestamp, expiryTimestamp); + + DelegationTokenData newDelegationTokenData = new DelegationTokenData(newTokenInformation); + + responseData + .setErrorCode(NONE.code()) + .setIssueTimestampMs(now) + .setExpiryTimestampMs(expiryTimestamp) + .setMaxTimestampMs(maxTimestamp) + .setTokenId(tokenId) + .setHmac(hmac); + + records.add(new ApiMessageAndVersion(newDelegationTokenData.toRecord(), (short) 0)); + return ControllerResult.atomicOf(records, responseData); + } + + public ControllerResult<RenewDelegationTokenResponseData> renewDelegationToken( + ControllerRequestContext context, + RenewDelegationTokenRequestData requestData, + MetadataVersion metadataVersion + ) { + long now = time.milliseconds(); + + List<ApiMessageAndVersion> records = new ArrayList<>(); + RenewDelegationTokenResponseData responseData = new RenewDelegationTokenResponseData(); + + TokenInformation myTokenInformation = getToken(requestData.hmac()); + + if (myTokenInformation == null) { + return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_NOT_FOUND.code())); + } Review Comment: Is this what happens if delegation tokens are disabled? Should we check explicitly instead? ########## metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.message.CreateDelegationTokenRequestData; +import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers; +import org.apache.kafka.common.message.CreateDelegationTokenResponseData; +import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; +import org.apache.kafka.common.message.ExpireDelegationTokenResponseData; +import org.apache.kafka.common.message.RenewDelegationTokenRequestData; +import org.apache.kafka.common.message.RenewDelegationTokenResponseData; +import org.apache.kafka.common.metadata.DelegationTokenRecord; +import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.token.delegation.TokenInformation; +import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.metadata.DelegationTokenData; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.common.utils.Time; + +import java.nio.charset.StandardCharsets; +import javax.crypto.spec.SecretKeySpec; +import javax.crypto.Mac; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH; +import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE; +import static org.apache.kafka.common.protocol.Errors.NONE; +import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION; + +/** + * Manages DelegationTokens. + */ +public class DelegationTokenControlManager { + private Time time = Time.SYSTEM; + + static class Builder { + private LogContext logContext = null; + private DelegationTokenCache tokenCache = null; + private String secretKeyString = null; + private long tokenDefaultMaxLifetime = 0; + private long tokenDefaultRenewLifetime = 0; + + Builder setLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder setTokenCache(DelegationTokenCache tokenCache) { + this.tokenCache = tokenCache; + return this; + } + + Builder setTokenKeyString(String secretKeyString) { + this.secretKeyString = secretKeyString; + return this; + } + + Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) { + this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime; + return this; + } + + Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) { + this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime; + return this; + } + + DelegationTokenControlManager build() { + if (logContext == null) logContext = new LogContext(); + return new DelegationTokenControlManager( + logContext, + tokenCache, + secretKeyString, + tokenDefaultMaxLifetime, + tokenDefaultRenewLifetime); + } + } + + private final Logger log; + private final DelegationTokenCache tokenCache; + private final String secretKeyString; + private final long tokenDefaultMaxLifetime; + private final long tokenDefaultRenewLifetime; + + private DelegationTokenControlManager( + LogContext logContext, + DelegationTokenCache tokenCache, + String secretKeyString, + long tokenDefaultMaxLifetime, + long tokenDefaultRenewLifetime + ) { + this.log = logContext.logger(DelegationTokenControlManager.class); + this.tokenCache = tokenCache; + this.secretKeyString = secretKeyString; + this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime; + this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime; + } + + public static byte[] toBytes(String str) { + return str.getBytes(StandardCharsets.UTF_8); + } + + private byte[] createHmac(String tokenId) throws Exception { + Mac mac = Mac.getInstance("HmacSHA512"); + SecretKeySpec secretKey = new SecretKeySpec(toBytes(secretKeyString), mac.getAlgorithm()); + + mac.init(secretKey); + return mac.doFinal(toBytes(tokenId)); + } + + private TokenInformation getToken(byte[] hmac) { + String base64Pwd = Base64.getEncoder().encodeToString(hmac); + return tokenCache.tokenForHmac(base64Pwd); + } + + private boolean allowedToRenew(TokenInformation tokenInfo, KafkaPrincipal renewer) { + if (tokenInfo.owner().equals(renewer)) { + return true; + } + for (KafkaPrincipal validRenewer : tokenInfo.renewers()) { + if (validRenewer.equals(renewer)) { + return true; + } + } + return false; + } + + public boolean isEnabled() { + if (secretKeyString != null) { + return true; + } + return false; + } + + /* + * Pass in the MetadataVersion so that we can return a response to the caller + * if the current metadataVersion is too low. + */ + public ControllerResult<CreateDelegationTokenResponseData> createDelegationToken( + ControllerRequestContext context, + CreateDelegationTokenRequestData requestData, + MetadataVersion metadataVersion + ) { + long now = time.milliseconds(); + long maxLifeTime = tokenDefaultMaxLifetime; + if (requestData.maxLifetimeMs() > 0) { + maxLifeTime = Math.min(maxLifeTime, requestData.maxLifetimeMs()); + } + + long maxTimestamp = now + maxLifeTime; + long expiryTimestamp = Math.min(maxTimestamp, now + tokenDefaultRenewLifetime); + + String tokenId = Uuid.randomUuid().toString(); + + KafkaPrincipal owner = context.principal(); + if ((requestData.ownerPrincipalName() != null) && + (!requestData.ownerPrincipalName().isEmpty())) { Review Comment: Any particular reason for all the extra parens? ########## metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.message.CreateDelegationTokenRequestData; +import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers; +import org.apache.kafka.common.message.CreateDelegationTokenResponseData; +import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; +import org.apache.kafka.common.message.ExpireDelegationTokenResponseData; +import org.apache.kafka.common.message.RenewDelegationTokenRequestData; +import org.apache.kafka.common.message.RenewDelegationTokenResponseData; +import org.apache.kafka.common.metadata.DelegationTokenRecord; +import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.token.delegation.TokenInformation; +import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.metadata.DelegationTokenData; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.common.utils.Time; + +import java.nio.charset.StandardCharsets; +import javax.crypto.spec.SecretKeySpec; +import javax.crypto.Mac; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND; +import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH; +import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE; +import static org.apache.kafka.common.protocol.Errors.NONE; +import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION; + +/** + * Manages DelegationTokens. + */ +public class DelegationTokenControlManager { + private Time time = Time.SYSTEM; + + static class Builder { + private LogContext logContext = null; + private DelegationTokenCache tokenCache = null; + private String secretKeyString = null; + private long tokenDefaultMaxLifetime = 0; + private long tokenDefaultRenewLifetime = 0; + + Builder setLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder setTokenCache(DelegationTokenCache tokenCache) { + this.tokenCache = tokenCache; + return this; + } + + Builder setTokenKeyString(String secretKeyString) { + this.secretKeyString = secretKeyString; + return this; + } + + Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) { + this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime; + return this; + } + + Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) { + this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime; + return this; + } + + DelegationTokenControlManager build() { + if (logContext == null) logContext = new LogContext(); + return new DelegationTokenControlManager( + logContext, + tokenCache, + secretKeyString, + tokenDefaultMaxLifetime, + tokenDefaultRenewLifetime); + } + } + + private final Logger log; + private final DelegationTokenCache tokenCache; + private final String secretKeyString; + private final long tokenDefaultMaxLifetime; + private final long tokenDefaultRenewLifetime; + + private DelegationTokenControlManager( + LogContext logContext, + DelegationTokenCache tokenCache, + String secretKeyString, + long tokenDefaultMaxLifetime, + long tokenDefaultRenewLifetime + ) { + this.log = logContext.logger(DelegationTokenControlManager.class); + this.tokenCache = tokenCache; + this.secretKeyString = secretKeyString; + this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime; + this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime; + } + + public static byte[] toBytes(String str) { + return str.getBytes(StandardCharsets.UTF_8); + } + + private byte[] createHmac(String tokenId) throws Exception { + Mac mac = Mac.getInstance("HmacSHA512"); + SecretKeySpec secretKey = new SecretKeySpec(toBytes(secretKeyString), mac.getAlgorithm()); + + mac.init(secretKey); + return mac.doFinal(toBytes(tokenId)); + } + + private TokenInformation getToken(byte[] hmac) { + String base64Pwd = Base64.getEncoder().encodeToString(hmac); + return tokenCache.tokenForHmac(base64Pwd); + } + + private boolean allowedToRenew(TokenInformation tokenInfo, KafkaPrincipal renewer) { + if (tokenInfo.owner().equals(renewer)) { + return true; + } + for (KafkaPrincipal validRenewer : tokenInfo.renewers()) { + if (validRenewer.equals(renewer)) { + return true; + } + } + return false; + } + + public boolean isEnabled() { + if (secretKeyString != null) { + return true; + } + return false; Review Comment: Simplify? `return secretKeyString != null` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org