rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r469457068



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) {
         return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
     }
 
+    @Override
+    public DescribeUserScramCredentialsResult 
describeUserScramCredentials(List<String> users, 
DescribeUserScramCredentialsOptions options) {
+        final KafkaFutureImpl<Map<String, UserScramCredentialsDescription>> 
future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        Call call = new Call("describeUserScramCredentials", 
calcDeadlineMs(now, options.timeoutMs()),
+                new ControllerNodeProvider()) {
+            @Override
+            public DescribeUserScramCredentialsRequest.Builder 
createRequest(int timeoutMs) {
+                return new DescribeUserScramCredentialsRequest.Builder(
+                        new 
DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user ->
+                                new 
DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList())));
+            }
+
+            @Override
+            public void handleResponse(AbstractResponse abstractResponse) {
+                DescribeUserScramCredentialsResponse response = 
(DescribeUserScramCredentialsResponse) abstractResponse;
+                Errors error = Errors.forCode(response.data().error());
+                switch (error) {
+                    case NONE:
+                        DescribeUserScramCredentialsResponseData data = 
response.data();
+                        
future.complete(data.userScramCredentials().stream().collect(Collectors.toMap(
+                            
DescribeUserScramCredentialsResponseData.UserScramCredential::name,
+                            userScramCredential -> {
+                                List<ScramCredentialInfo> scramCredentialInfos 
= userScramCredential.credentialInfos().stream().map(
+                                    credentialInfo -> new 
ScramCredentialInfo(ScramMechanism.from(credentialInfo.mechanism()), 
credentialInfo.iterations()))
+                                        .collect(Collectors.toList());
+                                return new 
UserScramCredentialsDescription(userScramCredential.name(), 
scramCredentialInfos);
+                            })));
+                        break;
+                    case NOT_CONTROLLER:
+                        handleNotControllerError(error);
+                        break;
+                    default:
+                        future.completeExceptionally(new ApiError(error, 
response.data().errorMessage()).exception());
+                        break;
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                future.completeExceptionally(throwable);
+            }
+        };
+        runnable.call(call, now);
+        return new DescribeUserScramCredentialsResult(future);
+    }
+
+    @Override
+    public AlterUserScramCredentialsResult 
alterUserScramCredentials(List<UserScramCredentialAlteration> alterations,
+                                                                     
AlterUserScramCredentialsOptions options) {
+        final long now = time.milliseconds();
+        final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>();
+        for (UserScramCredentialAlteration alteration: alterations) {
+            futures.put(alteration.getUser(), new KafkaFutureImpl<>());
+        }
+        final Map<String, Exception> userIllegalAlterationExceptions = new 
HashMap<>();
+        // We need to keep track of users with deletions of an unknown SCRAM 
mechanism
+        alterations.stream().filter(a -> a instanceof 
UserScramCredentialDeletion).forEach(alteration -> {
+            UserScramCredentialDeletion deletion = 
(UserScramCredentialDeletion) alteration;
+            ScramMechanism mechanism = deletion.getMechanism();
+            if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) {
+                userIllegalAlterationExceptions.put(deletion.getUser(), new 
IllegalArgumentException("Unknown SCRAM mechanism"));
+            }
+        });
+        // Creating an upsertion may throw InvalidKeyException or 
NoSuchAlgorithmException,
+        // so keep track of which users are affected by such a failure and 
immediately fail all their alterations
+        final Map<String, Map<ScramMechanism, 
AlterUserScramCredentialsRequestData.ScramCredentialUpsertion>> userInsertions 
= new HashMap<>();
+        alterations.stream().filter(a -> a instanceof 
UserScramCredentialUpsertion)
+                .filter(alteration -> 
!userIllegalAlterationExceptions.containsKey(alteration.getUser()))
+                .forEach(alteration -> {
+                    UserScramCredentialUpsertion upsertion = 
(UserScramCredentialUpsertion) alteration;
+                    String user = upsertion.getUser();
+                    try {
+                        ScramMechanism mechanism = 
upsertion.getInfo().getMechanism();
+                        if (mechanism == null || mechanism == 
ScramMechanism.UNKNOWN)
+                            throw new IllegalArgumentException("Unknown SCRAM 
mechanism");
+                        userInsertions.putIfAbsent(user, new HashMap<>());
+                        userInsertions.get(user).put(mechanism, 
getScramCredentialUpsertion(upsertion));
+                    } catch (Exception e) {
+                        // we might overwrite aa exception from a previous 
upsertion, but we don't really care
+                        // since we just needs to mark this user as having at 
least one illegal alteration
+                        // and make an exception instance available for 
completing the corresponding future exceptionally
+                        userIllegalAlterationExceptions.put(user, e);

Review comment:
       Actually, we also have to deal with too few/too many iterations as well 
as empty username.  These, along with empty password, are a class unto 
themselves -- unacceptable credentials.  So I'm abandoning the specific 
"`UnacceptablePasswordException`/`Errors.UNACCEPTABLE_PASSWORD`" in favor of 
"`UnacceptableCredentialException`/`Errors.UNACCEPTABLE_CREDENTIAL`"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to