This is an automated email from the ASF dual-hosted git repository. kenhuuu pushed a commit to branch master-http in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 60596ce8c3160068009ae4a2569f602371625ce6 Author: Ken Hu <[email protected]> AuthorDate: Tue Jun 11 18:42:16 2024 -0700 Check for server pipelining CTR. The server will now ignore incoming requests if it is already handling a request which stops pipelining from working altogether. This prevents accidentally using the same channel for multiple requests on the server side which caused errors due to concurrent modification of channel state. --- .../server/handler/HttpRequestIdHandler.java | 32 +++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandler.java index 824fc477c6..19822a057b 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandler.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestIdHandler.java @@ -22,7 +22,11 @@ import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.util.AttributeKey; +import io.netty.util.ReferenceCountUtil; import java.util.UUID; @@ -31,20 +35,46 @@ import java.util.UUID; */ @ChannelHandler.Sharable public class HttpRequestIdHandler extends ChannelDuplexHandler { + /** + * The key for whether a {@link io.netty.handler.codec.http.HttpResponse} has been sent for the current response. + */ + private static final AttributeKey<Boolean> IN_USE = AttributeKey.valueOf("inUse"); + public static String REQUEST_ID_HEADER_NAME = "Gremlin-RequestId"; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - ctx.channel().attr(StateKey.REQUEST_ID).set(UUID.randomUUID()); + if (msg instanceof HttpRequest) { + final Boolean currentlyInUse = ctx.channel().attr(IN_USE).get(); + if (currentlyInUse != null && currentlyInUse == true) { + // Pipelining not supported so just ignore the request if another request already being handled. + ReferenceCountUtil.release(msg); + return; + } + + ctx.channel().attr(StateKey.REQUEST_ID).set(UUID.randomUUID()); + ctx.channel().attr(IN_USE).set(true); + } + ctx.fireChannelRead(msg); } + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.channel().attr(IN_USE).set(false); + super.exceptionCaught(ctx, cause); + } + @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (msg instanceof HttpResponse) { HttpResponse response = (HttpResponse) msg; response.headers().add(REQUEST_ID_HEADER_NAME, ctx.channel().attr(StateKey.REQUEST_ID).get()); } + if (msg instanceof LastHttpContent) { // possible for an object to be both HttpResponse and LastHttpContent. + ctx.channel().attr(IN_USE).set(false); + } + ctx.write(msg, promise); } }
