This is an automated email from the ASF dual-hosted git repository.

kenhuuu pushed a commit to branch http-initial-error-fix
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit cb1d34caeb7ac2cd8c7335766cdf9229fc02b5de
Author: Ken Hu <106191785+kenh...@users.noreply.github.com>
AuthorDate: Tue Jun 11 18:42:16 2024 -0700

    Update timing of returning connection and server pipelining CTR.
    
    The connection was being returned back to the pool too early as
    sometimes the LastHttpContent had not yet been received. Changing it to
    only release after the LastHttpContent has been received will prevent
    accidentally using the same channel for multiple requests.
    
    The server will now ignore incoming requests if it is already handling a
    request which stops pipelining from working altogether. This prevents
    accidentally using the same channel for multiple requests on the server
    side which caused errors due to concurrent modification of channel
    state.
---
 .../tinkerpop/gremlin/driver/Channelizer.java      |  3 ++-
 .../driver/handler/GremlinResponseHandler.java     |  9 ++++----
 .../handler/HttpGremlinResponseStreamDecoder.java  | 24 +++++++++++++++------
 .../server/handler/HttpRequestIdHandler.java       | 25 +++++++++++++++++++++-
 4 files changed, 49 insertions(+), 12 deletions(-)

diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index d2e393f100..41f3f5eb13 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -28,6 +28,7 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.http.HttpClientCodec;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslHandler;
+import io.netty.util.AttributeKey;
 import org.apache.tinkerpop.gremlin.driver.handler.GremlinResponseHandler;
 import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder;
 import 
org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseStreamDecoder;
@@ -155,7 +156,7 @@ public interface Channelizer extends ChannelHandler {
      * channelizer. Only sessionless requests are possible.
      */
     final class HttpChannelizer extends AbstractChannelizer {
-
+        public static final AttributeKey<Boolean> LAST_CONTENT_READ = 
AttributeKey.valueOf("lastContentRead");
         private HttpGremlinRequestEncoder gremlinRequestEncoder;
         private HttpGremlinResponseStreamDecoder gremlinResponseDecoder;
 
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
index 88e8a740e6..8b550c86d2 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
@@ -34,6 +34,8 @@ import org.slf4j.LoggerFactory;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static 
org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ;
+
 /**
  * Takes a map of requests pending responses and writes responses to the 
{@link ResultQueue} of a request
  * as the {@link ResponseMessageV4} objects are deserialized.
@@ -60,10 +62,10 @@ public class GremlinResponseHandler extends 
SimpleChannelInboundHandler<Response
 
     @Override
     protected void channelRead0(final ChannelHandlerContext 
channelHandlerContext, final ResponseMessageV4 response) {
-        final HttpResponseStatus statusCode = response.getStatus() == null ? 
HttpResponseStatus.PARTIAL_CONTENT : response.getStatus().getCode();
+        final HttpResponseStatus statusCode = (response.getStatus() == null) ? 
null : response.getStatus().getCode();
         final ResultQueue queue = pending.get();
 
-        if (statusCode == HttpResponseStatus.OK || statusCode == 
HttpResponseStatus.PARTIAL_CONTENT) {
+        if (null == statusCode || statusCode == HttpResponseStatus.OK) {
             final List<Object> data = response.getResult().getData();
             // unrolls the collection into individual results to be handled by 
the queue.
             data.forEach(item -> queue.add(new Result(item)));
@@ -75,8 +77,7 @@ public class GremlinResponseHandler extends 
SimpleChannelInboundHandler<Response
             }
         }
 
-        // as this is a non-PARTIAL_CONTENT code - the stream is done.
-        if (statusCode != HttpResponseStatus.PARTIAL_CONTENT) {
+        if (channelHandlerContext.channel().attr(LAST_CONTENT_READ).get() == 
true) {
             final ResultQueue current = pending.getAndSet(null);
             if (current != null) {
                 current.markComplete(response.getStatus().getAttributes());
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java
index d271e0dc0d..2478eb6656 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java
@@ -41,16 +41,21 @@ import org.apache.tinkerpop.gremlin.util.ser.SerializersV4;
 import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
 import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
-public class HttpGremlinResponseStreamDecoder extends 
MessageToMessageDecoder<DefaultHttpObject> {
+import static 
org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ;
 
+public class HttpGremlinResponseStreamDecoder extends 
MessageToMessageDecoder<DefaultHttpObject> {
     private static final AttributeKey<Boolean> IS_FIRST_CHUNK = 
AttributeKey.valueOf("isFirstChunk");
     private static final AttributeKey<HttpResponseStatus> RESPONSE_STATUS = 
AttributeKey.valueOf("responseStatus");
     private static final AttributeKey<String> RESPONSE_ENCODING = 
AttributeKey.valueOf("responseSerializer");
     private static final AttributeKey<Integer> BYTES_READ = 
AttributeKey.valueOf("bytesRead");
 
+    private static final ResponseMessageV4 EMPTY_RESPONSE =
+            
ResponseMessageV4.build().code(HttpResponseStatus.OK).result(Collections.emptyList()).create();
+
     private final MessageSerializerV4<?> serializer;
     private final int maxContentLength;
     private final ObjectMapper mapper = new ObjectMapper();
@@ -68,6 +73,7 @@ public class HttpGremlinResponseStreamDecoder extends 
MessageToMessageDecoder<De
 
         if (msg instanceof HttpResponse) {
             ctx.channel().attr(BYTES_READ).set(0);
+            ctx.channel().attr(LAST_CONTENT_READ).set(false);
 
             final HttpResponse resp = (HttpResponse) msg;
             responseStatus.set(resp.status());
@@ -84,16 +90,22 @@ public class HttpGremlinResponseStreamDecoder extends 
MessageToMessageDecoder<De
                 throw new TooLongFrameException("Response exceeded " + 
maxContentLength + " bytes.");
             }
 
-            if (msg instanceof LastHttpContent && content.readableBytes() == 0 
&& bytesRead.get() != 0) {
-                // If this last content contains no bytes and there were bytes 
read previously, it means that this is the
-                // trailing headers. Trailing headers aren't used in the 
driver and shouldn't be passed on.
-                content.release();
-                return;
+            if (msg instanceof LastHttpContent) {
+                ctx.channel().attr(LAST_CONTENT_READ).set(true);
+
+                if (content.readableBytes() == 0 && bytesRead.get() != 0) {
+                    // If this last content contains no bytes and there were 
bytes read previously, it means that this
+                    // is the trailing headers. Trailing headers aren't used 
in the driver and shouldn't be passed on.
+                    content.release();
+                    out.add(EMPTY_RESPONSE);
+                }
             }
 
             try {
                 // no more chunks expected
                 if (isError(responseStatus.get()) && 
!SerTokensV4.MIME_GRAPHBINARY_V4.equals(responseEncoding.get())) {
+                    // There are certain errors that can occur on the server 
before the request is processed so they
+                    // aren't returned in GraphBinary. Assume they are JSON 
responses instead.
                     final JsonNode node = 
mapper.readTree(content.toString(CharsetUtil.UTF_8));
                     final String message = node.get("message").asText();
                     final ResponseMessageV4 response = 
ResponseMessageV4.build()
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandler.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandler.java
index 824fc477c6..02ed6df191 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandler.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandler.java
@@ -22,7 +22,11 @@ import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http.HttpRequest;
 import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.util.AttributeKey;
+import io.netty.util.ReferenceCountUtil;
 
 import java.util.UUID;
 
@@ -31,11 +35,27 @@ import java.util.UUID;
  */
 @ChannelHandler.Sharable
 public class HttpRequestIdHandler extends ChannelDuplexHandler {
+    /**
+     * The key for whether a {@link io.netty.handler.codec.http.HttpResponse} 
has been sent for the current response.
+     */
+    private static final AttributeKey<Boolean> IN_USE = 
AttributeKey.valueOf("inUse");
+
     public static String REQUEST_ID_HEADER_NAME = "Gremlin-RequestId";
 
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
-        ctx.channel().attr(StateKey.REQUEST_ID).set(UUID.randomUUID());
+        if (msg instanceof HttpRequest) {
+            final Boolean currentlyInUse = ctx.channel().attr(IN_USE).get();
+            if (currentlyInUse != null && currentlyInUse == true) {
+                // Pipelining not supported so just ignore the request if 
another request already being handled.
+                ReferenceCountUtil.release(msg);
+                return;
+            }
+
+            ctx.channel().attr(StateKey.REQUEST_ID).set(UUID.randomUUID());
+            ctx.channel().attr(IN_USE).set(true);
+        }
+
         ctx.fireChannelRead(msg);
     }
 
@@ -44,7 +64,10 @@ public class HttpRequestIdHandler extends 
ChannelDuplexHandler {
         if (msg instanceof HttpResponse) {
             HttpResponse response = (HttpResponse) msg;
             response.headers().add(REQUEST_ID_HEADER_NAME, 
ctx.channel().attr(StateKey.REQUEST_ID).get());
+        } else if (msg instanceof LastHttpContent) {
+            ctx.channel().attr(IN_USE).set(false);
         }
+
         ctx.write(msg, promise);
     }
 }

Reply via email to