cmccabe commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r473276446
########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) { return new AlterClientQuotasResult(Collections.unmodifiableMap(futures)); } + @Override + public DescribeUserScramCredentialsResult describeUserScramCredentials(List<String> users, DescribeUserScramCredentialsOptions options) { + final KafkaFutureImpl<Map<String, UserScramCredentialsDescription>> future = new KafkaFutureImpl<>(); + final long now = time.milliseconds(); + Call call = new Call("describeUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), + new ControllerNodeProvider()) { + @Override + public DescribeUserScramCredentialsRequest.Builder createRequest(int timeoutMs) { + return new DescribeUserScramCredentialsRequest.Builder( + new DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user -> + new DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList()))); + } + + @Override + public void handleResponse(AbstractResponse abstractResponse) { + DescribeUserScramCredentialsResponse response = (DescribeUserScramCredentialsResponse) abstractResponse; + Errors error = Errors.forCode(response.data().error()); + switch (error) { + case NONE: + DescribeUserScramCredentialsResponseData data = response.data(); + future.complete(data.userScramCredentials().stream().collect(Collectors.toMap( + DescribeUserScramCredentialsResponseData.UserScramCredential::name, + userScramCredential -> { + List<ScramCredentialInfo> scramCredentialInfos = userScramCredential.credentialInfos().stream().map( + credentialInfo -> new ScramCredentialInfo(ScramMechanism.from(credentialInfo.mechanism()), credentialInfo.iterations())) + .collect(Collectors.toList()); + return new UserScramCredentialsDescription(userScramCredential.name(), scramCredentialInfos); + }))); + break; + case NOT_CONTROLLER: + handleNotControllerError(error); + break; + default: + future.completeExceptionally(new ApiError(error, response.data().errorMessage()).exception()); + break; + } + } + + @Override + void handleFailure(Throwable throwable) { + future.completeExceptionally(throwable); + } + }; + runnable.call(call, now); + return new DescribeUserScramCredentialsResult(future); + } + + @Override + public AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScramCredentialAlteration> alterations, + AlterUserScramCredentialsOptions options) { + final long now = time.milliseconds(); + final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>(); + for (UserScramCredentialAlteration alteration: alterations) { + futures.put(alteration.getUser(), new KafkaFutureImpl<>()); + } + final Map<String, Exception> userIllegalAlterationExceptions = new HashMap<>(); + // We need to keep track of users with deletions of an unknown SCRAM mechanism + alterations.stream().filter(a -> a instanceof UserScramCredentialDeletion).forEach(alteration -> { + UserScramCredentialDeletion deletion = (UserScramCredentialDeletion) alteration; + ScramMechanism mechanism = deletion.getMechanism(); + if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) { + userIllegalAlterationExceptions.put(deletion.getUser(), new IllegalArgumentException("Unknown SCRAM mechanism")); Review comment: ok ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org