This is an automated email from the ASF dual-hosted git repository. kenhuuu pushed a commit to branch master-http in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit e8c4cab23009ff76d5ed30704bd4d6144a0cac1a Author: Ken Hu <[email protected]> AuthorDate: Tue Jun 11 23:01:02 2024 -0700 Prevent Java driver channel from being reused too early CTR. The connection was being returned too quickly before all HttpContent were read. This modifies the driver's handlers by ensuring that the LastHttpContent has been handled before returning the connection. The same was done for the SimpleHttpClient. As a consequence, some tests needed to be updated as they will now have up to two more empty ResponseMessageV4s added to the end. --- .../tinkerpop/gremlin/driver/Channelizer.java | 8 ++++++ .../driver/handler/GremlinResponseHandler.java | 30 +++++++++++++++------- .../handler/HttpGremlinResponseStreamDecoder.java | 22 +++++++--------- .../gremlin/driver/simple/AbstractClient.java | 23 ----------------- .../gremlin/driver/simple/SimpleHttpClient.java | 28 +++++++++++++++++++- .../gremlin/server/GremlinServerIntegrateTest.java | 14 +++++++--- 6 files changed, 75 insertions(+), 50 deletions(-) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java index d2e393f100..5d8af18a3f 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java @@ -26,6 +26,7 @@ import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import org.apache.tinkerpop.gremlin.driver.handler.GremlinResponseHandler; @@ -33,7 +34,9 @@ import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder; import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseStreamDecoder; import org.apache.tinkerpop.gremlin.driver.handler.SslCheckHandler; import org.apache.tinkerpop.gremlin.util.MessageSerializerV4; +import org.apache.tinkerpop.gremlin.util.message.ResponseMessageV4; +import java.util.Collections; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; @@ -155,6 +158,11 @@ public interface Channelizer extends ChannelHandler { * channelizer. Only sessionless requests are possible. */ final class HttpChannelizer extends AbstractChannelizer { + /** + * This response is used as a signal for determining if all content of the response has been read. + */ + public static final ResponseMessageV4 LAST_CONTENT_READ_RESPONSE = + ResponseMessageV4.build().code(HttpResponseStatus.NO_CONTENT).result(Collections.emptyList()).create(); private HttpGremlinRequestEncoder gremlinRequestEncoder; private HttpGremlinResponseStreamDecoder gremlinResponseDecoder; diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java index 88e8a740e6..83203f964c 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java @@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.driver.handler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.util.AttributeKey; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.tinkerpop.gremlin.driver.Result; import org.apache.tinkerpop.gremlin.driver.ResultQueue; @@ -34,12 +35,15 @@ import org.slf4j.LoggerFactory; import java.util.List; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE; + /** * Takes a map of requests pending responses and writes responses to the {@link ResultQueue} of a request * as the {@link ResponseMessageV4} objects are deserialized. */ public class GremlinResponseHandler extends SimpleChannelInboundHandler<ResponseMessageV4> { private static final Logger logger = LoggerFactory.getLogger(GremlinResponseHandler.class); + private static final AttributeKey<ResponseException> CAUGHT_EXCEPTION = AttributeKey.valueOf("caughtException"); private final AtomicReference<ResultQueue> pending; public GremlinResponseHandler(final AtomicReference<ResultQueue> pending) { @@ -60,26 +64,34 @@ public class GremlinResponseHandler extends SimpleChannelInboundHandler<Response @Override protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final ResponseMessageV4 response) { - final HttpResponseStatus statusCode = response.getStatus() == null ? HttpResponseStatus.PARTIAL_CONTENT : response.getStatus().getCode(); + final HttpResponseStatus statusCode = response.getStatus() == null ? null : response.getStatus().getCode(); final ResultQueue queue = pending.get(); - if (statusCode == HttpResponseStatus.OK || statusCode == HttpResponseStatus.PARTIAL_CONTENT) { + if ((null == statusCode) || (statusCode == HttpResponseStatus.OK)) { final List<Object> data = response.getResult().getData(); // unrolls the collection into individual results to be handled by the queue. data.forEach(item -> queue.add(new Result(item))); } else { // this is a "success" but represents no results otherwise it is an error if (statusCode != HttpResponseStatus.NO_CONTENT) { - queue.markError(new ResponseException(response.getStatus().getCode(), response.getStatus().getMessage(), - response.getStatus().getException())); + // Save the error because there could be a subsequent HttpContent coming (probably just trailers). All + // content should be read first before marking the queue or else this channel might get reused too early. + channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).set( + new ResponseException(response.getStatus().getCode(), response.getStatus().getMessage(), + response.getStatus().getException()) + ); } } - // as this is a non-PARTIAL_CONTENT code - the stream is done. - if (statusCode != HttpResponseStatus.PARTIAL_CONTENT) { - final ResultQueue current = pending.getAndSet(null); - if (current != null) { - current.markComplete(response.getStatus().getAttributes()); + // Stream is done when the last content signaling response message is read. + if (LAST_CONTENT_READ_RESPONSE == response) { + final ResultQueue resultQueue = pending.getAndSet(null); + if (resultQueue != null) { + if (null == channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).get()) { + resultQueue.markComplete(response.getStatus().getAttributes()); + } else { + resultQueue.markError(channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).getAndSet(null)); + } } } } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java index d271e0dc0d..799dcc8f1a 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java @@ -44,6 +44,8 @@ import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper; import java.util.List; import java.util.Objects; +import static org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE; + public class HttpGremlinResponseStreamDecoder extends MessageToMessageDecoder<DefaultHttpObject> { private static final AttributeKey<Boolean> IS_FIRST_CHUNK = AttributeKey.valueOf("isFirstChunk"); @@ -84,13 +86,6 @@ public class HttpGremlinResponseStreamDecoder extends MessageToMessageDecoder<De throw new TooLongFrameException("Response exceeded " + maxContentLength + " bytes."); } - if (msg instanceof LastHttpContent && content.readableBytes() == 0 && bytesRead.get() != 0) { - // If this last content contains no bytes and there were bytes read previously, it means that this is the - // trailing headers. Trailing headers aren't used in the driver and shouldn't be passed on. - content.release(); - return; - } - try { // no more chunks expected if (isError(responseStatus.get()) && !SerTokensV4.MIME_GRAPHBINARY_V4.equals(responseEncoding.get())) { @@ -101,14 +96,15 @@ public class HttpGremlinResponseStreamDecoder extends MessageToMessageDecoder<De .create(); out.add(response); - return; + } else { + final ResponseMessageV4 chunk = serializer.readChunk(content, isFirstChunk.get()); + isFirstChunk.set(false); + out.add(chunk); } - final ResponseMessageV4 chunk = serializer.readChunk(content, isFirstChunk.get()); - - isFirstChunk.set(false); - - out.add(chunk); + if (msg instanceof LastHttpContent) { + out.add(LAST_CONTENT_READ_RESPONSE); + } } catch (SerializationException e) { throw new RuntimeException(e); } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java index a95bcbc8f5..27dc8a0f96 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java @@ -61,29 +61,6 @@ public abstract class AbstractClient implements SimpleClient { return submitAsync(requestMessage).get(180, TimeUnit.SECONDS); } - @Override - public CompletableFuture<List<ResponseMessageV4>> submitAsync(final RequestMessageV4 requestMessage) throws Exception { - final List<ResponseMessageV4> results = new ArrayList<>(); - final CompletableFuture<List<ResponseMessageV4>> f = new CompletableFuture<>(); - callbackResponseHandler.callback = response -> { - // message with trailers - if (f.isDone()) - throw new RuntimeException("A terminating message was already encountered - no more messages should have been received"); - - results.add(response); - - // check if the current message is terminating - if it is then we can mark complete - if (response.getStatus() != null && response.getStatus().getCode() != HttpResponseStatus.PARTIAL_CONTENT - && response.getStatus().getCode() != HttpResponseStatus.PROXY_AUTHENTICATION_REQUIRED) { - f.complete(results); - } - }; - - writeAndFlush(requestMessage); - - return f; - } - static class CallbackResponseHandler extends SimpleChannelInboundHandler<ResponseMessageV4> { public Consumer<ResponseMessageV4> callback; diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java index 7d4b31213f..328535d4d1 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java @@ -20,9 +20,11 @@ package org.apache.tinkerpop.gremlin.driver.simple; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelOption; +import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import org.apache.tinkerpop.gremlin.driver.Channelizer; import org.apache.tinkerpop.gremlin.driver.RequestInterceptor; import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseStreamDecoder; import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder; @@ -35,6 +37,7 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.HttpClientCodec; +import org.apache.tinkerpop.gremlin.util.message.ResponseMessageV4; import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4; import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryMapper; import org.slf4j.Logger; @@ -43,6 +46,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; /** @@ -101,7 +106,6 @@ public class SimpleHttpClient extends AbstractClient { new HttpClientCodec(), new HttpGremlinResponseStreamDecoder(serializer, Integer.MAX_VALUE), new HttpGremlinRequestEncoder(serializer, new ArrayList<>(), false), - callbackResponseHandler); } }); @@ -112,6 +116,28 @@ public class SimpleHttpClient extends AbstractClient { } } + @Override + public CompletableFuture<List<ResponseMessageV4>> submitAsync(final RequestMessageV4 requestMessage) throws Exception { + final List<ResponseMessageV4> results = new ArrayList<>(); + final CompletableFuture<List<ResponseMessageV4>> f = new CompletableFuture<>(); + callbackResponseHandler.callback = response -> { + // message with trailers + if (f.isDone()) + throw new RuntimeException("A terminating message was already encountered - no more messages should have been received"); + + results.add(response); + + // check if the current message is terminating - if it is then we can mark complete + if (Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE == response) { + f.complete(results); + } + }; + + writeAndFlush(requestMessage); + + return f; + } + @Override public void writeAndFlush(final RequestMessageV4 requestMessage) throws Exception { channel.writeAndFlush(requestMessage); diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java index ac153b4dc0..04b267c443 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java @@ -611,7 +611,6 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration final RequestMessageV4 request = RequestMessageV4.build("[0,1,2,3,4,5,6,7,8,9]").create(); final List<ResponseMessageV4> msgs = client.submit(request); - assertEquals(5, client.submit(request).size()); assertEquals(0, (int) msgs.get(0).getResult().getData().get(0)); assertEquals(1, (int) msgs.get(0).getResult().getData().get(1)); assertEquals(2, (int) msgs.get(1).getResult().getData().get(0)); @@ -622,6 +621,9 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration assertEquals(7, (int) msgs.get(3).getResult().getData().get(1)); assertEquals(8, (int) msgs.get(4).getResult().getData().get(0)); assertEquals(9, (int) msgs.get(4).getResult().getData().get(1)); + for (ResponseMessageV4 resp : msgs.subList(5, msgs.size())) { + assertEquals(0, resp.getResult().getData().size()); + } } } @@ -806,8 +808,10 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration try (SimpleClient client = TestClientFactory.createSimpleHttpClient()) { final RequestMessageV4 request = RequestMessageV4.build("10").create(); final List<ResponseMessageV4> responses = client.submit(request); - assertEquals(1, responses.size()); - assertEquals(HttpResponseStatus.OK, responses.get(0).getStatus().getCode()); + assertEquals(10, responses.get(0).getResult().getData().get(0)); + for (ResponseMessageV4 resp : responses.subList(1, responses.size())) { + assertEquals(0, resp.getResult().getData().size()); + } } } @@ -831,7 +835,9 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration final RequestMessageV4 request = RequestMessageV4.build("new String().doNothingAtAllBecauseThis is a syntax error").create(); final List<ResponseMessageV4> responses = client.submit(request); assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, responses.get(0).getStatus().getCode()); - assertEquals(1, responses.size()); + for (ResponseMessageV4 resp : responses.subList(1, responses.size())) { + assertEquals(0, resp.getResult().getData().size()); + } } }
