FlorianHockmann commented on code in PR #2525:
URL: https://github.com/apache/tinkerpop/pull/2525#discussion_r1540816027


##########
gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java:
##########
@@ -27,8 +27,7 @@
 import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
 import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;

Review Comment:
   Please revert this change. [Our dev docs explicitly mention that TinkerPop 
doesn't use wildcard 
imports](https://tinkerpop.apache.org/docs/current/dev/developer/#_code_style).



##########
gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java:
##########
@@ -218,9 +266,19 @@ public void 
shouldAuthenticateAndWorkWithVariablesOverGraphSONV1Serialization()
 
     private static void assertConnection(final Cluster cluster, final Client 
client) throws InterruptedException, ExecutionException {

Review Comment:
   This method is used in 4 different tests, such as 
`shouldAuthenticateWithPlainText`. These 4 tests will now fail if submitting 
multiple requests initially in parallel isn't working.
   I think it would be good if we could keep these tests as simple as possible 
so they don't include parallelization of initial requests. A test like 
`shouldAuthenticateWithPlainText` should really only fail if _authenticate with 
plain text_ isn't working, not if submitting multiple requests in parallel 
isn't working.
   
   Long story short, I think it would be good if you could revert the changes 
to this method and instead write a new test specifically for the 
parallelization issue.



##########
gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java:
##########
@@ -75,106 +79,159 @@ public SaslAuthenticationHandler(final Authenticator 
authenticator, final Author
 
     @Override
     public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
throws Exception {
-        if (msg instanceof RequestMessage){
-            final RequestMessage requestMessage = (RequestMessage) msg;
-
-            final Attribute<Authenticator.SaslNegotiator> negotiator = 
((AttributeMap) ctx).attr(StateKey.NEGOTIATOR);
-            final Attribute<RequestMessage> request = ((AttributeMap) 
ctx).attr(StateKey.REQUEST_MESSAGE);
-            if (negotiator.get() == null) {
-                try {
-                    // First time through so save the request and send an 
AUTHENTICATE challenge with no data
-                    
negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx)));
-                    request.set(requestMessage);
-                    final ResponseMessage authenticate = 
ResponseMessage.build(requestMessage)
-                            .code(ResponseStatusCode.AUTHENTICATE).create();
-                    ctx.writeAndFlush(authenticate);
-                } catch (Exception ex) {
-                    // newSaslNegotiator can cause troubles - if we don't 
catch and respond nicely the driver seems
-                    // to hang until timeout which isn't so nice. treating 
this like a server error as it means that
-                    // the Authenticator isn't really ready to deal with 
requests for some reason.
-                    logger.error(String.format("%s is not ready to handle 
requests - check its configuration or related services",
-                            authenticator.getClass().getSimpleName()), ex);
-
-                    final ResponseMessage error = 
ResponseMessage.build(requestMessage)
-                            .statusMessage("Authenticator is not ready to 
handle requests")
-                            .code(ResponseStatusCode.SERVER_ERROR).create();
-                    ctx.writeAndFlush(error);
-                }
-            } else {
-                if (requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION) 
&& requestMessage.getArgs().containsKey(Tokens.ARGS_SASL)) {
-                    
-                    final Object saslObject = 
requestMessage.getArgs().get(Tokens.ARGS_SASL);
-                    final byte[] saslResponse;
-                    
-                    if(saslObject instanceof String) {
-                        saslResponse = BASE64_DECODER.decode((String) 
saslObject);
-                    } else {
-                        final ResponseMessage error = 
ResponseMessage.build(request.get())
-                                .statusMessage("Incorrect type for : " + 
Tokens.ARGS_SASL + " - base64 encoded String is expected")
-                                
.code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST).create();
-                        ctx.writeAndFlush(error);
-                        return;
-                    }
-
-                    try {
-                        final byte[] saslMessage = 
negotiator.get().evaluateResponse(saslResponse);
-                        if (negotiator.get().isComplete()) {
-                            final AuthenticatedUser user = 
negotiator.get().getAuthenticatedUser();
-                            
ctx.channel().attr(StateKey.AUTHENTICATED_USER).set(user);
-                            // User name logged with the remote socket address 
and authenticator classname for audit logging
-                            if (settings.enableAuditLog) {
-                                String address = 
ctx.channel().remoteAddress().toString();
-                                if (address.startsWith("/") && 
address.length() > 1) address = address.substring(1);
-                                final String[] authClassParts = 
authenticator.getClass().toString().split("[.]");
-                                auditLogger.info("User {} with address {} 
authenticated by {}",
-                                        user.getName(), address, 
authClassParts[authClassParts.length - 1]);
-                            }
-                            // If we have got here we are authenticated so 
remove the handler and pass
-                            // the original message down the pipeline for 
processing
-                            ctx.pipeline().remove(this);
-                            final RequestMessage original = request.get();
-                            ctx.fireChannelRead(original);
-                        } else {
-                            // not done here - send back the sasl message for 
next challenge.
-                            final Map<String,Object> metadata = new 
HashMap<>();
-                            metadata.put(Tokens.ARGS_SASL, 
BASE64_ENCODER.encodeToString(saslMessage));
-                            final ResponseMessage authenticate = 
ResponseMessage.build(requestMessage)
-                                    .statusAttributes(metadata)
-                                    
.code(ResponseStatusCode.AUTHENTICATE).create();
-                            ctx.writeAndFlush(authenticate);
-                        }
-                    } catch (AuthenticationException ae) {
-                        final ResponseMessage error = 
ResponseMessage.build(request.get())
-                                .statusMessage(ae.getMessage())
-                                
.code(ResponseStatusCode.UNAUTHORIZED).create();
-                        ctx.writeAndFlush(error);
-                    }
-                } else {
-                    final ResponseMessage error = 
ResponseMessage.build(requestMessage)
-                            .statusMessage("Failed to authenticate")
-                            .code(ResponseStatusCode.UNAUTHORIZED).create();
-                    ctx.writeAndFlush(error);
-                }
-            }
-        } else {
+        if (!(msg instanceof RequestMessage)) {
             logger.warn("{} only processes RequestMessage instances - received 
{} - channel closing",
                     this.getClass().getSimpleName(), msg.getClass());
             ctx.close();
+            return;
+        }
+
+        final RequestMessage requestMessage = (RequestMessage) msg;
+
+        final Attribute<Authenticator.SaslNegotiator> negotiator = 
ctx.channel().attr(StateKey.NEGOTIATOR);
+        final Attribute<RequestMessage> request = 
ctx.channel().attr(StateKey.REQUEST_MESSAGE);
+        final Attribute<Pair<LocalDateTime, List<RequestMessage>>> 
deferredRequests = ctx.channel().attr(StateKey.DEFERRED_REQUEST_MESSAGES);
+
+        if (negotiator.get() == null) {
+            try {
+                // First time through so save the request and send an 
AUTHENTICATE challenge with no data
+                
negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx)));
+                request.set(requestMessage);
+                final ResponseMessage authenticate = 
ResponseMessage.build(requestMessage)
+                        .code(ResponseStatusCode.AUTHENTICATE).create();
+                ctx.writeAndFlush(authenticate);
+            } catch (Exception ex) {
+                // newSaslNegotiator can cause troubles - if we don't catch 
and respond nicely the driver seems
+                // to hang until timeout which isn't so nice. treating this 
like a server error as it means that
+                // the Authenticator isn't really ready to deal with requests 
for some reason.
+                logger.error(String.format("%s is not ready to handle requests 
- check its configuration or related services",
+                        authenticator.getClass().getSimpleName()), ex);
+
+                respondWithError(
+                    requestMessage,
+                    builder -> builder.statusMessage("Authenticator is not 
ready to handle requests").code(ResponseStatusCode.SERVER_ERROR),
+                    ctx);
+            }
+
+            return;
         }
+
+        // If authentication negotiation is pending, store subsequent 
non-authentication requests for later processing
+        if (negotiator.get() != null && 
!requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION)) {

Review Comment:
   (nitpick) Isn't `negotiator.get() != null` duplicate code since we have an 
`if (negotiator.get() == null)` right before that returns ?



##########
gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java:
##########
@@ -163,6 +171,46 @@ public void 
shouldFailAuthenticateWithPlainTextBadUsername() throws Exception {
         }
     }
 
+    @Test
+    public void 
shouldFailAuthenticateWithUnAuthenticatedRequestAfterMaxDeferrableDuration() 
throws Exception {
+        try (WebSocketClient client = 
TestClientFactory.createWebSocketClient()) {
+            // First request will initiate the authentication handshake
+            // Subsequent requests will be deferred
+            CompletableFuture<List<ResponseMessage>> future1 = 
client.submitAsync("");
+            CompletableFuture<List<ResponseMessage>> future2 = 
client.submitAsync("");
+            CompletableFuture<List<ResponseMessage>> future3 = 
client.submitAsync("");
+
+            // After the maximum allowed deferred request duration,
+            // any non-authenticated request will invalidate all requests with 
429 error
+            CompletableFuture<List<ResponseMessage>> future4 = 
CompletableFuture.runAsync(() -> {
+                try {
+                    
Thread.sleep(SaslAuthenticationHandler.MAX_REQUEST_DEFERRABLE_DURATION.plus(Duration.ofSeconds(1)).toMillis());
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }).thenCompose((__) -> {
+                try {
+                    return client.submitAsync("");
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+
+            List<ResponseMessage> responses = new ArrayList<>();
+
+            responses.addAll(future1.join());
+            responses.addAll(future2.join());
+            responses.addAll(future3.join());
+            responses.addAll(future4.join());
+
+            for (ResponseMessage response : responses) {
+                if (response.getStatus().getCode() != 
ResponseStatusCode.AUTHENTICATE) {

Review Comment:
   I think it's a bit hard to understand what the expected outcome is here / 
what this is really asserting since it's just iterating over all received 
responses and then either accepting them if their status code is `AUTHENTICATE` 
or if it's `TOO_MANY_REQUESTS`.
   Can't we explicitly assert which request should get which response status 
code? I guess `future4` should get `TOO_MANY_REQUESTS` and the other 3 should 
get `AUTHENTICATE` (?).
   
   Not that important, but I think it would also improve readability of this 
test if these weren't named `future1` - `future4`, but maybe something like 
`futureOfRequestWithinAuthDuration` vs `futureOfRequestSubmittedTooLate` or 
something like that.



##########
gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java:
##########
@@ -75,19 +74,19 @@ public CompletableFuture<List<ResponseMessage>> 
submitAsync(final RequestMessage
             if (response.getStatus().getCode().isFinalResponse()) {
                 f.complete(results);
             }
-        };
+        });
 
         writeAndFlush(requestMessage);
 
         return f;
     }
 
     static class CallbackResponseHandler extends 
SimpleChannelInboundHandler<ResponseMessage> {
-        public Consumer<ResponseMessage> callback;
+        public Map<UUID, Consumer<ResponseMessage>> callback = new HashMap<>();

Review Comment:
   (nitpick) I think we should rename this now that it's no longer just a 
`callback`. Maybe something like `callbackByRequestId`?



##########
gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java:
##########
@@ -75,106 +79,159 @@ public SaslAuthenticationHandler(final Authenticator 
authenticator, final Author
 
     @Override
     public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
throws Exception {
-        if (msg instanceof RequestMessage){
-            final RequestMessage requestMessage = (RequestMessage) msg;
-
-            final Attribute<Authenticator.SaslNegotiator> negotiator = 
((AttributeMap) ctx).attr(StateKey.NEGOTIATOR);
-            final Attribute<RequestMessage> request = ((AttributeMap) 
ctx).attr(StateKey.REQUEST_MESSAGE);
-            if (negotiator.get() == null) {
-                try {
-                    // First time through so save the request and send an 
AUTHENTICATE challenge with no data
-                    
negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx)));
-                    request.set(requestMessage);
-                    final ResponseMessage authenticate = 
ResponseMessage.build(requestMessage)
-                            .code(ResponseStatusCode.AUTHENTICATE).create();
-                    ctx.writeAndFlush(authenticate);
-                } catch (Exception ex) {
-                    // newSaslNegotiator can cause troubles - if we don't 
catch and respond nicely the driver seems
-                    // to hang until timeout which isn't so nice. treating 
this like a server error as it means that
-                    // the Authenticator isn't really ready to deal with 
requests for some reason.
-                    logger.error(String.format("%s is not ready to handle 
requests - check its configuration or related services",
-                            authenticator.getClass().getSimpleName()), ex);
-
-                    final ResponseMessage error = 
ResponseMessage.build(requestMessage)
-                            .statusMessage("Authenticator is not ready to 
handle requests")
-                            .code(ResponseStatusCode.SERVER_ERROR).create();
-                    ctx.writeAndFlush(error);
-                }
-            } else {
-                if (requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION) 
&& requestMessage.getArgs().containsKey(Tokens.ARGS_SASL)) {
-                    
-                    final Object saslObject = 
requestMessage.getArgs().get(Tokens.ARGS_SASL);
-                    final byte[] saslResponse;
-                    
-                    if(saslObject instanceof String) {
-                        saslResponse = BASE64_DECODER.decode((String) 
saslObject);
-                    } else {
-                        final ResponseMessage error = 
ResponseMessage.build(request.get())
-                                .statusMessage("Incorrect type for : " + 
Tokens.ARGS_SASL + " - base64 encoded String is expected")
-                                
.code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST).create();
-                        ctx.writeAndFlush(error);
-                        return;
-                    }
-
-                    try {
-                        final byte[] saslMessage = 
negotiator.get().evaluateResponse(saslResponse);
-                        if (negotiator.get().isComplete()) {
-                            final AuthenticatedUser user = 
negotiator.get().getAuthenticatedUser();
-                            
ctx.channel().attr(StateKey.AUTHENTICATED_USER).set(user);
-                            // User name logged with the remote socket address 
and authenticator classname for audit logging
-                            if (settings.enableAuditLog) {
-                                String address = 
ctx.channel().remoteAddress().toString();
-                                if (address.startsWith("/") && 
address.length() > 1) address = address.substring(1);
-                                final String[] authClassParts = 
authenticator.getClass().toString().split("[.]");
-                                auditLogger.info("User {} with address {} 
authenticated by {}",
-                                        user.getName(), address, 
authClassParts[authClassParts.length - 1]);
-                            }
-                            // If we have got here we are authenticated so 
remove the handler and pass
-                            // the original message down the pipeline for 
processing
-                            ctx.pipeline().remove(this);
-                            final RequestMessage original = request.get();
-                            ctx.fireChannelRead(original);
-                        } else {
-                            // not done here - send back the sasl message for 
next challenge.
-                            final Map<String,Object> metadata = new 
HashMap<>();
-                            metadata.put(Tokens.ARGS_SASL, 
BASE64_ENCODER.encodeToString(saslMessage));
-                            final ResponseMessage authenticate = 
ResponseMessage.build(requestMessage)
-                                    .statusAttributes(metadata)
-                                    
.code(ResponseStatusCode.AUTHENTICATE).create();
-                            ctx.writeAndFlush(authenticate);
-                        }
-                    } catch (AuthenticationException ae) {
-                        final ResponseMessage error = 
ResponseMessage.build(request.get())
-                                .statusMessage(ae.getMessage())
-                                
.code(ResponseStatusCode.UNAUTHORIZED).create();
-                        ctx.writeAndFlush(error);
-                    }
-                } else {
-                    final ResponseMessage error = 
ResponseMessage.build(requestMessage)
-                            .statusMessage("Failed to authenticate")
-                            .code(ResponseStatusCode.UNAUTHORIZED).create();
-                    ctx.writeAndFlush(error);
-                }
-            }
-        } else {
+        if (!(msg instanceof RequestMessage)) {
             logger.warn("{} only processes RequestMessage instances - received 
{} - channel closing",
                     this.getClass().getSimpleName(), msg.getClass());
             ctx.close();
+            return;
+        }
+
+        final RequestMessage requestMessage = (RequestMessage) msg;
+
+        final Attribute<Authenticator.SaslNegotiator> negotiator = 
ctx.channel().attr(StateKey.NEGOTIATOR);
+        final Attribute<RequestMessage> request = 
ctx.channel().attr(StateKey.REQUEST_MESSAGE);
+        final Attribute<Pair<LocalDateTime, List<RequestMessage>>> 
deferredRequests = ctx.channel().attr(StateKey.DEFERRED_REQUEST_MESSAGES);
+
+        if (negotiator.get() == null) {
+            try {
+                // First time through so save the request and send an 
AUTHENTICATE challenge with no data
+                
negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx)));
+                request.set(requestMessage);
+                final ResponseMessage authenticate = 
ResponseMessage.build(requestMessage)
+                        .code(ResponseStatusCode.AUTHENTICATE).create();
+                ctx.writeAndFlush(authenticate);
+            } catch (Exception ex) {
+                // newSaslNegotiator can cause troubles - if we don't catch 
and respond nicely the driver seems
+                // to hang until timeout which isn't so nice. treating this 
like a server error as it means that
+                // the Authenticator isn't really ready to deal with requests 
for some reason.
+                logger.error(String.format("%s is not ready to handle requests 
- check its configuration or related services",
+                        authenticator.getClass().getSimpleName()), ex);
+
+                respondWithError(
+                    requestMessage,
+                    builder -> builder.statusMessage("Authenticator is not 
ready to handle requests").code(ResponseStatusCode.SERVER_ERROR),
+                    ctx);
+            }
+
+            return;
         }
+
+        // If authentication negotiation is pending, store subsequent 
non-authentication requests for later processing
+        if (negotiator.get() != null && 
!requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION)) {
+            deferredRequests.setIfAbsent(new 
ImmutablePair<>(LocalDateTime.now(), new ArrayList<>()));
+            deferredRequests.get().getValue().add(requestMessage);
+
+            final Duration deferredDuration = 
Duration.between(deferredRequests.get().getKey(), LocalDateTime.now());
+
+            if (deferredDuration.compareTo(MAX_REQUEST_DEFERRABLE_DURATION) > 
0) {
+                respondWithError(
+                    requestMessage,
+                    builder -> builder.statusMessage("Too many unauthenticated 
requests").code(ResponseStatusCode.TOO_MANY_REQUESTS),

Review Comment:
   Is the problem here really that there are _too many unauthenticated 
requests_? Isn't the problem that authentication took longer than 
`MAX_REQUEST_DEFERRABLE_DURATION`?
   I think as a user it might be good to know whether I simply submitted too 
many requests or whether authentication is just too slow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@tinkerpop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to