[ https://issues.apache.org/jira/browse/TINKERPOP-3061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831310#comment-17831310 ]
ASF GitHub Bot commented on TINKERPOP-3061: ------------------------------------------- FlorianHockmann commented on code in PR #2525: URL: https://github.com/apache/tinkerpop/pull/2525#discussion_r1540816027 ########## gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java: ########## @@ -27,8 +27,7 @@ import org.apache.tinkerpop.gremlin.util.message.ResponseMessage; import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode; -import java.util.ArrayList; -import java.util.List; +import java.util.*; Review Comment: Please revert this change. [Our dev docs explicitly mention that TinkerPop doesn't use wildcard imports](https://tinkerpop.apache.org/docs/current/dev/developer/#_code_style). ########## gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java: ########## @@ -218,9 +266,19 @@ public void shouldAuthenticateAndWorkWithVariablesOverGraphSONV1Serialization() private static void assertConnection(final Cluster cluster, final Client client) throws InterruptedException, ExecutionException { Review Comment: This method is used in 4 different tests, such as `shouldAuthenticateWithPlainText`. These 4 tests will now fail if submitting multiple requests initially in parallel isn't working. I think it would be good if we could keep these tests as simple as possible so they don't include parallelization of initial requests. A test like `shouldAuthenticateWithPlainText` should really only fail if _authenticate with plain text_ isn't working, not if submitting multiple requests in parallel isn't working. Long story short, I think it would be good if you could revert the changes to this method and instead write a new test specifically for the parallelization issue. ########## gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java: ########## @@ -75,106 +79,159 @@ public SaslAuthenticationHandler(final Authenticator authenticator, final Author @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { - if (msg instanceof RequestMessage){ - final RequestMessage requestMessage = (RequestMessage) msg; - - final Attribute<Authenticator.SaslNegotiator> negotiator = ((AttributeMap) ctx).attr(StateKey.NEGOTIATOR); - final Attribute<RequestMessage> request = ((AttributeMap) ctx).attr(StateKey.REQUEST_MESSAGE); - if (negotiator.get() == null) { - try { - // First time through so save the request and send an AUTHENTICATE challenge with no data - negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx))); - request.set(requestMessage); - final ResponseMessage authenticate = ResponseMessage.build(requestMessage) - .code(ResponseStatusCode.AUTHENTICATE).create(); - ctx.writeAndFlush(authenticate); - } catch (Exception ex) { - // newSaslNegotiator can cause troubles - if we don't catch and respond nicely the driver seems - // to hang until timeout which isn't so nice. treating this like a server error as it means that - // the Authenticator isn't really ready to deal with requests for some reason. - logger.error(String.format("%s is not ready to handle requests - check its configuration or related services", - authenticator.getClass().getSimpleName()), ex); - - final ResponseMessage error = ResponseMessage.build(requestMessage) - .statusMessage("Authenticator is not ready to handle requests") - .code(ResponseStatusCode.SERVER_ERROR).create(); - ctx.writeAndFlush(error); - } - } else { - if (requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION) && requestMessage.getArgs().containsKey(Tokens.ARGS_SASL)) { - - final Object saslObject = requestMessage.getArgs().get(Tokens.ARGS_SASL); - final byte[] saslResponse; - - if(saslObject instanceof String) { - saslResponse = BASE64_DECODER.decode((String) saslObject); - } else { - final ResponseMessage error = ResponseMessage.build(request.get()) - .statusMessage("Incorrect type for : " + Tokens.ARGS_SASL + " - base64 encoded String is expected") - .code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST).create(); - ctx.writeAndFlush(error); - return; - } - - try { - final byte[] saslMessage = negotiator.get().evaluateResponse(saslResponse); - if (negotiator.get().isComplete()) { - final AuthenticatedUser user = negotiator.get().getAuthenticatedUser(); - ctx.channel().attr(StateKey.AUTHENTICATED_USER).set(user); - // User name logged with the remote socket address and authenticator classname for audit logging - if (settings.enableAuditLog) { - String address = ctx.channel().remoteAddress().toString(); - if (address.startsWith("/") && address.length() > 1) address = address.substring(1); - final String[] authClassParts = authenticator.getClass().toString().split("[.]"); - auditLogger.info("User {} with address {} authenticated by {}", - user.getName(), address, authClassParts[authClassParts.length - 1]); - } - // If we have got here we are authenticated so remove the handler and pass - // the original message down the pipeline for processing - ctx.pipeline().remove(this); - final RequestMessage original = request.get(); - ctx.fireChannelRead(original); - } else { - // not done here - send back the sasl message for next challenge. - final Map<String,Object> metadata = new HashMap<>(); - metadata.put(Tokens.ARGS_SASL, BASE64_ENCODER.encodeToString(saslMessage)); - final ResponseMessage authenticate = ResponseMessage.build(requestMessage) - .statusAttributes(metadata) - .code(ResponseStatusCode.AUTHENTICATE).create(); - ctx.writeAndFlush(authenticate); - } - } catch (AuthenticationException ae) { - final ResponseMessage error = ResponseMessage.build(request.get()) - .statusMessage(ae.getMessage()) - .code(ResponseStatusCode.UNAUTHORIZED).create(); - ctx.writeAndFlush(error); - } - } else { - final ResponseMessage error = ResponseMessage.build(requestMessage) - .statusMessage("Failed to authenticate") - .code(ResponseStatusCode.UNAUTHORIZED).create(); - ctx.writeAndFlush(error); - } - } - } else { + if (!(msg instanceof RequestMessage)) { logger.warn("{} only processes RequestMessage instances - received {} - channel closing", this.getClass().getSimpleName(), msg.getClass()); ctx.close(); + return; + } + + final RequestMessage requestMessage = (RequestMessage) msg; + + final Attribute<Authenticator.SaslNegotiator> negotiator = ctx.channel().attr(StateKey.NEGOTIATOR); + final Attribute<RequestMessage> request = ctx.channel().attr(StateKey.REQUEST_MESSAGE); + final Attribute<Pair<LocalDateTime, List<RequestMessage>>> deferredRequests = ctx.channel().attr(StateKey.DEFERRED_REQUEST_MESSAGES); + + if (negotiator.get() == null) { + try { + // First time through so save the request and send an AUTHENTICATE challenge with no data + negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx))); + request.set(requestMessage); + final ResponseMessage authenticate = ResponseMessage.build(requestMessage) + .code(ResponseStatusCode.AUTHENTICATE).create(); + ctx.writeAndFlush(authenticate); + } catch (Exception ex) { + // newSaslNegotiator can cause troubles - if we don't catch and respond nicely the driver seems + // to hang until timeout which isn't so nice. treating this like a server error as it means that + // the Authenticator isn't really ready to deal with requests for some reason. + logger.error(String.format("%s is not ready to handle requests - check its configuration or related services", + authenticator.getClass().getSimpleName()), ex); + + respondWithError( + requestMessage, + builder -> builder.statusMessage("Authenticator is not ready to handle requests").code(ResponseStatusCode.SERVER_ERROR), + ctx); + } + + return; } + + // If authentication negotiation is pending, store subsequent non-authentication requests for later processing + if (negotiator.get() != null && !requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION)) { Review Comment: (nitpick) Isn't `negotiator.get() != null` duplicate code since we have an `if (negotiator.get() == null)` right before that returns ? ########## gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java: ########## @@ -163,6 +171,46 @@ public void shouldFailAuthenticateWithPlainTextBadUsername() throws Exception { } } + @Test + public void shouldFailAuthenticateWithUnAuthenticatedRequestAfterMaxDeferrableDuration() throws Exception { + try (WebSocketClient client = TestClientFactory.createWebSocketClient()) { + // First request will initiate the authentication handshake + // Subsequent requests will be deferred + CompletableFuture<List<ResponseMessage>> future1 = client.submitAsync(""); + CompletableFuture<List<ResponseMessage>> future2 = client.submitAsync(""); + CompletableFuture<List<ResponseMessage>> future3 = client.submitAsync(""); + + // After the maximum allowed deferred request duration, + // any non-authenticated request will invalidate all requests with 429 error + CompletableFuture<List<ResponseMessage>> future4 = CompletableFuture.runAsync(() -> { + try { + Thread.sleep(SaslAuthenticationHandler.MAX_REQUEST_DEFERRABLE_DURATION.plus(Duration.ofSeconds(1)).toMillis()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }).thenCompose((__) -> { + try { + return client.submitAsync(""); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + List<ResponseMessage> responses = new ArrayList<>(); + + responses.addAll(future1.join()); + responses.addAll(future2.join()); + responses.addAll(future3.join()); + responses.addAll(future4.join()); + + for (ResponseMessage response : responses) { + if (response.getStatus().getCode() != ResponseStatusCode.AUTHENTICATE) { Review Comment: I think it's a bit hard to understand what the expected outcome is here / what this is really asserting since it's just iterating over all received responses and then either accepting them if their status code is `AUTHENTICATE` or if it's `TOO_MANY_REQUESTS`. Can't we explicitly assert which request should get which response status code? I guess `future4` should get `TOO_MANY_REQUESTS` and the other 3 should get `AUTHENTICATE` (?). Not that important, but I think it would also improve readability of this test if these weren't named `future1` - `future4`, but maybe something like `futureOfRequestWithinAuthDuration` vs `futureOfRequestSubmittedTooLate` or something like that. ########## gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java: ########## @@ -75,19 +74,19 @@ public CompletableFuture<List<ResponseMessage>> submitAsync(final RequestMessage if (response.getStatus().getCode().isFinalResponse()) { f.complete(results); } - }; + }); writeAndFlush(requestMessage); return f; } static class CallbackResponseHandler extends SimpleChannelInboundHandler<ResponseMessage> { - public Consumer<ResponseMessage> callback; + public Map<UUID, Consumer<ResponseMessage>> callback = new HashMap<>(); Review Comment: (nitpick) I think we should rename this now that it's no longer just a `callback`. Maybe something like `callbackByRequestId`? ########## gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java: ########## @@ -75,106 +79,159 @@ public SaslAuthenticationHandler(final Authenticator authenticator, final Author @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { - if (msg instanceof RequestMessage){ - final RequestMessage requestMessage = (RequestMessage) msg; - - final Attribute<Authenticator.SaslNegotiator> negotiator = ((AttributeMap) ctx).attr(StateKey.NEGOTIATOR); - final Attribute<RequestMessage> request = ((AttributeMap) ctx).attr(StateKey.REQUEST_MESSAGE); - if (negotiator.get() == null) { - try { - // First time through so save the request and send an AUTHENTICATE challenge with no data - negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx))); - request.set(requestMessage); - final ResponseMessage authenticate = ResponseMessage.build(requestMessage) - .code(ResponseStatusCode.AUTHENTICATE).create(); - ctx.writeAndFlush(authenticate); - } catch (Exception ex) { - // newSaslNegotiator can cause troubles - if we don't catch and respond nicely the driver seems - // to hang until timeout which isn't so nice. treating this like a server error as it means that - // the Authenticator isn't really ready to deal with requests for some reason. - logger.error(String.format("%s is not ready to handle requests - check its configuration or related services", - authenticator.getClass().getSimpleName()), ex); - - final ResponseMessage error = ResponseMessage.build(requestMessage) - .statusMessage("Authenticator is not ready to handle requests") - .code(ResponseStatusCode.SERVER_ERROR).create(); - ctx.writeAndFlush(error); - } - } else { - if (requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION) && requestMessage.getArgs().containsKey(Tokens.ARGS_SASL)) { - - final Object saslObject = requestMessage.getArgs().get(Tokens.ARGS_SASL); - final byte[] saslResponse; - - if(saslObject instanceof String) { - saslResponse = BASE64_DECODER.decode((String) saslObject); - } else { - final ResponseMessage error = ResponseMessage.build(request.get()) - .statusMessage("Incorrect type for : " + Tokens.ARGS_SASL + " - base64 encoded String is expected") - .code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST).create(); - ctx.writeAndFlush(error); - return; - } - - try { - final byte[] saslMessage = negotiator.get().evaluateResponse(saslResponse); - if (negotiator.get().isComplete()) { - final AuthenticatedUser user = negotiator.get().getAuthenticatedUser(); - ctx.channel().attr(StateKey.AUTHENTICATED_USER).set(user); - // User name logged with the remote socket address and authenticator classname for audit logging - if (settings.enableAuditLog) { - String address = ctx.channel().remoteAddress().toString(); - if (address.startsWith("/") && address.length() > 1) address = address.substring(1); - final String[] authClassParts = authenticator.getClass().toString().split("[.]"); - auditLogger.info("User {} with address {} authenticated by {}", - user.getName(), address, authClassParts[authClassParts.length - 1]); - } - // If we have got here we are authenticated so remove the handler and pass - // the original message down the pipeline for processing - ctx.pipeline().remove(this); - final RequestMessage original = request.get(); - ctx.fireChannelRead(original); - } else { - // not done here - send back the sasl message for next challenge. - final Map<String,Object> metadata = new HashMap<>(); - metadata.put(Tokens.ARGS_SASL, BASE64_ENCODER.encodeToString(saslMessage)); - final ResponseMessage authenticate = ResponseMessage.build(requestMessage) - .statusAttributes(metadata) - .code(ResponseStatusCode.AUTHENTICATE).create(); - ctx.writeAndFlush(authenticate); - } - } catch (AuthenticationException ae) { - final ResponseMessage error = ResponseMessage.build(request.get()) - .statusMessage(ae.getMessage()) - .code(ResponseStatusCode.UNAUTHORIZED).create(); - ctx.writeAndFlush(error); - } - } else { - final ResponseMessage error = ResponseMessage.build(requestMessage) - .statusMessage("Failed to authenticate") - .code(ResponseStatusCode.UNAUTHORIZED).create(); - ctx.writeAndFlush(error); - } - } - } else { + if (!(msg instanceof RequestMessage)) { logger.warn("{} only processes RequestMessage instances - received {} - channel closing", this.getClass().getSimpleName(), msg.getClass()); ctx.close(); + return; + } + + final RequestMessage requestMessage = (RequestMessage) msg; + + final Attribute<Authenticator.SaslNegotiator> negotiator = ctx.channel().attr(StateKey.NEGOTIATOR); + final Attribute<RequestMessage> request = ctx.channel().attr(StateKey.REQUEST_MESSAGE); + final Attribute<Pair<LocalDateTime, List<RequestMessage>>> deferredRequests = ctx.channel().attr(StateKey.DEFERRED_REQUEST_MESSAGES); + + if (negotiator.get() == null) { + try { + // First time through so save the request and send an AUTHENTICATE challenge with no data + negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx))); + request.set(requestMessage); + final ResponseMessage authenticate = ResponseMessage.build(requestMessage) + .code(ResponseStatusCode.AUTHENTICATE).create(); + ctx.writeAndFlush(authenticate); + } catch (Exception ex) { + // newSaslNegotiator can cause troubles - if we don't catch and respond nicely the driver seems + // to hang until timeout which isn't so nice. treating this like a server error as it means that + // the Authenticator isn't really ready to deal with requests for some reason. + logger.error(String.format("%s is not ready to handle requests - check its configuration or related services", + authenticator.getClass().getSimpleName()), ex); + + respondWithError( + requestMessage, + builder -> builder.statusMessage("Authenticator is not ready to handle requests").code(ResponseStatusCode.SERVER_ERROR), + ctx); + } + + return; } + + // If authentication negotiation is pending, store subsequent non-authentication requests for later processing + if (negotiator.get() != null && !requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION)) { + deferredRequests.setIfAbsent(new ImmutablePair<>(LocalDateTime.now(), new ArrayList<>())); + deferredRequests.get().getValue().add(requestMessage); + + final Duration deferredDuration = Duration.between(deferredRequests.get().getKey(), LocalDateTime.now()); + + if (deferredDuration.compareTo(MAX_REQUEST_DEFERRABLE_DURATION) > 0) { + respondWithError( + requestMessage, + builder -> builder.statusMessage("Too many unauthenticated requests").code(ResponseStatusCode.TOO_MANY_REQUESTS), Review Comment: Is the problem here really that there are _too many unauthenticated requests_? Isn't the problem that authentication took longer than `MAX_REQUEST_DEFERRABLE_DURATION`? I think as a user it might be good to know whether I simply submitted too many requests or whether authentication is just too slow. > Concurrent queries will break authentication on javascript driver > ----------------------------------------------------------------- > > Key: TINKERPOP-3061 > URL: https://issues.apache.org/jira/browse/TINKERPOP-3061 > Project: TinkerPop > Issue Type: Bug > Components: javascript > Affects Versions: 3.6.6, 3.7.1 > Reporter: Yang Xia > Priority: Major > > Reported by tien on Discord: > {code:java} > import gremlin from "gremlin"; > const g = gremlin.process.AnonymousTraversalSource.traversal().withRemote( > new gremlin.driver.DriverRemoteConnection("ws://localhost:8182/gremlin", { > authenticator: new gremlin.driver.auth.PlainTextSaslAuthenticator( > "admin", > "administrator" > ), > }) > ); > // This will throws: Failed to authenticate (401) > await Promise.all([g.V().toList(), g.V().toList()]); > // This works as expected > await g.V().toList(); > await g.V().toList(); {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)