[
https://issues.apache.org/jira/browse/TINKERPOP-3061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831310#comment-17831310
]
ASF GitHub Bot commented on TINKERPOP-3061:
-------------------------------------------
FlorianHockmann commented on code in PR #2525:
URL: https://github.com/apache/tinkerpop/pull/2525#discussion_r1540816027
##########
gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java:
##########
@@ -27,8 +27,7 @@
import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
Review Comment:
Please revert this change. [Our dev docs explicitly mention that TinkerPop
doesn't use wildcard
imports](https://tinkerpop.apache.org/docs/current/dev/developer/#_code_style).
##########
gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java:
##########
@@ -218,9 +266,19 @@ public void
shouldAuthenticateAndWorkWithVariablesOverGraphSONV1Serialization()
private static void assertConnection(final Cluster cluster, final Client
client) throws InterruptedException, ExecutionException {
Review Comment:
This method is used in 4 different tests, such as
`shouldAuthenticateWithPlainText`. These 4 tests will now fail if submitting
multiple requests initially in parallel isn't working.
I think it would be good if we could keep these tests as simple as possible
so they don't include parallelization of initial requests. A test like
`shouldAuthenticateWithPlainText` should really only fail if _authenticate with
plain text_ isn't working, not if submitting multiple requests in parallel
isn't working.
Long story short, I think it would be good if you could revert the changes
to this method and instead write a new test specifically for the
parallelization issue.
##########
gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java:
##########
@@ -75,106 +79,159 @@ public SaslAuthenticationHandler(final Authenticator
authenticator, final Author
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg)
throws Exception {
- if (msg instanceof RequestMessage){
- final RequestMessage requestMessage = (RequestMessage) msg;
-
- final Attribute<Authenticator.SaslNegotiator> negotiator =
((AttributeMap) ctx).attr(StateKey.NEGOTIATOR);
- final Attribute<RequestMessage> request = ((AttributeMap)
ctx).attr(StateKey.REQUEST_MESSAGE);
- if (negotiator.get() == null) {
- try {
- // First time through so save the request and send an
AUTHENTICATE challenge with no data
-
negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx)));
- request.set(requestMessage);
- final ResponseMessage authenticate =
ResponseMessage.build(requestMessage)
- .code(ResponseStatusCode.AUTHENTICATE).create();
- ctx.writeAndFlush(authenticate);
- } catch (Exception ex) {
- // newSaslNegotiator can cause troubles - if we don't
catch and respond nicely the driver seems
- // to hang until timeout which isn't so nice. treating
this like a server error as it means that
- // the Authenticator isn't really ready to deal with
requests for some reason.
- logger.error(String.format("%s is not ready to handle
requests - check its configuration or related services",
- authenticator.getClass().getSimpleName()), ex);
-
- final ResponseMessage error =
ResponseMessage.build(requestMessage)
- .statusMessage("Authenticator is not ready to
handle requests")
- .code(ResponseStatusCode.SERVER_ERROR).create();
- ctx.writeAndFlush(error);
- }
- } else {
- if (requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION)
&& requestMessage.getArgs().containsKey(Tokens.ARGS_SASL)) {
-
- final Object saslObject =
requestMessage.getArgs().get(Tokens.ARGS_SASL);
- final byte[] saslResponse;
-
- if(saslObject instanceof String) {
- saslResponse = BASE64_DECODER.decode((String)
saslObject);
- } else {
- final ResponseMessage error =
ResponseMessage.build(request.get())
- .statusMessage("Incorrect type for : " +
Tokens.ARGS_SASL + " - base64 encoded String is expected")
-
.code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST).create();
- ctx.writeAndFlush(error);
- return;
- }
-
- try {
- final byte[] saslMessage =
negotiator.get().evaluateResponse(saslResponse);
- if (negotiator.get().isComplete()) {
- final AuthenticatedUser user =
negotiator.get().getAuthenticatedUser();
-
ctx.channel().attr(StateKey.AUTHENTICATED_USER).set(user);
- // User name logged with the remote socket address
and authenticator classname for audit logging
- if (settings.enableAuditLog) {
- String address =
ctx.channel().remoteAddress().toString();
- if (address.startsWith("/") &&
address.length() > 1) address = address.substring(1);
- final String[] authClassParts =
authenticator.getClass().toString().split("[.]");
- auditLogger.info("User {} with address {}
authenticated by {}",
- user.getName(), address,
authClassParts[authClassParts.length - 1]);
- }
- // If we have got here we are authenticated so
remove the handler and pass
- // the original message down the pipeline for
processing
- ctx.pipeline().remove(this);
- final RequestMessage original = request.get();
- ctx.fireChannelRead(original);
- } else {
- // not done here - send back the sasl message for
next challenge.
- final Map<String,Object> metadata = new
HashMap<>();
- metadata.put(Tokens.ARGS_SASL,
BASE64_ENCODER.encodeToString(saslMessage));
- final ResponseMessage authenticate =
ResponseMessage.build(requestMessage)
- .statusAttributes(metadata)
-
.code(ResponseStatusCode.AUTHENTICATE).create();
- ctx.writeAndFlush(authenticate);
- }
- } catch (AuthenticationException ae) {
- final ResponseMessage error =
ResponseMessage.build(request.get())
- .statusMessage(ae.getMessage())
-
.code(ResponseStatusCode.UNAUTHORIZED).create();
- ctx.writeAndFlush(error);
- }
- } else {
- final ResponseMessage error =
ResponseMessage.build(requestMessage)
- .statusMessage("Failed to authenticate")
- .code(ResponseStatusCode.UNAUTHORIZED).create();
- ctx.writeAndFlush(error);
- }
- }
- } else {
+ if (!(msg instanceof RequestMessage)) {
logger.warn("{} only processes RequestMessage instances - received
{} - channel closing",
this.getClass().getSimpleName(), msg.getClass());
ctx.close();
+ return;
+ }
+
+ final RequestMessage requestMessage = (RequestMessage) msg;
+
+ final Attribute<Authenticator.SaslNegotiator> negotiator =
ctx.channel().attr(StateKey.NEGOTIATOR);
+ final Attribute<RequestMessage> request =
ctx.channel().attr(StateKey.REQUEST_MESSAGE);
+ final Attribute<Pair<LocalDateTime, List<RequestMessage>>>
deferredRequests = ctx.channel().attr(StateKey.DEFERRED_REQUEST_MESSAGES);
+
+ if (negotiator.get() == null) {
+ try {
+ // First time through so save the request and send an
AUTHENTICATE challenge with no data
+
negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx)));
+ request.set(requestMessage);
+ final ResponseMessage authenticate =
ResponseMessage.build(requestMessage)
+ .code(ResponseStatusCode.AUTHENTICATE).create();
+ ctx.writeAndFlush(authenticate);
+ } catch (Exception ex) {
+ // newSaslNegotiator can cause troubles - if we don't catch
and respond nicely the driver seems
+ // to hang until timeout which isn't so nice. treating this
like a server error as it means that
+ // the Authenticator isn't really ready to deal with requests
for some reason.
+ logger.error(String.format("%s is not ready to handle requests
- check its configuration or related services",
+ authenticator.getClass().getSimpleName()), ex);
+
+ respondWithError(
+ requestMessage,
+ builder -> builder.statusMessage("Authenticator is not
ready to handle requests").code(ResponseStatusCode.SERVER_ERROR),
+ ctx);
+ }
+
+ return;
}
+
+ // If authentication negotiation is pending, store subsequent
non-authentication requests for later processing
+ if (negotiator.get() != null &&
!requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION)) {
Review Comment:
(nitpick) Isn't `negotiator.get() != null` duplicate code since we have an
`if (negotiator.get() == null)` right before that returns ?
##########
gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java:
##########
@@ -163,6 +171,46 @@ public void
shouldFailAuthenticateWithPlainTextBadUsername() throws Exception {
}
}
+ @Test
+ public void
shouldFailAuthenticateWithUnAuthenticatedRequestAfterMaxDeferrableDuration()
throws Exception {
+ try (WebSocketClient client =
TestClientFactory.createWebSocketClient()) {
+ // First request will initiate the authentication handshake
+ // Subsequent requests will be deferred
+ CompletableFuture<List<ResponseMessage>> future1 =
client.submitAsync("");
+ CompletableFuture<List<ResponseMessage>> future2 =
client.submitAsync("");
+ CompletableFuture<List<ResponseMessage>> future3 =
client.submitAsync("");
+
+ // After the maximum allowed deferred request duration,
+ // any non-authenticated request will invalidate all requests with
429 error
+ CompletableFuture<List<ResponseMessage>> future4 =
CompletableFuture.runAsync(() -> {
+ try {
+
Thread.sleep(SaslAuthenticationHandler.MAX_REQUEST_DEFERRABLE_DURATION.plus(Duration.ofSeconds(1)).toMillis());
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }).thenCompose((__) -> {
+ try {
+ return client.submitAsync("");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ List<ResponseMessage> responses = new ArrayList<>();
+
+ responses.addAll(future1.join());
+ responses.addAll(future2.join());
+ responses.addAll(future3.join());
+ responses.addAll(future4.join());
+
+ for (ResponseMessage response : responses) {
+ if (response.getStatus().getCode() !=
ResponseStatusCode.AUTHENTICATE) {
Review Comment:
I think it's a bit hard to understand what the expected outcome is here /
what this is really asserting since it's just iterating over all received
responses and then either accepting them if their status code is `AUTHENTICATE`
or if it's `TOO_MANY_REQUESTS`.
Can't we explicitly assert which request should get which response status
code? I guess `future4` should get `TOO_MANY_REQUESTS` and the other 3 should
get `AUTHENTICATE` (?).
Not that important, but I think it would also improve readability of this
test if these weren't named `future1` - `future4`, but maybe something like
`futureOfRequestWithinAuthDuration` vs `futureOfRequestSubmittedTooLate` or
something like that.
##########
gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java:
##########
@@ -75,19 +74,19 @@ public CompletableFuture<List<ResponseMessage>>
submitAsync(final RequestMessage
if (response.getStatus().getCode().isFinalResponse()) {
f.complete(results);
}
- };
+ });
writeAndFlush(requestMessage);
return f;
}
static class CallbackResponseHandler extends
SimpleChannelInboundHandler<ResponseMessage> {
- public Consumer<ResponseMessage> callback;
+ public Map<UUID, Consumer<ResponseMessage>> callback = new HashMap<>();
Review Comment:
(nitpick) I think we should rename this now that it's no longer just a
`callback`. Maybe something like `callbackByRequestId`?
##########
gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java:
##########
@@ -75,106 +79,159 @@ public SaslAuthenticationHandler(final Authenticator
authenticator, final Author
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg)
throws Exception {
- if (msg instanceof RequestMessage){
- final RequestMessage requestMessage = (RequestMessage) msg;
-
- final Attribute<Authenticator.SaslNegotiator> negotiator =
((AttributeMap) ctx).attr(StateKey.NEGOTIATOR);
- final Attribute<RequestMessage> request = ((AttributeMap)
ctx).attr(StateKey.REQUEST_MESSAGE);
- if (negotiator.get() == null) {
- try {
- // First time through so save the request and send an
AUTHENTICATE challenge with no data
-
negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx)));
- request.set(requestMessage);
- final ResponseMessage authenticate =
ResponseMessage.build(requestMessage)
- .code(ResponseStatusCode.AUTHENTICATE).create();
- ctx.writeAndFlush(authenticate);
- } catch (Exception ex) {
- // newSaslNegotiator can cause troubles - if we don't
catch and respond nicely the driver seems
- // to hang until timeout which isn't so nice. treating
this like a server error as it means that
- // the Authenticator isn't really ready to deal with
requests for some reason.
- logger.error(String.format("%s is not ready to handle
requests - check its configuration or related services",
- authenticator.getClass().getSimpleName()), ex);
-
- final ResponseMessage error =
ResponseMessage.build(requestMessage)
- .statusMessage("Authenticator is not ready to
handle requests")
- .code(ResponseStatusCode.SERVER_ERROR).create();
- ctx.writeAndFlush(error);
- }
- } else {
- if (requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION)
&& requestMessage.getArgs().containsKey(Tokens.ARGS_SASL)) {
-
- final Object saslObject =
requestMessage.getArgs().get(Tokens.ARGS_SASL);
- final byte[] saslResponse;
-
- if(saslObject instanceof String) {
- saslResponse = BASE64_DECODER.decode((String)
saslObject);
- } else {
- final ResponseMessage error =
ResponseMessage.build(request.get())
- .statusMessage("Incorrect type for : " +
Tokens.ARGS_SASL + " - base64 encoded String is expected")
-
.code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST).create();
- ctx.writeAndFlush(error);
- return;
- }
-
- try {
- final byte[] saslMessage =
negotiator.get().evaluateResponse(saslResponse);
- if (negotiator.get().isComplete()) {
- final AuthenticatedUser user =
negotiator.get().getAuthenticatedUser();
-
ctx.channel().attr(StateKey.AUTHENTICATED_USER).set(user);
- // User name logged with the remote socket address
and authenticator classname for audit logging
- if (settings.enableAuditLog) {
- String address =
ctx.channel().remoteAddress().toString();
- if (address.startsWith("/") &&
address.length() > 1) address = address.substring(1);
- final String[] authClassParts =
authenticator.getClass().toString().split("[.]");
- auditLogger.info("User {} with address {}
authenticated by {}",
- user.getName(), address,
authClassParts[authClassParts.length - 1]);
- }
- // If we have got here we are authenticated so
remove the handler and pass
- // the original message down the pipeline for
processing
- ctx.pipeline().remove(this);
- final RequestMessage original = request.get();
- ctx.fireChannelRead(original);
- } else {
- // not done here - send back the sasl message for
next challenge.
- final Map<String,Object> metadata = new
HashMap<>();
- metadata.put(Tokens.ARGS_SASL,
BASE64_ENCODER.encodeToString(saslMessage));
- final ResponseMessage authenticate =
ResponseMessage.build(requestMessage)
- .statusAttributes(metadata)
-
.code(ResponseStatusCode.AUTHENTICATE).create();
- ctx.writeAndFlush(authenticate);
- }
- } catch (AuthenticationException ae) {
- final ResponseMessage error =
ResponseMessage.build(request.get())
- .statusMessage(ae.getMessage())
-
.code(ResponseStatusCode.UNAUTHORIZED).create();
- ctx.writeAndFlush(error);
- }
- } else {
- final ResponseMessage error =
ResponseMessage.build(requestMessage)
- .statusMessage("Failed to authenticate")
- .code(ResponseStatusCode.UNAUTHORIZED).create();
- ctx.writeAndFlush(error);
- }
- }
- } else {
+ if (!(msg instanceof RequestMessage)) {
logger.warn("{} only processes RequestMessage instances - received
{} - channel closing",
this.getClass().getSimpleName(), msg.getClass());
ctx.close();
+ return;
+ }
+
+ final RequestMessage requestMessage = (RequestMessage) msg;
+
+ final Attribute<Authenticator.SaslNegotiator> negotiator =
ctx.channel().attr(StateKey.NEGOTIATOR);
+ final Attribute<RequestMessage> request =
ctx.channel().attr(StateKey.REQUEST_MESSAGE);
+ final Attribute<Pair<LocalDateTime, List<RequestMessage>>>
deferredRequests = ctx.channel().attr(StateKey.DEFERRED_REQUEST_MESSAGES);
+
+ if (negotiator.get() == null) {
+ try {
+ // First time through so save the request and send an
AUTHENTICATE challenge with no data
+
negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx)));
+ request.set(requestMessage);
+ final ResponseMessage authenticate =
ResponseMessage.build(requestMessage)
+ .code(ResponseStatusCode.AUTHENTICATE).create();
+ ctx.writeAndFlush(authenticate);
+ } catch (Exception ex) {
+ // newSaslNegotiator can cause troubles - if we don't catch
and respond nicely the driver seems
+ // to hang until timeout which isn't so nice. treating this
like a server error as it means that
+ // the Authenticator isn't really ready to deal with requests
for some reason.
+ logger.error(String.format("%s is not ready to handle requests
- check its configuration or related services",
+ authenticator.getClass().getSimpleName()), ex);
+
+ respondWithError(
+ requestMessage,
+ builder -> builder.statusMessage("Authenticator is not
ready to handle requests").code(ResponseStatusCode.SERVER_ERROR),
+ ctx);
+ }
+
+ return;
}
+
+ // If authentication negotiation is pending, store subsequent
non-authentication requests for later processing
+ if (negotiator.get() != null &&
!requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION)) {
+ deferredRequests.setIfAbsent(new
ImmutablePair<>(LocalDateTime.now(), new ArrayList<>()));
+ deferredRequests.get().getValue().add(requestMessage);
+
+ final Duration deferredDuration =
Duration.between(deferredRequests.get().getKey(), LocalDateTime.now());
+
+ if (deferredDuration.compareTo(MAX_REQUEST_DEFERRABLE_DURATION) >
0) {
+ respondWithError(
+ requestMessage,
+ builder -> builder.statusMessage("Too many unauthenticated
requests").code(ResponseStatusCode.TOO_MANY_REQUESTS),
Review Comment:
Is the problem here really that there are _too many unauthenticated
requests_? Isn't the problem that authentication took longer than
`MAX_REQUEST_DEFERRABLE_DURATION`?
I think as a user it might be good to know whether I simply submitted too
many requests or whether authentication is just too slow.
> Concurrent queries will break authentication on javascript driver
> -----------------------------------------------------------------
>
> Key: TINKERPOP-3061
> URL: https://issues.apache.org/jira/browse/TINKERPOP-3061
> Project: TinkerPop
> Issue Type: Bug
> Components: javascript
> Affects Versions: 3.6.6, 3.7.1
> Reporter: Yang Xia
> Priority: Major
>
> Reported by tien on Discord:
> {code:java}
> import gremlin from "gremlin";
> const g = gremlin.process.AnonymousTraversalSource.traversal().withRemote(
> new gremlin.driver.DriverRemoteConnection("ws://localhost:8182/gremlin", {
> authenticator: new gremlin.driver.auth.PlainTextSaslAuthenticator(
> "admin",
> "administrator"
> ),
> })
> );
> // This will throws: Failed to authenticate (401)
> await Promise.all([g.V().toList(), g.V().toList()]);
> // This works as expected
> await g.V().toList();
> await g.V().toList(); {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)