FlorianHockmann commented on code in PR #2525: URL: https://github.com/apache/tinkerpop/pull/2525#discussion_r1540816027
########## gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java: ########## @@ -27,8 +27,7 @@ import org.apache.tinkerpop.gremlin.util.message.ResponseMessage; import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode; -import java.util.ArrayList; -import java.util.List; +import java.util.*; Review Comment: Please revert this change. [Our dev docs explicitly mention that TinkerPop doesn't use wildcard imports](https://tinkerpop.apache.org/docs/current/dev/developer/#_code_style). ########## gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java: ########## @@ -218,9 +266,19 @@ public void shouldAuthenticateAndWorkWithVariablesOverGraphSONV1Serialization() private static void assertConnection(final Cluster cluster, final Client client) throws InterruptedException, ExecutionException { Review Comment: This method is used in 4 different tests, such as `shouldAuthenticateWithPlainText`. These 4 tests will now fail if submitting multiple requests initially in parallel isn't working. I think it would be good if we could keep these tests as simple as possible so they don't include parallelization of initial requests. A test like `shouldAuthenticateWithPlainText` should really only fail if _authenticate with plain text_ isn't working, not if submitting multiple requests in parallel isn't working. Long story short, I think it would be good if you could revert the changes to this method and instead write a new test specifically for the parallelization issue. ########## gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java: ########## @@ -75,106 +79,159 @@ public SaslAuthenticationHandler(final Authenticator authenticator, final Author @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { - if (msg instanceof RequestMessage){ - final RequestMessage requestMessage = (RequestMessage) msg; - - final Attribute<Authenticator.SaslNegotiator> negotiator = ((AttributeMap) ctx).attr(StateKey.NEGOTIATOR); - final Attribute<RequestMessage> request = ((AttributeMap) ctx).attr(StateKey.REQUEST_MESSAGE); - if (negotiator.get() == null) { - try { - // First time through so save the request and send an AUTHENTICATE challenge with no data - negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx))); - request.set(requestMessage); - final ResponseMessage authenticate = ResponseMessage.build(requestMessage) - .code(ResponseStatusCode.AUTHENTICATE).create(); - ctx.writeAndFlush(authenticate); - } catch (Exception ex) { - // newSaslNegotiator can cause troubles - if we don't catch and respond nicely the driver seems - // to hang until timeout which isn't so nice. treating this like a server error as it means that - // the Authenticator isn't really ready to deal with requests for some reason. - logger.error(String.format("%s is not ready to handle requests - check its configuration or related services", - authenticator.getClass().getSimpleName()), ex); - - final ResponseMessage error = ResponseMessage.build(requestMessage) - .statusMessage("Authenticator is not ready to handle requests") - .code(ResponseStatusCode.SERVER_ERROR).create(); - ctx.writeAndFlush(error); - } - } else { - if (requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION) && requestMessage.getArgs().containsKey(Tokens.ARGS_SASL)) { - - final Object saslObject = requestMessage.getArgs().get(Tokens.ARGS_SASL); - final byte[] saslResponse; - - if(saslObject instanceof String) { - saslResponse = BASE64_DECODER.decode((String) saslObject); - } else { - final ResponseMessage error = ResponseMessage.build(request.get()) - .statusMessage("Incorrect type for : " + Tokens.ARGS_SASL + " - base64 encoded String is expected") - .code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST).create(); - ctx.writeAndFlush(error); - return; - } - - try { - final byte[] saslMessage = negotiator.get().evaluateResponse(saslResponse); - if (negotiator.get().isComplete()) { - final AuthenticatedUser user = negotiator.get().getAuthenticatedUser(); - ctx.channel().attr(StateKey.AUTHENTICATED_USER).set(user); - // User name logged with the remote socket address and authenticator classname for audit logging - if (settings.enableAuditLog) { - String address = ctx.channel().remoteAddress().toString(); - if (address.startsWith("/") && address.length() > 1) address = address.substring(1); - final String[] authClassParts = authenticator.getClass().toString().split("[.]"); - auditLogger.info("User {} with address {} authenticated by {}", - user.getName(), address, authClassParts[authClassParts.length - 1]); - } - // If we have got here we are authenticated so remove the handler and pass - // the original message down the pipeline for processing - ctx.pipeline().remove(this); - final RequestMessage original = request.get(); - ctx.fireChannelRead(original); - } else { - // not done here - send back the sasl message for next challenge. - final Map<String,Object> metadata = new HashMap<>(); - metadata.put(Tokens.ARGS_SASL, BASE64_ENCODER.encodeToString(saslMessage)); - final ResponseMessage authenticate = ResponseMessage.build(requestMessage) - .statusAttributes(metadata) - .code(ResponseStatusCode.AUTHENTICATE).create(); - ctx.writeAndFlush(authenticate); - } - } catch (AuthenticationException ae) { - final ResponseMessage error = ResponseMessage.build(request.get()) - .statusMessage(ae.getMessage()) - .code(ResponseStatusCode.UNAUTHORIZED).create(); - ctx.writeAndFlush(error); - } - } else { - final ResponseMessage error = ResponseMessage.build(requestMessage) - .statusMessage("Failed to authenticate") - .code(ResponseStatusCode.UNAUTHORIZED).create(); - ctx.writeAndFlush(error); - } - } - } else { + if (!(msg instanceof RequestMessage)) { logger.warn("{} only processes RequestMessage instances - received {} - channel closing", this.getClass().getSimpleName(), msg.getClass()); ctx.close(); + return; + } + + final RequestMessage requestMessage = (RequestMessage) msg; + + final Attribute<Authenticator.SaslNegotiator> negotiator = ctx.channel().attr(StateKey.NEGOTIATOR); + final Attribute<RequestMessage> request = ctx.channel().attr(StateKey.REQUEST_MESSAGE); + final Attribute<Pair<LocalDateTime, List<RequestMessage>>> deferredRequests = ctx.channel().attr(StateKey.DEFERRED_REQUEST_MESSAGES); + + if (negotiator.get() == null) { + try { + // First time through so save the request and send an AUTHENTICATE challenge with no data + negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx))); + request.set(requestMessage); + final ResponseMessage authenticate = ResponseMessage.build(requestMessage) + .code(ResponseStatusCode.AUTHENTICATE).create(); + ctx.writeAndFlush(authenticate); + } catch (Exception ex) { + // newSaslNegotiator can cause troubles - if we don't catch and respond nicely the driver seems + // to hang until timeout which isn't so nice. treating this like a server error as it means that + // the Authenticator isn't really ready to deal with requests for some reason. + logger.error(String.format("%s is not ready to handle requests - check its configuration or related services", + authenticator.getClass().getSimpleName()), ex); + + respondWithError( + requestMessage, + builder -> builder.statusMessage("Authenticator is not ready to handle requests").code(ResponseStatusCode.SERVER_ERROR), + ctx); + } + + return; } + + // If authentication negotiation is pending, store subsequent non-authentication requests for later processing + if (negotiator.get() != null && !requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION)) { Review Comment: (nitpick) Isn't `negotiator.get() != null` duplicate code since we have an `if (negotiator.get() == null)` right before that returns ? ########## gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java: ########## @@ -163,6 +171,46 @@ public void shouldFailAuthenticateWithPlainTextBadUsername() throws Exception { } } + @Test + public void shouldFailAuthenticateWithUnAuthenticatedRequestAfterMaxDeferrableDuration() throws Exception { + try (WebSocketClient client = TestClientFactory.createWebSocketClient()) { + // First request will initiate the authentication handshake + // Subsequent requests will be deferred + CompletableFuture<List<ResponseMessage>> future1 = client.submitAsync(""); + CompletableFuture<List<ResponseMessage>> future2 = client.submitAsync(""); + CompletableFuture<List<ResponseMessage>> future3 = client.submitAsync(""); + + // After the maximum allowed deferred request duration, + // any non-authenticated request will invalidate all requests with 429 error + CompletableFuture<List<ResponseMessage>> future4 = CompletableFuture.runAsync(() -> { + try { + Thread.sleep(SaslAuthenticationHandler.MAX_REQUEST_DEFERRABLE_DURATION.plus(Duration.ofSeconds(1)).toMillis()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }).thenCompose((__) -> { + try { + return client.submitAsync(""); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + List<ResponseMessage> responses = new ArrayList<>(); + + responses.addAll(future1.join()); + responses.addAll(future2.join()); + responses.addAll(future3.join()); + responses.addAll(future4.join()); + + for (ResponseMessage response : responses) { + if (response.getStatus().getCode() != ResponseStatusCode.AUTHENTICATE) { Review Comment: I think it's a bit hard to understand what the expected outcome is here / what this is really asserting since it's just iterating over all received responses and then either accepting them if their status code is `AUTHENTICATE` or if it's `TOO_MANY_REQUESTS`. Can't we explicitly assert which request should get which response status code? I guess `future4` should get `TOO_MANY_REQUESTS` and the other 3 should get `AUTHENTICATE` (?). Not that important, but I think it would also improve readability of this test if these weren't named `future1` - `future4`, but maybe something like `futureOfRequestWithinAuthDuration` vs `futureOfRequestSubmittedTooLate` or something like that. ########## gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java: ########## @@ -75,19 +74,19 @@ public CompletableFuture<List<ResponseMessage>> submitAsync(final RequestMessage if (response.getStatus().getCode().isFinalResponse()) { f.complete(results); } - }; + }); writeAndFlush(requestMessage); return f; } static class CallbackResponseHandler extends SimpleChannelInboundHandler<ResponseMessage> { - public Consumer<ResponseMessage> callback; + public Map<UUID, Consumer<ResponseMessage>> callback = new HashMap<>(); Review Comment: (nitpick) I think we should rename this now that it's no longer just a `callback`. Maybe something like `callbackByRequestId`? ########## gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java: ########## @@ -75,106 +79,159 @@ public SaslAuthenticationHandler(final Authenticator authenticator, final Author @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { - if (msg instanceof RequestMessage){ - final RequestMessage requestMessage = (RequestMessage) msg; - - final Attribute<Authenticator.SaslNegotiator> negotiator = ((AttributeMap) ctx).attr(StateKey.NEGOTIATOR); - final Attribute<RequestMessage> request = ((AttributeMap) ctx).attr(StateKey.REQUEST_MESSAGE); - if (negotiator.get() == null) { - try { - // First time through so save the request and send an AUTHENTICATE challenge with no data - negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx))); - request.set(requestMessage); - final ResponseMessage authenticate = ResponseMessage.build(requestMessage) - .code(ResponseStatusCode.AUTHENTICATE).create(); - ctx.writeAndFlush(authenticate); - } catch (Exception ex) { - // newSaslNegotiator can cause troubles - if we don't catch and respond nicely the driver seems - // to hang until timeout which isn't so nice. treating this like a server error as it means that - // the Authenticator isn't really ready to deal with requests for some reason. - logger.error(String.format("%s is not ready to handle requests - check its configuration or related services", - authenticator.getClass().getSimpleName()), ex); - - final ResponseMessage error = ResponseMessage.build(requestMessage) - .statusMessage("Authenticator is not ready to handle requests") - .code(ResponseStatusCode.SERVER_ERROR).create(); - ctx.writeAndFlush(error); - } - } else { - if (requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION) && requestMessage.getArgs().containsKey(Tokens.ARGS_SASL)) { - - final Object saslObject = requestMessage.getArgs().get(Tokens.ARGS_SASL); - final byte[] saslResponse; - - if(saslObject instanceof String) { - saslResponse = BASE64_DECODER.decode((String) saslObject); - } else { - final ResponseMessage error = ResponseMessage.build(request.get()) - .statusMessage("Incorrect type for : " + Tokens.ARGS_SASL + " - base64 encoded String is expected") - .code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST).create(); - ctx.writeAndFlush(error); - return; - } - - try { - final byte[] saslMessage = negotiator.get().evaluateResponse(saslResponse); - if (negotiator.get().isComplete()) { - final AuthenticatedUser user = negotiator.get().getAuthenticatedUser(); - ctx.channel().attr(StateKey.AUTHENTICATED_USER).set(user); - // User name logged with the remote socket address and authenticator classname for audit logging - if (settings.enableAuditLog) { - String address = ctx.channel().remoteAddress().toString(); - if (address.startsWith("/") && address.length() > 1) address = address.substring(1); - final String[] authClassParts = authenticator.getClass().toString().split("[.]"); - auditLogger.info("User {} with address {} authenticated by {}", - user.getName(), address, authClassParts[authClassParts.length - 1]); - } - // If we have got here we are authenticated so remove the handler and pass - // the original message down the pipeline for processing - ctx.pipeline().remove(this); - final RequestMessage original = request.get(); - ctx.fireChannelRead(original); - } else { - // not done here - send back the sasl message for next challenge. - final Map<String,Object> metadata = new HashMap<>(); - metadata.put(Tokens.ARGS_SASL, BASE64_ENCODER.encodeToString(saslMessage)); - final ResponseMessage authenticate = ResponseMessage.build(requestMessage) - .statusAttributes(metadata) - .code(ResponseStatusCode.AUTHENTICATE).create(); - ctx.writeAndFlush(authenticate); - } - } catch (AuthenticationException ae) { - final ResponseMessage error = ResponseMessage.build(request.get()) - .statusMessage(ae.getMessage()) - .code(ResponseStatusCode.UNAUTHORIZED).create(); - ctx.writeAndFlush(error); - } - } else { - final ResponseMessage error = ResponseMessage.build(requestMessage) - .statusMessage("Failed to authenticate") - .code(ResponseStatusCode.UNAUTHORIZED).create(); - ctx.writeAndFlush(error); - } - } - } else { + if (!(msg instanceof RequestMessage)) { logger.warn("{} only processes RequestMessage instances - received {} - channel closing", this.getClass().getSimpleName(), msg.getClass()); ctx.close(); + return; + } + + final RequestMessage requestMessage = (RequestMessage) msg; + + final Attribute<Authenticator.SaslNegotiator> negotiator = ctx.channel().attr(StateKey.NEGOTIATOR); + final Attribute<RequestMessage> request = ctx.channel().attr(StateKey.REQUEST_MESSAGE); + final Attribute<Pair<LocalDateTime, List<RequestMessage>>> deferredRequests = ctx.channel().attr(StateKey.DEFERRED_REQUEST_MESSAGES); + + if (negotiator.get() == null) { + try { + // First time through so save the request and send an AUTHENTICATE challenge with no data + negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx))); + request.set(requestMessage); + final ResponseMessage authenticate = ResponseMessage.build(requestMessage) + .code(ResponseStatusCode.AUTHENTICATE).create(); + ctx.writeAndFlush(authenticate); + } catch (Exception ex) { + // newSaslNegotiator can cause troubles - if we don't catch and respond nicely the driver seems + // to hang until timeout which isn't so nice. treating this like a server error as it means that + // the Authenticator isn't really ready to deal with requests for some reason. + logger.error(String.format("%s is not ready to handle requests - check its configuration or related services", + authenticator.getClass().getSimpleName()), ex); + + respondWithError( + requestMessage, + builder -> builder.statusMessage("Authenticator is not ready to handle requests").code(ResponseStatusCode.SERVER_ERROR), + ctx); + } + + return; } + + // If authentication negotiation is pending, store subsequent non-authentication requests for later processing + if (negotiator.get() != null && !requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION)) { + deferredRequests.setIfAbsent(new ImmutablePair<>(LocalDateTime.now(), new ArrayList<>())); + deferredRequests.get().getValue().add(requestMessage); + + final Duration deferredDuration = Duration.between(deferredRequests.get().getKey(), LocalDateTime.now()); + + if (deferredDuration.compareTo(MAX_REQUEST_DEFERRABLE_DURATION) > 0) { + respondWithError( + requestMessage, + builder -> builder.statusMessage("Too many unauthenticated requests").code(ResponseStatusCode.TOO_MANY_REQUESTS), Review Comment: Is the problem here really that there are _too many unauthenticated requests_? Isn't the problem that authentication took longer than `MAX_REQUEST_DEFERRABLE_DURATION`? I think as a user it might be good to know whether I simply submitted too many requests or whether authentication is just too slow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@tinkerpop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org