rajinisivaram commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r467848781



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ScramMechanism.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+/**
+ * Representation of a SASL/SCRAM Mechanism.
+ *
+ * @see <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API";>KIP-554:
 Add Broker-side SCRAM Config API</a>
+ */
+public enum ScramMechanism {
+    UNKNOWN((byte) 0),
+    SCRAM_SHA_256((byte) 1),
+    SCRAM_SHA_512((byte) 2);
+
+    /**
+     *
+     * @param type the type indicator
+     * @return the instance corresponding to the given type indicator, 
otherwise {@link #UNKNOWN}
+     */
+    public static ScramMechanism fromType(byte type) {
+        for (ScramMechanism scramMechanism : ScramMechanism.values()) {
+            if (scramMechanism.type == type) {
+                return scramMechanism;
+            }
+        }
+        return UNKNOWN;
+    }
+
+    /**
+     *
+     * @param mechanismName the SASL SCRAM mechanism name
+     * @return the corresponding SASL SCRAM mechanism enum, otherwise {@link 
#UNKNOWN}
+     * @see <a href="https://tools.ietf.org/html/rfc5802#section-4>
+     *     Salted Challenge Response Authentication Mechanism (SCRAM) SASL and 
GSS-API Mechanisms, Section 4</a>
+     */
+    public static ScramMechanism fromMechanismName(String mechanismName) {
+        ScramMechanism retvalFoundMechanism = 
ScramMechanism.valueOf(mechanismName.replace('-', '_'));
+        return retvalFoundMechanism != null ? retvalFoundMechanism : UNKNOWN;
+    }
+
+    /**
+     *
+     * @return the corresponding SASL SCRAM mechanism name
+     * @see <a href="https://tools.ietf.org/html/rfc5802#section-4>
+     *     Salted Challenge Response Authentication Mechanism (SCRAM) SASL and 
GSS-API Mechanisms, Section 4</a>
+     */
+    public String getMechanismName() {

Review comment:
       We don't use `get` prefix elsewhere, just `mechanismName()`?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1214,6 +1215,64 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection<ClientQuotaAlterati
      */
     AlterClientQuotasResult 
alterClientQuotas(Collection<ClientQuotaAlteration> entries, 
AlterClientQuotasOptions options);
 
+    /**
+     * <p>Describe all SASL/SCRAM credentials.
+     *
+     * <p>This is a convenience method for {@link 
#describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)}
+     *
+     * @return The DescribeUserScramCredentialsResult.
+     */
+    default DescribeUserScramCredentialsResult describeUserScramCredentials() {
+        return describeUserScramCredentials(null, new 
DescribeUserScramCredentialsOptions());
+    }
+
+    /**
+     * <p>Describe SASL/SCRAM credentials for the given users.
+     *
+     * <p>This is a convenience method for {@link 
#describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)}
+     *
+     * @param users the users for which credentials are to be described; all 
users' credentials are described if null
+     *              or empty.  A user explicitly specified here that does not 
have a SCRAM credential will not appear
+     *              in the results.

Review comment:
       Should we throw an exception for users which don't exist to be 
consistent with other APIs?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/UserScramCredentialsDescription.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Representation of all SASL/SCRAM credentials associated with a user that 
can be retrieved.
+ *
+ * @see <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API";>KIP-554:
 Add Broker-side SCRAM Config API</a>
+ */
+public class UserScramCredentialsDescription {
+    private final String name;
+    private final List<ScramCredentialInfo> infos;
+
+    /**
+     *
+     * @param name the required user name
+     * @param infos the required SASL/SCRAM credential representations for the 
user
+     */
+    public UserScramCredentialsDescription(String name, 
List<ScramCredentialInfo> infos) {
+        this.name = Objects.requireNonNull(name);
+        this.infos = Collections.unmodifiableList(new 
ArrayList<>(Objects.requireNonNull(infos)));
+    }
+
+    /**
+     *
+     * @return the user name
+     */
+    public String getName() {

Review comment:
       Remove `get` prefix

##########
File path: 
core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala
##########
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+
+import java.nio.charset.StandardCharsets
+import java.util
+import java.util.Properties
+
+import kafka.network.SocketServer
+import kafka.security.authorizer.AclAuthorizer
+import org.apache.kafka.clients.admin.ScramMechanism
+import org.apache.kafka.common.acl.AclOperation
+import org.apache.kafka.common.message.{AlterUserScramCredentialsRequestData, 
DescribeUserScramCredentialsRequestData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterUserScramCredentialsRequest, 
AlterUserScramCredentialsResponse, DescribeUserScramCredentialsRequest, 
DescribeUserScramCredentialsResponse}
+import org.apache.kafka.common.resource.ResourceType
+import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal, KafkaPrincipalBuilder}
+import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, 
AuthorizationResult}
+import org.junit.Assert._
+import org.junit.{Before, Test}
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * Test AlterUserScramCredentialsRequest/Response API for the cases where 
either no credentials are altered
+ * or failure is expected due to lack of authorization, sending the request to 
a non-controller broker, or some other issue.
+ * Also tests the Alter and Describe APIs for the case where credentials are 
successfully altered/described.
+ */
+class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
+    properties.put(KafkaConfig.ControlledShutdownEnableProp, "false")
+    properties.put(KafkaConfig.AuthorizerClassNameProp, 
classOf[AlterCredentialsTest.TestAuthorizer].getName)
+    properties.put(KafkaConfig.PrincipalBuilderClassProp, 
classOf[AlterCredentialsTest.TestPrincipalBuilder].getName)
+  }
+
+  @Before
+  override def setUp(): Unit = {
+    AlterCredentialsTest.principal = KafkaPrincipal.ANONYMOUS // default is to 
be authorized
+    super.setUp()
+  }
+
+  @Test
+  def testAlterNothing(): Unit = {
+    val request = new AlterUserScramCredentialsRequest.Builder(
+      new AlterUserScramCredentialsRequestData()
+        .setDeletions(new 
util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialDeletion])
+        .setUpsertions(new 
util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialUpsertion])).build()
+    val response = sendAlterUserScramCredentialsRequest(request)
+
+    val results = response.data.results
+    assertEquals(0, results.size)
+  }
+
+  @Test
+  def testAlterNothingNotAuthorized(): Unit = {
+    AlterCredentialsTest.principal = AlterCredentialsTest.UnauthorizedPrincipal
+
+    val request = new AlterUserScramCredentialsRequest.Builder(
+      new AlterUserScramCredentialsRequestData()
+        .setDeletions(new 
util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialDeletion])
+        .setUpsertions(new 
util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialUpsertion])).build()
+    val response = sendAlterUserScramCredentialsRequest(request)
+
+    val results = response.data.results
+    assertEquals(0, results.size)
+  }
+
+  @Test
+  def testAlterSomethingNotAuthorized(): Unit = {
+    AlterCredentialsTest.principal = AlterCredentialsTest.UnauthorizedPrincipal
+
+    val request = new AlterUserScramCredentialsRequest.Builder(
+      new AlterUserScramCredentialsRequestData()
+        .setDeletions(util.Arrays.asList(new 
AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName("name1").setMechanism(ScramMechanism.SCRAM_SHA_256.getType)))
+        .setUpsertions(util.Arrays.asList(new 
AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName("name2").setMechanism(ScramMechanism.SCRAM_SHA_512.getType)))).build()
+    val response = sendAlterUserScramCredentialsRequest(request)
+
+    val results = response.data.results
+    assertEquals(2, results.size)
+    assertTrue("Expected not authorized",
+      results.get(0).errorCode == Errors.CLUSTER_AUTHORIZATION_FAILED.code && 
results.get(1).errorCode == Errors.CLUSTER_AUTHORIZATION_FAILED.code)
+  }
+
+  @Test
+  def testAlterSameThingTwice(): Unit = {
+    val deletion1 = new 
AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName("name1").setMechanism(ScramMechanism.SCRAM_SHA_256.getType)
+    val deletion2 = new 
AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName("name2").setMechanism(ScramMechanism.SCRAM_SHA_256.getType)
+    val upsertion1 = new 
AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName("name1").setMechanism(ScramMechanism.SCRAM_SHA_256.getType)
+      
.setIterations(-1).setSalt("salt".getBytes).setSaltedPassword("saltedPassword".getBytes)
+    val upsertion2 = new 
AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName("name2").setMechanism(ScramMechanism.SCRAM_SHA_256.getType)
+      
.setIterations(-1).setSalt("salt".getBytes).setSaltedPassword("saltedPassword".getBytes)
+    val requests = List (
+      new AlterUserScramCredentialsRequest.Builder(
+        new AlterUserScramCredentialsRequestData()
+          .setDeletions(util.Arrays.asList(deletion1, deletion1))
+          .setUpsertions(util.Arrays.asList(upsertion2, upsertion2))).build(),
+      new AlterUserScramCredentialsRequest.Builder(
+        new AlterUserScramCredentialsRequestData()
+          .setDeletions(util.Arrays.asList(deletion1, deletion2))
+          .setUpsertions(util.Arrays.asList(upsertion1, upsertion2))).build(),
+    )
+    requests.foreach(request => {
+      val response = sendAlterUserScramCredentialsRequest(request)
+      val results = response.data.results
+      assertEquals(2, results.size)
+      assertTrue("Expected error when altering the same credential twice in a 
single request",
+        results.get(0).errorCode == Errors.INVALID_REQUEST.code && 
results.get(1).errorCode == Errors.INVALID_REQUEST.code)
+    })
+  }
+
+  @Test
+  def testAlterEmptyUser(): Unit = {

Review comment:
       Do we test for empty password?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) {
         return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
     }
 
+    @Override
+    public DescribeUserScramCredentialsResult 
describeUserScramCredentials(List<String> users, 
DescribeUserScramCredentialsOptions options) {
+        final KafkaFutureImpl<Map<String, UserScramCredentialsDescription>> 
future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        Call call = new Call("describeUserScramCredentials", 
calcDeadlineMs(now, options.timeoutMs()),
+                new ControllerNodeProvider()) {
+            @Override
+            public DescribeUserScramCredentialsRequest.Builder 
createRequest(int timeoutMs) {
+                return new DescribeUserScramCredentialsRequest.Builder(
+                        new 
DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user ->
+                                new 
DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList())));
+            }
+
+            @Override
+            public void handleResponse(AbstractResponse abstractResponse) {
+                DescribeUserScramCredentialsResponse response = 
(DescribeUserScramCredentialsResponse) abstractResponse;
+                Errors error = Errors.forCode(response.data().error());
+                switch (error) {
+                    case NONE:
+                        DescribeUserScramCredentialsResponseData data = 
response.data();
+                        
future.complete(data.userScramCredentials().stream().collect(Collectors.toMap(
+                            
DescribeUserScramCredentialsResponseData.UserScramCredential::name,
+                            userScramCredential -> {
+                                List<ScramCredentialInfo> scramCredentialInfos 
= userScramCredential.credentialInfos().stream().map(
+                                    credentialInfo -> new 
ScramCredentialInfo(ScramMechanism.fromType(credentialInfo.mechanism()), 
credentialInfo.iterations()))
+                                        .collect(Collectors.toList());
+                                return new 
UserScramCredentialsDescription(userScramCredential.name(), 
scramCredentialInfos);
+                            })));
+                        break;
+                    case NOT_CONTROLLER:
+                        handleNotControllerError(error);
+                        break;
+                    default:
+                        future.completeExceptionally(new ApiError(error, 
response.data().errorMessage()).exception());
+                        break;
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                future.completeExceptionally(throwable);
+            }
+        };
+        runnable.call(call, now);
+        return new DescribeUserScramCredentialsResult(future);
+    }
+
+    @Override
+    public AlterUserScramCredentialsResult 
alterUserScramCredentials(List<UserScramCredentialAlteration> alterations,
+                                                                     
AlterUserScramCredentialsOptions options) {
+        final long now = time.milliseconds();
+        final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>();
+        for (UserScramCredentialAlteration alteration: alterations) {
+            futures.put(alteration.getUser(), new KafkaFutureImpl<>());
+        }
+        final Map<String, Exception> userIllegalAlterationExceptions = new 
HashMap<>();
+        // We need to keep track of users with deletions of an unknown SCRAM 
mechanism
+        alterations.stream().filter(a -> a instanceof 
UserScramCredentialDeletion).forEach(alteration -> {
+            UserScramCredentialDeletion deletion = 
(UserScramCredentialDeletion) alteration;
+            ScramMechanism mechanism = deletion.getMechanism();
+            if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) {
+                userIllegalAlterationExceptions.put(deletion.getUser(), new 
InvalidRequestException("Unknown SCRAM mechanism"));
+            }
+        });
+        // Creating an upsertion may throw InvalidKeyException or 
NoSuchAlgorithmException,
+        // so keep track of which users are affected by such a failure and 
immediately fail all their alterations
+        final Map<String, Map<ScramMechanism, 
AlterUserScramCredentialsRequestData.ScramCredentialUpsertion>> userInsertions 
= new HashMap<>();
+        alterations.stream().filter(a -> a instanceof 
UserScramCredentialUpsertion)
+                .filter(alteration -> 
!userIllegalAlterationExceptions.containsKey(alteration.getUser()))
+                .forEach(alteration -> {
+                    UserScramCredentialUpsertion upsertion = 
(UserScramCredentialUpsertion) alteration;
+                    String user = upsertion.getUser();
+                    try {
+                        ScramMechanism mechanism = 
upsertion.getInfo().getMechanism();
+                        if (mechanism == null || mechanism == 
ScramMechanism.UNKNOWN)
+                            throw new InvalidRequestException("Unknown SCRAM 
mechanism");
+                        userInsertions.putIfAbsent(user, new HashMap<>());
+                        userInsertions.get(user).put(mechanism, 
getScramCredentialUpsertion(upsertion));
+                    } catch (Exception e) {
+                        // we might overwrite an exception from a previous 
upsertion, but we don't really care
+                        // since we just need to mark this user as having at 
least one illegal alteration
+                        // and make an exception instance available for 
completing the corresponding future exceptionally
+                        userIllegalAlterationExceptions.put(user, e);
+                    }
+                });
+        // fail any users immediately that have an illegal alteration as 
identified above
+        userIllegalAlterationExceptions.entrySet().stream().forEach(entry -> {
+            
futures.get(entry.getKey()).completeExceptionally(entry.getValue());
+        });
+
+        // submit alterations for users that do not have an illegal upsertion 
as identified above
+        Call call = new Call("alterUserScramCredentials", calcDeadlineMs(now, 
options.timeoutMs()),
+                new ControllerNodeProvider()) {
+            @Override
+            public AlterUserScramCredentialsRequest.Builder createRequest(int 
timeoutMs) {
+                return new AlterUserScramCredentialsRequest.Builder(
+                        new 
AlterUserScramCredentialsRequestData().setUpsertions(alterations.stream()
+                                .filter(a -> a instanceof 
UserScramCredentialUpsertion)
+                                .filter(a -> 
!userIllegalAlterationExceptions.containsKey(a.getUser()))
+                                .map(a -> 
userInsertions.get(a.getUser()).get(((UserScramCredentialUpsertion) 
a).getInfo().getMechanism()))
+                                .collect(Collectors.toList()))
+                        .setDeletions(alterations.stream()
+                                .filter(a -> a instanceof 
UserScramCredentialDeletion)
+                                .filter(a -> 
!userIllegalAlterationExceptions.containsKey(a.getUser()))
+                                .map(d ->
+                                
getScramCredentialDeletion((UserScramCredentialDeletion) 
d)).collect(Collectors.toList())));
+            }
+
+            @Override
+            public void handleResponse(AbstractResponse abstractResponse) {
+                AlterUserScramCredentialsResponse response = 
(AlterUserScramCredentialsResponse) abstractResponse;
+                // Check for controller change
+                for (Errors error : response.errorCounts().keySet()) {
+                    if (error == Errors.NOT_CONTROLLER) {
+                        handleNotControllerError(error);
+                    }
+                }
+                response.data().results().forEach(result -> {
+                    KafkaFutureImpl<Void> future = futures.get(result.user());
+                    if (future == null) {
+                        log.warn("Server response mentioned unknown user {}", 
result.user());
+                    } else {
+                        Errors error = Errors.forCode(result.errorCode());
+                        if (error != Errors.NONE) {
+                            
future.completeExceptionally(error.exception(result.errorMessage()));
+                        } else {
+                            future.complete(null);
+                        }
+                    }
+                });
+                completeUnrealizedFutures(
+                    futures.entrySet().stream(),
+                    user -> "The broker response did not contain a result for 
user " + user);
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(futures.values(), throwable);
+            }
+        };
+        runnable.call(call, now);
+        return new AlterUserScramCredentialsResult(new HashMap<>(futures));
+    }
+
+    private static 
AlterUserScramCredentialsRequestData.ScramCredentialUpsertion 
getScramCredentialUpsertion(UserScramCredentialUpsertion u) throws 
InvalidKeyException, NoSuchAlgorithmException {
+        AlterUserScramCredentialsRequestData.ScramCredentialUpsertion retval = 
new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion();
+        return retval.setName(u.getUser())
+                .setMechanism(u.getInfo().getMechanism().getType())
+                .setIterations(u.getInfo().getIterations())
+                .setSalt(u.getSalt())
+                
.setSaltedPassword(getSaltedPasword(u.getInfo().getMechanism(), 
u.getPassword(), u.getSalt(), u.getInfo().getIterations()));
+    }
+
+    private static 
AlterUserScramCredentialsRequestData.ScramCredentialDeletion 
getScramCredentialDeletion(UserScramCredentialDeletion d) {
+        return new 
AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(d.getUser()).setMechanism(d.getMechanism().getType());
+    }
+
+    private static byte[] getSaltedPasword(ScramMechanism 
publicScramMechanism, byte[] password, byte[] salt, int iterations) throws 
NoSuchAlgorithmException, InvalidKeyException {
+        return new 
ScramFormatter(org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(publicScramMechanism.getMechanismName()))
+                .hi(password, salt, iterations);

Review comment:
       Iterations can be -1 here? Won't we end up sending a password without 
applying the salt properly?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ScramMechanism.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+/**
+ * Representation of a SASL/SCRAM Mechanism.
+ *
+ * @see <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API";>KIP-554:
 Add Broker-side SCRAM Config API</a>
+ */
+public enum ScramMechanism {
+    UNKNOWN((byte) 0),
+    SCRAM_SHA_256((byte) 1),
+    SCRAM_SHA_512((byte) 2);
+
+    /**
+     *
+     * @param type the type indicator
+     * @return the instance corresponding to the given type indicator, 
otherwise {@link #UNKNOWN}
+     */
+    public static ScramMechanism fromType(byte type) {
+        for (ScramMechanism scramMechanism : ScramMechanism.values()) {
+            if (scramMechanism.type == type) {
+                return scramMechanism;
+            }
+        }
+        return UNKNOWN;
+    }
+
+    /**
+     *
+     * @param mechanismName the SASL SCRAM mechanism name
+     * @return the corresponding SASL SCRAM mechanism enum, otherwise {@link 
#UNKNOWN}
+     * @see <a href="https://tools.ietf.org/html/rfc5802#section-4>
+     *     Salted Challenge Response Authentication Mechanism (SCRAM) SASL and 
GSS-API Mechanisms, Section 4</a>
+     */
+    public static ScramMechanism fromMechanismName(String mechanismName) {
+        ScramMechanism retvalFoundMechanism = 
ScramMechanism.valueOf(mechanismName.replace('-', '_'));
+        return retvalFoundMechanism != null ? retvalFoundMechanism : UNKNOWN;
+    }
+
+    /**
+     *
+     * @return the corresponding SASL SCRAM mechanism name
+     * @see <a href="https://tools.ietf.org/html/rfc5802#section-4>
+     *     Salted Challenge Response Authentication Mechanism (SCRAM) SASL and 
GSS-API Mechanisms, Section 4</a>
+     */
+    public String getMechanismName() {
+        return this.mechanismName;
+    }
+
+    /**
+     *
+     * @return the type indicator for this SASL SCRAM mechanism
+     */
+    public byte getType() {

Review comment:
       As before, `type()`?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ScramCredentialInfo.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+
+/**
+ * Mechanism and iterations for a SASL/SCRAM credential associated with a user.
+ *
+ * @see <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API";>KIP-554:
 Add Broker-side SCRAM Config API</a>
+ */
+public class ScramCredentialInfo {
+    private final ScramMechanism mechanism;
+    private final int iterations;
+
+    /**
+     *
+     * @param mechanism the required mechanism
+     * @param iterations the number of iterations used when creating the 
credential
+     */
+    public ScramCredentialInfo(ScramMechanism mechanism, int iterations) {
+        this.mechanism = Objects.requireNonNull(mechanism);
+        this.iterations = iterations;
+    }
+
+    /**
+     *
+     * @return the mechanism
+     */
+    public ScramMechanism getMechanism() {
+        return mechanism;
+    }
+
+    /**
+     *
+     * @return the number of iterations used when creating the credential

Review comment:
       or -1?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/UserScramCredentialsDescription.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Representation of all SASL/SCRAM credentials associated with a user that 
can be retrieved.
+ *
+ * @see <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API";>KIP-554:
 Add Broker-side SCRAM Config API</a>
+ */
+public class UserScramCredentialsDescription {
+    private final String name;
+    private final List<ScramCredentialInfo> infos;
+
+    /**
+     *
+     * @param name the required user name
+     * @param infos the required SASL/SCRAM credential representations for the 
user
+     */
+    public UserScramCredentialsDescription(String name, 
List<ScramCredentialInfo> infos) {
+        this.name = Objects.requireNonNull(name);
+        this.infos = Collections.unmodifiableList(new 
ArrayList<>(Objects.requireNonNull(infos)));
+    }
+
+    /**
+     *
+     * @return the user name
+     */
+    public String getName() {
+        return name;
+    }
+
+    /**
+     *
+     * @return the unmodifiable list of SASL/SCRAM credential representations 
for the user
+     */
+    public List<ScramCredentialInfo> getInfos() {

Review comment:
       `credentialInfos()`?

##########
File path: 
core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala
##########
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+
+import java.nio.charset.StandardCharsets
+import java.util
+import java.util.Properties
+
+import kafka.network.SocketServer
+import kafka.security.authorizer.AclAuthorizer
+import org.apache.kafka.clients.admin.ScramMechanism
+import org.apache.kafka.common.acl.AclOperation
+import org.apache.kafka.common.message.{AlterUserScramCredentialsRequestData, 
DescribeUserScramCredentialsRequestData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterUserScramCredentialsRequest, 
AlterUserScramCredentialsResponse, DescribeUserScramCredentialsRequest, 
DescribeUserScramCredentialsResponse}
+import org.apache.kafka.common.resource.ResourceType
+import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal, KafkaPrincipalBuilder}
+import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, 
AuthorizationResult}
+import org.junit.Assert._
+import org.junit.{Before, Test}
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * Test AlterUserScramCredentialsRequest/Response API for the cases where 
either no credentials are altered
+ * or failure is expected due to lack of authorization, sending the request to 
a non-controller broker, or some other issue.
+ * Also tests the Alter and Describe APIs for the case where credentials are 
successfully altered/described.
+ */
+class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
+    properties.put(KafkaConfig.ControlledShutdownEnableProp, "false")
+    properties.put(KafkaConfig.AuthorizerClassNameProp, 
classOf[AlterCredentialsTest.TestAuthorizer].getName)
+    properties.put(KafkaConfig.PrincipalBuilderClassProp, 
classOf[AlterCredentialsTest.TestPrincipalBuilder].getName)
+  }
+
+  @Before
+  override def setUp(): Unit = {
+    AlterCredentialsTest.principal = KafkaPrincipal.ANONYMOUS // default is to 
be authorized
+    super.setUp()
+  }
+
+  @Test
+  def testAlterNothing(): Unit = {
+    val request = new AlterUserScramCredentialsRequest.Builder(
+      new AlterUserScramCredentialsRequestData()
+        .setDeletions(new 
util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialDeletion])
+        .setUpsertions(new 
util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialUpsertion])).build()
+    val response = sendAlterUserScramCredentialsRequest(request)
+
+    val results = response.data.results
+    assertEquals(0, results.size)
+  }
+
+  @Test
+  def testAlterNothingNotAuthorized(): Unit = {
+    AlterCredentialsTest.principal = AlterCredentialsTest.UnauthorizedPrincipal
+
+    val request = new AlterUserScramCredentialsRequest.Builder(
+      new AlterUserScramCredentialsRequestData()
+        .setDeletions(new 
util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialDeletion])
+        .setUpsertions(new 
util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialUpsertion])).build()
+    val response = sendAlterUserScramCredentialsRequest(request)
+
+    val results = response.data.results
+    assertEquals(0, results.size)
+  }
+
+  @Test
+  def testAlterSomethingNotAuthorized(): Unit = {
+    AlterCredentialsTest.principal = AlterCredentialsTest.UnauthorizedPrincipal
+
+    val request = new AlterUserScramCredentialsRequest.Builder(
+      new AlterUserScramCredentialsRequestData()
+        .setDeletions(util.Arrays.asList(new 
AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName("name1").setMechanism(ScramMechanism.SCRAM_SHA_256.getType)))
+        .setUpsertions(util.Arrays.asList(new 
AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName("name2").setMechanism(ScramMechanism.SCRAM_SHA_512.getType)))).build()
+    val response = sendAlterUserScramCredentialsRequest(request)
+
+    val results = response.data.results
+    assertEquals(2, results.size)
+    assertTrue("Expected not authorized",
+      results.get(0).errorCode == Errors.CLUSTER_AUTHORIZATION_FAILED.code && 
results.get(1).errorCode == Errors.CLUSTER_AUTHORIZATION_FAILED.code)
+  }
+
+  @Test
+  def testAlterSameThingTwice(): Unit = {
+    val deletion1 = new 
AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName("name1").setMechanism(ScramMechanism.SCRAM_SHA_256.getType)
+    val deletion2 = new 
AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName("name2").setMechanism(ScramMechanism.SCRAM_SHA_256.getType)
+    val upsertion1 = new 
AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName("name1").setMechanism(ScramMechanism.SCRAM_SHA_256.getType)
+      
.setIterations(-1).setSalt("salt".getBytes).setSaltedPassword("saltedPassword".getBytes)
+    val upsertion2 = new 
AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName("name2").setMechanism(ScramMechanism.SCRAM_SHA_256.getType)
+      
.setIterations(-1).setSalt("salt".getBytes).setSaltedPassword("saltedPassword".getBytes)
+    val requests = List (
+      new AlterUserScramCredentialsRequest.Builder(
+        new AlterUserScramCredentialsRequestData()
+          .setDeletions(util.Arrays.asList(deletion1, deletion1))
+          .setUpsertions(util.Arrays.asList(upsertion2, upsertion2))).build(),
+      new AlterUserScramCredentialsRequest.Builder(
+        new AlterUserScramCredentialsRequestData()
+          .setDeletions(util.Arrays.asList(deletion1, deletion2))
+          .setUpsertions(util.Arrays.asList(upsertion1, upsertion2))).build(),
+    )
+    requests.foreach(request => {
+      val response = sendAlterUserScramCredentialsRequest(request)
+      val results = response.data.results
+      assertEquals(2, results.size)
+      assertTrue("Expected error when altering the same credential twice in a 
single request",
+        results.get(0).errorCode == Errors.INVALID_REQUEST.code && 
results.get(1).errorCode == Errors.INVALID_REQUEST.code)

Review comment:
       assertEquals so we know what the error was if test fails? (mutliple 
places)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to