[ 
https://issues.apache.org/jira/browse/TINKERPOP-3061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831310#comment-17831310
 ] 

ASF GitHub Bot commented on TINKERPOP-3061:
-------------------------------------------

FlorianHockmann commented on code in PR #2525:
URL: https://github.com/apache/tinkerpop/pull/2525#discussion_r1540816027


##########
gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java:
##########
@@ -27,8 +27,7 @@
 import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
 import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;

Review Comment:
   Please revert this change. [Our dev docs explicitly mention that TinkerPop 
doesn't use wildcard 
imports](https://tinkerpop.apache.org/docs/current/dev/developer/#_code_style).



##########
gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java:
##########
@@ -218,9 +266,19 @@ public void 
shouldAuthenticateAndWorkWithVariablesOverGraphSONV1Serialization()
 
     private static void assertConnection(final Cluster cluster, final Client 
client) throws InterruptedException, ExecutionException {

Review Comment:
   This method is used in 4 different tests, such as 
`shouldAuthenticateWithPlainText`. These 4 tests will now fail if submitting 
multiple requests initially in parallel isn't working.
   I think it would be good if we could keep these tests as simple as possible 
so they don't include parallelization of initial requests. A test like 
`shouldAuthenticateWithPlainText` should really only fail if _authenticate with 
plain text_ isn't working, not if submitting multiple requests in parallel 
isn't working.
   
   Long story short, I think it would be good if you could revert the changes 
to this method and instead write a new test specifically for the 
parallelization issue.



##########
gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java:
##########
@@ -75,106 +79,159 @@ public SaslAuthenticationHandler(final Authenticator 
authenticator, final Author
 
     @Override
     public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
throws Exception {
-        if (msg instanceof RequestMessage){
-            final RequestMessage requestMessage = (RequestMessage) msg;
-
-            final Attribute<Authenticator.SaslNegotiator> negotiator = 
((AttributeMap) ctx).attr(StateKey.NEGOTIATOR);
-            final Attribute<RequestMessage> request = ((AttributeMap) 
ctx).attr(StateKey.REQUEST_MESSAGE);
-            if (negotiator.get() == null) {
-                try {
-                    // First time through so save the request and send an 
AUTHENTICATE challenge with no data
-                    
negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx)));
-                    request.set(requestMessage);
-                    final ResponseMessage authenticate = 
ResponseMessage.build(requestMessage)
-                            .code(ResponseStatusCode.AUTHENTICATE).create();
-                    ctx.writeAndFlush(authenticate);
-                } catch (Exception ex) {
-                    // newSaslNegotiator can cause troubles - if we don't 
catch and respond nicely the driver seems
-                    // to hang until timeout which isn't so nice. treating 
this like a server error as it means that
-                    // the Authenticator isn't really ready to deal with 
requests for some reason.
-                    logger.error(String.format("%s is not ready to handle 
requests - check its configuration or related services",
-                            authenticator.getClass().getSimpleName()), ex);
-
-                    final ResponseMessage error = 
ResponseMessage.build(requestMessage)
-                            .statusMessage("Authenticator is not ready to 
handle requests")
-                            .code(ResponseStatusCode.SERVER_ERROR).create();
-                    ctx.writeAndFlush(error);
-                }
-            } else {
-                if (requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION) 
&& requestMessage.getArgs().containsKey(Tokens.ARGS_SASL)) {
-                    
-                    final Object saslObject = 
requestMessage.getArgs().get(Tokens.ARGS_SASL);
-                    final byte[] saslResponse;
-                    
-                    if(saslObject instanceof String) {
-                        saslResponse = BASE64_DECODER.decode((String) 
saslObject);
-                    } else {
-                        final ResponseMessage error = 
ResponseMessage.build(request.get())
-                                .statusMessage("Incorrect type for : " + 
Tokens.ARGS_SASL + " - base64 encoded String is expected")
-                                
.code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST).create();
-                        ctx.writeAndFlush(error);
-                        return;
-                    }
-
-                    try {
-                        final byte[] saslMessage = 
negotiator.get().evaluateResponse(saslResponse);
-                        if (negotiator.get().isComplete()) {
-                            final AuthenticatedUser user = 
negotiator.get().getAuthenticatedUser();
-                            
ctx.channel().attr(StateKey.AUTHENTICATED_USER).set(user);
-                            // User name logged with the remote socket address 
and authenticator classname for audit logging
-                            if (settings.enableAuditLog) {
-                                String address = 
ctx.channel().remoteAddress().toString();
-                                if (address.startsWith("/") && 
address.length() > 1) address = address.substring(1);
-                                final String[] authClassParts = 
authenticator.getClass().toString().split("[.]");
-                                auditLogger.info("User {} with address {} 
authenticated by {}",
-                                        user.getName(), address, 
authClassParts[authClassParts.length - 1]);
-                            }
-                            // If we have got here we are authenticated so 
remove the handler and pass
-                            // the original message down the pipeline for 
processing
-                            ctx.pipeline().remove(this);
-                            final RequestMessage original = request.get();
-                            ctx.fireChannelRead(original);
-                        } else {
-                            // not done here - send back the sasl message for 
next challenge.
-                            final Map<String,Object> metadata = new 
HashMap<>();
-                            metadata.put(Tokens.ARGS_SASL, 
BASE64_ENCODER.encodeToString(saslMessage));
-                            final ResponseMessage authenticate = 
ResponseMessage.build(requestMessage)
-                                    .statusAttributes(metadata)
-                                    
.code(ResponseStatusCode.AUTHENTICATE).create();
-                            ctx.writeAndFlush(authenticate);
-                        }
-                    } catch (AuthenticationException ae) {
-                        final ResponseMessage error = 
ResponseMessage.build(request.get())
-                                .statusMessage(ae.getMessage())
-                                
.code(ResponseStatusCode.UNAUTHORIZED).create();
-                        ctx.writeAndFlush(error);
-                    }
-                } else {
-                    final ResponseMessage error = 
ResponseMessage.build(requestMessage)
-                            .statusMessage("Failed to authenticate")
-                            .code(ResponseStatusCode.UNAUTHORIZED).create();
-                    ctx.writeAndFlush(error);
-                }
-            }
-        } else {
+        if (!(msg instanceof RequestMessage)) {
             logger.warn("{} only processes RequestMessage instances - received 
{} - channel closing",
                     this.getClass().getSimpleName(), msg.getClass());
             ctx.close();
+            return;
+        }
+
+        final RequestMessage requestMessage = (RequestMessage) msg;
+
+        final Attribute<Authenticator.SaslNegotiator> negotiator = 
ctx.channel().attr(StateKey.NEGOTIATOR);
+        final Attribute<RequestMessage> request = 
ctx.channel().attr(StateKey.REQUEST_MESSAGE);
+        final Attribute<Pair<LocalDateTime, List<RequestMessage>>> 
deferredRequests = ctx.channel().attr(StateKey.DEFERRED_REQUEST_MESSAGES);
+
+        if (negotiator.get() == null) {
+            try {
+                // First time through so save the request and send an 
AUTHENTICATE challenge with no data
+                
negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx)));
+                request.set(requestMessage);
+                final ResponseMessage authenticate = 
ResponseMessage.build(requestMessage)
+                        .code(ResponseStatusCode.AUTHENTICATE).create();
+                ctx.writeAndFlush(authenticate);
+            } catch (Exception ex) {
+                // newSaslNegotiator can cause troubles - if we don't catch 
and respond nicely the driver seems
+                // to hang until timeout which isn't so nice. treating this 
like a server error as it means that
+                // the Authenticator isn't really ready to deal with requests 
for some reason.
+                logger.error(String.format("%s is not ready to handle requests 
- check its configuration or related services",
+                        authenticator.getClass().getSimpleName()), ex);
+
+                respondWithError(
+                    requestMessage,
+                    builder -> builder.statusMessage("Authenticator is not 
ready to handle requests").code(ResponseStatusCode.SERVER_ERROR),
+                    ctx);
+            }
+
+            return;
         }
+
+        // If authentication negotiation is pending, store subsequent 
non-authentication requests for later processing
+        if (negotiator.get() != null && 
!requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION)) {

Review Comment:
   (nitpick) Isn't `negotiator.get() != null` duplicate code since we have an 
`if (negotiator.get() == null)` right before that returns ?



##########
gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java:
##########
@@ -163,6 +171,46 @@ public void 
shouldFailAuthenticateWithPlainTextBadUsername() throws Exception {
         }
     }
 
+    @Test
+    public void 
shouldFailAuthenticateWithUnAuthenticatedRequestAfterMaxDeferrableDuration() 
throws Exception {
+        try (WebSocketClient client = 
TestClientFactory.createWebSocketClient()) {
+            // First request will initiate the authentication handshake
+            // Subsequent requests will be deferred
+            CompletableFuture<List<ResponseMessage>> future1 = 
client.submitAsync("");
+            CompletableFuture<List<ResponseMessage>> future2 = 
client.submitAsync("");
+            CompletableFuture<List<ResponseMessage>> future3 = 
client.submitAsync("");
+
+            // After the maximum allowed deferred request duration,
+            // any non-authenticated request will invalidate all requests with 
429 error
+            CompletableFuture<List<ResponseMessage>> future4 = 
CompletableFuture.runAsync(() -> {
+                try {
+                    
Thread.sleep(SaslAuthenticationHandler.MAX_REQUEST_DEFERRABLE_DURATION.plus(Duration.ofSeconds(1)).toMillis());
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }).thenCompose((__) -> {
+                try {
+                    return client.submitAsync("");
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+
+            List<ResponseMessage> responses = new ArrayList<>();
+
+            responses.addAll(future1.join());
+            responses.addAll(future2.join());
+            responses.addAll(future3.join());
+            responses.addAll(future4.join());
+
+            for (ResponseMessage response : responses) {
+                if (response.getStatus().getCode() != 
ResponseStatusCode.AUTHENTICATE) {

Review Comment:
   I think it's a bit hard to understand what the expected outcome is here / 
what this is really asserting since it's just iterating over all received 
responses and then either accepting them if their status code is `AUTHENTICATE` 
or if it's `TOO_MANY_REQUESTS`.
   Can't we explicitly assert which request should get which response status 
code? I guess `future4` should get `TOO_MANY_REQUESTS` and the other 3 should 
get `AUTHENTICATE` (?).
   
   Not that important, but I think it would also improve readability of this 
test if these weren't named `future1` - `future4`, but maybe something like 
`futureOfRequestWithinAuthDuration` vs `futureOfRequestSubmittedTooLate` or 
something like that.



##########
gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java:
##########
@@ -75,19 +74,19 @@ public CompletableFuture<List<ResponseMessage>> 
submitAsync(final RequestMessage
             if (response.getStatus().getCode().isFinalResponse()) {
                 f.complete(results);
             }
-        };
+        });
 
         writeAndFlush(requestMessage);
 
         return f;
     }
 
     static class CallbackResponseHandler extends 
SimpleChannelInboundHandler<ResponseMessage> {
-        public Consumer<ResponseMessage> callback;
+        public Map<UUID, Consumer<ResponseMessage>> callback = new HashMap<>();

Review Comment:
   (nitpick) I think we should rename this now that it's no longer just a 
`callback`. Maybe something like `callbackByRequestId`?



##########
gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java:
##########
@@ -75,106 +79,159 @@ public SaslAuthenticationHandler(final Authenticator 
authenticator, final Author
 
     @Override
     public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
throws Exception {
-        if (msg instanceof RequestMessage){
-            final RequestMessage requestMessage = (RequestMessage) msg;
-
-            final Attribute<Authenticator.SaslNegotiator> negotiator = 
((AttributeMap) ctx).attr(StateKey.NEGOTIATOR);
-            final Attribute<RequestMessage> request = ((AttributeMap) 
ctx).attr(StateKey.REQUEST_MESSAGE);
-            if (negotiator.get() == null) {
-                try {
-                    // First time through so save the request and send an 
AUTHENTICATE challenge with no data
-                    
negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx)));
-                    request.set(requestMessage);
-                    final ResponseMessage authenticate = 
ResponseMessage.build(requestMessage)
-                            .code(ResponseStatusCode.AUTHENTICATE).create();
-                    ctx.writeAndFlush(authenticate);
-                } catch (Exception ex) {
-                    // newSaslNegotiator can cause troubles - if we don't 
catch and respond nicely the driver seems
-                    // to hang until timeout which isn't so nice. treating 
this like a server error as it means that
-                    // the Authenticator isn't really ready to deal with 
requests for some reason.
-                    logger.error(String.format("%s is not ready to handle 
requests - check its configuration or related services",
-                            authenticator.getClass().getSimpleName()), ex);
-
-                    final ResponseMessage error = 
ResponseMessage.build(requestMessage)
-                            .statusMessage("Authenticator is not ready to 
handle requests")
-                            .code(ResponseStatusCode.SERVER_ERROR).create();
-                    ctx.writeAndFlush(error);
-                }
-            } else {
-                if (requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION) 
&& requestMessage.getArgs().containsKey(Tokens.ARGS_SASL)) {
-                    
-                    final Object saslObject = 
requestMessage.getArgs().get(Tokens.ARGS_SASL);
-                    final byte[] saslResponse;
-                    
-                    if(saslObject instanceof String) {
-                        saslResponse = BASE64_DECODER.decode((String) 
saslObject);
-                    } else {
-                        final ResponseMessage error = 
ResponseMessage.build(request.get())
-                                .statusMessage("Incorrect type for : " + 
Tokens.ARGS_SASL + " - base64 encoded String is expected")
-                                
.code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST).create();
-                        ctx.writeAndFlush(error);
-                        return;
-                    }
-
-                    try {
-                        final byte[] saslMessage = 
negotiator.get().evaluateResponse(saslResponse);
-                        if (negotiator.get().isComplete()) {
-                            final AuthenticatedUser user = 
negotiator.get().getAuthenticatedUser();
-                            
ctx.channel().attr(StateKey.AUTHENTICATED_USER).set(user);
-                            // User name logged with the remote socket address 
and authenticator classname for audit logging
-                            if (settings.enableAuditLog) {
-                                String address = 
ctx.channel().remoteAddress().toString();
-                                if (address.startsWith("/") && 
address.length() > 1) address = address.substring(1);
-                                final String[] authClassParts = 
authenticator.getClass().toString().split("[.]");
-                                auditLogger.info("User {} with address {} 
authenticated by {}",
-                                        user.getName(), address, 
authClassParts[authClassParts.length - 1]);
-                            }
-                            // If we have got here we are authenticated so 
remove the handler and pass
-                            // the original message down the pipeline for 
processing
-                            ctx.pipeline().remove(this);
-                            final RequestMessage original = request.get();
-                            ctx.fireChannelRead(original);
-                        } else {
-                            // not done here - send back the sasl message for 
next challenge.
-                            final Map<String,Object> metadata = new 
HashMap<>();
-                            metadata.put(Tokens.ARGS_SASL, 
BASE64_ENCODER.encodeToString(saslMessage));
-                            final ResponseMessage authenticate = 
ResponseMessage.build(requestMessage)
-                                    .statusAttributes(metadata)
-                                    
.code(ResponseStatusCode.AUTHENTICATE).create();
-                            ctx.writeAndFlush(authenticate);
-                        }
-                    } catch (AuthenticationException ae) {
-                        final ResponseMessage error = 
ResponseMessage.build(request.get())
-                                .statusMessage(ae.getMessage())
-                                
.code(ResponseStatusCode.UNAUTHORIZED).create();
-                        ctx.writeAndFlush(error);
-                    }
-                } else {
-                    final ResponseMessage error = 
ResponseMessage.build(requestMessage)
-                            .statusMessage("Failed to authenticate")
-                            .code(ResponseStatusCode.UNAUTHORIZED).create();
-                    ctx.writeAndFlush(error);
-                }
-            }
-        } else {
+        if (!(msg instanceof RequestMessage)) {
             logger.warn("{} only processes RequestMessage instances - received 
{} - channel closing",
                     this.getClass().getSimpleName(), msg.getClass());
             ctx.close();
+            return;
+        }
+
+        final RequestMessage requestMessage = (RequestMessage) msg;
+
+        final Attribute<Authenticator.SaslNegotiator> negotiator = 
ctx.channel().attr(StateKey.NEGOTIATOR);
+        final Attribute<RequestMessage> request = 
ctx.channel().attr(StateKey.REQUEST_MESSAGE);
+        final Attribute<Pair<LocalDateTime, List<RequestMessage>>> 
deferredRequests = ctx.channel().attr(StateKey.DEFERRED_REQUEST_MESSAGES);
+
+        if (negotiator.get() == null) {
+            try {
+                // First time through so save the request and send an 
AUTHENTICATE challenge with no data
+                
negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx)));
+                request.set(requestMessage);
+                final ResponseMessage authenticate = 
ResponseMessage.build(requestMessage)
+                        .code(ResponseStatusCode.AUTHENTICATE).create();
+                ctx.writeAndFlush(authenticate);
+            } catch (Exception ex) {
+                // newSaslNegotiator can cause troubles - if we don't catch 
and respond nicely the driver seems
+                // to hang until timeout which isn't so nice. treating this 
like a server error as it means that
+                // the Authenticator isn't really ready to deal with requests 
for some reason.
+                logger.error(String.format("%s is not ready to handle requests 
- check its configuration or related services",
+                        authenticator.getClass().getSimpleName()), ex);
+
+                respondWithError(
+                    requestMessage,
+                    builder -> builder.statusMessage("Authenticator is not 
ready to handle requests").code(ResponseStatusCode.SERVER_ERROR),
+                    ctx);
+            }
+
+            return;
         }
+
+        // If authentication negotiation is pending, store subsequent 
non-authentication requests for later processing
+        if (negotiator.get() != null && 
!requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION)) {
+            deferredRequests.setIfAbsent(new 
ImmutablePair<>(LocalDateTime.now(), new ArrayList<>()));
+            deferredRequests.get().getValue().add(requestMessage);
+
+            final Duration deferredDuration = 
Duration.between(deferredRequests.get().getKey(), LocalDateTime.now());
+
+            if (deferredDuration.compareTo(MAX_REQUEST_DEFERRABLE_DURATION) > 
0) {
+                respondWithError(
+                    requestMessage,
+                    builder -> builder.statusMessage("Too many unauthenticated 
requests").code(ResponseStatusCode.TOO_MANY_REQUESTS),

Review Comment:
   Is the problem here really that there are _too many unauthenticated 
requests_? Isn't the problem that authentication took longer than 
`MAX_REQUEST_DEFERRABLE_DURATION`?
   I think as a user it might be good to know whether I simply submitted too 
many requests or whether authentication is just too slow.





> Concurrent queries will break authentication on javascript driver
> -----------------------------------------------------------------
>
>                 Key: TINKERPOP-3061
>                 URL: https://issues.apache.org/jira/browse/TINKERPOP-3061
>             Project: TinkerPop
>          Issue Type: Bug
>          Components: javascript
>    Affects Versions: 3.6.6, 3.7.1
>            Reporter: Yang Xia
>            Priority: Major
>
> Reported by tien on Discord:
> {code:java}
> import gremlin from "gremlin";
> const g = gremlin.process.AnonymousTraversalSource.traversal().withRemote(
>   new gremlin.driver.DriverRemoteConnection("ws://localhost:8182/gremlin", {
>     authenticator: new gremlin.driver.auth.PlainTextSaslAuthenticator(
>       "admin",
>       "administrator"
>     ),
>   })
> );
> // This will throws: Failed to authenticate (401)
> await Promise.all([g.V().toList(), g.V().toList()]);
> // This works as expected
> await g.V().toList();
> await g.V().toList(); {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to