michaeljmarshall commented on code in PR #19292:
URL: https://github.com/apache/pulsar/pull/19292#discussion_r1086824187
##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -423,31 +421,65 @@ public void brokerConnected(DirectProxyHandler
directProxyHandler, CommandConnec
// According to auth result, send newConnected or newAuthChallenge command.
private void doAuthentication(AuthData clientData)
throws Exception {
- AuthData brokerData = authState.authenticate(clientData);
- // authentication has completed, will send newConnected command.
- if (authState.isComplete()) {
- clientAuthRole = authState.getAuthRole();
- if (LOG.isDebugEnabled()) {
- LOG.debug("[{}] Client successfully authenticated with {} role
{}",
- remoteAddress, authMethod, clientAuthRole);
+ CompletableFuture<AuthData> authChallengeFuture =
authState.authenticateAsync(clientData);
+ if (authChallengeFuture.isDone()) {
+ if (!authChallengeFuture.isCompletedExceptionally()) {
+ authChallengeSuccessCallback(authChallengeFuture.get());
+ } else {
+ try {
+ authChallengeFuture.get();
+ } catch (ExecutionException e) {
+ authenticationFailedCallback(e.getCause());
+ }
}
+ } else {
+ state = State.Connecting;
+ authChallengeFuture.whenCompleteAsync((authChallenge, throwable)
-> {
+ if (throwable == null) {
+ authChallengeSuccessCallback(authChallenge);
+ } else {
+ authenticationFailedCallback(throwable);
+ }
+ }, ctx.executor());
+ }
+ }
+
+ protected void authenticationFailedCallback(Throwable t) {
+ LOG.warn("[{}] Unable to authenticate: ", remoteAddress, t);
+ final ByteBuf msg = Commands.newError(-1,
ServerError.AuthenticationError, "Failed to authenticate");
+ writeAndFlushAndClose(msg);
+ }
+
+ // Always run in this class's event loop.
+ protected void authChallengeSuccessCallback(AuthData authChallenge) {
+ try {
+ // authentication has completed, will send newConnected command.
+ if (authChallenge == null) {
+ clientAuthRole = authState.getAuthRole();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[{}] Client successfully authenticated with {}
role {}",
+ remoteAddress, authMethod, clientAuthRole);
+ }
- // First connection
- if (this.connectionPool == null || state == State.Connecting) {
- // authentication has completed, will send newConnected
command.
- completeConnect(clientData);
+ // First connection
+ if (this.connectionPool == null || state == State.Connecting) {
+ // authentication has completed, will send newConnected
command.
+ completeConnect();
+ }
+ return;
}
- return;
- }
- // auth not complete, continue auth with client side.
- final ByteBuf msg = Commands.newAuthChallenge(authMethod, brokerData,
protocolVersionToAdvertise);
- writeAndFlush(msg);
- if (LOG.isDebugEnabled()) {
- LOG.debug("[{}] Authentication in progress client by method {}.",
- remoteAddress, authMethod);
+ // auth not complete, continue auth with client side.
+ final ByteBuf msg = Commands.newAuthChallenge(authMethod,
authChallenge, protocolVersionToAdvertise);
+ writeAndFlush(msg);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[{}] Authentication in progress client by method
{}.",
+ remoteAddress, authMethod);
+ }
+ state = State.Connecting;
+ } catch (Exception e) {
Review Comment:
I wrote it this way because we are in a callback and if we miss the
exception, we will leak the connection forever. If we think we should only
catch more specific exceptions, I can update it to catch
`AuthenticationExcpetion` and `RuntimeException`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]