[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-31 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r480407445



##
File path: 
core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
##
@@ -545,6 +558,16 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
 }
   }
 
+  protected def createScramAdminClient(scramMechanism: String, user: String, 
password: String): Admin = {

Review comment:
   Ok, this now invokes a new method on SaslSetup().





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-31 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r480370634



##
File path: 
core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
##
@@ -545,6 +558,16 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
 }
   }
 
+  protected def createScramAdminClient(scramMechanism: String, user: String, 
password: String): Admin = {

Review comment:
   See above





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-31 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r480370463



##
File path: 
core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
##
@@ -248,4 +250,25 @@ class SaslClientsWithInvalidCredentialsTest extends 
IntegrationTestHarness with
 producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
 createProducer()
   }
+
+  private def createScramAdminClient(user: String, password: String): Admin = {

Review comment:
   Ok, I added the following code to SaslSetup, and we implement that first 
method in the 3 test classes that use this functionality.
   
   ```
 def createPrivilegedAdminClient(): Admin = {
   // create an admin client instance that is authorized to create 
credentials
   throw new UnsupportedOperationException("Must implement this if a test 
needs to use it")
 }
   
 def createScramCredentialsViaPrivilegedAdminClient(userName: String, 
password: String): Unit = {
   val privilegedAdminClient = createPrivilegedAdminClient() // must 
explicitly implement this method
   try {
 // create the SCRAM credential for the given user
 createScramCredentials(privilegedAdminClient, userName, password)
   } finally {
 privilegedAdminClient.close()
   }
 }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-31 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r480329570



##
File path: 
core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
##
@@ -42,7 +42,18 @@ class SaslScramSslEndToEndAuthorizationTest extends 
SaslEndToEndAuthorizationTes
   override def setUp(): Unit = {
 super.setUp()
 // Create client credentials after starting brokers so that dynamic 
credential creation is also tested
-createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, 
JaasTestUtils.KafkaScramPassword)
-createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser2, 
JaasTestUtils.KafkaScramPassword2)
+createScramCredentialWithScramAdminClient(JaasTestUtils.KafkaScramUser, 
JaasTestUtils.KafkaScramPassword)
+createScramCredentialWithScramAdminClient(JaasTestUtils.KafkaScramUser2, 
JaasTestUtils.KafkaScramPassword2)
+  }
+
+  private def createScramCredentialWithScramAdminClient(user: String, 
password: String) = {

Review comment:
   It was a goal to eliminate all SCRAM credential creation via ZooKeeper 
where possible.  The only places that do so after this PR are when credentials 
have to be created before the brokers are started (i.e. when the inter-broker 
security protocol is SASL/SCRAM).  This code used to create the credential 
directly via ZooKeeper, but since it occurs after the brokers start it can use 
the admin client.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-31 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r480324560



##
File path: 
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
##
@@ -1047,8 +1047,8 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
 
   @Test
   def testAddRemoveSaslListeners(): Unit = {
-createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, 
JaasTestUtils.KafkaScramPassword)
-createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin, 
JaasTestUtils.KafkaScramAdminPassword)
+createScramCredentials(adminClients.head, JaasTestUtils.KafkaScramUser, 
JaasTestUtils.KafkaScramPassword)
+createScramCredentials(adminClients.head, JaasTestUtils.KafkaScramAdmin, 
JaasTestUtils.KafkaScramAdminPassword)
 initializeKerberos()

Review comment:
   Good point.  It wasn't waiting before, and it probably didn't/doesn't 
matter since we were spending time initializing Kerberos, but I added the check 
anyway just to be safe.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-31 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r480301973



##
File path: 
core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala
##
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.nio.charset.StandardCharsets
+
+import kafka.server.BaseRequestTest
+import kafka.utils.Exit
+import org.junit.Assert._
+import org.junit.Test
+
+class UserScramCredentialsCommandTest extends BaseRequestTest {
+  override def brokerCount = 1
+  var exitStatus: Option[Int] = None
+  var exitMessage: Option[String] = None
+
+  case class ConfigCommandResult(stdout: String, exitStatus: Option[Int] = 
None)
+
+  private def runConfigCommandViaBroker(args: Array[String]) : 
ConfigCommandResult = {
+val byteArrayOutputStream = new ByteArrayOutputStream()
+val utf8 = StandardCharsets.UTF_8.name
+val printStream = new PrintStream(byteArrayOutputStream, true, utf8)
+var exitStatus: Option[Int] = None
+Exit.setExitProcedure { (status, _) =>
+  exitStatus = Some(status)
+  throw new RuntimeException
+}
+try {
+  Console.withOut(printStream) {
+ConfigCommand.main(Array("--bootstrap-server", brokerList) ++ args)
+  }
+  ConfigCommandResult(byteArrayOutputStream.toString(utf8))
+} catch {
+  case e: Exception => {

Review comment:
   Logging it at debug level doesn't hurt, so I added it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-31 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r480282139



##
File path: core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
##
@@ -169,6 +169,18 @@ object JaasTestUtils {
 jaasFile
   }
 
+  // Returns a SASL/SCRAM configuration using credentials for the given user 
and password
+  def scramClientLoginModule(mechanism: String, scramUser: String, 
scramPassword: String): String = {
+mechanism match {
+  case "SCRAM-SHA-256" | "SCRAM-SHA-512" =>

Review comment:
   I fixed this in the other places in this file as well





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-31 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r480272048



##
File path: tests/kafkatest/services/kafka/kafka.py
##
@@ -575,10 +577,21 @@ def set_unclean_leader_election(self, topic, value=True, 
node=None):
 node.account.ssh(cmd)
 
 def _connect_setting_kafka_configs(self, node):
+# Use this for everything related to kafka-configs except User SCRAM 
Credentials
 if node.version.kafka_configs_command_uses_bootstrap_server():
-return "--bootstrap-server %s " % 
self.bootstrap_servers(self.security_protocol)
+return "--bootstrap-server %s --command-config <(echo '%s')" % 
(self.bootstrap_servers(self.security_protocol),

Review comment:
   I created https://issues.apache.org/jira/browse/KAFKA-10451 to track 
this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-27 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r478653535



##
File path: tests/kafkatest/services/security/security_config.py
##
@@ -350,6 +362,14 @@ def kafka_opts(self):
 else:
 return ""
 
+def export_kafka_opts_for_admin_client_as_broker(self):

Review comment:
   Yes, created https://issues.apache.org/jira/browse/KAFKA-10443 to track 
this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-24 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r475771230



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.errors.ResourceNotFoundException;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> usersFuture;
+private final Map> 
perUserFutures;
+
+/**
+ *
+ * @param usersFuture the future indicating the users described by the call
+ * @param perUserFutures the required map of user names to futures 
representing the results of describing each
+ *   user's SCRAM credentials.
+ */
+public DescribeUserScramCredentialsResult(KafkaFuture> 
usersFuture,
+  Map> perUserFutures) {
+this.usersFuture = Objects.requireNonNull(usersFuture);
+this.perUserFutures = Objects.requireNonNull(perUserFutures);
+}
+
+/**
+ *
+ * @return a future for the results of all requested (either explicitly or 
implicitly via describe-all) users.
+ * The future will complete successfully only if the users future first 
completes successfully and then all the
+ * futures for the user descriptions complete successfully.
+ */
+public KafkaFuture> all() {
+KafkaFuture succeedsOnlyIfUsersFutureSucceeds = 
KafkaFuture.allOf(users());
+return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> {
+KafkaFuture succeedsOnlyIfAllDescriptionsSucceed = 
KafkaFuture.allOf(perUserFutures.values().toArray(
+new KafkaFuture[perUserFutures.size()]));
+KafkaFuture> 
mapFuture = succeedsOnlyIfAllDescriptionsSucceed.thenApply(void2 ->
+perUserFutures.entrySet().stream().collect(Collectors.toMap(
+e -> e.getKey(),
+e -> 
valueFromFutureGuaranteedToSucceedAtThisPoint(e.getValue();
+/* At this point it is only the users future that is guaranteed to 
have succeeded.
+ * We want to return the future to the map, but we have to return 
a map at this point.
+ * We need to dereference the future while propagating any 
exception.
+ */
+return valueFromFuturePropagatingExceptionsAsUnchecked(mapFuture);
+});
+}
+
+/**
+ *
+ * @return a future indicating the distinct users that were requested 
(either explicitly or implicitly via
+ * describe-all).  The future will not complete successfully if the user 
is not authorized to perform the describe
+ * operation; otherwise, it will complete successfully as long as the list 
of users with credentials can be
+ * successfully determined within some hard-coded timeout period.
+ */
+public KafkaFuture> users() {
+return usersFuture;
+}
+
+/**
+ *
+ * @param userName the name of the user description being requested
+ * @return a future indicating the description results for the given user. 
The future will complete exceptionally if
+ * the future returned by {@link #users()} completes exceptionally.  If 
the given user does not exist in the list
+ * of requested users then the future will complete exceptionally with
+ * {@link org.apache.kafka.common.errors.ResourceNotFoundException}.
+ */
+public KafkaFuture description(String 
userName) {
+KafkaFuture succeedsOnlyIfUsersFutureSucceeds = 
KafkaFuture.allOf(usersFuture);
+return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> {
+   

[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-24 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r475771230



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.errors.ResourceNotFoundException;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> usersFuture;
+private final Map> 
perUserFutures;
+
+/**
+ *
+ * @param usersFuture the future indicating the users described by the call
+ * @param perUserFutures the required map of user names to futures 
representing the results of describing each
+ *   user's SCRAM credentials.
+ */
+public DescribeUserScramCredentialsResult(KafkaFuture> 
usersFuture,
+  Map> perUserFutures) {
+this.usersFuture = Objects.requireNonNull(usersFuture);
+this.perUserFutures = Objects.requireNonNull(perUserFutures);
+}
+
+/**
+ *
+ * @return a future for the results of all requested (either explicitly or 
implicitly via describe-all) users.
+ * The future will complete successfully only if the users future first 
completes successfully and then all the
+ * futures for the user descriptions complete successfully.
+ */
+public KafkaFuture> all() {
+KafkaFuture succeedsOnlyIfUsersFutureSucceeds = 
KafkaFuture.allOf(users());
+return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> {
+KafkaFuture succeedsOnlyIfAllDescriptionsSucceed = 
KafkaFuture.allOf(perUserFutures.values().toArray(
+new KafkaFuture[perUserFutures.size()]));
+KafkaFuture> 
mapFuture = succeedsOnlyIfAllDescriptionsSucceed.thenApply(void2 ->
+perUserFutures.entrySet().stream().collect(Collectors.toMap(
+e -> e.getKey(),
+e -> 
valueFromFutureGuaranteedToSucceedAtThisPoint(e.getValue();
+/* At this point it is only the users future that is guaranteed to 
have succeeded.
+ * We want to return the future to the map, but we have to return 
a map at this point.
+ * We need to dereference the future while propagating any 
exception.
+ */
+return valueFromFuturePropagatingExceptionsAsUnchecked(mapFuture);
+});
+}
+
+/**
+ *
+ * @return a future indicating the distinct users that were requested 
(either explicitly or implicitly via
+ * describe-all).  The future will not complete successfully if the user 
is not authorized to perform the describe
+ * operation; otherwise, it will complete successfully as long as the list 
of users with credentials can be
+ * successfully determined within some hard-coded timeout period.
+ */
+public KafkaFuture> users() {
+return usersFuture;
+}
+
+/**
+ *
+ * @param userName the name of the user description being requested
+ * @return a future indicating the description results for the given user. 
The future will complete exceptionally if
+ * the future returned by {@link #users()} completes exceptionally.  If 
the given user does not exist in the list
+ * of requested users then the future will complete exceptionally with
+ * {@link org.apache.kafka.common.errors.ResourceNotFoundException}.
+ */
+public KafkaFuture description(String 
userName) {
+KafkaFuture succeedsOnlyIfUsersFutureSucceeds = 
KafkaFuture.allOf(usersFuture);
+return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> {
+   

[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-24 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r475756645



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.errors.ResourceNotFoundException;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> usersFuture;
+private final Map> 
perUserFutures;
+
+/**
+ *
+ * @param usersFuture the future indicating the users described by the call
+ * @param perUserFutures the required map of user names to futures 
representing the results of describing each
+ *   user's SCRAM credentials.
+ */
+public DescribeUserScramCredentialsResult(KafkaFuture> 
usersFuture,
+  Map> perUserFutures) {
+this.usersFuture = Objects.requireNonNull(usersFuture);
+this.perUserFutures = Objects.requireNonNull(perUserFutures);
+}
+
+/**
+ *
+ * @return a future for the results of all requested (either explicitly or 
implicitly via describe-all) users.
+ * The future will complete successfully only if the users future first 
completes successfully and then all the
+ * futures for the user descriptions complete successfully.
+ */
+public KafkaFuture> all() {
+KafkaFuture succeedsOnlyIfUsersFutureSucceeds = 
KafkaFuture.allOf(users());
+return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> {
+KafkaFuture succeedsOnlyIfAllDescriptionsSucceed = 
KafkaFuture.allOf(perUserFutures.values().toArray(
+new KafkaFuture[perUserFutures.size()]));
+KafkaFuture> 
mapFuture = succeedsOnlyIfAllDescriptionsSucceed.thenApply(void2 ->
+perUserFutures.entrySet().stream().collect(Collectors.toMap(
+e -> e.getKey(),
+e -> 
valueFromFutureGuaranteedToSucceedAtThisPoint(e.getValue();
+/* At this point it is only the users future that is guaranteed to 
have succeeded.
+ * We want to return the future to the map, but we have to return 
a map at this point.
+ * We need to dereference the future while propagating any 
exception.
+ */
+return valueFromFuturePropagatingExceptionsAsUnchecked(mapFuture);
+});
+}
+
+/**
+ *
+ * @return a future indicating the distinct users that were requested 
(either explicitly or implicitly via

Review comment:
   We've gone back and forth on this.  The KIP does not explicitly state 
what to do in the case of a describe request for a user that does not have 
credentials, and we originally coded it to silently drop them, but then we 
changed it to be consistent with other APIs and raise an error 
(https://github.com/apache/kafka/pull/9032#discussion_r468871453).  I agree 
that it isn't totally clear what to do.  Rather than making the change back, 
I'll leave both this Javadoc and the underlying implementation as-is right now 
unill we discuss further and decide for sure what we want.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-24 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r475750058



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.errors.ResourceNotFoundException;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> usersFuture;
+private final Map> 
perUserFutures;
+
+/**
+ *
+ * @param usersFuture the future indicating the users described by the call
+ * @param perUserFutures the required map of user names to futures 
representing the results of describing each
+ *   user's SCRAM credentials.
+ */
+public DescribeUserScramCredentialsResult(KafkaFuture> 
usersFuture,
+  Map> perUserFutures) {
+this.usersFuture = Objects.requireNonNull(usersFuture);
+this.perUserFutures = Objects.requireNonNull(perUserFutures);
+}
+
+/**
+ *
+ * @return a future for the results of all requested (either explicitly or 
implicitly via describe-all) users.
+ * The future will complete successfully only if the users future first 
completes successfully and then all the
+ * futures for the user descriptions complete successfully.
+ */
+public KafkaFuture> all() {
+KafkaFuture succeedsOnlyIfUsersFutureSucceeds = 
KafkaFuture.allOf(users());
+return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> {
+KafkaFuture succeedsOnlyIfAllDescriptionsSucceed = 
KafkaFuture.allOf(perUserFutures.values().toArray(
+new KafkaFuture[perUserFutures.size()]));
+KafkaFuture> 
mapFuture = succeedsOnlyIfAllDescriptionsSucceed.thenApply(void2 ->
+perUserFutures.entrySet().stream().collect(Collectors.toMap(
+e -> e.getKey(),
+e -> 
valueFromFutureGuaranteedToSucceedAtThisPoint(e.getValue();
+/* At this point it is only the users future that is guaranteed to 
have succeeded.
+ * We want to return the future to the map, but we have to return 
a map at this point.
+ * We need to dereference the future while propagating any 
exception.
+ */
+return valueFromFuturePropagatingExceptionsAsUnchecked(mapFuture);
+});
+}
+
+/**
+ *
+ * @return a future indicating the distinct users that were requested 
(either explicitly or implicitly via
+ * describe-all).  The future will not complete successfully if the user 
is not authorized to perform the describe
+ * operation; otherwise, it will complete successfully as long as the list 
of users with credentials can be
+ * successfully determined within some hard-coded timeout period.
+ */
+public KafkaFuture> users() {
+return usersFuture;
+}
+
+/**
+ *
+ * @param userName the name of the user description being requested
+ * @return a future indicating the description results for the given user. 
The future will complete exceptionally if
+ * the future returned by {@link #users()} completes exceptionally.  If 
the given user does not exist in the list
+ * of requested users then the future will complete exceptionally with
+ * {@link org.apache.kafka.common.errors.ResourceNotFoundException}.
+ */
+public KafkaFuture description(String 
userName) {
+KafkaFuture succeedsOnlyIfUsersFutureSucceeds = 
KafkaFuture.allOf(usersFuture);
+return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> {
+   

[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-21 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r474950597



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> 
future;
+
+/**
+ *
+ * @param future the required future representing the result of the call
+ */
+public 
DescribeUserScramCredentialsResult(KafkaFuture>
 future) {
+this.future = Objects.requireNonNull(future);
+}
+
+/**
+ *
+ * @return the future representing the result of the call
+ */
+public KafkaFuture> future() {

Review comment:
   Sorry, my mistake.  This now reflects what we discussed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-20 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r474236159



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> 
future;

Review comment:
   Ok, we have two levels of Futures now:
   
   1. `DescribeUserScramCredentialsResult` has a 
`KafkaFuture>`
   2. `UserScramCredentialsDescriptionResult` has a user name and a 
`KafkaFuture`
   
   I think this is where we want to end up.  Let me know if you agree.  I also 
added some logic in `AlterUserScramCredentialsRequestTest` to confirm the 
behavior of the per-user futures and associated errors.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-20 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r474226011



##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -1214,6 +1215,64 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection entries, 
AlterClientQuotasOptions options);
 
+/**
+ * Describe all SASL/SCRAM credentials.
+ *
+ * This is a convenience method for {@link 
#describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)}
+ *
+ * @return The DescribeUserScramCredentialsResult.
+ */
+default DescribeUserScramCredentialsResult describeUserScramCredentials() {
+return describeUserScramCredentials(null, new 
DescribeUserScramCredentialsOptions());
+}
+
+/**
+ * Describe SASL/SCRAM credentials for the given users.
+ *
+ * This is a convenience method for {@link 
#describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)}
+ *
+ * @param users the users for which credentials are to be described; all 
users' credentials are described if null
+ *  or empty.  A user explicitly specified here that does not 
have a SCRAM credential will not appear
+ *  in the results.

Review comment:
   I reworked this Javadoc to list all possible exceptions for both alter 
and describe.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-19 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r472947852



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> 
future;

Review comment:
   Ok, I think I follow now.  You are saying that we could potentially 
implement describe by invoking 1+N requests to Kafka: one to get the list of 
credentials (either the list of all of them if we are asking for them all, or 
the explicitly requested ones we wanted), and then another N requests to get 
the data for each one.  This on the surface seems like an anti-pattern, but it 
is not unreasonable for the case where the data is expensive to get in the 
first place — maybe we are forced to make 1 or more round-trips for each 
anyway.  So as a general, reusable pattern, yes, I believe it works.
   
   So when we invoke describe, whether it is describe-all or just an explicit 
few, we return a single future, and that future returns a list of instances 
(UserName in this case): either 1 instance for each user that has at least 1 
credential for the describe-all case, or one instance per distinct user 
explicitly requested otherwise.  Then each UserName instance has the accessor 
you mentioned, which in this case returns  
Future.
   
   Do I have that right?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-18 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r472575326



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> 
future;

Review comment:
   I can see reasonable arguments for all the possibilities, so if you feel 
strongly about one of them then I would be fine with it.  For example, `list()` 
and `describe()` even though they return the same thing (now -- `describe()` 
could return more later potentially).  Just let me know.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-18 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r472570823



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> 
future;

Review comment:
   > a list RPC to show everything, and a describe RPC to show only some 
things.
   
   Do you mean a list RPC that takes no arguments and returns every credential 
defined for every user and a describe RPC that takes 1 or more users and 
returns every credential defined for those specified users, and they both 
return the same information for each credential?
   
   Or do you mean a list RPC and a describe RPC that return different sets of 
information (as is done with list vs. describe topics)?  I think you mean the 
former (two RPCs, each returning the same thing), but I want to be certain I 
understand.
   
   > There are a few reasons why. One is that even though we currently only 
make one RPC, in the future we might make more than one. In that case we would 
want multiple futures.
   
   I don't understand what this is referring to.  By "we currently only make 
one RPC" to what are you referring?
   
   > I also feel like in AdminClient, errors should be handled with futures 
pretty much all the time
   
   Agreed.  Will convert to using futures always, whenever we arrive at the 
final decision on what the RPCs need to look like.
   
   I'm wondering if we convert to returning futures everywhere, can we stick 
with the one describe RPC?  For example, could the admin client return a 
`Future>>`?  Would that 
work, and if so, would that be a reasonable way to proceed?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-18 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r472574569



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> 
future;

Review comment:
   If we do decide to go with 2 separate APIs, then I might be concerned 
about using `list()` vs `describe()` if they return the same set of information 
(i.e. mechanism and iterations).  Perhaps using two separate names gives us 
room to expand `describe()` to return more information later on, though.  But 
if not, and they will always return the same information, then maybe 
`describeAll()` and `describe()` (or `listAll()` and `list()`) might be better?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-18 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r472570823



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> 
future;

Review comment:
   > a list RPC to show everything, and a describe RPC to show only some 
things.
   
   Do you mean a list RPC that takes no arguments and returns every credential 
defined for every users and a describe RPC that takes 1 or more users and 
returns every credential defined for those specified users, and they both 
return the same information for each credential?
   
   Or do you mean a list RPC and a describe RPC that return different sets of 
information (as is done with list vs. describe topics)?  I think you mean the 
former (two RPCs, each returning the same thing), but I want to be certain I 
understand.
   
   > There are a few reasons why. One is that even though we currently only 
make one RPC, in the future we might make more than one. In that case we would 
want multiple futures.
   
   I don't understand what this is referring to.  By "we currently only make 
one RPC" to what are you referring?
   
   > I also feel like in AdminClient, errors should be handled with futures 
pretty much all the time
   
   Agreed.  Will convert to using futures always, whenever we arrive at the 
final decision on what the RPCs need to look like.
   
   I'm wondering if we convert to returning futures everywhere, can we stick 
with the one describe RPC?  For example, could the admin client return a 
`Future>>`?  Would that 
work, and if so, would that be a reasonable way to proceed?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-18 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r472570823



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> 
future;

Review comment:
   > a list RPC to show everything, and a describe RPC to show only some 
things.
   
   Do you mean a list RPC that takes no arguments and returns every credential 
defined for every users and a describe RPC that takes 1 or more users and 
returns every credential defined for those specified users, and they both 
return the same information for each credential?
   
   Or do you mean a list RPC and a describe RPC that return different sets of 
information (as is done with list vs. describe topics)?  I think you mean the 
former (two RPCs, each returning the same thing), but I want to be certain I 
understand.
   
   > There are a few reasons why. One is that even though we currently only 
make one RPC, in the future we might make more than one. In that case we would 
want multiple futures.
   
   I don't understand what this is referring to.  By "we currently only make 
one RPC" to what are you referring?
   
   > I also feel like in AdminClient, errors should be handled with futures 
pretty much all the time
   
   Agreed.  Will convert to using futures always, whenever we arrive at the 
final decision on what the RPCs need to look like.
   
   I'm wondering if we convert to returning futures everywhere, can we stick 
with the one describe RPC?  For example, could the admin client return a 
`Future>>`?  Would that work, and if so, would that be 
a reasonable way to proceed?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-18 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r472530253



##
File path: core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
##
@@ -486,7 +486,9 @@ class ConfigCommandTest extends ZooKeeperTestHarness with 
Logging {
   }
 
   @Test
-  def shouldNotAlterNonQuotaClientConfigUsingBootstrapServer(): Unit = {
+  def shouldNotAlterNonQuotaNonScramUserOrClientConfigUsingBootstrapServer(): 
Unit = {
+// when using --bootstrap-server, it should be illegal to alter anything 
that is not a quota and not a SCRAM credential
+// for both user and client entities

Review comment:
   @cmccabe Good question, actually.  There is already a check to make sure 
a non-existent config cannot be **deleted** via `--zookeeper`: 
`shouldNotUpdateConfigIfNonExistingConfigIsDeletedUsingZookeper()`.  This test 
passes, of course.
   
   However, there is no check to make sure an unrecognized config can be 
**added**, and in fact if I add that test it fails; the code will gladly go 
ahead and add anything we wish (and it will gladly go ahead and delete it if we 
wish as well -- the above test is only checking that something that doesn't 
exist can't be deleted).
   
   The next question, of course, is whether we should "fix" this or not.  What 
do you think?  To fix it we would need the full set of allowed configs at the 
User, Client, Topic, and Broker levels and then insert code to check 
accordingly.  Since the ZooKeeper update path is going away due to KIP-500, I'm 
wondering if we can just leave it alone?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-13 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r470339737



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> 
future;

Review comment:
   I kept the top-level error information but added per-user error 
information in addition.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-13 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r470339031



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/UserScramCredentialUpsertion.java
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.security.SecureRandom;
+import java.util.Objects;
+
+/**
+ * A request to update/insert a SASL/SCRAM credential for a user.
+ *
+ * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API";>KIP-554:
 Add Broker-side SCRAM Config API
+ */
+public class UserScramCredentialUpsertion extends 
UserScramCredentialAlteration {
+private final ScramCredentialInfo info;
+private final byte[] salt;
+private final byte[] password;
+
+/**
+ * Constructor that generates a random salt
+ *
+ * @param user the user for which the credential is to be updated/inserted
+ * @param credentialInfo the mechanism and iterations to be used
+ * @param password the password
+ */
+public UserScramCredentialUpsertion(String user, ScramCredentialInfo 
credentialInfo, String password) {
+this(user, credentialInfo, password.getBytes(StandardCharsets.UTF_8));
+}
+
+/**
+ * Constructor that generates a random salt
+ *
+ * @param user the user for which the credential is to be updated/inserted
+ * @param credentialInfo the mechanism and iterations to be used
+ * @param password the password
+ */
+public UserScramCredentialUpsertion(String user, ScramCredentialInfo 
credentialInfo, byte[] password) {
+this(user, credentialInfo, password, generateRandomSalt());
+}
+
+/**
+ * Constructor that accepts an explicit salt
+ *
+ * @param user the user for which the credential is to be updated/inserted
+ * @param credentialInfo the mechanism and iterations to be used
+ * @param password the password
+ * @param salt the salt to be used
+ */
+public UserScramCredentialUpsertion(String user, ScramCredentialInfo 
credentialInfo, byte[] password, byte[] salt) {
+super(Objects.requireNonNull(user));
+this.info = Objects.requireNonNull(credentialInfo);
+this.password = Objects.requireNonNull(password);
+this.salt = Objects.requireNonNull(salt);
+}
+
+/**
+ *
+ * @return the mechanism and iterations
+ */
+public ScramCredentialInfo credentialInfo() {
+return info;
+}
+
+/**
+ *
+ * @return the salt
+ */
+public byte[] salt() {
+return salt;
+}
+
+/**
+ *
+ * @return the password
+ */
+public byte[] password() {
+return password;
+}
+
+private static byte[] generateRandomSalt() {
+return new BigInteger(130, new 
SecureRandom()).toString(Character.MAX_RADIX).getBytes(StandardCharsets.UTF_8);

Review comment:
   The latest commit changes the methods on 
`org.apache.kafka.common.security.scram.internals.ScramFormatter` to be static, 
and I now reuse that logic.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-13 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r470101599



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> 
future;

Review comment:
   I don't see the need for multiple futures; everything is returning in a 
single response from Kafka, and I can't think of why it would be necessary to 
have some users' credential descriptions available to the API caller but others 
not.  I'll take a look at keeping a single future for the response but moving 
the error status from the top level down into each per-user result.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-12 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r469680702



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> 
future;

Review comment:
   Now that I'm working on this, I discovered that there is no other API 
that can describe or list everything that works this way.  Everything that can 
describe or list everything returns a single future.  Every describe or list 
API that returns a map of keys to futures requires a non-empty list of keys to 
describe or list.  For example:
   
   1. `listTopics()` lists all topics and returns a single `Future`; the 
`describeTopics()` API returns a map of names to futures but requires a 
non-empty list of topics to describe.
   2. `describeConfigs()` returns a map of resources to futures but requires a 
non-empty list of resources to describe.
   3. `describeLogDirs()` returns a map of broker IDs to futures but requires a 
non-empty list of brokers to describe.
   4. `describeReplicaLogDirs()` returns a map of replicas to futures but 
requires a non-empty list of replicas to describe.
   5. `describeConsumerGroups()` returns a map of consumer groups to futures 
but requires a non-empty list of consumer groups to describe.
   6. `listPartitionReassignments()` allows listing all or a subset of 
reassignments and returns a single future.
   7. `listOffsets()` returns a map of topic-partitions to futures but requires 
a non-empty list of topic-partitions to describe.
   8. `describeClientQuotas()` allows listing all or a subset of quotas and 
returns a single future.
   
   I think if we made this change here we would be off the beaten path.  That's 
not necessarily bad, but what tipped me off to this was the fact that when we 
list everything we have to create a future for every user that gets returned, 
and we don't know that list of users when we make the request, so there's 
really no way to implement it.
   
   We could create two separate APIs: one for describing some explicit, 
non-empty list of users, which would return a map of users to futures, and 
another one that describes everything, which returns a single future.  
`listTopics()` vs `describeTopics()` works this way, for example, though the 
information returned in the two is very different: when listing you just get 
the names, and when describing you get a lot more.  I don't see us 
distinguishing between listing vs. describing in terms of data -- we are going 
to send back the same two things (mechanism and iterations) regardless.  So we 
would probably be talking about creating a `describeUserScramCredentials()` API 
and a `describeAllUserScramCredentials()` API with the first taking a list and 
returning a map of futures and the second not taking a list and returning a 
single future.
   
   But I'm thinking we should just keep it the way it is -- take a 
possibly-empty list and return a single future regardles of whether the list 
was empty or not.
   
   Thoughts?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-12 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r469650293



##
File path: 
core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala
##
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.nio.charset.StandardCharsets
+
+import kafka.server.BaseRequestTest
+import kafka.utils.Exit
+import org.junit.Assert._
+import org.junit.Test
+
+class UserScramCredentialsCommandTest extends BaseRequestTest {
+  override def brokerCount = 1
+  var exitStatus: Option[Int] = None
+  var exitMessage: Option[String] = None
+
+  case class ConfigCommandResult(stdout: String, exitStatus: Option[Int] = 
None)
+
+  private def runConfigCommandViaBroker(args: Array[String]) : 
ConfigCommandResult = {
+val byteArrayOutputStream = new ByteArrayOutputStream()
+val utf8 = StandardCharsets.UTF_8.name
+val printStream = new PrintStream(byteArrayOutputStream, true, utf8)
+var exitStatus: Option[Int] = None
+Exit.setExitProcedure { (status, _) =>

Review comment:
   Actually, I think this may not be an issue since parallel tests in 
Gradle run in separate processes rather than separate threads.  From 
https://docs.gradle.org/current/dsl/org.gradle.api.tasks.testing.Test.html: 
`"Test are always run in (one or more) separate JVMs."`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-12 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r469646777



##
File path: 
core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala
##
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.nio.charset.StandardCharsets
+
+import kafka.server.BaseRequestTest
+import kafka.utils.Exit
+import org.junit.Assert._
+import org.junit.Test
+
+class UserScramCredentialsCommandTest extends BaseRequestTest {
+  override def brokerCount = 1
+  var exitStatus: Option[Int] = None
+  var exitMessage: Option[String] = None
+
+  case class ConfigCommandResult(stdout: String, exitStatus: Option[Int] = 
None)
+
+  private def runConfigCommandViaBroker(args: Array[String]) : 
ConfigCommandResult = {
+val byteArrayOutputStream = new ByteArrayOutputStream()
+val utf8 = StandardCharsets.UTF_8.name
+val printStream = new PrintStream(byteArrayOutputStream, true, utf8)
+var exitStatus: Option[Int] = None
+Exit.setExitProcedure { (status, _) =>

Review comment:
   Hmm, good point, I think there may be a problem here in general because 
there is only a single exit procedure that can be set globally, and multiple 
tests that set/reset it in parallel will collide.  There are 16 Scala test 
classes in `core` out of 260 that do this -- so 6% of test classes.  So I think 
this will introduce some flakiness to these 16 tests.  Does this sound correct 
to you, and should we open a separate ticket for this as opposed to trying to 
fix it here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-12 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r469646777



##
File path: 
core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala
##
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.nio.charset.StandardCharsets
+
+import kafka.server.BaseRequestTest
+import kafka.utils.Exit
+import org.junit.Assert._
+import org.junit.Test
+
+class UserScramCredentialsCommandTest extends BaseRequestTest {
+  override def brokerCount = 1
+  var exitStatus: Option[Int] = None
+  var exitMessage: Option[String] = None
+
+  case class ConfigCommandResult(stdout: String, exitStatus: Option[Int] = 
None)
+
+  private def runConfigCommandViaBroker(args: Array[String]) : 
ConfigCommandResult = {
+val byteArrayOutputStream = new ByteArrayOutputStream()
+val utf8 = StandardCharsets.UTF_8.name
+val printStream = new PrintStream(byteArrayOutputStream, true, utf8)
+var exitStatus: Option[Int] = None
+Exit.setExitProcedure { (status, _) =>

Review comment:
   Hmm, good point, I think there may be a problem here in general because 
there is only a single exit procedure that can be set globally, and multiple 
tests that set/reset it in parallel will collide.  There are 16 Scala test 
classes in `core` out of 260 that do this -- so 4% of test classes.  So I think 
this will introduce some flakiness to these 16 tests.  Does this sound correct 
to you, and should we open a separate ticket for this as opposed to trying to 
fix it here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-12 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r469637266



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/UserScramCredentialUpsertion.java
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.security.SecureRandom;
+import java.util.Objects;
+
+/**
+ * A request to update/insert a SASL/SCRAM credential for a user.
+ *
+ * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API";>KIP-554:
 Add Broker-side SCRAM Config API
+ */
+public class UserScramCredentialUpsertion extends 
UserScramCredentialAlteration {
+private final ScramCredentialInfo info;
+private final byte[] salt;
+private final byte[] password;
+
+/**
+ * Constructor that generates a random salt
+ *
+ * @param user the user for which the credential is to be updated/inserted
+ * @param credentialInfo the mechanism and iterations to be used
+ * @param password the password
+ */
+public UserScramCredentialUpsertion(String user, ScramCredentialInfo 
credentialInfo, String password) {
+this(user, credentialInfo, password.getBytes(StandardCharsets.UTF_8));
+}
+
+/**
+ * Constructor that generates a random salt
+ *
+ * @param user the user for which the credential is to be updated/inserted
+ * @param credentialInfo the mechanism and iterations to be used
+ * @param password the password
+ */
+public UserScramCredentialUpsertion(String user, ScramCredentialInfo 
credentialInfo, byte[] password) {
+this(user, credentialInfo, password, generateRandomSalt());
+}
+
+/**
+ * Constructor that accepts an explicit salt
+ *
+ * @param user the user for which the credential is to be updated/inserted
+ * @param credentialInfo the mechanism and iterations to be used
+ * @param password the password
+ * @param salt the salt to be used
+ */
+public UserScramCredentialUpsertion(String user, ScramCredentialInfo 
credentialInfo, byte[] password, byte[] salt) {
+super(Objects.requireNonNull(user));
+this.info = Objects.requireNonNull(credentialInfo);
+this.password = Objects.requireNonNull(password);
+this.salt = Objects.requireNonNull(salt);
+}
+
+/**
+ *
+ * @return the mechanism and iterations
+ */
+public ScramCredentialInfo credentialInfo() {
+return info;
+}
+
+/**
+ *
+ * @return the salt
+ */
+public byte[] salt() {
+return salt;
+}
+
+/**
+ *
+ * @return the password
+ */
+public byte[] password() {
+return password;
+}
+
+private static byte[] generateRandomSalt() {
+return new BigInteger(130, new 
SecureRandom()).toString(Character.MAX_RADIX).getBytes(StandardCharsets.UTF_8);

Review comment:
   @cmccabe  I think the approach you suggest leaves out how to identify 
`length` which itself needs to be randomized.  I got the current implementation 
from `org.apache.kafka.common.security.scram.internals.ScramFormatter`.  I 
would have invoked `ScramFormatter.secureRandomBytes()` directly, but it is not 
`static` and I did not want to either instantiate an instance or change methods 
to static (though the class is internal, so I could have done that). I instead 
replicated the logic here.  The array length ends up being random with this 
approach, as do the bytes in the array.  Let me know what you think.  Currently 
I've left this as-is.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-12 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r469627624



##
File path: 
core/src/test/scala/unit/kafka/server/DescribeUserScramCredentialsRequestTest.scala
##
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util
+import java.util.Properties
+
+import kafka.network.SocketServer
+import kafka.security.authorizer.AclAuthorizer
+import org.apache.kafka.common.acl.AclOperation
+import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData
+import 
org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{DescribeUserScramCredentialsRequest, 
DescribeUserScramCredentialsResponse}
+import org.apache.kafka.common.resource.ResourceType
+import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal, KafkaPrincipalBuilder}
+import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, 
AuthorizationResult}
+import org.junit.Assert._
+import org.junit.{Before, Test}
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * Test DescribeUserScramCredentialsRequest/Response API for the cases where 
no credentials exist
+ * or failure is expected due to lack of authorization, sending the request to 
a non-controller broker, or some other issue.
+ * Testing the API for the case where there are actually credentials to 
describe is performed elsewhere.
+ */
+class DescribeUserScramCredentialsRequestTest extends BaseRequestTest {
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
+properties.put(KafkaConfig.ControlledShutdownEnableProp, "false")
+properties.put(KafkaConfig.AuthorizerClassNameProp, 
classOf[DescribeCredentialsTest.TestAuthorizer].getName)
+properties.put(KafkaConfig.PrincipalBuilderClassProp, 
classOf[DescribeCredentialsTest.TestPrincipalBuilder].getName)
+  }
+
+  @Before
+  override def setUp(): Unit = {
+DescribeCredentialsTest.principal = KafkaPrincipal.ANONYMOUS // default is 
to be authorized
+super.setUp()
+  }
+
+  @Test
+  def testDescribeNothing(): Unit = {
+val request = new DescribeUserScramCredentialsRequest.Builder(
+  new DescribeUserScramCredentialsRequestData()).build()
+val response = sendDescribeUserScramCredentialsRequest(request)
+
+val error = response.data.error
+assertEquals("Expected no error when routed correctly", Errors.NONE.code, 
error)
+assertEquals("Expected no credentials", 0, 
response.data.userScramCredentials.size)
+  }
+
+  @Test
+  def testDescribeNotController(): Unit = {
+val request = new DescribeUserScramCredentialsRequest.Builder(
+  new DescribeUserScramCredentialsRequestData()).build()
+val response = sendDescribeUserScramCredentialsRequest(request, 
notControllerSocketServer)
+
+val error = response.data.error
+assertEquals("Expected controller error when routed incorrectly", 
Errors.NOT_CONTROLLER.code, error)
+  }
+
+  @Test
+  def testDescribeNotAuthorized(): Unit = {
+DescribeCredentialsTest.principal = 
DescribeCredentialsTest.UnauthorizedPrincipal
+
+val request = new DescribeUserScramCredentialsRequest.Builder(
+  new DescribeUserScramCredentialsRequestData()).build()
+val response = sendDescribeUserScramCredentialsRequest(request)
+
+val error = response.data.error
+assertEquals("Expected not authorized error", 
Errors.CLUSTER_AUTHORIZATION_FAILED.code, error)
+  }
+
+  @Test
+  def testDescribeSameUserTwice(): Unit = {
+val user = new UserName().setName("user1")
+val request = new DescribeUserScramCredentialsRequest.Builder(
+  new DescribeUserScramCredentialsRequestData().setUsers(List(user, 
user).asJava)).build()
+val response = sendDescribeUserScramCredentialsRequest(request)
+
+val error = response.data.error
+assertEquals("Expected invalid request error", 
Errors.INVALID_REQUEST.code, error)
+  }
+
+
+  private def sendDescribeUserScramCredentialsRequest(request: 
DescribeUserScramCredentialsRequest, socketServer: SocketServer = 
controllerSocketServer): Desc

[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-12 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r469476195



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) {
 return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
 }
 
+@Override
+public DescribeUserScramCredentialsResult 
describeUserScramCredentials(List users, 
DescribeUserScramCredentialsOptions options) {
+final KafkaFutureImpl> 
future = new KafkaFutureImpl<>();
+final long now = time.milliseconds();
+Call call = new Call("describeUserScramCredentials", 
calcDeadlineMs(now, options.timeoutMs()),
+new ControllerNodeProvider()) {

Review comment:
   Ok, switching Describe to **not** require that it be done on the 
controller.
   
   > planning on getting rid of ControllerNodeProvider as part of KIP-590 
   
   Leaving Alter alone for now under the assumption that we will fix it as part 
of the KIP-590 PR.  Let me know if you wish me to change this now instead.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-12 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r469457068



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) {
 return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
 }
 
+@Override
+public DescribeUserScramCredentialsResult 
describeUserScramCredentials(List users, 
DescribeUserScramCredentialsOptions options) {
+final KafkaFutureImpl> 
future = new KafkaFutureImpl<>();
+final long now = time.milliseconds();
+Call call = new Call("describeUserScramCredentials", 
calcDeadlineMs(now, options.timeoutMs()),
+new ControllerNodeProvider()) {
+@Override
+public DescribeUserScramCredentialsRequest.Builder 
createRequest(int timeoutMs) {
+return new DescribeUserScramCredentialsRequest.Builder(
+new 
DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user ->
+new 
DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList(;
+}
+
+@Override
+public void handleResponse(AbstractResponse abstractResponse) {
+DescribeUserScramCredentialsResponse response = 
(DescribeUserScramCredentialsResponse) abstractResponse;
+Errors error = Errors.forCode(response.data().error());
+switch (error) {
+case NONE:
+DescribeUserScramCredentialsResponseData data = 
response.data();
+
future.complete(data.userScramCredentials().stream().collect(Collectors.toMap(
+
DescribeUserScramCredentialsResponseData.UserScramCredential::name,
+userScramCredential -> {
+List scramCredentialInfos 
= userScramCredential.credentialInfos().stream().map(
+credentialInfo -> new 
ScramCredentialInfo(ScramMechanism.from(credentialInfo.mechanism()), 
credentialInfo.iterations()))
+.collect(Collectors.toList());
+return new 
UserScramCredentialsDescription(userScramCredential.name(), 
scramCredentialInfos);
+})));
+break;
+case NOT_CONTROLLER:
+handleNotControllerError(error);
+break;
+default:
+future.completeExceptionally(new ApiError(error, 
response.data().errorMessage()).exception());
+break;
+}
+}
+
+@Override
+void handleFailure(Throwable throwable) {
+future.completeExceptionally(throwable);
+}
+};
+runnable.call(call, now);
+return new DescribeUserScramCredentialsResult(future);
+}
+
+@Override
+public AlterUserScramCredentialsResult 
alterUserScramCredentials(List alterations,
+ 
AlterUserScramCredentialsOptions options) {
+final long now = time.milliseconds();
+final Map> futures = new HashMap<>();
+for (UserScramCredentialAlteration alteration: alterations) {
+futures.put(alteration.getUser(), new KafkaFutureImpl<>());
+}
+final Map userIllegalAlterationExceptions = new 
HashMap<>();
+// We need to keep track of users with deletions of an unknown SCRAM 
mechanism
+alterations.stream().filter(a -> a instanceof 
UserScramCredentialDeletion).forEach(alteration -> {
+UserScramCredentialDeletion deletion = 
(UserScramCredentialDeletion) alteration;
+ScramMechanism mechanism = deletion.getMechanism();
+if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) {
+userIllegalAlterationExceptions.put(deletion.getUser(), new 
IllegalArgumentException("Unknown SCRAM mechanism"));
+}
+});
+// Creating an upsertion may throw InvalidKeyException or 
NoSuchAlgorithmException,
+// so keep track of which users are affected by such a failure and 
immediately fail all their alterations
+final Map> userInsertions 
= new HashMap<>();
+alterations.stream().filter(a -> a instanceof 
UserScramCredentialUpsertion)
+.filter(alteration -> 
!userIllegalAlterationExceptions.containsKey(alteration.getUser()))
+.forEach(alteration -> {
+UserScramCredentialUpsertion upsertion = 
(UserScramCredentialUpsertion) alteration;
+String user = upsertion.getUser();
+try {
+Scr

[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-12 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r469421523



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) {
 return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
 }
 
+@Override
+public DescribeUserScramCredentialsResult 
describeUserScramCredentials(List users, 
DescribeUserScramCredentialsOptions options) {
+final KafkaFutureImpl> 
future = new KafkaFutureImpl<>();
+final long now = time.milliseconds();
+Call call = new Call("describeUserScramCredentials", 
calcDeadlineMs(now, options.timeoutMs()),
+new ControllerNodeProvider()) {
+@Override
+public DescribeUserScramCredentialsRequest.Builder 
createRequest(int timeoutMs) {
+return new DescribeUserScramCredentialsRequest.Builder(
+new 
DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user ->
+new 
DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList(;
+}
+
+@Override
+public void handleResponse(AbstractResponse abstractResponse) {
+DescribeUserScramCredentialsResponse response = 
(DescribeUserScramCredentialsResponse) abstractResponse;
+Errors error = Errors.forCode(response.data().error());
+switch (error) {
+case NONE:
+DescribeUserScramCredentialsResponseData data = 
response.data();
+
future.complete(data.userScramCredentials().stream().collect(Collectors.toMap(
+
DescribeUserScramCredentialsResponseData.UserScramCredential::name,
+userScramCredential -> {
+List scramCredentialInfos 
= userScramCredential.credentialInfos().stream().map(
+credentialInfo -> new 
ScramCredentialInfo(ScramMechanism.from(credentialInfo.mechanism()), 
credentialInfo.iterations()))
+.collect(Collectors.toList());
+return new 
UserScramCredentialsDescription(userScramCredential.name(), 
scramCredentialInfos);
+})));
+break;
+case NOT_CONTROLLER:
+handleNotControllerError(error);
+break;
+default:
+future.completeExceptionally(new ApiError(error, 
response.data().errorMessage()).exception());
+break;
+}
+}
+
+@Override
+void handleFailure(Throwable throwable) {
+future.completeExceptionally(throwable);
+}
+};
+runnable.call(call, now);
+return new DescribeUserScramCredentialsResult(future);
+}
+
+@Override
+public AlterUserScramCredentialsResult 
alterUserScramCredentials(List alterations,
+ 
AlterUserScramCredentialsOptions options) {
+final long now = time.milliseconds();
+final Map> futures = new HashMap<>();
+for (UserScramCredentialAlteration alteration: alterations) {
+futures.put(alteration.getUser(), new KafkaFutureImpl<>());
+}
+final Map userIllegalAlterationExceptions = new 
HashMap<>();
+// We need to keep track of users with deletions of an unknown SCRAM 
mechanism
+alterations.stream().filter(a -> a instanceof 
UserScramCredentialDeletion).forEach(alteration -> {
+UserScramCredentialDeletion deletion = 
(UserScramCredentialDeletion) alteration;
+ScramMechanism mechanism = deletion.getMechanism();
+if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) {
+userIllegalAlterationExceptions.put(deletion.getUser(), new 
IllegalArgumentException("Unknown SCRAM mechanism"));
+}
+});
+// Creating an upsertion may throw InvalidKeyException or 
NoSuchAlgorithmException,
+// so keep track of which users are affected by such a failure and 
immediately fail all their alterations
+final Map> userInsertions 
= new HashMap<>();
+alterations.stream().filter(a -> a instanceof 
UserScramCredentialUpsertion)
+.filter(alteration -> 
!userIllegalAlterationExceptions.containsKey(alteration.getUser()))
+.forEach(alteration -> {
+UserScramCredentialUpsertion upsertion = 
(UserScramCredentialUpsertion) alteration;
+String user = upsertion.getUser();
+try {
+Scr

[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-12 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r469410480



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) {
 return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
 }
 
+@Override
+public DescribeUserScramCredentialsResult 
describeUserScramCredentials(List users, 
DescribeUserScramCredentialsOptions options) {
+final KafkaFutureImpl> 
future = new KafkaFutureImpl<>();
+final long now = time.milliseconds();
+Call call = new Call("describeUserScramCredentials", 
calcDeadlineMs(now, options.timeoutMs()),
+new ControllerNodeProvider()) {
+@Override
+public DescribeUserScramCredentialsRequest.Builder 
createRequest(int timeoutMs) {
+return new DescribeUserScramCredentialsRequest.Builder(
+new 
DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user ->
+new 
DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList(;
+}
+
+@Override
+public void handleResponse(AbstractResponse abstractResponse) {
+DescribeUserScramCredentialsResponse response = 
(DescribeUserScramCredentialsResponse) abstractResponse;
+Errors error = Errors.forCode(response.data().error());
+switch (error) {
+case NONE:
+DescribeUserScramCredentialsResponseData data = 
response.data();
+
future.complete(data.userScramCredentials().stream().collect(Collectors.toMap(
+
DescribeUserScramCredentialsResponseData.UserScramCredential::name,
+userScramCredential -> {
+List scramCredentialInfos 
= userScramCredential.credentialInfos().stream().map(
+credentialInfo -> new 
ScramCredentialInfo(ScramMechanism.from(credentialInfo.mechanism()), 
credentialInfo.iterations()))
+.collect(Collectors.toList());
+return new 
UserScramCredentialsDescription(userScramCredential.name(), 
scramCredentialInfos);
+})));
+break;
+case NOT_CONTROLLER:
+handleNotControllerError(error);
+break;
+default:
+future.completeExceptionally(new ApiError(error, 
response.data().errorMessage()).exception());
+break;
+}
+}
+
+@Override
+void handleFailure(Throwable throwable) {
+future.completeExceptionally(throwable);
+}
+};
+runnable.call(call, now);
+return new DescribeUserScramCredentialsResult(future);
+}
+
+@Override
+public AlterUserScramCredentialsResult 
alterUserScramCredentials(List alterations,
+ 
AlterUserScramCredentialsOptions options) {
+final long now = time.milliseconds();
+final Map> futures = new HashMap<>();
+for (UserScramCredentialAlteration alteration: alterations) {
+futures.put(alteration.getUser(), new KafkaFutureImpl<>());
+}
+final Map userIllegalAlterationExceptions = new 
HashMap<>();
+// We need to keep track of users with deletions of an unknown SCRAM 
mechanism
+alterations.stream().filter(a -> a instanceof 
UserScramCredentialDeletion).forEach(alteration -> {
+UserScramCredentialDeletion deletion = 
(UserScramCredentialDeletion) alteration;
+ScramMechanism mechanism = deletion.getMechanism();
+if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) {
+userIllegalAlterationExceptions.put(deletion.getUser(), new 
IllegalArgumentException("Unknown SCRAM mechanism"));

Review comment:
   I think `UnsupportedSaslMechanismException` is actually appropriate here 
-- it already exists and corresponds to `Errors.UNSUPPORTED_SASL_MECHANISM`.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-10 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r468149236



##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -1214,6 +1215,64 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection entries, 
AlterClientQuotasOptions options);
 
+/**
+ * Describe all SASL/SCRAM credentials.
+ *
+ * This is a convenience method for {@link 
#describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)}
+ *
+ * @return The DescribeUserScramCredentialsResult.
+ */
+default DescribeUserScramCredentialsResult describeUserScramCredentials() {
+return describeUserScramCredentials(null, new 
DescribeUserScramCredentialsOptions());
+}
+
+/**
+ * Describe SASL/SCRAM credentials for the given users.
+ *
+ * This is a convenience method for {@link 
#describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)}
+ *
+ * @param users the users for which credentials are to be described; all 
users' credentials are described if null
+ *  or empty.  A user explicitly specified here that does not 
have a SCRAM credential will not appear
+ *  in the results.

Review comment:
   Hmm, good question.  The KIP doesn't state what do do here.  @cmccabe 
thoughts?

##
File path: 
core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala
##
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+
+import java.nio.charset.StandardCharsets
+import java.util
+import java.util.Properties
+
+import kafka.network.SocketServer
+import kafka.security.authorizer.AclAuthorizer
+import org.apache.kafka.clients.admin.ScramMechanism
+import org.apache.kafka.common.acl.AclOperation
+import org.apache.kafka.common.message.{AlterUserScramCredentialsRequestData, 
DescribeUserScramCredentialsRequestData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterUserScramCredentialsRequest, 
AlterUserScramCredentialsResponse, DescribeUserScramCredentialsRequest, 
DescribeUserScramCredentialsResponse}
+import org.apache.kafka.common.resource.ResourceType
+import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal, KafkaPrincipalBuilder}
+import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, 
AuthorizationResult}
+import org.junit.Assert._
+import org.junit.{Before, Test}
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * Test AlterUserScramCredentialsRequest/Response API for the cases where 
either no credentials are altered
+ * or failure is expected due to lack of authorization, sending the request to 
a non-controller broker, or some other issue.
+ * Also tests the Alter and Describe APIs for the case where credentials are 
successfully altered/described.
+ */
+class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
+properties.put(KafkaConfig.ControlledShutdownEnableProp, "false")
+properties.put(KafkaConfig.AuthorizerClassNameProp, 
classOf[AlterCredentialsTest.TestAuthorizer].getName)
+properties.put(KafkaConfig.PrincipalBuilderClassProp, 
classOf[AlterCredentialsTest.TestPrincipalBuilder].getName)
+  }
+
+  @Before
+  override def setUp(): Unit = {
+AlterCredentialsTest.principal = KafkaPrincipal.ANONYMOUS // default is to 
be authorized
+super.setUp()
+  }
+
+  @Test
+  def testAlterNothing(): Unit = {
+val request = new AlterUserScramCredentialsRequest.Builder(
+  new AlterUserScramCredentialsRequestData()
+.setDeletions(new 
util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialDeletion])
+.setUpsertions(new 
util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialUpsertion])).build()
+val response = sendAlterUserScramCredentialsRequest(request)
+
+val results = response.data.results
+assertEquals(0, results.size)
+  }
+
+  @Test
+  def testAlterNothingNotAuthorized(): Unit = {
+AlterCredentialsTest.principal = AlterCredential

[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-07 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r467191680



##
File path: core/src/main/scala/kafka/admin/ConfigCommand.scala
##
@@ -508,7 +563,15 @@ object ConfigCommand extends Config {
 
   val entityStr = (entitySubstr(ClientQuotaEntity.USER) ++ 
entitySubstr(ClientQuotaEntity.CLIENT_ID)).mkString(", ")
   val entriesStr = entries.asScala.map(e => 
s"${e._1}=${e._2}").mkString(", ")
-  println(s"Configs for ${entityStr} are ${entriesStr}")
+  println(s"Quota configs for ${entityStr} are ${entriesStr}")
+}
+// we describe user SCRAM credentials only when we are not describing 
client information
+// and we are not given either --entity-default or --user-defaults
+if (!entityTypes.contains(ConfigType.Client) && !entityNames.contains("")) 
{
+  getUserScramCredentialConfigs(adminClient, entityNames).foreach { case 
(user, description) =>

Review comment:
   I'm adding 
`core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala` -- 
let me know if this test covers this case.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-06 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r466681231



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) {
 return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
 }
 
+@Override
+public DescribeUserScramCredentialsResult 
describeUserScramCredentials(List users, 
DescribeUserScramCredentialsOptions options) {
+final KafkaFutureImpl> 
future = new KafkaFutureImpl<>();
+final long now = time.milliseconds();
+Call call = new Call("describeUserScramCredentials", 
calcDeadlineMs(now, options.timeoutMs()),
+new ControllerNodeProvider()) {

Review comment:
   The KIP says `It will be will be sent to the controller, and will return 
NOT_CONTROLLER if the receiving broker is not the controller.` It says this for 
both Describe and Alter.  I agree it doesn't seem necessary for Describe.  
Would it require an email to the list for notification if we decide to change 
it?  Or would a KIP update be sufficient?  Do you think we should change it?  
@cmccabe any thoughts here as well?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-05 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r465751005



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) {
 return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
 }
 
+@Override
+public DescribeUserScramCredentialsResult 
describeUserScramCredentials(List users, 
DescribeUserScramCredentialsOptions options) {
+final KafkaFutureImpl> 
future = new KafkaFutureImpl<>();
+final long now = time.milliseconds();
+Call call = new Call("describeUserScramCredentials", 
calcDeadlineMs(now, options.timeoutMs()),
+new ControllerNodeProvider()) {
+@Override
+public DescribeUserScramCredentialsRequest.Builder 
createRequest(int timeoutMs) {
+return new DescribeUserScramCredentialsRequest.Builder(
+new 
DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user ->
+new 
DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList(;
+}
+
+@Override
+public void handleResponse(AbstractResponse abstractResponse) {
+DescribeUserScramCredentialsResponse response = 
(DescribeUserScramCredentialsResponse) abstractResponse;
+Errors error = Errors.forCode(response.data().error());
+switch (error) {
+case NONE:
+DescribeUserScramCredentialsResponseData data = 
response.data();
+
future.complete(data.userScramCredentials().stream().collect(Collectors.toMap(
+
DescribeUserScramCredentialsResponseData.UserScramCredential::name,
+userScramCredential -> {
+List scramCredentialInfos 
= userScramCredential.credentialInfos().stream().map(
+credentialInfo -> new 
ScramCredentialInfo(ScramMechanism.from(credentialInfo.mechanism()), 
credentialInfo.iterations()))
+.collect(Collectors.toList());
+return new 
UserScramCredentialsDescription(userScramCredential.name(), 
scramCredentialInfos);
+})));
+break;
+case NOT_CONTROLLER:
+handleNotControllerError(error);
+break;
+default:
+future.completeExceptionally(new ApiError(error, 
response.data().errorMessage()).exception());
+break;
+}
+}
+
+@Override
+void handleFailure(Throwable throwable) {
+future.completeExceptionally(throwable);
+}
+};
+runnable.call(call, now);
+return new DescribeUserScramCredentialsResult(future);
+}
+
+@Override
+public AlterUserScramCredentialsResult 
alterUserScramCredentials(List alterations,
+ 
AlterUserScramCredentialsOptions options) {
+final long now = time.milliseconds();
+final Map> futures = new HashMap<>();
+for (UserScramCredentialAlteration alteration: alterations) {
+futures.put(alteration.getUser(), new KafkaFutureImpl<>());
+}
+final Map userIllegalAlterationExceptions = new 
HashMap<>();
+// We need to keep track of users with deletions of an unknown SCRAM 
mechanism
+alterations.stream().filter(a -> a instanceof 
UserScramCredentialDeletion).forEach(alteration -> {
+UserScramCredentialDeletion deletion = 
(UserScramCredentialDeletion) alteration;
+ScramMechanism mechanism = deletion.getMechanism();
+if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) {
+userIllegalAlterationExceptions.put(deletion.getUser(), new 
IllegalArgumentException("Unknown SCRAM mechanism"));
+}
+});
+// Creating an upsertion may throw InvalidKeyException or 
NoSuchAlgorithmException,
+// so keep track of which users are affected by such a failure and 
immediately fail all their alterations
+final Map> userInsertions 
= new HashMap<>();
+alterations.stream().filter(a -> a instanceof 
UserScramCredentialUpsertion)
+.filter(alteration -> 
!userIllegalAlterationExceptions.containsKey(alteration.getUser()))
+.forEach(alteration -> {
+UserScramCredentialUpsertion upsertion = 
(UserScramCredentialUpsertion) alteration;
+String user = upsertion.getUser();
+try {
+Scr

[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-05 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r465744271



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) {
 return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
 }
 
+@Override
+public DescribeUserScramCredentialsResult 
describeUserScramCredentials(List users, 
DescribeUserScramCredentialsOptions options) {
+final KafkaFutureImpl> 
future = new KafkaFutureImpl<>();
+final long now = time.milliseconds();
+Call call = new Call("describeUserScramCredentials", 
calcDeadlineMs(now, options.timeoutMs()),
+new ControllerNodeProvider()) {
+@Override
+public DescribeUserScramCredentialsRequest.Builder 
createRequest(int timeoutMs) {
+return new DescribeUserScramCredentialsRequest.Builder(
+new 
DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user ->
+new 
DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList(;
+}
+
+@Override
+public void handleResponse(AbstractResponse abstractResponse) {
+DescribeUserScramCredentialsResponse response = 
(DescribeUserScramCredentialsResponse) abstractResponse;
+Errors error = Errors.forCode(response.data().error());
+switch (error) {
+case NONE:
+DescribeUserScramCredentialsResponseData data = 
response.data();
+
future.complete(data.userScramCredentials().stream().collect(Collectors.toMap(
+
DescribeUserScramCredentialsResponseData.UserScramCredential::name,
+userScramCredential -> {
+List scramCredentialInfos 
= userScramCredential.credentialInfos().stream().map(
+credentialInfo -> new 
ScramCredentialInfo(ScramMechanism.from(credentialInfo.mechanism()), 
credentialInfo.iterations()))
+.collect(Collectors.toList());
+return new 
UserScramCredentialsDescription(userScramCredential.name(), 
scramCredentialInfos);
+})));
+break;
+case NOT_CONTROLLER:
+handleNotControllerError(error);
+break;
+default:
+future.completeExceptionally(new ApiError(error, 
response.data().errorMessage()).exception());
+break;
+}
+}
+
+@Override
+void handleFailure(Throwable throwable) {
+future.completeExceptionally(throwable);
+}
+};
+runnable.call(call, now);
+return new DescribeUserScramCredentialsResult(future);
+}
+
+@Override
+public AlterUserScramCredentialsResult 
alterUserScramCredentials(List alterations,
+ 
AlterUserScramCredentialsOptions options) {
+final long now = time.milliseconds();
+final Map> futures = new HashMap<>();
+for (UserScramCredentialAlteration alteration: alterations) {
+futures.put(alteration.getUser(), new KafkaFutureImpl<>());
+}
+final Map userIllegalAlterationExceptions = new 
HashMap<>();
+// We need to keep track of users with deletions of an unknown SCRAM 
mechanism
+alterations.stream().filter(a -> a instanceof 
UserScramCredentialDeletion).forEach(alteration -> {
+UserScramCredentialDeletion deletion = 
(UserScramCredentialDeletion) alteration;
+ScramMechanism mechanism = deletion.getMechanism();
+if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) {
+userIllegalAlterationExceptions.put(deletion.getUser(), new 
IllegalArgumentException("Unknown SCRAM mechanism"));

Review comment:
   How about `InvalidRequestException`?  It's already used in this class, 
and it might be more appropriate than `InvalidConfigurationException`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org