[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r480407445 ## File path: core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala ## @@ -545,6 +558,16 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } } + protected def createScramAdminClient(scramMechanism: String, user: String, password: String): Admin = { Review comment: Ok, this now invokes a new method on SaslSetup(). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r480370634 ## File path: core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala ## @@ -545,6 +558,16 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } } + protected def createScramAdminClient(scramMechanism: String, user: String, password: String): Admin = { Review comment: See above This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r480370463 ## File path: core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala ## @@ -248,4 +250,25 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") createProducer() } + + private def createScramAdminClient(user: String, password: String): Admin = { Review comment: Ok, I added the following code to SaslSetup, and we implement that first method in the 3 test classes that use this functionality. ``` def createPrivilegedAdminClient(): Admin = { // create an admin client instance that is authorized to create credentials throw new UnsupportedOperationException("Must implement this if a test needs to use it") } def createScramCredentialsViaPrivilegedAdminClient(userName: String, password: String): Unit = { val privilegedAdminClient = createPrivilegedAdminClient() // must explicitly implement this method try { // create the SCRAM credential for the given user createScramCredentials(privilegedAdminClient, userName, password) } finally { privilegedAdminClient.close() } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r480329570 ## File path: core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala ## @@ -42,7 +42,18 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes override def setUp(): Unit = { super.setUp() // Create client credentials after starting brokers so that dynamic credential creation is also tested -createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword) -createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser2, JaasTestUtils.KafkaScramPassword2) +createScramCredentialWithScramAdminClient(JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword) +createScramCredentialWithScramAdminClient(JaasTestUtils.KafkaScramUser2, JaasTestUtils.KafkaScramPassword2) + } + + private def createScramCredentialWithScramAdminClient(user: String, password: String) = { Review comment: It was a goal to eliminate all SCRAM credential creation via ZooKeeper where possible. The only places that do so after this PR are when credentials have to be created before the brokers are started (i.e. when the inter-broker security protocol is SASL/SCRAM). This code used to create the credential directly via ZooKeeper, but since it occurs after the brokers start it can use the admin client. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r480324560 ## File path: core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala ## @@ -1047,8 +1047,8 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet @Test def testAddRemoveSaslListeners(): Unit = { -createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword) -createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword) +createScramCredentials(adminClients.head, JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword) +createScramCredentials(adminClients.head, JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword) initializeKerberos() Review comment: Good point. It wasn't waiting before, and it probably didn't/doesn't matter since we were spending time initializing Kerberos, but I added the check anyway just to be safe. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r480301973 ## File path: core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala ## @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import java.io.{ByteArrayOutputStream, PrintStream} +import java.nio.charset.StandardCharsets + +import kafka.server.BaseRequestTest +import kafka.utils.Exit +import org.junit.Assert._ +import org.junit.Test + +class UserScramCredentialsCommandTest extends BaseRequestTest { + override def brokerCount = 1 + var exitStatus: Option[Int] = None + var exitMessage: Option[String] = None + + case class ConfigCommandResult(stdout: String, exitStatus: Option[Int] = None) + + private def runConfigCommandViaBroker(args: Array[String]) : ConfigCommandResult = { +val byteArrayOutputStream = new ByteArrayOutputStream() +val utf8 = StandardCharsets.UTF_8.name +val printStream = new PrintStream(byteArrayOutputStream, true, utf8) +var exitStatus: Option[Int] = None +Exit.setExitProcedure { (status, _) => + exitStatus = Some(status) + throw new RuntimeException +} +try { + Console.withOut(printStream) { +ConfigCommand.main(Array("--bootstrap-server", brokerList) ++ args) + } + ConfigCommandResult(byteArrayOutputStream.toString(utf8)) +} catch { + case e: Exception => { Review comment: Logging it at debug level doesn't hurt, so I added it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r480282139 ## File path: core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala ## @@ -169,6 +169,18 @@ object JaasTestUtils { jaasFile } + // Returns a SASL/SCRAM configuration using credentials for the given user and password + def scramClientLoginModule(mechanism: String, scramUser: String, scramPassword: String): String = { +mechanism match { + case "SCRAM-SHA-256" | "SCRAM-SHA-512" => Review comment: I fixed this in the other places in this file as well This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r480272048 ## File path: tests/kafkatest/services/kafka/kafka.py ## @@ -575,10 +577,21 @@ def set_unclean_leader_election(self, topic, value=True, node=None): node.account.ssh(cmd) def _connect_setting_kafka_configs(self, node): +# Use this for everything related to kafka-configs except User SCRAM Credentials if node.version.kafka_configs_command_uses_bootstrap_server(): -return "--bootstrap-server %s " % self.bootstrap_servers(self.security_protocol) +return "--bootstrap-server %s --command-config <(echo '%s')" % (self.bootstrap_servers(self.security_protocol), Review comment: I created https://issues.apache.org/jira/browse/KAFKA-10451 to track this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r478653535 ## File path: tests/kafkatest/services/security/security_config.py ## @@ -350,6 +362,14 @@ def kafka_opts(self): else: return "" +def export_kafka_opts_for_admin_client_as_broker(self): Review comment: Yes, created https://issues.apache.org/jira/browse/KAFKA-10443 to track this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r475771230 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.errors.ResourceNotFoundException; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> usersFuture; +private final Map> perUserFutures; + +/** + * + * @param usersFuture the future indicating the users described by the call + * @param perUserFutures the required map of user names to futures representing the results of describing each + * user's SCRAM credentials. + */ +public DescribeUserScramCredentialsResult(KafkaFuture> usersFuture, + Map> perUserFutures) { +this.usersFuture = Objects.requireNonNull(usersFuture); +this.perUserFutures = Objects.requireNonNull(perUserFutures); +} + +/** + * + * @return a future for the results of all requested (either explicitly or implicitly via describe-all) users. + * The future will complete successfully only if the users future first completes successfully and then all the + * futures for the user descriptions complete successfully. + */ +public KafkaFuture> all() { +KafkaFuture succeedsOnlyIfUsersFutureSucceeds = KafkaFuture.allOf(users()); +return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> { +KafkaFuture succeedsOnlyIfAllDescriptionsSucceed = KafkaFuture.allOf(perUserFutures.values().toArray( +new KafkaFuture[perUserFutures.size()])); +KafkaFuture> mapFuture = succeedsOnlyIfAllDescriptionsSucceed.thenApply(void2 -> +perUserFutures.entrySet().stream().collect(Collectors.toMap( +e -> e.getKey(), +e -> valueFromFutureGuaranteedToSucceedAtThisPoint(e.getValue(); +/* At this point it is only the users future that is guaranteed to have succeeded. + * We want to return the future to the map, but we have to return a map at this point. + * We need to dereference the future while propagating any exception. + */ +return valueFromFuturePropagatingExceptionsAsUnchecked(mapFuture); +}); +} + +/** + * + * @return a future indicating the distinct users that were requested (either explicitly or implicitly via + * describe-all). The future will not complete successfully if the user is not authorized to perform the describe + * operation; otherwise, it will complete successfully as long as the list of users with credentials can be + * successfully determined within some hard-coded timeout period. + */ +public KafkaFuture> users() { +return usersFuture; +} + +/** + * + * @param userName the name of the user description being requested + * @return a future indicating the description results for the given user. The future will complete exceptionally if + * the future returned by {@link #users()} completes exceptionally. If the given user does not exist in the list + * of requested users then the future will complete exceptionally with + * {@link org.apache.kafka.common.errors.ResourceNotFoundException}. + */ +public KafkaFuture description(String userName) { +KafkaFuture succeedsOnlyIfUsersFutureSucceeds = KafkaFuture.allOf(usersFuture); +return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> { +
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r475771230 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.errors.ResourceNotFoundException; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> usersFuture; +private final Map> perUserFutures; + +/** + * + * @param usersFuture the future indicating the users described by the call + * @param perUserFutures the required map of user names to futures representing the results of describing each + * user's SCRAM credentials. + */ +public DescribeUserScramCredentialsResult(KafkaFuture> usersFuture, + Map> perUserFutures) { +this.usersFuture = Objects.requireNonNull(usersFuture); +this.perUserFutures = Objects.requireNonNull(perUserFutures); +} + +/** + * + * @return a future for the results of all requested (either explicitly or implicitly via describe-all) users. + * The future will complete successfully only if the users future first completes successfully and then all the + * futures for the user descriptions complete successfully. + */ +public KafkaFuture> all() { +KafkaFuture succeedsOnlyIfUsersFutureSucceeds = KafkaFuture.allOf(users()); +return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> { +KafkaFuture succeedsOnlyIfAllDescriptionsSucceed = KafkaFuture.allOf(perUserFutures.values().toArray( +new KafkaFuture[perUserFutures.size()])); +KafkaFuture> mapFuture = succeedsOnlyIfAllDescriptionsSucceed.thenApply(void2 -> +perUserFutures.entrySet().stream().collect(Collectors.toMap( +e -> e.getKey(), +e -> valueFromFutureGuaranteedToSucceedAtThisPoint(e.getValue(); +/* At this point it is only the users future that is guaranteed to have succeeded. + * We want to return the future to the map, but we have to return a map at this point. + * We need to dereference the future while propagating any exception. + */ +return valueFromFuturePropagatingExceptionsAsUnchecked(mapFuture); +}); +} + +/** + * + * @return a future indicating the distinct users that were requested (either explicitly or implicitly via + * describe-all). The future will not complete successfully if the user is not authorized to perform the describe + * operation; otherwise, it will complete successfully as long as the list of users with credentials can be + * successfully determined within some hard-coded timeout period. + */ +public KafkaFuture> users() { +return usersFuture; +} + +/** + * + * @param userName the name of the user description being requested + * @return a future indicating the description results for the given user. The future will complete exceptionally if + * the future returned by {@link #users()} completes exceptionally. If the given user does not exist in the list + * of requested users then the future will complete exceptionally with + * {@link org.apache.kafka.common.errors.ResourceNotFoundException}. + */ +public KafkaFuture description(String userName) { +KafkaFuture succeedsOnlyIfUsersFutureSucceeds = KafkaFuture.allOf(usersFuture); +return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> { +
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r475756645 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.errors.ResourceNotFoundException; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> usersFuture; +private final Map> perUserFutures; + +/** + * + * @param usersFuture the future indicating the users described by the call + * @param perUserFutures the required map of user names to futures representing the results of describing each + * user's SCRAM credentials. + */ +public DescribeUserScramCredentialsResult(KafkaFuture> usersFuture, + Map> perUserFutures) { +this.usersFuture = Objects.requireNonNull(usersFuture); +this.perUserFutures = Objects.requireNonNull(perUserFutures); +} + +/** + * + * @return a future for the results of all requested (either explicitly or implicitly via describe-all) users. + * The future will complete successfully only if the users future first completes successfully and then all the + * futures for the user descriptions complete successfully. + */ +public KafkaFuture> all() { +KafkaFuture succeedsOnlyIfUsersFutureSucceeds = KafkaFuture.allOf(users()); +return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> { +KafkaFuture succeedsOnlyIfAllDescriptionsSucceed = KafkaFuture.allOf(perUserFutures.values().toArray( +new KafkaFuture[perUserFutures.size()])); +KafkaFuture> mapFuture = succeedsOnlyIfAllDescriptionsSucceed.thenApply(void2 -> +perUserFutures.entrySet().stream().collect(Collectors.toMap( +e -> e.getKey(), +e -> valueFromFutureGuaranteedToSucceedAtThisPoint(e.getValue(); +/* At this point it is only the users future that is guaranteed to have succeeded. + * We want to return the future to the map, but we have to return a map at this point. + * We need to dereference the future while propagating any exception. + */ +return valueFromFuturePropagatingExceptionsAsUnchecked(mapFuture); +}); +} + +/** + * + * @return a future indicating the distinct users that were requested (either explicitly or implicitly via Review comment: We've gone back and forth on this. The KIP does not explicitly state what to do in the case of a describe request for a user that does not have credentials, and we originally coded it to silently drop them, but then we changed it to be consistent with other APIs and raise an error (https://github.com/apache/kafka/pull/9032#discussion_r468871453). I agree that it isn't totally clear what to do. Rather than making the change back, I'll leave both this Javadoc and the underlying implementation as-is right now unill we discuss further and decide for sure what we want. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r475750058 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.errors.ResourceNotFoundException; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> usersFuture; +private final Map> perUserFutures; + +/** + * + * @param usersFuture the future indicating the users described by the call + * @param perUserFutures the required map of user names to futures representing the results of describing each + * user's SCRAM credentials. + */ +public DescribeUserScramCredentialsResult(KafkaFuture> usersFuture, + Map> perUserFutures) { +this.usersFuture = Objects.requireNonNull(usersFuture); +this.perUserFutures = Objects.requireNonNull(perUserFutures); +} + +/** + * + * @return a future for the results of all requested (either explicitly or implicitly via describe-all) users. + * The future will complete successfully only if the users future first completes successfully and then all the + * futures for the user descriptions complete successfully. + */ +public KafkaFuture> all() { +KafkaFuture succeedsOnlyIfUsersFutureSucceeds = KafkaFuture.allOf(users()); +return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> { +KafkaFuture succeedsOnlyIfAllDescriptionsSucceed = KafkaFuture.allOf(perUserFutures.values().toArray( +new KafkaFuture[perUserFutures.size()])); +KafkaFuture> mapFuture = succeedsOnlyIfAllDescriptionsSucceed.thenApply(void2 -> +perUserFutures.entrySet().stream().collect(Collectors.toMap( +e -> e.getKey(), +e -> valueFromFutureGuaranteedToSucceedAtThisPoint(e.getValue(); +/* At this point it is only the users future that is guaranteed to have succeeded. + * We want to return the future to the map, but we have to return a map at this point. + * We need to dereference the future while propagating any exception. + */ +return valueFromFuturePropagatingExceptionsAsUnchecked(mapFuture); +}); +} + +/** + * + * @return a future indicating the distinct users that were requested (either explicitly or implicitly via + * describe-all). The future will not complete successfully if the user is not authorized to perform the describe + * operation; otherwise, it will complete successfully as long as the list of users with credentials can be + * successfully determined within some hard-coded timeout period. + */ +public KafkaFuture> users() { +return usersFuture; +} + +/** + * + * @param userName the name of the user description being requested + * @return a future indicating the description results for the given user. The future will complete exceptionally if + * the future returned by {@link #users()} completes exceptionally. If the given user does not exist in the list + * of requested users then the future will complete exceptionally with + * {@link org.apache.kafka.common.errors.ResourceNotFoundException}. + */ +public KafkaFuture description(String userName) { +KafkaFuture succeedsOnlyIfUsersFutureSucceeds = KafkaFuture.allOf(usersFuture); +return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> { +
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r474950597 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.List; +import java.util.Objects; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> future; + +/** + * + * @param future the required future representing the result of the call + */ +public DescribeUserScramCredentialsResult(KafkaFuture> future) { +this.future = Objects.requireNonNull(future); +} + +/** + * + * @return the future representing the result of the call + */ +public KafkaFuture> future() { Review comment: Sorry, my mistake. This now reflects what we discussed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r474236159 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; +import java.util.Objects; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> future; Review comment: Ok, we have two levels of Futures now: 1. `DescribeUserScramCredentialsResult` has a `KafkaFuture>` 2. `UserScramCredentialsDescriptionResult` has a user name and a `KafkaFuture` I think this is where we want to end up. Let me know if you agree. I also added some logic in `AlterUserScramCredentialsRequestTest` to confirm the behavior of the per-user futures and associated errors. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r474226011 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java ## @@ -1214,6 +1215,64 @@ default AlterClientQuotasResult alterClientQuotas(Collection entries, AlterClientQuotasOptions options); +/** + * Describe all SASL/SCRAM credentials. + * + * This is a convenience method for {@link #describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)} + * + * @return The DescribeUserScramCredentialsResult. + */ +default DescribeUserScramCredentialsResult describeUserScramCredentials() { +return describeUserScramCredentials(null, new DescribeUserScramCredentialsOptions()); +} + +/** + * Describe SASL/SCRAM credentials for the given users. + * + * This is a convenience method for {@link #describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)} + * + * @param users the users for which credentials are to be described; all users' credentials are described if null + * or empty. A user explicitly specified here that does not have a SCRAM credential will not appear + * in the results. Review comment: I reworked this Javadoc to list all possible exceptions for both alter and describe. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r472947852 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; +import java.util.Objects; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> future; Review comment: Ok, I think I follow now. You are saying that we could potentially implement describe by invoking 1+N requests to Kafka: one to get the list of credentials (either the list of all of them if we are asking for them all, or the explicitly requested ones we wanted), and then another N requests to get the data for each one. This on the surface seems like an anti-pattern, but it is not unreasonable for the case where the data is expensive to get in the first place — maybe we are forced to make 1 or more round-trips for each anyway. So as a general, reusable pattern, yes, I believe it works. So when we invoke describe, whether it is describe-all or just an explicit few, we return a single future, and that future returns a list of instances (UserName in this case): either 1 instance for each user that has at least 1 credential for the describe-all case, or one instance per distinct user explicitly requested otherwise. Then each UserName instance has the accessor you mentioned, which in this case returns Future. Do I have that right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r472575326 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; +import java.util.Objects; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> future; Review comment: I can see reasonable arguments for all the possibilities, so if you feel strongly about one of them then I would be fine with it. For example, `list()` and `describe()` even though they return the same thing (now -- `describe()` could return more later potentially). Just let me know. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r472570823 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; +import java.util.Objects; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> future; Review comment: > a list RPC to show everything, and a describe RPC to show only some things. Do you mean a list RPC that takes no arguments and returns every credential defined for every user and a describe RPC that takes 1 or more users and returns every credential defined for those specified users, and they both return the same information for each credential? Or do you mean a list RPC and a describe RPC that return different sets of information (as is done with list vs. describe topics)? I think you mean the former (two RPCs, each returning the same thing), but I want to be certain I understand. > There are a few reasons why. One is that even though we currently only make one RPC, in the future we might make more than one. In that case we would want multiple futures. I don't understand what this is referring to. By "we currently only make one RPC" to what are you referring? > I also feel like in AdminClient, errors should be handled with futures pretty much all the time Agreed. Will convert to using futures always, whenever we arrive at the final decision on what the RPCs need to look like. I'm wondering if we convert to returning futures everywhere, can we stick with the one describe RPC? For example, could the admin client return a `Future>>`? Would that work, and if so, would that be a reasonable way to proceed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r472574569 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; +import java.util.Objects; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> future; Review comment: If we do decide to go with 2 separate APIs, then I might be concerned about using `list()` vs `describe()` if they return the same set of information (i.e. mechanism and iterations). Perhaps using two separate names gives us room to expand `describe()` to return more information later on, though. But if not, and they will always return the same information, then maybe `describeAll()` and `describe()` (or `listAll()` and `list()`) might be better? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r472570823 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; +import java.util.Objects; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> future; Review comment: > a list RPC to show everything, and a describe RPC to show only some things. Do you mean a list RPC that takes no arguments and returns every credential defined for every users and a describe RPC that takes 1 or more users and returns every credential defined for those specified users, and they both return the same information for each credential? Or do you mean a list RPC and a describe RPC that return different sets of information (as is done with list vs. describe topics)? I think you mean the former (two RPCs, each returning the same thing), but I want to be certain I understand. > There are a few reasons why. One is that even though we currently only make one RPC, in the future we might make more than one. In that case we would want multiple futures. I don't understand what this is referring to. By "we currently only make one RPC" to what are you referring? > I also feel like in AdminClient, errors should be handled with futures pretty much all the time Agreed. Will convert to using futures always, whenever we arrive at the final decision on what the RPCs need to look like. I'm wondering if we convert to returning futures everywhere, can we stick with the one describe RPC? For example, could the admin client return a `Future>>`? Would that work, and if so, would that be a reasonable way to proceed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r472570823 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; +import java.util.Objects; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> future; Review comment: > a list RPC to show everything, and a describe RPC to show only some things. Do you mean a list RPC that takes no arguments and returns every credential defined for every users and a describe RPC that takes 1 or more users and returns every credential defined for those specified users, and they both return the same information for each credential? Or do you mean a list RPC and a describe RPC that return different sets of information (as is done with list vs. describe topics)? I think you mean the former (two RPCs, each returning the same thing), but I want to be certain I understand. > There are a few reasons why. One is that even though we currently only make one RPC, in the future we might make more than one. In that case we would want multiple futures. I don't understand what this is referring to. By "we currently only make one RPC" to what are you referring? > I also feel like in AdminClient, errors should be handled with futures pretty much all the time Agreed. Will convert to using futures always, whenever we arrive at the final decision on what the RPCs need to look like. I'm wondering if we convert to returning futures everywhere, can we stick with the one describe RPC? For example, could the admin client return a `Future>>`? Would that work, and if so, would that be a reasonable way to proceed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r472530253 ## File path: core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala ## @@ -486,7 +486,9 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { } @Test - def shouldNotAlterNonQuotaClientConfigUsingBootstrapServer(): Unit = { + def shouldNotAlterNonQuotaNonScramUserOrClientConfigUsingBootstrapServer(): Unit = { +// when using --bootstrap-server, it should be illegal to alter anything that is not a quota and not a SCRAM credential +// for both user and client entities Review comment: @cmccabe Good question, actually. There is already a check to make sure a non-existent config cannot be **deleted** via `--zookeeper`: `shouldNotUpdateConfigIfNonExistingConfigIsDeletedUsingZookeper()`. This test passes, of course. However, there is no check to make sure an unrecognized config can be **added**, and in fact if I add that test it fails; the code will gladly go ahead and add anything we wish (and it will gladly go ahead and delete it if we wish as well -- the above test is only checking that something that doesn't exist can't be deleted). The next question, of course, is whether we should "fix" this or not. What do you think? To fix it we would need the full set of allowed configs at the User, Client, Topic, and Broker levels and then insert code to check accordingly. Since the ZooKeeper update path is going away due to KIP-500, I'm wondering if we can just leave it alone? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r470339737 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; +import java.util.Objects; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> future; Review comment: I kept the top-level error information but added per-user error information in addition. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r470339031 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/UserScramCredentialUpsertion.java ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import java.math.BigInteger; +import java.nio.charset.StandardCharsets; +import java.security.SecureRandom; +import java.util.Objects; + +/** + * A request to update/insert a SASL/SCRAM credential for a user. + * + * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API";>KIP-554: Add Broker-side SCRAM Config API + */ +public class UserScramCredentialUpsertion extends UserScramCredentialAlteration { +private final ScramCredentialInfo info; +private final byte[] salt; +private final byte[] password; + +/** + * Constructor that generates a random salt + * + * @param user the user for which the credential is to be updated/inserted + * @param credentialInfo the mechanism and iterations to be used + * @param password the password + */ +public UserScramCredentialUpsertion(String user, ScramCredentialInfo credentialInfo, String password) { +this(user, credentialInfo, password.getBytes(StandardCharsets.UTF_8)); +} + +/** + * Constructor that generates a random salt + * + * @param user the user for which the credential is to be updated/inserted + * @param credentialInfo the mechanism and iterations to be used + * @param password the password + */ +public UserScramCredentialUpsertion(String user, ScramCredentialInfo credentialInfo, byte[] password) { +this(user, credentialInfo, password, generateRandomSalt()); +} + +/** + * Constructor that accepts an explicit salt + * + * @param user the user for which the credential is to be updated/inserted + * @param credentialInfo the mechanism and iterations to be used + * @param password the password + * @param salt the salt to be used + */ +public UserScramCredentialUpsertion(String user, ScramCredentialInfo credentialInfo, byte[] password, byte[] salt) { +super(Objects.requireNonNull(user)); +this.info = Objects.requireNonNull(credentialInfo); +this.password = Objects.requireNonNull(password); +this.salt = Objects.requireNonNull(salt); +} + +/** + * + * @return the mechanism and iterations + */ +public ScramCredentialInfo credentialInfo() { +return info; +} + +/** + * + * @return the salt + */ +public byte[] salt() { +return salt; +} + +/** + * + * @return the password + */ +public byte[] password() { +return password; +} + +private static byte[] generateRandomSalt() { +return new BigInteger(130, new SecureRandom()).toString(Character.MAX_RADIX).getBytes(StandardCharsets.UTF_8); Review comment: The latest commit changes the methods on `org.apache.kafka.common.security.scram.internals.ScramFormatter` to be static, and I now reuse that logic. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r470101599 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; +import java.util.Objects; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> future; Review comment: I don't see the need for multiple futures; everything is returning in a single response from Kafka, and I can't think of why it would be necessary to have some users' credential descriptions available to the API caller but others not. I'll take a look at keeping a single future for the response but moving the error status from the top level down into each per-user result. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r469680702 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; +import java.util.Objects; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> future; Review comment: Now that I'm working on this, I discovered that there is no other API that can describe or list everything that works this way. Everything that can describe or list everything returns a single future. Every describe or list API that returns a map of keys to futures requires a non-empty list of keys to describe or list. For example: 1. `listTopics()` lists all topics and returns a single `Future`; the `describeTopics()` API returns a map of names to futures but requires a non-empty list of topics to describe. 2. `describeConfigs()` returns a map of resources to futures but requires a non-empty list of resources to describe. 3. `describeLogDirs()` returns a map of broker IDs to futures but requires a non-empty list of brokers to describe. 4. `describeReplicaLogDirs()` returns a map of replicas to futures but requires a non-empty list of replicas to describe. 5. `describeConsumerGroups()` returns a map of consumer groups to futures but requires a non-empty list of consumer groups to describe. 6. `listPartitionReassignments()` allows listing all or a subset of reassignments and returns a single future. 7. `listOffsets()` returns a map of topic-partitions to futures but requires a non-empty list of topic-partitions to describe. 8. `describeClientQuotas()` allows listing all or a subset of quotas and returns a single future. I think if we made this change here we would be off the beaten path. That's not necessarily bad, but what tipped me off to this was the fact that when we list everything we have to create a future for every user that gets returned, and we don't know that list of users when we make the request, so there's really no way to implement it. We could create two separate APIs: one for describing some explicit, non-empty list of users, which would return a map of users to futures, and another one that describes everything, which returns a single future. `listTopics()` vs `describeTopics()` works this way, for example, though the information returned in the two is very different: when listing you just get the names, and when describing you get a lot more. I don't see us distinguishing between listing vs. describing in terms of data -- we are going to send back the same two things (mechanism and iterations) regardless. So we would probably be talking about creating a `describeUserScramCredentials()` API and a `describeAllUserScramCredentials()` API with the first taking a list and returning a map of futures and the second not taking a list and returning a single future. But I'm thinking we should just keep it the way it is -- take a possibly-empty list and return a single future regardles of whether the list was empty or not. Thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r469650293 ## File path: core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala ## @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import java.io.{ByteArrayOutputStream, PrintStream} +import java.nio.charset.StandardCharsets + +import kafka.server.BaseRequestTest +import kafka.utils.Exit +import org.junit.Assert._ +import org.junit.Test + +class UserScramCredentialsCommandTest extends BaseRequestTest { + override def brokerCount = 1 + var exitStatus: Option[Int] = None + var exitMessage: Option[String] = None + + case class ConfigCommandResult(stdout: String, exitStatus: Option[Int] = None) + + private def runConfigCommandViaBroker(args: Array[String]) : ConfigCommandResult = { +val byteArrayOutputStream = new ByteArrayOutputStream() +val utf8 = StandardCharsets.UTF_8.name +val printStream = new PrintStream(byteArrayOutputStream, true, utf8) +var exitStatus: Option[Int] = None +Exit.setExitProcedure { (status, _) => Review comment: Actually, I think this may not be an issue since parallel tests in Gradle run in separate processes rather than separate threads. From https://docs.gradle.org/current/dsl/org.gradle.api.tasks.testing.Test.html: `"Test are always run in (one or more) separate JVMs."` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r469646777 ## File path: core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala ## @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import java.io.{ByteArrayOutputStream, PrintStream} +import java.nio.charset.StandardCharsets + +import kafka.server.BaseRequestTest +import kafka.utils.Exit +import org.junit.Assert._ +import org.junit.Test + +class UserScramCredentialsCommandTest extends BaseRequestTest { + override def brokerCount = 1 + var exitStatus: Option[Int] = None + var exitMessage: Option[String] = None + + case class ConfigCommandResult(stdout: String, exitStatus: Option[Int] = None) + + private def runConfigCommandViaBroker(args: Array[String]) : ConfigCommandResult = { +val byteArrayOutputStream = new ByteArrayOutputStream() +val utf8 = StandardCharsets.UTF_8.name +val printStream = new PrintStream(byteArrayOutputStream, true, utf8) +var exitStatus: Option[Int] = None +Exit.setExitProcedure { (status, _) => Review comment: Hmm, good point, I think there may be a problem here in general because there is only a single exit procedure that can be set globally, and multiple tests that set/reset it in parallel will collide. There are 16 Scala test classes in `core` out of 260 that do this -- so 6% of test classes. So I think this will introduce some flakiness to these 16 tests. Does this sound correct to you, and should we open a separate ticket for this as opposed to trying to fix it here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r469646777 ## File path: core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala ## @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import java.io.{ByteArrayOutputStream, PrintStream} +import java.nio.charset.StandardCharsets + +import kafka.server.BaseRequestTest +import kafka.utils.Exit +import org.junit.Assert._ +import org.junit.Test + +class UserScramCredentialsCommandTest extends BaseRequestTest { + override def brokerCount = 1 + var exitStatus: Option[Int] = None + var exitMessage: Option[String] = None + + case class ConfigCommandResult(stdout: String, exitStatus: Option[Int] = None) + + private def runConfigCommandViaBroker(args: Array[String]) : ConfigCommandResult = { +val byteArrayOutputStream = new ByteArrayOutputStream() +val utf8 = StandardCharsets.UTF_8.name +val printStream = new PrintStream(byteArrayOutputStream, true, utf8) +var exitStatus: Option[Int] = None +Exit.setExitProcedure { (status, _) => Review comment: Hmm, good point, I think there may be a problem here in general because there is only a single exit procedure that can be set globally, and multiple tests that set/reset it in parallel will collide. There are 16 Scala test classes in `core` out of 260 that do this -- so 4% of test classes. So I think this will introduce some flakiness to these 16 tests. Does this sound correct to you, and should we open a separate ticket for this as opposed to trying to fix it here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r469637266 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/UserScramCredentialUpsertion.java ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import java.math.BigInteger; +import java.nio.charset.StandardCharsets; +import java.security.SecureRandom; +import java.util.Objects; + +/** + * A request to update/insert a SASL/SCRAM credential for a user. + * + * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API";>KIP-554: Add Broker-side SCRAM Config API + */ +public class UserScramCredentialUpsertion extends UserScramCredentialAlteration { +private final ScramCredentialInfo info; +private final byte[] salt; +private final byte[] password; + +/** + * Constructor that generates a random salt + * + * @param user the user for which the credential is to be updated/inserted + * @param credentialInfo the mechanism and iterations to be used + * @param password the password + */ +public UserScramCredentialUpsertion(String user, ScramCredentialInfo credentialInfo, String password) { +this(user, credentialInfo, password.getBytes(StandardCharsets.UTF_8)); +} + +/** + * Constructor that generates a random salt + * + * @param user the user for which the credential is to be updated/inserted + * @param credentialInfo the mechanism and iterations to be used + * @param password the password + */ +public UserScramCredentialUpsertion(String user, ScramCredentialInfo credentialInfo, byte[] password) { +this(user, credentialInfo, password, generateRandomSalt()); +} + +/** + * Constructor that accepts an explicit salt + * + * @param user the user for which the credential is to be updated/inserted + * @param credentialInfo the mechanism and iterations to be used + * @param password the password + * @param salt the salt to be used + */ +public UserScramCredentialUpsertion(String user, ScramCredentialInfo credentialInfo, byte[] password, byte[] salt) { +super(Objects.requireNonNull(user)); +this.info = Objects.requireNonNull(credentialInfo); +this.password = Objects.requireNonNull(password); +this.salt = Objects.requireNonNull(salt); +} + +/** + * + * @return the mechanism and iterations + */ +public ScramCredentialInfo credentialInfo() { +return info; +} + +/** + * + * @return the salt + */ +public byte[] salt() { +return salt; +} + +/** + * + * @return the password + */ +public byte[] password() { +return password; +} + +private static byte[] generateRandomSalt() { +return new BigInteger(130, new SecureRandom()).toString(Character.MAX_RADIX).getBytes(StandardCharsets.UTF_8); Review comment: @cmccabe I think the approach you suggest leaves out how to identify `length` which itself needs to be randomized. I got the current implementation from `org.apache.kafka.common.security.scram.internals.ScramFormatter`. I would have invoked `ScramFormatter.secureRandomBytes()` directly, but it is not `static` and I did not want to either instantiate an instance or change methods to static (though the class is internal, so I could have done that). I instead replicated the logic here. The array length ends up being random with this approach, as do the bytes in the array. Let me know what you think. Currently I've left this as-is. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r469627624 ## File path: core/src/test/scala/unit/kafka/server/DescribeUserScramCredentialsRequestTest.scala ## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import java.util +import java.util.Properties + +import kafka.network.SocketServer +import kafka.security.authorizer.AclAuthorizer +import org.apache.kafka.common.acl.AclOperation +import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData +import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{DescribeUserScramCredentialsRequest, DescribeUserScramCredentialsResponse} +import org.apache.kafka.common.resource.ResourceType +import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder} +import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult} +import org.junit.Assert._ +import org.junit.{Before, Test} + +import scala.jdk.CollectionConverters._ + +/** + * Test DescribeUserScramCredentialsRequest/Response API for the cases where no credentials exist + * or failure is expected due to lack of authorization, sending the request to a non-controller broker, or some other issue. + * Testing the API for the case where there are actually credentials to describe is performed elsewhere. + */ +class DescribeUserScramCredentialsRequestTest extends BaseRequestTest { + override def brokerPropertyOverrides(properties: Properties): Unit = { +properties.put(KafkaConfig.ControlledShutdownEnableProp, "false") +properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[DescribeCredentialsTest.TestAuthorizer].getName) +properties.put(KafkaConfig.PrincipalBuilderClassProp, classOf[DescribeCredentialsTest.TestPrincipalBuilder].getName) + } + + @Before + override def setUp(): Unit = { +DescribeCredentialsTest.principal = KafkaPrincipal.ANONYMOUS // default is to be authorized +super.setUp() + } + + @Test + def testDescribeNothing(): Unit = { +val request = new DescribeUserScramCredentialsRequest.Builder( + new DescribeUserScramCredentialsRequestData()).build() +val response = sendDescribeUserScramCredentialsRequest(request) + +val error = response.data.error +assertEquals("Expected no error when routed correctly", Errors.NONE.code, error) +assertEquals("Expected no credentials", 0, response.data.userScramCredentials.size) + } + + @Test + def testDescribeNotController(): Unit = { +val request = new DescribeUserScramCredentialsRequest.Builder( + new DescribeUserScramCredentialsRequestData()).build() +val response = sendDescribeUserScramCredentialsRequest(request, notControllerSocketServer) + +val error = response.data.error +assertEquals("Expected controller error when routed incorrectly", Errors.NOT_CONTROLLER.code, error) + } + + @Test + def testDescribeNotAuthorized(): Unit = { +DescribeCredentialsTest.principal = DescribeCredentialsTest.UnauthorizedPrincipal + +val request = new DescribeUserScramCredentialsRequest.Builder( + new DescribeUserScramCredentialsRequestData()).build() +val response = sendDescribeUserScramCredentialsRequest(request) + +val error = response.data.error +assertEquals("Expected not authorized error", Errors.CLUSTER_AUTHORIZATION_FAILED.code, error) + } + + @Test + def testDescribeSameUserTwice(): Unit = { +val user = new UserName().setName("user1") +val request = new DescribeUserScramCredentialsRequest.Builder( + new DescribeUserScramCredentialsRequestData().setUsers(List(user, user).asJava)).build() +val response = sendDescribeUserScramCredentialsRequest(request) + +val error = response.data.error +assertEquals("Expected invalid request error", Errors.INVALID_REQUEST.code, error) + } + + + private def sendDescribeUserScramCredentialsRequest(request: DescribeUserScramCredentialsRequest, socketServer: SocketServer = controllerSocketServer): Desc
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r469476195 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) { return new AlterClientQuotasResult(Collections.unmodifiableMap(futures)); } +@Override +public DescribeUserScramCredentialsResult describeUserScramCredentials(List users, DescribeUserScramCredentialsOptions options) { +final KafkaFutureImpl> future = new KafkaFutureImpl<>(); +final long now = time.milliseconds(); +Call call = new Call("describeUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), +new ControllerNodeProvider()) { Review comment: Ok, switching Describe to **not** require that it be done on the controller. > planning on getting rid of ControllerNodeProvider as part of KIP-590 Leaving Alter alone for now under the assumption that we will fix it as part of the KIP-590 PR. Let me know if you wish me to change this now instead. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r469457068 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) { return new AlterClientQuotasResult(Collections.unmodifiableMap(futures)); } +@Override +public DescribeUserScramCredentialsResult describeUserScramCredentials(List users, DescribeUserScramCredentialsOptions options) { +final KafkaFutureImpl> future = new KafkaFutureImpl<>(); +final long now = time.milliseconds(); +Call call = new Call("describeUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), +new ControllerNodeProvider()) { +@Override +public DescribeUserScramCredentialsRequest.Builder createRequest(int timeoutMs) { +return new DescribeUserScramCredentialsRequest.Builder( +new DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user -> +new DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList(; +} + +@Override +public void handleResponse(AbstractResponse abstractResponse) { +DescribeUserScramCredentialsResponse response = (DescribeUserScramCredentialsResponse) abstractResponse; +Errors error = Errors.forCode(response.data().error()); +switch (error) { +case NONE: +DescribeUserScramCredentialsResponseData data = response.data(); + future.complete(data.userScramCredentials().stream().collect(Collectors.toMap( + DescribeUserScramCredentialsResponseData.UserScramCredential::name, +userScramCredential -> { +List scramCredentialInfos = userScramCredential.credentialInfos().stream().map( +credentialInfo -> new ScramCredentialInfo(ScramMechanism.from(credentialInfo.mechanism()), credentialInfo.iterations())) +.collect(Collectors.toList()); +return new UserScramCredentialsDescription(userScramCredential.name(), scramCredentialInfos); +}))); +break; +case NOT_CONTROLLER: +handleNotControllerError(error); +break; +default: +future.completeExceptionally(new ApiError(error, response.data().errorMessage()).exception()); +break; +} +} + +@Override +void handleFailure(Throwable throwable) { +future.completeExceptionally(throwable); +} +}; +runnable.call(call, now); +return new DescribeUserScramCredentialsResult(future); +} + +@Override +public AlterUserScramCredentialsResult alterUserScramCredentials(List alterations, + AlterUserScramCredentialsOptions options) { +final long now = time.milliseconds(); +final Map> futures = new HashMap<>(); +for (UserScramCredentialAlteration alteration: alterations) { +futures.put(alteration.getUser(), new KafkaFutureImpl<>()); +} +final Map userIllegalAlterationExceptions = new HashMap<>(); +// We need to keep track of users with deletions of an unknown SCRAM mechanism +alterations.stream().filter(a -> a instanceof UserScramCredentialDeletion).forEach(alteration -> { +UserScramCredentialDeletion deletion = (UserScramCredentialDeletion) alteration; +ScramMechanism mechanism = deletion.getMechanism(); +if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) { +userIllegalAlterationExceptions.put(deletion.getUser(), new IllegalArgumentException("Unknown SCRAM mechanism")); +} +}); +// Creating an upsertion may throw InvalidKeyException or NoSuchAlgorithmException, +// so keep track of which users are affected by such a failure and immediately fail all their alterations +final Map> userInsertions = new HashMap<>(); +alterations.stream().filter(a -> a instanceof UserScramCredentialUpsertion) +.filter(alteration -> !userIllegalAlterationExceptions.containsKey(alteration.getUser())) +.forEach(alteration -> { +UserScramCredentialUpsertion upsertion = (UserScramCredentialUpsertion) alteration; +String user = upsertion.getUser(); +try { +Scr
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r469421523 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) { return new AlterClientQuotasResult(Collections.unmodifiableMap(futures)); } +@Override +public DescribeUserScramCredentialsResult describeUserScramCredentials(List users, DescribeUserScramCredentialsOptions options) { +final KafkaFutureImpl> future = new KafkaFutureImpl<>(); +final long now = time.milliseconds(); +Call call = new Call("describeUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), +new ControllerNodeProvider()) { +@Override +public DescribeUserScramCredentialsRequest.Builder createRequest(int timeoutMs) { +return new DescribeUserScramCredentialsRequest.Builder( +new DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user -> +new DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList(; +} + +@Override +public void handleResponse(AbstractResponse abstractResponse) { +DescribeUserScramCredentialsResponse response = (DescribeUserScramCredentialsResponse) abstractResponse; +Errors error = Errors.forCode(response.data().error()); +switch (error) { +case NONE: +DescribeUserScramCredentialsResponseData data = response.data(); + future.complete(data.userScramCredentials().stream().collect(Collectors.toMap( + DescribeUserScramCredentialsResponseData.UserScramCredential::name, +userScramCredential -> { +List scramCredentialInfos = userScramCredential.credentialInfos().stream().map( +credentialInfo -> new ScramCredentialInfo(ScramMechanism.from(credentialInfo.mechanism()), credentialInfo.iterations())) +.collect(Collectors.toList()); +return new UserScramCredentialsDescription(userScramCredential.name(), scramCredentialInfos); +}))); +break; +case NOT_CONTROLLER: +handleNotControllerError(error); +break; +default: +future.completeExceptionally(new ApiError(error, response.data().errorMessage()).exception()); +break; +} +} + +@Override +void handleFailure(Throwable throwable) { +future.completeExceptionally(throwable); +} +}; +runnable.call(call, now); +return new DescribeUserScramCredentialsResult(future); +} + +@Override +public AlterUserScramCredentialsResult alterUserScramCredentials(List alterations, + AlterUserScramCredentialsOptions options) { +final long now = time.milliseconds(); +final Map> futures = new HashMap<>(); +for (UserScramCredentialAlteration alteration: alterations) { +futures.put(alteration.getUser(), new KafkaFutureImpl<>()); +} +final Map userIllegalAlterationExceptions = new HashMap<>(); +// We need to keep track of users with deletions of an unknown SCRAM mechanism +alterations.stream().filter(a -> a instanceof UserScramCredentialDeletion).forEach(alteration -> { +UserScramCredentialDeletion deletion = (UserScramCredentialDeletion) alteration; +ScramMechanism mechanism = deletion.getMechanism(); +if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) { +userIllegalAlterationExceptions.put(deletion.getUser(), new IllegalArgumentException("Unknown SCRAM mechanism")); +} +}); +// Creating an upsertion may throw InvalidKeyException or NoSuchAlgorithmException, +// so keep track of which users are affected by such a failure and immediately fail all their alterations +final Map> userInsertions = new HashMap<>(); +alterations.stream().filter(a -> a instanceof UserScramCredentialUpsertion) +.filter(alteration -> !userIllegalAlterationExceptions.containsKey(alteration.getUser())) +.forEach(alteration -> { +UserScramCredentialUpsertion upsertion = (UserScramCredentialUpsertion) alteration; +String user = upsertion.getUser(); +try { +Scr
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r469410480 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) { return new AlterClientQuotasResult(Collections.unmodifiableMap(futures)); } +@Override +public DescribeUserScramCredentialsResult describeUserScramCredentials(List users, DescribeUserScramCredentialsOptions options) { +final KafkaFutureImpl> future = new KafkaFutureImpl<>(); +final long now = time.milliseconds(); +Call call = new Call("describeUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), +new ControllerNodeProvider()) { +@Override +public DescribeUserScramCredentialsRequest.Builder createRequest(int timeoutMs) { +return new DescribeUserScramCredentialsRequest.Builder( +new DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user -> +new DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList(; +} + +@Override +public void handleResponse(AbstractResponse abstractResponse) { +DescribeUserScramCredentialsResponse response = (DescribeUserScramCredentialsResponse) abstractResponse; +Errors error = Errors.forCode(response.data().error()); +switch (error) { +case NONE: +DescribeUserScramCredentialsResponseData data = response.data(); + future.complete(data.userScramCredentials().stream().collect(Collectors.toMap( + DescribeUserScramCredentialsResponseData.UserScramCredential::name, +userScramCredential -> { +List scramCredentialInfos = userScramCredential.credentialInfos().stream().map( +credentialInfo -> new ScramCredentialInfo(ScramMechanism.from(credentialInfo.mechanism()), credentialInfo.iterations())) +.collect(Collectors.toList()); +return new UserScramCredentialsDescription(userScramCredential.name(), scramCredentialInfos); +}))); +break; +case NOT_CONTROLLER: +handleNotControllerError(error); +break; +default: +future.completeExceptionally(new ApiError(error, response.data().errorMessage()).exception()); +break; +} +} + +@Override +void handleFailure(Throwable throwable) { +future.completeExceptionally(throwable); +} +}; +runnable.call(call, now); +return new DescribeUserScramCredentialsResult(future); +} + +@Override +public AlterUserScramCredentialsResult alterUserScramCredentials(List alterations, + AlterUserScramCredentialsOptions options) { +final long now = time.milliseconds(); +final Map> futures = new HashMap<>(); +for (UserScramCredentialAlteration alteration: alterations) { +futures.put(alteration.getUser(), new KafkaFutureImpl<>()); +} +final Map userIllegalAlterationExceptions = new HashMap<>(); +// We need to keep track of users with deletions of an unknown SCRAM mechanism +alterations.stream().filter(a -> a instanceof UserScramCredentialDeletion).forEach(alteration -> { +UserScramCredentialDeletion deletion = (UserScramCredentialDeletion) alteration; +ScramMechanism mechanism = deletion.getMechanism(); +if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) { +userIllegalAlterationExceptions.put(deletion.getUser(), new IllegalArgumentException("Unknown SCRAM mechanism")); Review comment: I think `UnsupportedSaslMechanismException` is actually appropriate here -- it already exists and corresponds to `Errors.UNSUPPORTED_SASL_MECHANISM`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r468149236 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java ## @@ -1214,6 +1215,64 @@ default AlterClientQuotasResult alterClientQuotas(Collection entries, AlterClientQuotasOptions options); +/** + * Describe all SASL/SCRAM credentials. + * + * This is a convenience method for {@link #describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)} + * + * @return The DescribeUserScramCredentialsResult. + */ +default DescribeUserScramCredentialsResult describeUserScramCredentials() { +return describeUserScramCredentials(null, new DescribeUserScramCredentialsOptions()); +} + +/** + * Describe SASL/SCRAM credentials for the given users. + * + * This is a convenience method for {@link #describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)} + * + * @param users the users for which credentials are to be described; all users' credentials are described if null + * or empty. A user explicitly specified here that does not have a SCRAM credential will not appear + * in the results. Review comment: Hmm, good question. The KIP doesn't state what do do here. @cmccabe thoughts? ## File path: core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala ## @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + + +import java.nio.charset.StandardCharsets +import java.util +import java.util.Properties + +import kafka.network.SocketServer +import kafka.security.authorizer.AclAuthorizer +import org.apache.kafka.clients.admin.ScramMechanism +import org.apache.kafka.common.acl.AclOperation +import org.apache.kafka.common.message.{AlterUserScramCredentialsRequestData, DescribeUserScramCredentialsRequestData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterUserScramCredentialsRequest, AlterUserScramCredentialsResponse, DescribeUserScramCredentialsRequest, DescribeUserScramCredentialsResponse} +import org.apache.kafka.common.resource.ResourceType +import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder} +import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult} +import org.junit.Assert._ +import org.junit.{Before, Test} + +import scala.jdk.CollectionConverters._ + +/** + * Test AlterUserScramCredentialsRequest/Response API for the cases where either no credentials are altered + * or failure is expected due to lack of authorization, sending the request to a non-controller broker, or some other issue. + * Also tests the Alter and Describe APIs for the case where credentials are successfully altered/described. + */ +class AlterUserScramCredentialsRequestTest extends BaseRequestTest { + override def brokerPropertyOverrides(properties: Properties): Unit = { +properties.put(KafkaConfig.ControlledShutdownEnableProp, "false") +properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AlterCredentialsTest.TestAuthorizer].getName) +properties.put(KafkaConfig.PrincipalBuilderClassProp, classOf[AlterCredentialsTest.TestPrincipalBuilder].getName) + } + + @Before + override def setUp(): Unit = { +AlterCredentialsTest.principal = KafkaPrincipal.ANONYMOUS // default is to be authorized +super.setUp() + } + + @Test + def testAlterNothing(): Unit = { +val request = new AlterUserScramCredentialsRequest.Builder( + new AlterUserScramCredentialsRequestData() +.setDeletions(new util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialDeletion]) +.setUpsertions(new util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialUpsertion])).build() +val response = sendAlterUserScramCredentialsRequest(request) + +val results = response.data.results +assertEquals(0, results.size) + } + + @Test + def testAlterNothingNotAuthorized(): Unit = { +AlterCredentialsTest.principal = AlterCredential
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r467191680 ## File path: core/src/main/scala/kafka/admin/ConfigCommand.scala ## @@ -508,7 +563,15 @@ object ConfigCommand extends Config { val entityStr = (entitySubstr(ClientQuotaEntity.USER) ++ entitySubstr(ClientQuotaEntity.CLIENT_ID)).mkString(", ") val entriesStr = entries.asScala.map(e => s"${e._1}=${e._2}").mkString(", ") - println(s"Configs for ${entityStr} are ${entriesStr}") + println(s"Quota configs for ${entityStr} are ${entriesStr}") +} +// we describe user SCRAM credentials only when we are not describing client information +// and we are not given either --entity-default or --user-defaults +if (!entityTypes.contains(ConfigType.Client) && !entityNames.contains("")) { + getUserScramCredentialConfigs(adminClient, entityNames).foreach { case (user, description) => Review comment: I'm adding `core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala` -- let me know if this test covers this case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r466681231 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) { return new AlterClientQuotasResult(Collections.unmodifiableMap(futures)); } +@Override +public DescribeUserScramCredentialsResult describeUserScramCredentials(List users, DescribeUserScramCredentialsOptions options) { +final KafkaFutureImpl> future = new KafkaFutureImpl<>(); +final long now = time.milliseconds(); +Call call = new Call("describeUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), +new ControllerNodeProvider()) { Review comment: The KIP says `It will be will be sent to the controller, and will return NOT_CONTROLLER if the receiving broker is not the controller.` It says this for both Describe and Alter. I agree it doesn't seem necessary for Describe. Would it require an email to the list for notification if we decide to change it? Or would a KIP update be sufficient? Do you think we should change it? @cmccabe any thoughts here as well? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r465751005 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) { return new AlterClientQuotasResult(Collections.unmodifiableMap(futures)); } +@Override +public DescribeUserScramCredentialsResult describeUserScramCredentials(List users, DescribeUserScramCredentialsOptions options) { +final KafkaFutureImpl> future = new KafkaFutureImpl<>(); +final long now = time.milliseconds(); +Call call = new Call("describeUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), +new ControllerNodeProvider()) { +@Override +public DescribeUserScramCredentialsRequest.Builder createRequest(int timeoutMs) { +return new DescribeUserScramCredentialsRequest.Builder( +new DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user -> +new DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList(; +} + +@Override +public void handleResponse(AbstractResponse abstractResponse) { +DescribeUserScramCredentialsResponse response = (DescribeUserScramCredentialsResponse) abstractResponse; +Errors error = Errors.forCode(response.data().error()); +switch (error) { +case NONE: +DescribeUserScramCredentialsResponseData data = response.data(); + future.complete(data.userScramCredentials().stream().collect(Collectors.toMap( + DescribeUserScramCredentialsResponseData.UserScramCredential::name, +userScramCredential -> { +List scramCredentialInfos = userScramCredential.credentialInfos().stream().map( +credentialInfo -> new ScramCredentialInfo(ScramMechanism.from(credentialInfo.mechanism()), credentialInfo.iterations())) +.collect(Collectors.toList()); +return new UserScramCredentialsDescription(userScramCredential.name(), scramCredentialInfos); +}))); +break; +case NOT_CONTROLLER: +handleNotControllerError(error); +break; +default: +future.completeExceptionally(new ApiError(error, response.data().errorMessage()).exception()); +break; +} +} + +@Override +void handleFailure(Throwable throwable) { +future.completeExceptionally(throwable); +} +}; +runnable.call(call, now); +return new DescribeUserScramCredentialsResult(future); +} + +@Override +public AlterUserScramCredentialsResult alterUserScramCredentials(List alterations, + AlterUserScramCredentialsOptions options) { +final long now = time.milliseconds(); +final Map> futures = new HashMap<>(); +for (UserScramCredentialAlteration alteration: alterations) { +futures.put(alteration.getUser(), new KafkaFutureImpl<>()); +} +final Map userIllegalAlterationExceptions = new HashMap<>(); +// We need to keep track of users with deletions of an unknown SCRAM mechanism +alterations.stream().filter(a -> a instanceof UserScramCredentialDeletion).forEach(alteration -> { +UserScramCredentialDeletion deletion = (UserScramCredentialDeletion) alteration; +ScramMechanism mechanism = deletion.getMechanism(); +if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) { +userIllegalAlterationExceptions.put(deletion.getUser(), new IllegalArgumentException("Unknown SCRAM mechanism")); +} +}); +// Creating an upsertion may throw InvalidKeyException or NoSuchAlgorithmException, +// so keep track of which users are affected by such a failure and immediately fail all their alterations +final Map> userInsertions = new HashMap<>(); +alterations.stream().filter(a -> a instanceof UserScramCredentialUpsertion) +.filter(alteration -> !userIllegalAlterationExceptions.containsKey(alteration.getUser())) +.forEach(alteration -> { +UserScramCredentialUpsertion upsertion = (UserScramCredentialUpsertion) alteration; +String user = upsertion.getUser(); +try { +Scr
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r465744271 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) { return new AlterClientQuotasResult(Collections.unmodifiableMap(futures)); } +@Override +public DescribeUserScramCredentialsResult describeUserScramCredentials(List users, DescribeUserScramCredentialsOptions options) { +final KafkaFutureImpl> future = new KafkaFutureImpl<>(); +final long now = time.milliseconds(); +Call call = new Call("describeUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), +new ControllerNodeProvider()) { +@Override +public DescribeUserScramCredentialsRequest.Builder createRequest(int timeoutMs) { +return new DescribeUserScramCredentialsRequest.Builder( +new DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user -> +new DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList(; +} + +@Override +public void handleResponse(AbstractResponse abstractResponse) { +DescribeUserScramCredentialsResponse response = (DescribeUserScramCredentialsResponse) abstractResponse; +Errors error = Errors.forCode(response.data().error()); +switch (error) { +case NONE: +DescribeUserScramCredentialsResponseData data = response.data(); + future.complete(data.userScramCredentials().stream().collect(Collectors.toMap( + DescribeUserScramCredentialsResponseData.UserScramCredential::name, +userScramCredential -> { +List scramCredentialInfos = userScramCredential.credentialInfos().stream().map( +credentialInfo -> new ScramCredentialInfo(ScramMechanism.from(credentialInfo.mechanism()), credentialInfo.iterations())) +.collect(Collectors.toList()); +return new UserScramCredentialsDescription(userScramCredential.name(), scramCredentialInfos); +}))); +break; +case NOT_CONTROLLER: +handleNotControllerError(error); +break; +default: +future.completeExceptionally(new ApiError(error, response.data().errorMessage()).exception()); +break; +} +} + +@Override +void handleFailure(Throwable throwable) { +future.completeExceptionally(throwable); +} +}; +runnable.call(call, now); +return new DescribeUserScramCredentialsResult(future); +} + +@Override +public AlterUserScramCredentialsResult alterUserScramCredentials(List alterations, + AlterUserScramCredentialsOptions options) { +final long now = time.milliseconds(); +final Map> futures = new HashMap<>(); +for (UserScramCredentialAlteration alteration: alterations) { +futures.put(alteration.getUser(), new KafkaFutureImpl<>()); +} +final Map userIllegalAlterationExceptions = new HashMap<>(); +// We need to keep track of users with deletions of an unknown SCRAM mechanism +alterations.stream().filter(a -> a instanceof UserScramCredentialDeletion).forEach(alteration -> { +UserScramCredentialDeletion deletion = (UserScramCredentialDeletion) alteration; +ScramMechanism mechanism = deletion.getMechanism(); +if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) { +userIllegalAlterationExceptions.put(deletion.getUser(), new IllegalArgumentException("Unknown SCRAM mechanism")); Review comment: How about `InvalidRequestException`? It's already used in this class, and it might be more appropriate than `InvalidConfigurationException`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org