This is an automated email from the ASF dual-hosted git repository. kenhuuu pushed a commit to branch http-server-test-updates in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 0776043e175f438c7db5bd44ce2370083ad28d16 Author: Ken Hu <106191785+kenh...@users.noreply.github.com> AuthorDate: Mon May 20 12:20:06 2024 -0700 Fix max content length handling to gremlin-driver. Updated to match old behavior where the incoming size of the data is checked so that it doesn't exceed the maxContentLength setting. --- .../apache/tinkerpop/gremlin/driver/Channelizer.java | 2 +- .../driver/handler/HttpGremlinRequestEncoder.java | 1 + .../handler/HttpGremlinResponseStreamDecoder.java | 18 +++++++++++++++--- .../gremlin/driver/simple/SimpleHttpClient.java | 2 +- .../gremlin/driver/ClientConnectionIntegrateTest.java | 7 ++++--- 5 files changed, 22 insertions(+), 8 deletions(-) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java index 7ff0b7e2e8..4caa029484 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java @@ -157,7 +157,7 @@ public interface Channelizer extends ChannelHandler { super.init(connection); gremlinRequestEncoder = new HttpGremlinRequestEncoder(cluster.getSerializer(), cluster.getRequestInterceptor(), cluster.isUserAgentOnConnectEnabled()); - gremlinResponseDecoder = new HttpGremlinResponseStreamDecoder(cluster.getSerializer()); + gremlinResponseDecoder = new HttpGremlinResponseStreamDecoder(cluster.getSerializer(), cluster.getMaxContentLength()); } @Override diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java index 31a564d57e..2d9c51c73e 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java @@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageEncoder; +import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpHeaderNames; diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java index 825a55ada2..8d8f626017 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java @@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.driver.handler; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.codec.http.DefaultHttpObject; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpHeaders; @@ -44,12 +45,15 @@ public class HttpGremlinResponseStreamDecoder extends MessageToMessageDecoder<De private static final AttributeKey<Boolean> IS_FIRST_CHUNK = AttributeKey.valueOf("isFirstChunk"); private static final AttributeKey<HttpResponseStatus> RESPONSE_STATUS = AttributeKey.valueOf("responseStatus"); + private static final AttributeKey<Integer> BYTES_READ = AttributeKey.valueOf("bytesRead"); private final MessageSerializerV4<?> serializer; + private final int maxContentLength; private final ObjectMapper mapper = new ObjectMapper(); - public HttpGremlinResponseStreamDecoder(MessageSerializerV4<?> serializer) { + public HttpGremlinResponseStreamDecoder(final MessageSerializerV4<?> serializer, final int maxContentLength) { this.serializer = serializer; + this.maxContentLength = maxContentLength; } @Override @@ -65,14 +69,22 @@ public class HttpGremlinResponseStreamDecoder extends MessageToMessageDecoder<De } isFirstChunk.set(true); + ctx.channel().attr(BYTES_READ).set(0); } if (msg instanceof HttpContent) { + ByteBuf content = ((HttpContent) msg).content(); + Attribute<Integer> bytesRead = ctx.channel().attr(BYTES_READ); + bytesRead.set(bytesRead.get() + content.readableBytes()); + if (bytesRead.get() > maxContentLength) { + ctx.fireExceptionCaught(new TooLongFrameException("Response exceeded " + maxContentLength + " bytes.")); + } + try { // with error status we can get json in response // no more chunks expected if (isError(responseStatus.get())) { - final JsonNode node = mapper.readTree(((HttpContent) msg).content().toString(CharsetUtil.UTF_8)); + final JsonNode node = mapper.readTree(content.toString(CharsetUtil.UTF_8)); final String message = node.get("message").asText(); final ResponseMessageV4 response = ResponseMessageV4.build() .code(responseStatus.get()).statusMessage(message) @@ -82,7 +94,7 @@ public class HttpGremlinResponseStreamDecoder extends MessageToMessageDecoder<De return; } - final ResponseMessageV4 chunk = serializer.readChunk(((HttpContent) msg).content(), isFirstChunk.get()); + final ResponseMessageV4 chunk = serializer.readChunk(content, isFirstChunk.get()); if (msg instanceof LastHttpContent) { final HttpHeaders trailingHeaders = ((LastHttpContent) msg).trailingHeaders(); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java index 893ba7e30e..7d4b31213f 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java @@ -99,7 +99,7 @@ public class SimpleHttpClient extends AbstractClient { } p.addLast( new HttpClientCodec(), - new HttpGremlinResponseStreamDecoder(serializer), + new HttpGremlinResponseStreamDecoder(serializer, Integer.MAX_VALUE), new HttpGremlinRequestEncoder(serializer, new ArrayList<>(), false), callbackResponseHandler); diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java index bc6e5c6546..ef93ff19b5 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java @@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.driver; import ch.qos.logback.classic.Level; import ch.qos.logback.classic.Logger; import io.netty.handler.codec.CorruptedFrameException; +import io.netty.handler.codec.TooLongFrameException; import nl.altindag.log.LogCaptor; import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4; import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException; @@ -110,7 +111,7 @@ public class ClientConnectionIntegrateTest extends AbstractGremlinServerIntegrat fail("Should throw an exception."); } catch (Exception re) { - assertThat(re.getCause() instanceof CorruptedFrameException, is(true)); + assertThat(re.getCause() instanceof TooLongFrameException, is(true)); } // without this wait this test is failing randomly on docker/travis with ConcurrentModificationException @@ -183,7 +184,7 @@ public class ClientConnectionIntegrateTest extends AbstractGremlinServerIntegrat } @Test - public void overLimitOperationsShouldDelegateToSingleNewConnection() throws InterruptedException { + public void overLimitOperationsShouldCreateNewHttpConnectionPerRequestAsNeeded() throws InterruptedException { final int operations = 6; final Cluster cluster = TestClientFactory.build() .minConnectionPoolSize(1) @@ -218,7 +219,7 @@ public class ClientConnectionIntegrateTest extends AbstractGremlinServerIntegrat } }); - assertEquals(2, connectionBorrowCount.size()); + assertEquals(operations, connectionBorrowCount.size()); for (int finalBorrowCount : connectionBorrowCount.values()) { assertEquals(1, finalBorrowCount); }