This is an automated email from the ASF dual-hosted git repository. valentyn pushed a commit to branch valentyn/streaming-serializers in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/valentyn/streaming-serializers by this push: new 090ef4c5ea GraphBinary streaming draft 090ef4c5ea is described below commit 090ef4c5ea2cd59c8ec8765491d53c15a121c120 Author: Valentyn Kahamlyk <valentyn.kaham...@improving.com> AuthorDate: Wed Mar 27 12:27:33 2024 -0700 GraphBinary streaming draft --- .../gremlin/structure/io/binary/DataType.java | 1 + .../gremlin/structure/io/binary/Marker.java | 41 +++++ .../io/binary/TypeSerializerRegistry.java | 1 + .../io/binary/types/SingleTypeSerializer.java | 3 + .../server/handler/HttpGremlinEndpointHandler.java | 8 +- .../gremlin/server/handler/HttpHandlerUtil.java | 2 +- .../server/GremlinServerHttpIntegrateTest.java | 55 +++++- .../gremlin/util/message/ResponseMessage.java | 15 ++ .../util/ser/GraphBinaryMessageSerializerV1.java | 12 +- .../util/ser/GraphBinaryMessageSerializerV4.java | 188 +++++++++++++++++++++ .../util/ser/GraphSONMessageSerializerV4.java | 14 +- .../gremlin/util/ser/MessageChunkSerializer.java | 10 +- .../tinkerpop/gremlin/util/ser/SerTokens.java | 1 + .../tinkerpop/gremlin/util/ser/Serializers.java | 7 +- .../util/ser/GraphSONMessageSerializerV4Test.java | 43 +---- .../binary/GraphBinaryMessageSerializerV4Test.java | 164 ++++++++++++++++++ 16 files changed, 506 insertions(+), 59 deletions(-) diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/DataType.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/DataType.java index e3fd09f16a..246a266fbe 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/DataType.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/DataType.java @@ -90,6 +90,7 @@ public enum DataType { ZONEOFFSET(0X8E), CUSTOM(0), + MARKER(0XFD), UNSPECIFIED_NULL(0XFE); private final int code; diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/Marker.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/Marker.java new file mode 100644 index 0000000000..8f7beaab63 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/Marker.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.structure.io.binary; + +public class Marker { + private final byte value; + + public static Marker END_OF_STREAM = new Marker((byte)0); + + private Marker(final byte value) { + this.value = value; + } + + public byte getValue() { + return value; + } + + public static Marker of(final byte value) { + if (value != 0) { + throw new IllegalArgumentException(); + } + return END_OF_STREAM; + } +} diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/TypeSerializerRegistry.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/TypeSerializerRegistry.java index 9958179318..428bfb4b24 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/TypeSerializerRegistry.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/TypeSerializerRegistry.java @@ -189,6 +189,7 @@ public class TypeSerializerRegistry { new RegistryEntry<>(Tree.class, new TreeSerializer()), new RegistryEntry<>(Metrics.class, new MetricsSerializer()), new RegistryEntry<>(TraversalMetrics.class, new TraversalMetricsSerializer()), + new RegistryEntry<>(Marker.class, SingleTypeSerializer.MarkerSerializer), // TransformSerializer implementations new RegistryEntry<>(Map.Entry.class, new MapEntrySerializer()), diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/types/SingleTypeSerializer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/types/SingleTypeSerializer.java index 4abaddab28..7d163209cf 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/types/SingleTypeSerializer.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/types/SingleTypeSerializer.java @@ -22,6 +22,7 @@ import org.apache.tinkerpop.gremlin.structure.io.binary.DataType; import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader; import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryWriter; import org.apache.tinkerpop.gremlin.structure.io.Buffer; +import org.apache.tinkerpop.gremlin.structure.io.binary.Marker; import java.time.Year; import java.util.function.BiConsumer; @@ -48,6 +49,8 @@ public class SingleTypeSerializer<T> extends SimpleTypeSerializer<T> { new SingleTypeSerializer<>(DataType.BYTE, Buffer::readByte, (v, b) -> b.writeByte(v)); public static final SingleTypeSerializer<Year> YearSerializer = new SingleTypeSerializer<>(DataType.YEAR, bb -> Year.of(bb.readInt()), (v, b) -> b.writeInt(v.getValue())); + public static final SingleTypeSerializer<Marker> MarkerSerializer = + new SingleTypeSerializer<>(DataType.MARKER, bb -> Marker.of(bb.readByte()), (v, b) -> b.writeByte(v.getValue())); private final Function<Buffer, T> readFunc; private final BiConsumer<T, Buffer> writeFunc; diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java index 00e88c72ab..2a84db2ffb 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java @@ -96,7 +96,6 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -115,7 +114,6 @@ import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; import static io.netty.handler.codec.http.HttpResponseStatus.OK; import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; -import static io.netty.handler.codec.http.LastHttpContent.EMPTY_LAST_CONTENT; import static org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtil.sendTrailingHeaders; import static org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtil.writeErrorFrame; @@ -824,12 +822,12 @@ public class HttpGremlinEndpointHandler extends ChannelInboundHandlerAdapter { } ctx.setRequestState(RequestState.STREAMING); - return new Frame(chunkSerializer.writeResponseHeader(responseMessage, nettyContext.alloc())); + return new Frame(chunkSerializer.writeHeader(responseMessage, nettyContext.alloc())); case STREAMING: - return new Frame(chunkSerializer.writeResponseChunk(aggregate, nettyContext.alloc())); + return new Frame(chunkSerializer.writeChunk(aggregate, nettyContext.alloc())); case FINISHING: ctx.setRequestState(RequestState.FINISHED); - return new Frame(chunkSerializer.writeResponseFooter(responseMessage, nettyContext.alloc())); + return new Frame(chunkSerializer.writeFooter(responseMessage, nettyContext.alloc())); } // todo: just throw? diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java index 3ce6cb3f3a..5be5dd2b90 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java @@ -306,7 +306,7 @@ public class HttpHandlerUtil { static void writeErrorFrame(final ChannelHandlerContext ctx, final Context context, ResponseMessage responseMessage, final MessageSerializer<?> serializer) { try { final ByteBuf ByteBuf = context.getRequestState() == HttpGremlinEndpointHandler.RequestState.STREAMING - ? ((MessageChunkSerializer) serializer).writeError(responseMessage, ctx.alloc()) + ? ((MessageChunkSerializer) serializer).writeErrorFooter(responseMessage, ctx.alloc()) : serializer.serializeResponseAsBinary(responseMessage, ctx.alloc()); context.setRequestState(HttpGremlinEndpointHandler.RequestState.ERROR); diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java index 2321bed855..1ab5874d09 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java @@ -38,9 +38,11 @@ import org.apache.tinkerpop.gremlin.util.message.ResponseMessage; import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode; import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV1; import org.apache.tinkerpop.gremlin.util.Tokens; +import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4; import org.apache.tinkerpop.gremlin.util.ser.GraphSONMessageSerializerV1; import org.apache.tinkerpop.gremlin.util.ser.GraphSONMessageSerializerV2; import org.apache.tinkerpop.gremlin.util.ser.GraphSONMessageSerializerV3; +import org.apache.tinkerpop.gremlin.util.ser.GraphSONMessageSerializerV4; import org.apache.tinkerpop.gremlin.util.ser.GraphSONUntypedMessageSerializerV1; import org.apache.tinkerpop.gremlin.util.ser.GraphSONUntypedMessageSerializerV2; import org.apache.tinkerpop.gremlin.util.ser.GraphSONUntypedMessageSerializerV3; @@ -178,6 +180,20 @@ public class GremlinServerHttpIntegrateTest extends AbstractGremlinServerIntegra settings.evaluationTimeout = 5000; settings.gremlinPool = 1; break; + case "should200OnPOSTWithChunkedResponse": + case "shouldHandleErrorsInFirstChunkPOSTWithChunkedResponse": + case "shouldHandleErrorsNotInFirstChunkPOSTWithChunkedResponse": + settings.serializers.clear(); + final Settings.SerializerSettings serializerSettingsV4 = new Settings.SerializerSettings(); + serializerSettingsV4.className = GraphSONMessageSerializerV4.class.getName(); + settings.serializers.add(serializerSettingsV4); + break; + case "should200OnPOSTWithChunkedResponseGraphBinary": + settings.serializers.clear(); + final Settings.SerializerSettings serializerSettingsV4b = new Settings.SerializerSettings(); + serializerSettingsV4b.className = GraphBinaryMessageSerializerV4.class.getName(); + settings.serializers.add(serializerSettingsV4b); + break; } return settings; } @@ -1145,7 +1161,40 @@ public class GremlinServerHttpIntegrateTest extends AbstractGremlinServerIntegra assertEquals(200, response.getStatusLine().getStatusCode()); assertTrue(response.getEntity().isChunked()); - String json = EntityUtils.toString(response.getEntity()); + final String json = EntityUtils.toString(response.getEntity()); + final JsonNode node = mapper.readTree(json); + assertEquals(8, node.get("result").get("data").get(GraphSONTokens.VALUEPROP).get(8).get(GraphSONTokens.VALUEPROP).intValue()); + assertEquals("ten", node.get("result").get("data").get(GraphSONTokens.VALUEPROP).get(10).textValue()); + assertEquals("new chunk", node.get("result").get("data").get(GraphSONTokens.VALUEPROP).get(16).textValue()); + + final Header[] footers = getTrailingHeaders(response); + assertEquals(2, footers.length); + assertEquals("code", footers[0].getName()); + assertEquals("200", footers[0].getValue()); + assertEquals("message", footers[1].getName()); + assertEquals("OK", footers[1].getValue()); + } + } + + @Test + public void should200OnPOSTWithChunkedResponseGraphBinary() throws Exception { + final String gremlin = "g.inject(0,1,2,3,4,5,6,7,8,9,'ten',11,12,13,14,15,'new chunk')"; + final GraphBinaryMessageSerializerV4 serializer = new GraphBinaryMessageSerializerV4(); + final ByteBuf serializedRequest = serializer.serializeRequestAsBinary( + RequestMessage.build(Tokens.OPS_EVAL).addArg(Tokens.ARGS_GREMLIN, gremlin).create(), + new UnpooledByteBufAllocator(false)); + + final CloseableHttpClient httpclient = HttpClients.createDefault(); + final HttpPost httppost = new HttpPost(TestClientFactory.createURLString()); + httppost.addHeader(HttpHeaders.CONTENT_TYPE, Serializers.GRAPHBINARY_V4.getValue()); + httppost.addHeader(HttpHeaders.ACCEPT, Serializers.GRAPHBINARY_V4.getValue()); + httppost.setEntity(new ByteArrayEntity(serializedRequest.array())); + + try (final CloseableHttpResponse response = httpclient.execute(httppost)) { + assertEquals(200, response.getStatusLine().getStatusCode()); + assertTrue(response.getEntity().isChunked()); + + final String json = EntityUtils.toString(response.getEntity()); final JsonNode node = mapper.readTree(json); assertEquals(8, node.get("result").get("data").get(GraphSONTokens.VALUEPROP).get(8).get(GraphSONTokens.VALUEPROP).intValue()); assertEquals("ten", node.get("result").get("data").get(GraphSONTokens.VALUEPROP).get(10).textValue()); @@ -1173,7 +1222,7 @@ public class GremlinServerHttpIntegrateTest extends AbstractGremlinServerIntegra assertEquals(200, response.getStatusLine().getStatusCode()); assertTrue(response.getEntity().isChunked()); - String json = EntityUtils.toString(response.getEntity()); + final String json = EntityUtils.toString(response.getEntity()); final JsonNode node = mapper.readTree(json); assertEquals("some error", node.get("status").get("message").textValue()); assertEquals(500, node.get("status").get("code").intValue()); @@ -1200,7 +1249,7 @@ public class GremlinServerHttpIntegrateTest extends AbstractGremlinServerIntegra assertEquals(200, response.getStatusLine().getStatusCode()); assertTrue(response.getEntity().isChunked()); - String json = EntityUtils.toString(response.getEntity()); + final String json = EntityUtils.toString(response.getEntity()); final JsonNode node = mapper.readTree(json); assertEquals(0, node.get("result").get("data").get(GraphSONTokens.VALUEPROP).get(0).get(GraphSONTokens.VALUEPROP).intValue()); assertEquals("some error", node.get("status").get("message").textValue()); diff --git a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/ResponseMessage.java b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/ResponseMessage.java index aa9f2bdf7d..a644a4dd4b 100644 --- a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/ResponseMessage.java +++ b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/ResponseMessage.java @@ -99,6 +99,10 @@ public final class ResponseMessage { return new Builder(requestId); } + public static Builder buildV4(final UUID requestId) { + return new Builder(requestId, true); + } + public final static class Builder { private final UUID requestId; @@ -116,6 +120,13 @@ public final class ResponseMessage { this.requestId = requestId; } + // builder for TP4 + private Builder(final UUID requestId, final boolean v4) { + this.requestId = requestId; + this.code = null; + this.statusMessage = null; + } + public Builder code(final ResponseStatusCode code) { this.code = code; return this; @@ -157,6 +168,10 @@ public final class ResponseMessage { public ResponseMessage create() { final ResponseResult responseResult = new ResponseResult(result, metaData); + // skip null values + if (code == null && statusMessage == null) { + return new ResponseMessage(requestId, null, responseResult); + } final ResponseStatus responseStatus = new ResponseStatus(code, statusMessage, attributes); return new ResponseMessage(requestId, responseStatus, responseResult); } diff --git a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphBinaryMessageSerializerV1.java b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphBinaryMessageSerializerV1.java index fe4b051715..17f80518ed 100644 --- a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphBinaryMessageSerializerV1.java +++ b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphBinaryMessageSerializerV1.java @@ -58,12 +58,12 @@ public class GraphBinaryMessageSerializerV1 extends AbstractMessageSerializer<Gr private static final Base64.Decoder base64Decoder = Base64.getDecoder(); private byte[] header = MIME_TYPE.getBytes(UTF_8); - private boolean serializeToString = false; - private GraphBinaryReader reader; - private GraphBinaryWriter writer; - private RequestMessageSerializer requestSerializer; - private ResponseMessageSerializer responseSerializer; - private GraphBinaryMapper mapper; + protected boolean serializeToString = false; + protected GraphBinaryReader reader; + protected GraphBinaryWriter writer; + protected RequestMessageSerializer requestSerializer; + protected ResponseMessageSerializer responseSerializer; + protected GraphBinaryMapper mapper; /** * Creates a new instance of the message serializer using the default type serializers. diff --git a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphBinaryMessageSerializerV4.java b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphBinaryMessageSerializerV4.java new file mode 100644 index 0000000000..510e1a9594 --- /dev/null +++ b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphBinaryMessageSerializerV4.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.util.ser; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import org.apache.tinkerpop.gremlin.structure.io.Buffer; +import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryMapper; +import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryWriter; +import org.apache.tinkerpop.gremlin.structure.io.binary.Marker; +import org.apache.tinkerpop.gremlin.structure.io.binary.TypeSerializerRegistry; +import org.apache.tinkerpop.gremlin.util.message.ResponseMessage; +import org.apache.tinkerpop.gremlin.util.message.ResponseStatus; +import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode; +import org.javatuples.Pair; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; +import java.util.UUID; + +public class GraphBinaryMessageSerializerV4 extends GraphBinaryMessageSerializerV1 + implements MessageChunkSerializer<GraphBinaryMapper> { + + private static final NettyBufferFactory bufferFactory = new NettyBufferFactory(); + private static final String MIME_TYPE = SerTokens.MIME_GRAPHBINARY_V4; + + public GraphBinaryMessageSerializerV4() { + this(TypeSerializerRegistry.INSTANCE); + } + + public GraphBinaryMessageSerializerV4(final TypeSerializerRegistry registry) { + super(registry); + } + + @Override + public String[] mimeTypesSupported() { + return new String[]{MIME_TYPE}; + } + + + @Override + public ByteBuf writeHeader(final ResponseMessage responseMessage, final ByteBufAllocator allocator) throws SerializationException { + // todo: write data or not when error? + final EnumSet<MessageParts> parts = responseMessage.getStatus() != null ? MessageParts.ALL : MessageParts.START; + + return write(responseMessage, null, allocator, parts); + } + + @Override + public ByteBuf writeChunk(final Object aggregate, final ByteBufAllocator allocator) throws SerializationException { + return write(null, aggregate, allocator, MessageParts.CHUNK); + } + + @Override + public ByteBuf writeFooter(final ResponseMessage responseMessage, final ByteBufAllocator allocator) throws SerializationException { + return write(responseMessage, null, allocator, MessageParts.END); + } + + @Override + public ByteBuf writeErrorFooter(final ResponseMessage responseMessage, final ByteBufAllocator allocator) throws SerializationException { + return write(responseMessage, null, allocator, MessageParts.ERROR); + } + + private ByteBuf write(final ResponseMessage responseMessage, final Object aggregate, + final ByteBufAllocator allocator, final EnumSet<MessageParts> parts) throws SerializationException { + final ByteBuf byteBuf = allocator.buffer(); + final Buffer buffer = bufferFactory.create(byteBuf); + + try { + if (parts.contains(MessageParts.HEADER)) { + // Version + buffer.writeByte(GraphBinaryWriter.VERSION_BYTE); + + // Nullable request id + writer.writeValue(responseMessage.getRequestId(), buffer, true); + } + + if (parts.contains(MessageParts.DATA)) { + final Object data = aggregate == null && responseMessage.getResult() != null + ? responseMessage.getResult().getData() + : aggregate; + if (data != null) { + for (final Object item : (List) data) { + writer.write(item, buffer); + } + } + } + + if (parts.contains(MessageParts.FOOTER)) { + final ResponseStatus status = responseMessage.getStatus(); + + // we don't know how much data we have, so need a special object + writer.write(Marker.END_OF_STREAM, buffer); + // Status code + writer.writeValue(status.getCode().getValue(), buffer, false); + // Nullable status message + writer.writeValue(status.getMessage(), buffer, true); + } + } catch (IOException e) { + throw new SerializationException(e); + } + return byteBuf; + } + + // ------- read message methods + private List<Object> readPayload(final Buffer buffer) throws IOException { + final List<Object> result = new ArrayList<>(); + while (buffer.readableBytes() != 0) { + final Object obj = reader.read(buffer); + if (obj instanceof Marker && ((Marker) obj).getValue() == 0) { + break; + } + result.add(obj); + } + return result; + } + + private Pair<ResponseStatusCode, String> readFooter(final Buffer buffer) throws IOException { + return Pair.with(ResponseStatusCode.getFromValue(reader.readValue(buffer, Integer.class, false)), + reader.readValue(buffer, String.class, true)); + } + + @Override + public ResponseMessage readChunk(final ByteBuf byteBuf, final boolean isFirstChunk) throws SerializationException { + final Buffer buffer = bufferFactory.create(byteBuf); + + try { + if (isFirstChunk) { + final int version = buffer.readByte() & 0xff; + + if (version >>> 7 != 1) { + // This is an indication that the response buffer was incorrectly built + // Or the buffer offsets are wrong + throw new SerializationException("The most significant bit should be set according to the format"); + } + } + + final UUID requestId = isFirstChunk ? reader.readValue(buffer, UUID.class, true) : null; + + final List<Object> result = readPayload(buffer); + + // no footer + if (buffer.readableBytes() == 0) { + return ResponseMessage.build(requestId) + .result(result) + .create(); + } + + final Pair<ResponseStatusCode, String> footer = readFooter(buffer); + return ResponseMessage.build(requestId) + .result(result) + .code(footer.getValue0()) + .statusMessage(footer.getValue1()) + .create(); + + } catch (IOException ex) { + throw new SerializationException(ex); + } + } + + private enum MessageParts { + HEADER, DATA, FOOTER; + + public static final EnumSet<MessageParts> ALL = EnumSet.of(HEADER, DATA, FOOTER); + public static final EnumSet<MessageParts> START = EnumSet.of(HEADER, DATA); + public static final EnumSet<MessageParts> CHUNK = EnumSet.of(DATA); + public static final EnumSet<MessageParts> END = EnumSet.of(DATA, FOOTER); + public static final EnumSet<MessageParts> ERROR = EnumSet.of(FOOTER); + } +} diff --git a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphSONMessageSerializerV4.java b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphSONMessageSerializerV4.java index ca981986b2..031ac0e1cc 100644 --- a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphSONMessageSerializerV4.java +++ b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphSONMessageSerializerV4.java @@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.util.ser; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.util.ReferenceCountUtil; +import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader; import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper; import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONUtil; import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONVersion; @@ -152,7 +153,7 @@ public final class GraphSONMessageSerializerV4 extends AbstractGraphSONMessageSe // !!! @Override - public ByteBuf writeResponseHeader(final ResponseMessage responseMessage, final ByteBufAllocator allocator) throws SerializationException { + public ByteBuf writeHeader(final ResponseMessage responseMessage, final ByteBufAllocator allocator) throws SerializationException { ByteBuf encodedMessage = null; try { final byte[] header = mapper.writeValueAsBytes(new ResponseMessage.ResponseMessageHeader(responseMessage)); @@ -185,7 +186,7 @@ public final class GraphSONMessageSerializerV4 extends AbstractGraphSONMessageSe } @Override - public ByteBuf writeResponseChunk(final Object aggregate, final ByteBufAllocator allocator) throws SerializationException { + public ByteBuf writeChunk(final Object aggregate, final ByteBufAllocator allocator) throws SerializationException { ByteBuf encodedMessage = null; try { final byte[] payload = getChunk(false, aggregate); @@ -202,7 +203,7 @@ public final class GraphSONMessageSerializerV4 extends AbstractGraphSONMessageSe } @Override - public ByteBuf writeResponseFooter(final ResponseMessage responseMessage, final ByteBufAllocator allocator) throws SerializationException { + public ByteBuf writeFooter(final ResponseMessage responseMessage, final ByteBufAllocator allocator) throws SerializationException { ByteBuf encodedMessage = null; try { final byte[] footer = mapper.writeValueAsBytes(new ResponseMessage.ResponseMessageFooter(responseMessage)); @@ -222,7 +223,7 @@ public final class GraphSONMessageSerializerV4 extends AbstractGraphSONMessageSe } @Override - public ByteBuf writeError(final ResponseMessage responseMessage, final ByteBufAllocator allocator) throws SerializationException { + public ByteBuf writeErrorFooter(final ResponseMessage responseMessage, final ByteBufAllocator allocator) throws SerializationException { ByteBuf encodedMessage = null; try { final byte[] footer = mapper.writeValueAsBytes(new ResponseMessage.ResponseMessageFooter(responseMessage)); @@ -239,6 +240,11 @@ public final class GraphSONMessageSerializerV4 extends AbstractGraphSONMessageSe } } + @Override + public ResponseMessage readChunk(final ByteBuf byteBuf, final boolean isFirstChunk) { + throw new IllegalStateException("Reading for streaming GraphSON is not supported"); + } + public final static class ResponseMessageHeaderSerializer extends StdSerializer<ResponseMessage.ResponseMessageHeader> { public ResponseMessageHeaderSerializer() { super(ResponseMessage.ResponseMessageHeader.class); diff --git a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/MessageChunkSerializer.java b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/MessageChunkSerializer.java index 3167b185cf..cabd9db75c 100644 --- a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/MessageChunkSerializer.java +++ b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/MessageChunkSerializer.java @@ -24,11 +24,13 @@ import org.apache.tinkerpop.gremlin.util.MessageSerializer; import org.apache.tinkerpop.gremlin.util.message.ResponseMessage; public interface MessageChunkSerializer<M> extends MessageSerializer<M> { - public ByteBuf writeResponseHeader(final ResponseMessage responseMessage, final ByteBufAllocator allocator) throws SerializationException; + public ByteBuf writeHeader(final ResponseMessage responseMessage, final ByteBufAllocator allocator) throws SerializationException; - public ByteBuf writeResponseChunk(final Object aggregate, final ByteBufAllocator allocator) throws SerializationException; + public ByteBuf writeChunk(final Object aggregate, final ByteBufAllocator allocator) throws SerializationException; - public ByteBuf writeResponseFooter(final ResponseMessage responseMessage, final ByteBufAllocator allocator) throws SerializationException; + public ByteBuf writeFooter(final ResponseMessage responseMessage, final ByteBufAllocator allocator) throws SerializationException; - public ByteBuf writeError(final ResponseMessage responseMessage, final ByteBufAllocator allocator) throws SerializationException; + public ByteBuf writeErrorFooter(final ResponseMessage responseMessage, final ByteBufAllocator allocator) throws SerializationException; + + public ResponseMessage readChunk(final ByteBuf byteBuf, final boolean isFirstChunk) throws SerializationException; } diff --git a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/SerTokens.java b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/SerTokens.java index 8e20f531af..20b8fae69c 100644 --- a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/SerTokens.java +++ b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/SerTokens.java @@ -45,4 +45,5 @@ public final class SerTokens { public static final String MIME_GRAPHSON_V3_UNTYPED = "application/vnd.gremlin-v3.0+json;types=false"; public static final String MIME_GRAPHSON_V4 = "application/vnd.gremlin-v4.0+json"; public static final String MIME_GRAPHBINARY_V1 = "application/vnd.graphbinary-v1.0"; + public static final String MIME_GRAPHBINARY_V4 = "application/vnd.graphbinary-v4.0"; } diff --git a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/Serializers.java b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/Serializers.java index 3aad5e36ee..dd1ce3e3ca 100644 --- a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/Serializers.java +++ b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/Serializers.java @@ -65,7 +65,12 @@ public enum Serializers { /** * GraphBinary 1.0. */ - GRAPHBINARY_V1(SerTokens.MIME_GRAPHBINARY_V1); + GRAPHBINARY_V1(SerTokens.MIME_GRAPHBINARY_V1), + + /** + * GraphBinary 4.0. + */ + GRAPHBINARY_V4(SerTokens.MIME_GRAPHBINARY_V4); private String value; diff --git a/gremlin-util/src/test/java/org/apache/tinkerpop/gremlin/util/ser/GraphSONMessageSerializerV4Test.java b/gremlin-util/src/test/java/org/apache/tinkerpop/gremlin/util/ser/GraphSONMessageSerializerV4Test.java index 67db315a20..27cb804a3b 100644 --- a/gremlin-util/src/test/java/org/apache/tinkerpop/gremlin/util/ser/GraphSONMessageSerializerV4Test.java +++ b/gremlin-util/src/test/java/org/apache/tinkerpop/gremlin/util/ser/GraphSONMessageSerializerV4Test.java @@ -19,42 +19,15 @@ package org.apache.tinkerpop.gremlin.util.ser; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.util.CharsetUtil; -import org.apache.tinkerpop.gremlin.process.traversal.Traversal; -import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; -import org.apache.tinkerpop.gremlin.process.traversal.step.util.Tree; -import org.apache.tinkerpop.gremlin.structure.Edge; -import org.apache.tinkerpop.gremlin.structure.Graph; -import org.apache.tinkerpop.gremlin.structure.Property; -import org.apache.tinkerpop.gremlin.structure.Vertex; -import org.apache.tinkerpop.gremlin.structure.VertexProperty; -import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper; -import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONXModuleV3; -import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph; -import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory; -import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; -import org.apache.tinkerpop.gremlin.util.MessageSerializer; -import org.apache.tinkerpop.gremlin.util.Tokens; -import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; -import org.apache.tinkerpop.gremlin.util.message.RequestMessage; import org.apache.tinkerpop.gremlin.util.message.ResponseMessage; import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode; -import org.apache.tinkerpop.shaded.jackson.databind.JsonMappingException; import org.junit.Test; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Date; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.UUID; -import static org.apache.tinkerpop.gremlin.util.MockitoHamcrestMatcherAdapter.reflectionEquals; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasItemInArray; import static org.junit.Assert.assertEquals; @@ -82,10 +55,10 @@ public class GraphSONMessageSerializerV4Test extends GraphSONMessageSerializerV3 .statusMessage("OK") .create(); - final ByteBuf bb0 = serializer.writeResponseHeader(response, allocator); - final ByteBuf bb1 = serializer.writeResponseChunk(Arrays.asList("chunk", 1), allocator); - final ByteBuf bb2 = serializer.writeResponseChunk(Arrays.asList("chunk", 2), allocator); - final ByteBuf bb3 = serializer.writeResponseFooter(response, allocator); + final ByteBuf bb0 = serializer.writeHeader(response, allocator); + final ByteBuf bb1 = serializer.writeChunk(Arrays.asList("chunk", 1), allocator); + final ByteBuf bb2 = serializer.writeChunk(Arrays.asList("chunk", 2), allocator); + final ByteBuf bb3 = serializer.writeFooter(response, allocator); final ByteBuf bbCombined = allocator.buffer(bb0.readableBytes() + bb1.readableBytes() + bb2.readableBytes() + bb3.readableBytes()); @@ -111,10 +84,10 @@ public class GraphSONMessageSerializerV4Test extends GraphSONMessageSerializerV3 .statusMessage("OK") .create(); - final ByteBuf bb0 = serializer.writeResponseHeader(response, allocator); - final ByteBuf bb1 = serializer.writeResponseChunk(Arrays.asList("chunk", 1), allocator); - final ByteBuf bb2 = serializer.writeResponseChunk(Arrays.asList("chunk", 2), allocator); - final ByteBuf bb3 = serializer.writeError(response, allocator); + final ByteBuf bb0 = serializer.writeHeader(response, allocator); + final ByteBuf bb1 = serializer.writeChunk(Arrays.asList("chunk", 1), allocator); + final ByteBuf bb2 = serializer.writeChunk(Arrays.asList("chunk", 2), allocator); + final ByteBuf bb3 = serializer.writeErrorFooter(response, allocator); final ByteBuf bbCombined = allocator.buffer(bb0.readableBytes() + bb1.readableBytes() + bb2.readableBytes() + bb3.readableBytes()); diff --git a/gremlin-util/src/test/java/org/apache/tinkerpop/gremlin/util/ser/binary/GraphBinaryMessageSerializerV4Test.java b/gremlin-util/src/test/java/org/apache/tinkerpop/gremlin/util/ser/binary/GraphBinaryMessageSerializerV4Test.java new file mode 100644 index 0000000000..daeb368ab4 --- /dev/null +++ b/gremlin-util/src/test/java/org/apache/tinkerpop/gremlin/util/ser/binary/GraphBinaryMessageSerializerV4Test.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.util.ser.binary; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import org.apache.tinkerpop.gremlin.util.message.ResponseMessage; +import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode; +import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4; +import org.apache.tinkerpop.gremlin.util.ser.SerializationException; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.tinkerpop.shaded.jackson.databind.type.LogicalType.Collection; +import static org.junit.Assert.assertEquals; + +public class GraphBinaryMessageSerializerV4Test { + + private final ByteBufAllocator allocator = ByteBufAllocator.DEFAULT; + private final GraphBinaryMessageSerializerV4 serializer = new GraphBinaryMessageSerializerV4(); + + @Test + public void shouldSerializeAndDeserializeResponseInSingleChunk() throws SerializationException { + final ResponseMessage response = ResponseMessage.build(UUID.randomUUID()) + .code(ResponseStatusCode.SUCCESS) + .statusMessage("OK") + .result(Arrays.asList(1, "test")) + .create(); + + final ByteBuf buffer = serializer.writeHeader(response, allocator); + final ResponseMessage deserialized = serializer.readChunk(buffer, true); + assertResponseEquals(response, deserialized); + } + + @Test + public void shouldSerializeAndDeserializeResponseInHeaderChunk() throws SerializationException { + final ResponseMessage response = ResponseMessage.build(UUID.randomUUID()) + .result(Arrays.asList(1, "test")) + .create(); + + final ByteBuf buffer = serializer.writeHeader(response, allocator); + final ResponseMessage deserialized = serializer.readChunk(buffer, true); + assertResponseEquals(response, deserialized); + } + + @Test + public void shouldSerializeAndDeserializeResponseInDataChunk() throws SerializationException { + final List data = Arrays.asList(1, "test"); + final ByteBuf buffer = serializer.writeChunk(data, allocator); + final ResponseMessage deserialized = serializer.readChunk(buffer, false); + + assertEquals(data, deserialized.getResult().getData()); + } + + @Test + public void shouldSerializeAndDeserializeResponseInFooterChunk() throws SerializationException { + final ResponseMessage response = ResponseMessage.build((UUID)null) + .result(Arrays.asList(1, "test")) + .code(ResponseStatusCode.SUCCESS) + .statusMessage("OK") + .create(); + + final ByteBuf buffer = serializer.writeFooter(response, allocator); + final ResponseMessage deserialized = serializer.readChunk(buffer, false); + assertResponseEquals(response, deserialized); + } + + @Test + public void shouldSerializeAndDeserializeErrorResponseWithEmptyData() throws SerializationException { + final ResponseMessage response = ResponseMessage.build(UUID.randomUUID()) + .code(ResponseStatusCode.FORBIDDEN) + .statusMessage("FORBIDDEN") + .create(); + + final ByteBuf buffer = serializer.writeHeader(response, allocator); + final ResponseMessage deserialized = serializer.readChunk(buffer, true); + assertResponseEquals(response, deserialized); + } + + @Test + public void shouldSerializeAndDeserializeCompositeResponse() throws SerializationException { + final List headerData = Arrays.asList(0, "header"); + final ResponseMessage header = ResponseMessage.buildV4(UUID.randomUUID()) + .result(headerData) + .create(); + + final List chunkData1 = Arrays.asList(1, "data1"); + final List chunkData2 = Arrays.asList(2, "data2"); + + final List footerData = Arrays.asList(0xFF, "footer"); + final ResponseMessage footer = ResponseMessage.build((UUID)null) + .result(footerData) + .code(ResponseStatusCode.SUCCESS) + .statusMessage("OK") + .create(); + + final ByteBuf bb0 = serializer.writeHeader(header, allocator); + final ByteBuf bb1 = serializer.writeChunk(chunkData1, allocator); + final ByteBuf bb2 = serializer.writeChunk(chunkData2, allocator); + final ByteBuf bb3 = serializer.writeFooter(footer, allocator); + + final ByteBuf bbCombined = allocator.buffer(bb0.readableBytes() + bb1.readableBytes() + + bb2.readableBytes() + bb3.readableBytes()); + + bbCombined.writeBytes(bb0); + bbCombined.writeBytes(bb1); + bbCombined.writeBytes(bb2); + bbCombined.writeBytes(bb3); + + final ResponseMessage deserialized = serializer.readChunk(bbCombined, true); + + assertEquals(header.getRequestId(), deserialized.getRequestId()); + // Status + assertEquals(footer.getStatus().getCode(), deserialized.getStatus().getCode()); + assertEquals(footer.getStatus().getMessage(), deserialized.getStatus().getMessage()); + // Result + List<Integer> combinedData = new ArrayList<>(); + Stream.of(headerData, chunkData1, chunkData2, footerData).forEach(combinedData::addAll); + assertEquals(combinedData, deserialized.getResult().getData()); + } + + + // copy-paste because response format will be different + private static void assertResponseEquals(final ResponseMessage expected, final ResponseMessage actual) { + assertEquals(expected.getRequestId(), actual.getRequestId()); + // Status + assertEquals(expected.getStatus().getCode(), actual.getStatus().getCode()); + assertEquals(expected.getStatus().getMessage(), actual.getStatus().getMessage()); + assertEquals(expected.getStatus().getAttributes(), actual.getStatus().getAttributes()); + // Result + // null == empty List + if (!isEmptyData(expected) && !isEmptyData(actual)) { + assertEquals(expected.getResult().getData(), actual.getResult().getData()); + } + assertEquals(expected.getResult().getMeta(), actual.getResult().getMeta()); + } + + private static boolean isEmptyData(final ResponseMessage responseMessage) { + return responseMessage.getResult() == null || responseMessage.getResult().getData() == null || + ((List) responseMessage.getResult().getData()).isEmpty(); + } +}