rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r468149236



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1214,6 +1215,64 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection<ClientQuotaAlterati
      */
     AlterClientQuotasResult 
alterClientQuotas(Collection<ClientQuotaAlteration> entries, 
AlterClientQuotasOptions options);
 
+    /**
+     * <p>Describe all SASL/SCRAM credentials.
+     *
+     * <p>This is a convenience method for {@link 
#describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)}
+     *
+     * @return The DescribeUserScramCredentialsResult.
+     */
+    default DescribeUserScramCredentialsResult describeUserScramCredentials() {
+        return describeUserScramCredentials(null, new 
DescribeUserScramCredentialsOptions());
+    }
+
+    /**
+     * <p>Describe SASL/SCRAM credentials for the given users.
+     *
+     * <p>This is a convenience method for {@link 
#describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)}
+     *
+     * @param users the users for which credentials are to be described; all 
users' credentials are described if null
+     *              or empty.  A user explicitly specified here that does not 
have a SCRAM credential will not appear
+     *              in the results.

Review comment:
       Hmm, good question.  The KIP doesn't state what do do here.  @cmccabe 
thoughts?

##########
File path: 
core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala
##########
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+
+import java.nio.charset.StandardCharsets
+import java.util
+import java.util.Properties
+
+import kafka.network.SocketServer
+import kafka.security.authorizer.AclAuthorizer
+import org.apache.kafka.clients.admin.ScramMechanism
+import org.apache.kafka.common.acl.AclOperation
+import org.apache.kafka.common.message.{AlterUserScramCredentialsRequestData, 
DescribeUserScramCredentialsRequestData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterUserScramCredentialsRequest, 
AlterUserScramCredentialsResponse, DescribeUserScramCredentialsRequest, 
DescribeUserScramCredentialsResponse}
+import org.apache.kafka.common.resource.ResourceType
+import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal, KafkaPrincipalBuilder}
+import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, 
AuthorizationResult}
+import org.junit.Assert._
+import org.junit.{Before, Test}
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * Test AlterUserScramCredentialsRequest/Response API for the cases where 
either no credentials are altered
+ * or failure is expected due to lack of authorization, sending the request to 
a non-controller broker, or some other issue.
+ * Also tests the Alter and Describe APIs for the case where credentials are 
successfully altered/described.
+ */
+class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
+    properties.put(KafkaConfig.ControlledShutdownEnableProp, "false")
+    properties.put(KafkaConfig.AuthorizerClassNameProp, 
classOf[AlterCredentialsTest.TestAuthorizer].getName)
+    properties.put(KafkaConfig.PrincipalBuilderClassProp, 
classOf[AlterCredentialsTest.TestPrincipalBuilder].getName)
+  }
+
+  @Before
+  override def setUp(): Unit = {
+    AlterCredentialsTest.principal = KafkaPrincipal.ANONYMOUS // default is to 
be authorized
+    super.setUp()
+  }
+
+  @Test
+  def testAlterNothing(): Unit = {
+    val request = new AlterUserScramCredentialsRequest.Builder(
+      new AlterUserScramCredentialsRequestData()
+        .setDeletions(new 
util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialDeletion])
+        .setUpsertions(new 
util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialUpsertion])).build()
+    val response = sendAlterUserScramCredentialsRequest(request)
+
+    val results = response.data.results
+    assertEquals(0, results.size)
+  }
+
+  @Test
+  def testAlterNothingNotAuthorized(): Unit = {
+    AlterCredentialsTest.principal = AlterCredentialsTest.UnauthorizedPrincipal
+
+    val request = new AlterUserScramCredentialsRequest.Builder(
+      new AlterUserScramCredentialsRequestData()
+        .setDeletions(new 
util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialDeletion])
+        .setUpsertions(new 
util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialUpsertion])).build()
+    val response = sendAlterUserScramCredentialsRequest(request)
+
+    val results = response.data.results
+    assertEquals(0, results.size)
+  }
+
+  @Test
+  def testAlterSomethingNotAuthorized(): Unit = {
+    AlterCredentialsTest.principal = AlterCredentialsTest.UnauthorizedPrincipal
+
+    val request = new AlterUserScramCredentialsRequest.Builder(
+      new AlterUserScramCredentialsRequestData()
+        .setDeletions(util.Arrays.asList(new 
AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName("name1").setMechanism(ScramMechanism.SCRAM_SHA_256.getType)))
+        .setUpsertions(util.Arrays.asList(new 
AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName("name2").setMechanism(ScramMechanism.SCRAM_SHA_512.getType)))).build()
+    val response = sendAlterUserScramCredentialsRequest(request)
+
+    val results = response.data.results
+    assertEquals(2, results.size)
+    assertTrue("Expected not authorized",
+      results.get(0).errorCode == Errors.CLUSTER_AUTHORIZATION_FAILED.code && 
results.get(1).errorCode == Errors.CLUSTER_AUTHORIZATION_FAILED.code)
+  }
+
+  @Test
+  def testAlterSameThingTwice(): Unit = {
+    val deletion1 = new 
AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName("name1").setMechanism(ScramMechanism.SCRAM_SHA_256.getType)
+    val deletion2 = new 
AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName("name2").setMechanism(ScramMechanism.SCRAM_SHA_256.getType)
+    val upsertion1 = new 
AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName("name1").setMechanism(ScramMechanism.SCRAM_SHA_256.getType)
+      
.setIterations(-1).setSalt("salt".getBytes).setSaltedPassword("saltedPassword".getBytes)
+    val upsertion2 = new 
AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName("name2").setMechanism(ScramMechanism.SCRAM_SHA_256.getType)
+      
.setIterations(-1).setSalt("salt".getBytes).setSaltedPassword("saltedPassword".getBytes)
+    val requests = List (
+      new AlterUserScramCredentialsRequest.Builder(
+        new AlterUserScramCredentialsRequestData()
+          .setDeletions(util.Arrays.asList(deletion1, deletion1))
+          .setUpsertions(util.Arrays.asList(upsertion2, upsertion2))).build(),
+      new AlterUserScramCredentialsRequest.Builder(
+        new AlterUserScramCredentialsRequestData()
+          .setDeletions(util.Arrays.asList(deletion1, deletion2))
+          .setUpsertions(util.Arrays.asList(upsertion1, upsertion2))).build(),
+    )
+    requests.foreach(request => {
+      val response = sendAlterUserScramCredentialsRequest(request)
+      val results = response.data.results
+      assertEquals(2, results.size)
+      assertTrue("Expected error when altering the same credential twice in a 
single request",
+        results.get(0).errorCode == Errors.INVALID_REQUEST.code && 
results.get(1).errorCode == Errors.INVALID_REQUEST.code)
+    })
+  }
+
+  @Test
+  def testAlterEmptyUser(): Unit = {

Review comment:
       Added a test for it in `UserScramCredentialsCommandTest`.  We can't test 
for it here because we get a salted password here, and I don't think it is 
possible for that to be an empty string and allow a successful SASL/SCRAM 
authentication.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ScramCredentialInfo.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+
+/**
+ * Mechanism and iterations for a SASL/SCRAM credential associated with a user.
+ *
+ * @see <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API";>KIP-554:
 Add Broker-side SCRAM Config API</a>
+ */
+public class ScramCredentialInfo {
+    private final ScramMechanism mechanism;
+    private final int iterations;
+
+    /**
+     *
+     * @param mechanism the required mechanism
+     * @param iterations the number of iterations used when creating the 
credential
+     */
+    public ScramCredentialInfo(ScramMechanism mechanism, int iterations) {
+        this.mechanism = Objects.requireNonNull(mechanism);
+        this.iterations = iterations;
+    }
+
+    /**
+     *
+     * @return the mechanism
+     */
+    public ScramMechanism getMechanism() {
+        return mechanism;
+    }
+
+    /**
+     *
+     * @return the number of iterations used when creating the credential

Review comment:
       TBD.  No change needed if we are getting rid of `-1` as a special value.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) {
         return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
     }
 
+    @Override
+    public DescribeUserScramCredentialsResult 
describeUserScramCredentials(List<String> users, 
DescribeUserScramCredentialsOptions options) {
+        final KafkaFutureImpl<Map<String, UserScramCredentialsDescription>> 
future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        Call call = new Call("describeUserScramCredentials", 
calcDeadlineMs(now, options.timeoutMs()),
+                new ControllerNodeProvider()) {
+            @Override
+            public DescribeUserScramCredentialsRequest.Builder 
createRequest(int timeoutMs) {
+                return new DescribeUserScramCredentialsRequest.Builder(
+                        new 
DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user ->
+                                new 
DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList())));
+            }
+
+            @Override
+            public void handleResponse(AbstractResponse abstractResponse) {
+                DescribeUserScramCredentialsResponse response = 
(DescribeUserScramCredentialsResponse) abstractResponse;
+                Errors error = Errors.forCode(response.data().error());
+                switch (error) {
+                    case NONE:
+                        DescribeUserScramCredentialsResponseData data = 
response.data();
+                        
future.complete(data.userScramCredentials().stream().collect(Collectors.toMap(
+                            
DescribeUserScramCredentialsResponseData.UserScramCredential::name,
+                            userScramCredential -> {
+                                List<ScramCredentialInfo> scramCredentialInfos 
= userScramCredential.credentialInfos().stream().map(
+                                    credentialInfo -> new 
ScramCredentialInfo(ScramMechanism.fromType(credentialInfo.mechanism()), 
credentialInfo.iterations()))
+                                        .collect(Collectors.toList());
+                                return new 
UserScramCredentialsDescription(userScramCredential.name(), 
scramCredentialInfos);
+                            })));
+                        break;
+                    case NOT_CONTROLLER:
+                        handleNotControllerError(error);
+                        break;
+                    default:
+                        future.completeExceptionally(new ApiError(error, 
response.data().errorMessage()).exception());
+                        break;
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                future.completeExceptionally(throwable);
+            }
+        };
+        runnable.call(call, now);
+        return new DescribeUserScramCredentialsResult(future);
+    }
+
+    @Override
+    public AlterUserScramCredentialsResult 
alterUserScramCredentials(List<UserScramCredentialAlteration> alterations,
+                                                                     
AlterUserScramCredentialsOptions options) {
+        final long now = time.milliseconds();
+        final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>();
+        for (UserScramCredentialAlteration alteration: alterations) {
+            futures.put(alteration.getUser(), new KafkaFutureImpl<>());
+        }
+        final Map<String, Exception> userIllegalAlterationExceptions = new 
HashMap<>();
+        // We need to keep track of users with deletions of an unknown SCRAM 
mechanism
+        alterations.stream().filter(a -> a instanceof 
UserScramCredentialDeletion).forEach(alteration -> {
+            UserScramCredentialDeletion deletion = 
(UserScramCredentialDeletion) alteration;
+            ScramMechanism mechanism = deletion.getMechanism();
+            if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) {
+                userIllegalAlterationExceptions.put(deletion.getUser(), new 
InvalidRequestException("Unknown SCRAM mechanism"));
+            }
+        });
+        // Creating an upsertion may throw InvalidKeyException or 
NoSuchAlgorithmException,
+        // so keep track of which users are affected by such a failure and 
immediately fail all their alterations
+        final Map<String, Map<ScramMechanism, 
AlterUserScramCredentialsRequestData.ScramCredentialUpsertion>> userInsertions 
= new HashMap<>();
+        alterations.stream().filter(a -> a instanceof 
UserScramCredentialUpsertion)
+                .filter(alteration -> 
!userIllegalAlterationExceptions.containsKey(alteration.getUser()))
+                .forEach(alteration -> {
+                    UserScramCredentialUpsertion upsertion = 
(UserScramCredentialUpsertion) alteration;
+                    String user = upsertion.getUser();
+                    try {
+                        ScramMechanism mechanism = 
upsertion.getInfo().getMechanism();
+                        if (mechanism == null || mechanism == 
ScramMechanism.UNKNOWN)
+                            throw new InvalidRequestException("Unknown SCRAM 
mechanism");
+                        userInsertions.putIfAbsent(user, new HashMap<>());
+                        userInsertions.get(user).put(mechanism, 
getScramCredentialUpsertion(upsertion));
+                    } catch (Exception e) {
+                        // we might overwrite an exception from a previous 
upsertion, but we don't really care
+                        // since we just need to mark this user as having at 
least one illegal alteration
+                        // and make an exception instance available for 
completing the corresponding future exceptionally
+                        userIllegalAlterationExceptions.put(user, e);
+                    }
+                });
+        // fail any users immediately that have an illegal alteration as 
identified above
+        userIllegalAlterationExceptions.entrySet().stream().forEach(entry -> {
+            
futures.get(entry.getKey()).completeExceptionally(entry.getValue());
+        });
+
+        // submit alterations for users that do not have an illegal upsertion 
as identified above
+        Call call = new Call("alterUserScramCredentials", calcDeadlineMs(now, 
options.timeoutMs()),
+                new ControllerNodeProvider()) {
+            @Override
+            public AlterUserScramCredentialsRequest.Builder createRequest(int 
timeoutMs) {
+                return new AlterUserScramCredentialsRequest.Builder(
+                        new 
AlterUserScramCredentialsRequestData().setUpsertions(alterations.stream()
+                                .filter(a -> a instanceof 
UserScramCredentialUpsertion)
+                                .filter(a -> 
!userIllegalAlterationExceptions.containsKey(a.getUser()))
+                                .map(a -> 
userInsertions.get(a.getUser()).get(((UserScramCredentialUpsertion) 
a).getInfo().getMechanism()))
+                                .collect(Collectors.toList()))
+                        .setDeletions(alterations.stream()
+                                .filter(a -> a instanceof 
UserScramCredentialDeletion)
+                                .filter(a -> 
!userIllegalAlterationExceptions.containsKey(a.getUser()))
+                                .map(d ->
+                                
getScramCredentialDeletion((UserScramCredentialDeletion) 
d)).collect(Collectors.toList())));
+            }
+
+            @Override
+            public void handleResponse(AbstractResponse abstractResponse) {
+                AlterUserScramCredentialsResponse response = 
(AlterUserScramCredentialsResponse) abstractResponse;
+                // Check for controller change
+                for (Errors error : response.errorCounts().keySet()) {
+                    if (error == Errors.NOT_CONTROLLER) {
+                        handleNotControllerError(error);
+                    }
+                }
+                response.data().results().forEach(result -> {
+                    KafkaFutureImpl<Void> future = futures.get(result.user());
+                    if (future == null) {
+                        log.warn("Server response mentioned unknown user {}", 
result.user());
+                    } else {
+                        Errors error = Errors.forCode(result.errorCode());
+                        if (error != Errors.NONE) {
+                            
future.completeExceptionally(error.exception(result.errorMessage()));
+                        } else {
+                            future.complete(null);
+                        }
+                    }
+                });
+                completeUnrealizedFutures(
+                    futures.entrySet().stream(),
+                    user -> "The broker response did not contain a result for 
user " + user);
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(futures.values(), throwable);
+            }
+        };
+        runnable.call(call, now);
+        return new AlterUserScramCredentialsResult(new HashMap<>(futures));
+    }
+
+    private static 
AlterUserScramCredentialsRequestData.ScramCredentialUpsertion 
getScramCredentialUpsertion(UserScramCredentialUpsertion u) throws 
InvalidKeyException, NoSuchAlgorithmException {
+        AlterUserScramCredentialsRequestData.ScramCredentialUpsertion retval = 
new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion();
+        return retval.setName(u.getUser())
+                .setMechanism(u.getInfo().getMechanism().getType())
+                .setIterations(u.getInfo().getIterations())
+                .setSalt(u.getSalt())
+                
.setSaltedPassword(getSaltedPasword(u.getInfo().getMechanism(), 
u.getPassword(), u.getSalt(), u.getInfo().getIterations()));
+    }
+
+    private static 
AlterUserScramCredentialsRequestData.ScramCredentialDeletion 
getScramCredentialDeletion(UserScramCredentialDeletion d) {
+        return new 
AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(d.getUser()).setMechanism(d.getMechanism().getType());
+    }
+
+    private static byte[] getSaltedPasword(ScramMechanism 
publicScramMechanism, byte[] password, byte[] salt, int iterations) throws 
NoSuchAlgorithmException, InvalidKeyException {
+        return new 
ScramFormatter(org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(publicScramMechanism.getMechanismName()))
+                .hi(password, salt, iterations);

Review comment:
       Yeah, I think there is a fundamental problem with allowing the value 
`-1` for iterations.  The KIP says "`Note that if the number of iterations is 
set to -1, the server-side default will be used.`". But we are on the client at 
this point in the code, and there is no concept for "server-side default" for 
SCRAM iterations in Kafka.  And unfortunately since we are salting the password 
here, we need to know the number of iterations.  So I think we need to do 
either of the following:
   
   1. Add an ability to define server-side default number of iterations per 
SASL/SCRAM mechanism in Kafka and allow clients to learn them.
   2. Get rid of `-1` as a special value.
   
   It's pretty clear to me that (2) is the way to go, but @rajinisivaram and 
@cmccabe please chime in.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ScramMechanism.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+/**
+ * Representation of a SASL/SCRAM Mechanism.
+ *
+ * @see <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API";>KIP-554:
 Add Broker-side SCRAM Config API</a>
+ */
+public enum ScramMechanism {
+    UNKNOWN((byte) 0),
+    SCRAM_SHA_256((byte) 1),
+    SCRAM_SHA_512((byte) 2);
+
+    /**
+     *
+     * @param type the type indicator
+     * @return the instance corresponding to the given type indicator, 
otherwise {@link #UNKNOWN}
+     */
+    public static ScramMechanism fromType(byte type) {
+        for (ScramMechanism scramMechanism : ScramMechanism.values()) {
+            if (scramMechanism.type == type) {
+                return scramMechanism;
+            }
+        }
+        return UNKNOWN;
+    }
+
+    /**
+     *
+     * @param mechanismName the SASL SCRAM mechanism name
+     * @return the corresponding SASL SCRAM mechanism enum, otherwise {@link 
#UNKNOWN}
+     * @see <a href="https://tools.ietf.org/html/rfc5802#section-4>
+     *     Salted Challenge Response Authentication Mechanism (SCRAM) SASL and 
GSS-API Mechanisms, Section 4</a>
+     */
+    public static ScramMechanism fromMechanismName(String mechanismName) {
+        ScramMechanism retvalFoundMechanism = 
ScramMechanism.valueOf(mechanismName.replace('-', '_'));
+        return retvalFoundMechanism != null ? retvalFoundMechanism : UNKNOWN;
+    }
+
+    /**
+     *
+     * @return the corresponding SASL SCRAM mechanism name
+     * @see <a href="https://tools.ietf.org/html/rfc5802#section-4>
+     *     Salted Challenge Response Authentication Mechanism (SCRAM) SASL and 
GSS-API Mechanisms, Section 4</a>
+     */
+    public String getMechanismName() {

Review comment:
       I made the changes here and also removed all `get` prefixes on getters 
within `ScramCredentialInfo` and 
`UserScramCredential{Alteration,Deletion,Upsertion,Description}`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to