This is an automated email from the ASF dual-hosted git repository. kenhuuu pushed a commit to branch http-initial-error-fix in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit cb1d34caeb7ac2cd8c7335766cdf9229fc02b5de Author: Ken Hu <106191785+kenh...@users.noreply.github.com> AuthorDate: Tue Jun 11 18:42:16 2024 -0700 Update timing of returning connection and server pipelining CTR. The connection was being returned back to the pool too early as sometimes the LastHttpContent had not yet been received. Changing it to only release after the LastHttpContent has been received will prevent accidentally using the same channel for multiple requests. The server will now ignore incoming requests if it is already handling a request which stops pipelining from working altogether. This prevents accidentally using the same channel for multiple requests on the server side which caused errors due to concurrent modification of channel state. --- .../tinkerpop/gremlin/driver/Channelizer.java | 3 ++- .../driver/handler/GremlinResponseHandler.java | 9 ++++---- .../handler/HttpGremlinResponseStreamDecoder.java | 24 +++++++++++++++------ .../server/handler/HttpRequestIdHandler.java | 25 +++++++++++++++++++++- 4 files changed, 49 insertions(+), 12 deletions(-) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java index d2e393f100..41f3f5eb13 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java @@ -28,6 +28,7 @@ import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; +import io.netty.util.AttributeKey; import org.apache.tinkerpop.gremlin.driver.handler.GremlinResponseHandler; import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder; import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseStreamDecoder; @@ -155,7 +156,7 @@ public interface Channelizer extends ChannelHandler { * channelizer. Only sessionless requests are possible. */ final class HttpChannelizer extends AbstractChannelizer { - + public static final AttributeKey<Boolean> LAST_CONTENT_READ = AttributeKey.valueOf("lastContentRead"); private HttpGremlinRequestEncoder gremlinRequestEncoder; private HttpGremlinResponseStreamDecoder gremlinResponseDecoder; diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java index 88e8a740e6..8b550c86d2 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java @@ -34,6 +34,8 @@ import org.slf4j.LoggerFactory; import java.util.List; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ; + /** * Takes a map of requests pending responses and writes responses to the {@link ResultQueue} of a request * as the {@link ResponseMessageV4} objects are deserialized. @@ -60,10 +62,10 @@ public class GremlinResponseHandler extends SimpleChannelInboundHandler<Response @Override protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final ResponseMessageV4 response) { - final HttpResponseStatus statusCode = response.getStatus() == null ? HttpResponseStatus.PARTIAL_CONTENT : response.getStatus().getCode(); + final HttpResponseStatus statusCode = (response.getStatus() == null) ? null : response.getStatus().getCode(); final ResultQueue queue = pending.get(); - if (statusCode == HttpResponseStatus.OK || statusCode == HttpResponseStatus.PARTIAL_CONTENT) { + if (null == statusCode || statusCode == HttpResponseStatus.OK) { final List<Object> data = response.getResult().getData(); // unrolls the collection into individual results to be handled by the queue. data.forEach(item -> queue.add(new Result(item))); @@ -75,8 +77,7 @@ public class GremlinResponseHandler extends SimpleChannelInboundHandler<Response } } - // as this is a non-PARTIAL_CONTENT code - the stream is done. - if (statusCode != HttpResponseStatus.PARTIAL_CONTENT) { + if (channelHandlerContext.channel().attr(LAST_CONTENT_READ).get() == true) { final ResultQueue current = pending.getAndSet(null); if (current != null) { current.markComplete(response.getStatus().getAttributes()); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java index d271e0dc0d..2478eb6656 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java @@ -41,16 +41,21 @@ import org.apache.tinkerpop.gremlin.util.ser.SerializersV4; import org.apache.tinkerpop.shaded.jackson.databind.JsonNode; import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper; +import java.util.Collections; import java.util.List; import java.util.Objects; -public class HttpGremlinResponseStreamDecoder extends MessageToMessageDecoder<DefaultHttpObject> { +import static org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ; +public class HttpGremlinResponseStreamDecoder extends MessageToMessageDecoder<DefaultHttpObject> { private static final AttributeKey<Boolean> IS_FIRST_CHUNK = AttributeKey.valueOf("isFirstChunk"); private static final AttributeKey<HttpResponseStatus> RESPONSE_STATUS = AttributeKey.valueOf("responseStatus"); private static final AttributeKey<String> RESPONSE_ENCODING = AttributeKey.valueOf("responseSerializer"); private static final AttributeKey<Integer> BYTES_READ = AttributeKey.valueOf("bytesRead"); + private static final ResponseMessageV4 EMPTY_RESPONSE = + ResponseMessageV4.build().code(HttpResponseStatus.OK).result(Collections.emptyList()).create(); + private final MessageSerializerV4<?> serializer; private final int maxContentLength; private final ObjectMapper mapper = new ObjectMapper(); @@ -68,6 +73,7 @@ public class HttpGremlinResponseStreamDecoder extends MessageToMessageDecoder<De if (msg instanceof HttpResponse) { ctx.channel().attr(BYTES_READ).set(0); + ctx.channel().attr(LAST_CONTENT_READ).set(false); final HttpResponse resp = (HttpResponse) msg; responseStatus.set(resp.status()); @@ -84,16 +90,22 @@ public class HttpGremlinResponseStreamDecoder extends MessageToMessageDecoder<De throw new TooLongFrameException("Response exceeded " + maxContentLength + " bytes."); } - if (msg instanceof LastHttpContent && content.readableBytes() == 0 && bytesRead.get() != 0) { - // If this last content contains no bytes and there were bytes read previously, it means that this is the - // trailing headers. Trailing headers aren't used in the driver and shouldn't be passed on. - content.release(); - return; + if (msg instanceof LastHttpContent) { + ctx.channel().attr(LAST_CONTENT_READ).set(true); + + if (content.readableBytes() == 0 && bytesRead.get() != 0) { + // If this last content contains no bytes and there were bytes read previously, it means that this + // is the trailing headers. Trailing headers aren't used in the driver and shouldn't be passed on. + content.release(); + out.add(EMPTY_RESPONSE); + } } try { // no more chunks expected if (isError(responseStatus.get()) && !SerTokensV4.MIME_GRAPHBINARY_V4.equals(responseEncoding.get())) { + // There are certain errors that can occur on the server before the request is processed so they + // aren't returned in GraphBinary. Assume they are JSON responses instead. final JsonNode node = mapper.readTree(content.toString(CharsetUtil.UTF_8)); final String message = node.get("message").asText(); final ResponseMessageV4 response = ResponseMessageV4.build() diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandler.java index 824fc477c6..02ed6df191 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandler.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandler.java @@ -22,7 +22,11 @@ import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.util.AttributeKey; +import io.netty.util.ReferenceCountUtil; import java.util.UUID; @@ -31,11 +35,27 @@ import java.util.UUID; */ @ChannelHandler.Sharable public class HttpRequestIdHandler extends ChannelDuplexHandler { + /** + * The key for whether a {@link io.netty.handler.codec.http.HttpResponse} has been sent for the current response. + */ + private static final AttributeKey<Boolean> IN_USE = AttributeKey.valueOf("inUse"); + public static String REQUEST_ID_HEADER_NAME = "Gremlin-RequestId"; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - ctx.channel().attr(StateKey.REQUEST_ID).set(UUID.randomUUID()); + if (msg instanceof HttpRequest) { + final Boolean currentlyInUse = ctx.channel().attr(IN_USE).get(); + if (currentlyInUse != null && currentlyInUse == true) { + // Pipelining not supported so just ignore the request if another request already being handled. + ReferenceCountUtil.release(msg); + return; + } + + ctx.channel().attr(StateKey.REQUEST_ID).set(UUID.randomUUID()); + ctx.channel().attr(IN_USE).set(true); + } + ctx.fireChannelRead(msg); } @@ -44,7 +64,10 @@ public class HttpRequestIdHandler extends ChannelDuplexHandler { if (msg instanceof HttpResponse) { HttpResponse response = (HttpResponse) msg; response.headers().add(REQUEST_ID_HEADER_NAME, ctx.channel().attr(StateKey.REQUEST_ID).get()); + } else if (msg instanceof LastHttpContent) { + ctx.channel().attr(IN_USE).set(false); } + ctx.write(msg, promise); } }