This is an automated email from the ASF dual-hosted git repository.

valentyn pushed a commit to branch valentyn/streaming-serializers
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/valentyn/streaming-serializers 
by this push:
     new 090ef4c5ea GraphBinary streaming draft
090ef4c5ea is described below

commit 090ef4c5ea2cd59c8ec8765491d53c15a121c120
Author: Valentyn Kahamlyk <valentyn.kaham...@improving.com>
AuthorDate: Wed Mar 27 12:27:33 2024 -0700

    GraphBinary streaming draft
---
 .../gremlin/structure/io/binary/DataType.java      |   1 +
 .../gremlin/structure/io/binary/Marker.java        |  41 +++++
 .../io/binary/TypeSerializerRegistry.java          |   1 +
 .../io/binary/types/SingleTypeSerializer.java      |   3 +
 .../server/handler/HttpGremlinEndpointHandler.java |   8 +-
 .../gremlin/server/handler/HttpHandlerUtil.java    |   2 +-
 .../server/GremlinServerHttpIntegrateTest.java     |  55 +++++-
 .../gremlin/util/message/ResponseMessage.java      |  15 ++
 .../util/ser/GraphBinaryMessageSerializerV1.java   |  12 +-
 .../util/ser/GraphBinaryMessageSerializerV4.java   | 188 +++++++++++++++++++++
 .../util/ser/GraphSONMessageSerializerV4.java      |  14 +-
 .../gremlin/util/ser/MessageChunkSerializer.java   |  10 +-
 .../tinkerpop/gremlin/util/ser/SerTokens.java      |   1 +
 .../tinkerpop/gremlin/util/ser/Serializers.java    |   7 +-
 .../util/ser/GraphSONMessageSerializerV4Test.java  |  43 +----
 .../binary/GraphBinaryMessageSerializerV4Test.java | 164 ++++++++++++++++++
 16 files changed, 506 insertions(+), 59 deletions(-)

diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/DataType.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/DataType.java
index e3fd09f16a..246a266fbe 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/DataType.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/DataType.java
@@ -90,6 +90,7 @@ public enum DataType {
     ZONEOFFSET(0X8E),
 
     CUSTOM(0),
+    MARKER(0XFD),
     UNSPECIFIED_NULL(0XFE);
 
     private final int code;
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/Marker.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/Marker.java
new file mode 100644
index 0000000000..8f7beaab63
--- /dev/null
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/Marker.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.structure.io.binary;
+
+public class Marker {
+    private final byte value;
+
+    public static Marker END_OF_STREAM = new Marker((byte)0);
+
+    private Marker(final byte value) {
+        this.value = value;
+    }
+
+    public byte getValue() {
+        return value;
+    }
+
+    public static Marker of(final byte value) {
+        if (value != 0) {
+            throw new IllegalArgumentException();
+        }
+        return END_OF_STREAM;
+    }
+}
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/TypeSerializerRegistry.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/TypeSerializerRegistry.java
index 9958179318..428bfb4b24 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/TypeSerializerRegistry.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/TypeSerializerRegistry.java
@@ -189,6 +189,7 @@ public class TypeSerializerRegistry {
             new RegistryEntry<>(Tree.class, new TreeSerializer()),
             new RegistryEntry<>(Metrics.class, new MetricsSerializer()),
             new RegistryEntry<>(TraversalMetrics.class, new 
TraversalMetricsSerializer()),
+            new RegistryEntry<>(Marker.class, 
SingleTypeSerializer.MarkerSerializer),
 
             // TransformSerializer implementations
             new RegistryEntry<>(Map.Entry.class, new MapEntrySerializer()),
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/types/SingleTypeSerializer.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/types/SingleTypeSerializer.java
index 4abaddab28..7d163209cf 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/types/SingleTypeSerializer.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/types/SingleTypeSerializer.java
@@ -22,6 +22,7 @@ import 
org.apache.tinkerpop.gremlin.structure.io.binary.DataType;
 import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader;
 import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryWriter;
 import org.apache.tinkerpop.gremlin.structure.io.Buffer;
+import org.apache.tinkerpop.gremlin.structure.io.binary.Marker;
 
 import java.time.Year;
 import java.util.function.BiConsumer;
@@ -48,6 +49,8 @@ public class SingleTypeSerializer<T> extends 
SimpleTypeSerializer<T> {
             new SingleTypeSerializer<>(DataType.BYTE, Buffer::readByte, (v, b) 
-> b.writeByte(v));
     public static final SingleTypeSerializer<Year> YearSerializer =
             new SingleTypeSerializer<>(DataType.YEAR, bb -> 
Year.of(bb.readInt()), (v, b) -> b.writeInt(v.getValue()));
+    public static final SingleTypeSerializer<Marker> MarkerSerializer =
+            new SingleTypeSerializer<>(DataType.MARKER, bb -> 
Marker.of(bb.readByte()), (v, b) -> b.writeByte(v.getValue()));
 
     private final Function<Buffer, T> readFunc;
     private final BiConsumer<T, Buffer> writeFunc;
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
index 00e88c72ab..2a84db2ffb 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
@@ -96,7 +96,6 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -115,7 +114,6 @@ import static 
io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
 import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
 import static io.netty.handler.codec.http.HttpResponseStatus.OK;
 import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-import static io.netty.handler.codec.http.LastHttpContent.EMPTY_LAST_CONTENT;
 import static 
org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtil.sendTrailingHeaders;
 import static 
org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtil.writeErrorFrame;
 
@@ -824,12 +822,12 @@ public class HttpGremlinEndpointHandler extends 
ChannelInboundHandlerAdapter {
                         }
 
                         ctx.setRequestState(RequestState.STREAMING);
-                        return new 
Frame(chunkSerializer.writeResponseHeader(responseMessage, 
nettyContext.alloc()));
+                        return new 
Frame(chunkSerializer.writeHeader(responseMessage, nettyContext.alloc()));
                     case STREAMING:
-                        return new 
Frame(chunkSerializer.writeResponseChunk(aggregate, nettyContext.alloc()));
+                        return new Frame(chunkSerializer.writeChunk(aggregate, 
nettyContext.alloc()));
                     case FINISHING:
                         ctx.setRequestState(RequestState.FINISHED);
-                        return new 
Frame(chunkSerializer.writeResponseFooter(responseMessage, 
nettyContext.alloc()));
+                        return new 
Frame(chunkSerializer.writeFooter(responseMessage, nettyContext.alloc()));
                 }
 
                 // todo: just throw?
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
index 3ce6cb3f3a..5be5dd2b90 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
@@ -306,7 +306,7 @@ public class HttpHandlerUtil {
     static void writeErrorFrame(final ChannelHandlerContext ctx, final Context 
context, ResponseMessage responseMessage, final MessageSerializer<?> 
serializer) {
         try {
             final ByteBuf ByteBuf = context.getRequestState() == 
HttpGremlinEndpointHandler.RequestState.STREAMING
-                    ? ((MessageChunkSerializer) 
serializer).writeError(responseMessage, ctx.alloc())
+                    ? ((MessageChunkSerializer) 
serializer).writeErrorFooter(responseMessage, ctx.alloc())
                     : serializer.serializeResponseAsBinary(responseMessage, 
ctx.alloc());
 
             
context.setRequestState(HttpGremlinEndpointHandler.RequestState.ERROR);
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java
index 2321bed855..1ab5874d09 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpIntegrateTest.java
@@ -38,9 +38,11 @@ import 
org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
 import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode;
 import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV1;
 import org.apache.tinkerpop.gremlin.util.Tokens;
+import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
 import org.apache.tinkerpop.gremlin.util.ser.GraphSONMessageSerializerV1;
 import org.apache.tinkerpop.gremlin.util.ser.GraphSONMessageSerializerV2;
 import org.apache.tinkerpop.gremlin.util.ser.GraphSONMessageSerializerV3;
+import org.apache.tinkerpop.gremlin.util.ser.GraphSONMessageSerializerV4;
 import 
org.apache.tinkerpop.gremlin.util.ser.GraphSONUntypedMessageSerializerV1;
 import 
org.apache.tinkerpop.gremlin.util.ser.GraphSONUntypedMessageSerializerV2;
 import 
org.apache.tinkerpop.gremlin.util.ser.GraphSONUntypedMessageSerializerV3;
@@ -178,6 +180,20 @@ public class GremlinServerHttpIntegrateTest extends 
AbstractGremlinServerIntegra
                 settings.evaluationTimeout = 5000;
                 settings.gremlinPool = 1;
                 break;
+            case "should200OnPOSTWithChunkedResponse":
+            case "shouldHandleErrorsInFirstChunkPOSTWithChunkedResponse":
+            case "shouldHandleErrorsNotInFirstChunkPOSTWithChunkedResponse":
+                settings.serializers.clear();
+                final Settings.SerializerSettings serializerSettingsV4 = new 
Settings.SerializerSettings();
+                serializerSettingsV4.className = 
GraphSONMessageSerializerV4.class.getName();
+                settings.serializers.add(serializerSettingsV4);
+                break;
+            case "should200OnPOSTWithChunkedResponseGraphBinary":
+                settings.serializers.clear();
+                final Settings.SerializerSettings serializerSettingsV4b = new 
Settings.SerializerSettings();
+                serializerSettingsV4b.className = 
GraphBinaryMessageSerializerV4.class.getName();
+                settings.serializers.add(serializerSettingsV4b);
+                break;
         }
         return settings;
     }
@@ -1145,7 +1161,40 @@ public class GremlinServerHttpIntegrateTest extends 
AbstractGremlinServerIntegra
             assertEquals(200, response.getStatusLine().getStatusCode());
             assertTrue(response.getEntity().isChunked());
 
-            String json = EntityUtils.toString(response.getEntity());
+            final String json = EntityUtils.toString(response.getEntity());
+            final JsonNode node = mapper.readTree(json);
+            assertEquals(8, 
node.get("result").get("data").get(GraphSONTokens.VALUEPROP).get(8).get(GraphSONTokens.VALUEPROP).intValue());
+            assertEquals("ten", 
node.get("result").get("data").get(GraphSONTokens.VALUEPROP).get(10).textValue());
+            assertEquals("new chunk", 
node.get("result").get("data").get(GraphSONTokens.VALUEPROP).get(16).textValue());
+
+            final Header[] footers = getTrailingHeaders(response);
+            assertEquals(2, footers.length);
+            assertEquals("code", footers[0].getName());
+            assertEquals("200", footers[0].getValue());
+            assertEquals("message", footers[1].getName());
+            assertEquals("OK", footers[1].getValue());
+        }
+    }
+
+    @Test
+    public void should200OnPOSTWithChunkedResponseGraphBinary() throws 
Exception {
+        final String gremlin = 
"g.inject(0,1,2,3,4,5,6,7,8,9,'ten',11,12,13,14,15,'new chunk')";
+        final GraphBinaryMessageSerializerV4 serializer = new 
GraphBinaryMessageSerializerV4();
+        final ByteBuf serializedRequest = serializer.serializeRequestAsBinary(
+                
RequestMessage.build(Tokens.OPS_EVAL).addArg(Tokens.ARGS_GREMLIN, 
gremlin).create(),
+                new UnpooledByteBufAllocator(false));
+
+        final CloseableHttpClient httpclient = HttpClients.createDefault();
+        final HttpPost httppost = new 
HttpPost(TestClientFactory.createURLString());
+        httppost.addHeader(HttpHeaders.CONTENT_TYPE, 
Serializers.GRAPHBINARY_V4.getValue());
+        httppost.addHeader(HttpHeaders.ACCEPT, 
Serializers.GRAPHBINARY_V4.getValue());
+        httppost.setEntity(new ByteArrayEntity(serializedRequest.array()));
+
+        try (final CloseableHttpResponse response = 
httpclient.execute(httppost)) {
+            assertEquals(200, response.getStatusLine().getStatusCode());
+            assertTrue(response.getEntity().isChunked());
+
+            final String json = EntityUtils.toString(response.getEntity());
             final JsonNode node = mapper.readTree(json);
             assertEquals(8, 
node.get("result").get("data").get(GraphSONTokens.VALUEPROP).get(8).get(GraphSONTokens.VALUEPROP).intValue());
             assertEquals("ten", 
node.get("result").get("data").get(GraphSONTokens.VALUEPROP).get(10).textValue());
@@ -1173,7 +1222,7 @@ public class GremlinServerHttpIntegrateTest extends 
AbstractGremlinServerIntegra
             assertEquals(200, response.getStatusLine().getStatusCode());
             assertTrue(response.getEntity().isChunked());
 
-            String json = EntityUtils.toString(response.getEntity());
+            final String json = EntityUtils.toString(response.getEntity());
             final JsonNode node = mapper.readTree(json);
             assertEquals("some error", 
node.get("status").get("message").textValue());
             assertEquals(500, node.get("status").get("code").intValue());
@@ -1200,7 +1249,7 @@ public class GremlinServerHttpIntegrateTest extends 
AbstractGremlinServerIntegra
             assertEquals(200, response.getStatusLine().getStatusCode());
             assertTrue(response.getEntity().isChunked());
 
-            String json = EntityUtils.toString(response.getEntity());
+            final String json = EntityUtils.toString(response.getEntity());
             final JsonNode node = mapper.readTree(json);
             assertEquals(0, 
node.get("result").get("data").get(GraphSONTokens.VALUEPROP).get(0).get(GraphSONTokens.VALUEPROP).intValue());
             assertEquals("some error", 
node.get("status").get("message").textValue());
diff --git 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/ResponseMessage.java
 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/ResponseMessage.java
index aa9f2bdf7d..a644a4dd4b 100644
--- 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/ResponseMessage.java
+++ 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/ResponseMessage.java
@@ -99,6 +99,10 @@ public final class ResponseMessage {
         return new Builder(requestId);
     }
 
+    public static Builder buildV4(final UUID requestId) {
+        return new Builder(requestId, true);
+    }
+
     public final static class Builder {
 
         private final UUID requestId;
@@ -116,6 +120,13 @@ public final class ResponseMessage {
             this.requestId = requestId;
         }
 
+        // builder for TP4
+        private Builder(final UUID requestId, final boolean v4) {
+            this.requestId = requestId;
+            this.code = null;
+            this.statusMessage = null;
+        }
+
         public Builder code(final ResponseStatusCode code) {
             this.code = code;
             return this;
@@ -157,6 +168,10 @@ public final class ResponseMessage {
 
         public ResponseMessage create() {
             final ResponseResult responseResult = new ResponseResult(result, 
metaData);
+            // skip null values
+            if (code == null && statusMessage == null) {
+                return new ResponseMessage(requestId, null, responseResult);
+            }
             final ResponseStatus responseStatus = new ResponseStatus(code, 
statusMessage, attributes);
             return new ResponseMessage(requestId, responseStatus, 
responseResult);
         }
diff --git 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphBinaryMessageSerializerV1.java
 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphBinaryMessageSerializerV1.java
index fe4b051715..17f80518ed 100644
--- 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphBinaryMessageSerializerV1.java
+++ 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphBinaryMessageSerializerV1.java
@@ -58,12 +58,12 @@ public class GraphBinaryMessageSerializerV1 extends 
AbstractMessageSerializer<Gr
     private static final Base64.Decoder base64Decoder = Base64.getDecoder();
 
     private byte[] header = MIME_TYPE.getBytes(UTF_8);
-    private boolean serializeToString = false;
-    private GraphBinaryReader reader;
-    private GraphBinaryWriter writer;
-    private RequestMessageSerializer requestSerializer;
-    private ResponseMessageSerializer responseSerializer;
-    private GraphBinaryMapper mapper;
+    protected boolean serializeToString = false;
+    protected GraphBinaryReader reader;
+    protected GraphBinaryWriter writer;
+    protected RequestMessageSerializer requestSerializer;
+    protected ResponseMessageSerializer responseSerializer;
+    protected GraphBinaryMapper mapper;
 
     /**
      * Creates a new instance of the message serializer using the default type 
serializers.
diff --git 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphBinaryMessageSerializerV4.java
 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphBinaryMessageSerializerV4.java
new file mode 100644
index 0000000000..510e1a9594
--- /dev/null
+++ 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphBinaryMessageSerializerV4.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.util.ser;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import org.apache.tinkerpop.gremlin.structure.io.Buffer;
+import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryMapper;
+import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryWriter;
+import org.apache.tinkerpop.gremlin.structure.io.binary.Marker;
+import org.apache.tinkerpop.gremlin.structure.io.binary.TypeSerializerRegistry;
+import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.util.message.ResponseStatus;
+import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode;
+import org.javatuples.Pair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.UUID;
+
+public class GraphBinaryMessageSerializerV4 extends 
GraphBinaryMessageSerializerV1
+    implements MessageChunkSerializer<GraphBinaryMapper> {
+
+    private static final NettyBufferFactory bufferFactory = new 
NettyBufferFactory();
+    private static final String MIME_TYPE = SerTokens.MIME_GRAPHBINARY_V4;
+
+    public GraphBinaryMessageSerializerV4() {
+        this(TypeSerializerRegistry.INSTANCE);
+    }
+
+    public GraphBinaryMessageSerializerV4(final TypeSerializerRegistry 
registry) {
+        super(registry);
+    }
+
+    @Override
+    public String[] mimeTypesSupported() {
+        return new String[]{MIME_TYPE};
+    }
+
+
+    @Override
+    public ByteBuf writeHeader(final ResponseMessage responseMessage, final 
ByteBufAllocator allocator) throws SerializationException {
+        // todo: write data or not when error?
+        final EnumSet<MessageParts> parts = responseMessage.getStatus() != 
null ? MessageParts.ALL : MessageParts.START;
+
+        return write(responseMessage, null, allocator, parts);
+    }
+
+    @Override
+    public ByteBuf writeChunk(final Object aggregate, final ByteBufAllocator 
allocator) throws SerializationException {
+        return write(null, aggregate, allocator, MessageParts.CHUNK);
+    }
+
+    @Override
+    public ByteBuf writeFooter(final ResponseMessage responseMessage, final 
ByteBufAllocator allocator) throws SerializationException {
+        return write(responseMessage, null, allocator, MessageParts.END);
+    }
+
+    @Override
+    public ByteBuf writeErrorFooter(final ResponseMessage responseMessage, 
final ByteBufAllocator allocator) throws SerializationException {
+        return write(responseMessage, null, allocator, MessageParts.ERROR);
+    }
+
+    private ByteBuf write(final ResponseMessage responseMessage, final Object 
aggregate,
+                          final ByteBufAllocator allocator, final 
EnumSet<MessageParts> parts) throws SerializationException {
+        final ByteBuf byteBuf = allocator.buffer();
+        final Buffer buffer = bufferFactory.create(byteBuf);
+
+        try {
+            if (parts.contains(MessageParts.HEADER)) {
+                // Version
+                buffer.writeByte(GraphBinaryWriter.VERSION_BYTE);
+
+                // Nullable request id
+                writer.writeValue(responseMessage.getRequestId(), buffer, 
true);
+            }
+
+            if (parts.contains(MessageParts.DATA)) {
+                final Object data = aggregate == null && 
responseMessage.getResult() != null
+                        ? responseMessage.getResult().getData()
+                        : aggregate;
+                if (data != null) {
+                    for (final Object item : (List) data) {
+                        writer.write(item, buffer);
+                    }
+                }
+            }
+
+            if (parts.contains(MessageParts.FOOTER)) {
+                final ResponseStatus status = responseMessage.getStatus();
+
+                // we don't know how much data we have, so need a special 
object
+                writer.write(Marker.END_OF_STREAM, buffer);
+                // Status code
+                writer.writeValue(status.getCode().getValue(), buffer, false);
+                // Nullable status message
+                writer.writeValue(status.getMessage(), buffer, true);
+            }
+        } catch (IOException e) {
+            throw new SerializationException(e);
+        }
+        return byteBuf;
+    }
+
+    // ------- read message methods
+    private List<Object> readPayload(final Buffer buffer) throws IOException {
+        final List<Object> result = new ArrayList<>();
+        while (buffer.readableBytes() != 0) {
+            final Object obj = reader.read(buffer);
+            if (obj instanceof Marker && ((Marker) obj).getValue() == 0) {
+                break;
+            }
+            result.add(obj);
+        }
+        return result;
+    }
+
+    private Pair<ResponseStatusCode, String> readFooter(final Buffer buffer) 
throws IOException {
+        return 
Pair.with(ResponseStatusCode.getFromValue(reader.readValue(buffer, 
Integer.class, false)),
+                reader.readValue(buffer, String.class, true));
+    }
+
+    @Override
+    public ResponseMessage readChunk(final ByteBuf byteBuf, final boolean 
isFirstChunk) throws SerializationException {
+        final Buffer buffer = bufferFactory.create(byteBuf);
+
+        try {
+            if (isFirstChunk) {
+                final int version = buffer.readByte() & 0xff;
+
+                if (version >>> 7 != 1) {
+                    // This is an indication that the response buffer was 
incorrectly built
+                    // Or the buffer offsets are wrong
+                    throw new SerializationException("The most significant bit 
should be set according to the format");
+                }
+            }
+
+            final UUID requestId = isFirstChunk ? reader.readValue(buffer, 
UUID.class, true) : null;
+
+            final List<Object> result = readPayload(buffer);
+
+            // no footer
+            if (buffer.readableBytes() == 0) {
+                return ResponseMessage.build(requestId)
+                        .result(result)
+                        .create();
+            }
+
+            final Pair<ResponseStatusCode, String> footer = readFooter(buffer);
+            return ResponseMessage.build(requestId)
+                    .result(result)
+                    .code(footer.getValue0())
+                    .statusMessage(footer.getValue1())
+                    .create();
+
+        } catch (IOException ex) {
+            throw new SerializationException(ex);
+        }
+    }
+
+    private enum MessageParts {
+        HEADER, DATA, FOOTER;
+
+        public static final EnumSet<MessageParts> ALL = EnumSet.of(HEADER, 
DATA, FOOTER);
+        public static final EnumSet<MessageParts> START = EnumSet.of(HEADER, 
DATA);
+        public static final EnumSet<MessageParts> CHUNK = EnumSet.of(DATA);
+        public static final EnumSet<MessageParts> END = EnumSet.of(DATA, 
FOOTER);
+        public static final EnumSet<MessageParts> ERROR = EnumSet.of(FOOTER);
+    }
+}
diff --git 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphSONMessageSerializerV4.java
 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphSONMessageSerializerV4.java
index ca981986b2..031ac0e1cc 100644
--- 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphSONMessageSerializerV4.java
+++ 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphSONMessageSerializerV4.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.util.ser;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.util.ReferenceCountUtil;
+import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader;
 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper;
 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONUtil;
 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONVersion;
@@ -152,7 +153,7 @@ public final class GraphSONMessageSerializerV4 extends 
AbstractGraphSONMessageSe
 
     // !!!
     @Override
-    public ByteBuf writeResponseHeader(final ResponseMessage responseMessage, 
final ByteBufAllocator allocator) throws SerializationException {
+    public ByteBuf writeHeader(final ResponseMessage responseMessage, final 
ByteBufAllocator allocator) throws SerializationException {
         ByteBuf encodedMessage = null;
         try {
             final byte[] header = mapper.writeValueAsBytes(new 
ResponseMessage.ResponseMessageHeader(responseMessage));
@@ -185,7 +186,7 @@ public final class GraphSONMessageSerializerV4 extends 
AbstractGraphSONMessageSe
     }
 
     @Override
-    public ByteBuf writeResponseChunk(final Object aggregate, final 
ByteBufAllocator allocator) throws SerializationException {
+    public ByteBuf writeChunk(final Object aggregate, final ByteBufAllocator 
allocator) throws SerializationException {
         ByteBuf encodedMessage = null;
         try {
             final byte[] payload = getChunk(false, aggregate);
@@ -202,7 +203,7 @@ public final class GraphSONMessageSerializerV4 extends 
AbstractGraphSONMessageSe
     }
 
     @Override
-    public ByteBuf writeResponseFooter(final ResponseMessage responseMessage, 
final ByteBufAllocator allocator) throws SerializationException {
+    public ByteBuf writeFooter(final ResponseMessage responseMessage, final 
ByteBufAllocator allocator) throws SerializationException {
         ByteBuf encodedMessage = null;
         try {
             final byte[] footer = mapper.writeValueAsBytes(new 
ResponseMessage.ResponseMessageFooter(responseMessage));
@@ -222,7 +223,7 @@ public final class GraphSONMessageSerializerV4 extends 
AbstractGraphSONMessageSe
     }
 
     @Override
-    public ByteBuf writeError(final ResponseMessage responseMessage, final 
ByteBufAllocator allocator) throws SerializationException {
+    public ByteBuf writeErrorFooter(final ResponseMessage responseMessage, 
final ByteBufAllocator allocator) throws SerializationException {
         ByteBuf encodedMessage = null;
         try {
             final byte[] footer = mapper.writeValueAsBytes(new 
ResponseMessage.ResponseMessageFooter(responseMessage));
@@ -239,6 +240,11 @@ public final class GraphSONMessageSerializerV4 extends 
AbstractGraphSONMessageSe
         }
     }
 
+    @Override
+    public ResponseMessage readChunk(final ByteBuf byteBuf, final boolean 
isFirstChunk) {
+        throw new IllegalStateException("Reading for streaming GraphSON is not 
supported");
+    }
+
     public final static class ResponseMessageHeaderSerializer extends 
StdSerializer<ResponseMessage.ResponseMessageHeader> {
         public ResponseMessageHeaderSerializer() {
             super(ResponseMessage.ResponseMessageHeader.class);
diff --git 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/MessageChunkSerializer.java
 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/MessageChunkSerializer.java
index 3167b185cf..cabd9db75c 100644
--- 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/MessageChunkSerializer.java
+++ 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/MessageChunkSerializer.java
@@ -24,11 +24,13 @@ import org.apache.tinkerpop.gremlin.util.MessageSerializer;
 import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
 
 public interface MessageChunkSerializer<M> extends MessageSerializer<M> {
-    public ByteBuf writeResponseHeader(final ResponseMessage responseMessage, 
final ByteBufAllocator allocator) throws SerializationException;
+    public ByteBuf writeHeader(final ResponseMessage responseMessage, final 
ByteBufAllocator allocator) throws SerializationException;
 
-    public ByteBuf writeResponseChunk(final Object aggregate, final 
ByteBufAllocator allocator) throws SerializationException;
+    public ByteBuf writeChunk(final Object aggregate, final ByteBufAllocator 
allocator) throws SerializationException;
 
-    public ByteBuf writeResponseFooter(final ResponseMessage responseMessage, 
final ByteBufAllocator allocator) throws SerializationException;
+    public ByteBuf writeFooter(final ResponseMessage responseMessage, final 
ByteBufAllocator allocator) throws SerializationException;
 
-    public ByteBuf writeError(final ResponseMessage responseMessage, final 
ByteBufAllocator allocator) throws SerializationException;
+    public ByteBuf writeErrorFooter(final ResponseMessage responseMessage, 
final ByteBufAllocator allocator) throws SerializationException;
+
+    public ResponseMessage readChunk(final ByteBuf byteBuf, final boolean 
isFirstChunk) throws SerializationException;
 }
diff --git 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/SerTokens.java
 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/SerTokens.java
index 8e20f531af..20b8fae69c 100644
--- 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/SerTokens.java
+++ 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/SerTokens.java
@@ -45,4 +45,5 @@ public final class SerTokens {
     public static final String MIME_GRAPHSON_V3_UNTYPED = 
"application/vnd.gremlin-v3.0+json;types=false";
     public static final String MIME_GRAPHSON_V4 = 
"application/vnd.gremlin-v4.0+json";
     public static final String MIME_GRAPHBINARY_V1 = 
"application/vnd.graphbinary-v1.0";
+    public static final String MIME_GRAPHBINARY_V4 = 
"application/vnd.graphbinary-v4.0";
 }
diff --git 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/Serializers.java
 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/Serializers.java
index 3aad5e36ee..dd1ce3e3ca 100644
--- 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/Serializers.java
+++ 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/Serializers.java
@@ -65,7 +65,12 @@ public enum Serializers {
     /**
      * GraphBinary 1.0.
      */
-    GRAPHBINARY_V1(SerTokens.MIME_GRAPHBINARY_V1);
+    GRAPHBINARY_V1(SerTokens.MIME_GRAPHBINARY_V1),
+
+    /**
+     * GraphBinary 4.0.
+     */
+    GRAPHBINARY_V4(SerTokens.MIME_GRAPHBINARY_V4);
 
     private String value;
 
diff --git 
a/gremlin-util/src/test/java/org/apache/tinkerpop/gremlin/util/ser/GraphSONMessageSerializerV4Test.java
 
b/gremlin-util/src/test/java/org/apache/tinkerpop/gremlin/util/ser/GraphSONMessageSerializerV4Test.java
index 67db315a20..27cb804a3b 100644
--- 
a/gremlin-util/src/test/java/org/apache/tinkerpop/gremlin/util/ser/GraphSONMessageSerializerV4Test.java
+++ 
b/gremlin-util/src/test/java/org/apache/tinkerpop/gremlin/util/ser/GraphSONMessageSerializerV4Test.java
@@ -19,42 +19,15 @@
 package org.apache.tinkerpop.gremlin.util.ser;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.util.CharsetUtil;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import 
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.Tree;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.Property;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.structure.VertexProperty;
-import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper;
-import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONXModuleV3;
-import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
-import org.apache.tinkerpop.gremlin.util.MessageSerializer;
-import org.apache.tinkerpop.gremlin.util.Tokens;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
 import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode;
-import org.apache.tinkerpop.shaded.jackson.databind.JsonMappingException;
 import org.junit.Test;
 
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 
-import static 
org.apache.tinkerpop.gremlin.util.MockitoHamcrestMatcherAdapter.reflectionEquals;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.hasItemInArray;
 import static org.junit.Assert.assertEquals;
@@ -82,10 +55,10 @@ public class GraphSONMessageSerializerV4Test extends 
GraphSONMessageSerializerV3
                 .statusMessage("OK")
                 .create();
 
-        final ByteBuf bb0 = serializer.writeResponseHeader(response, 
allocator);
-        final ByteBuf bb1 = 
serializer.writeResponseChunk(Arrays.asList("chunk", 1), allocator);
-        final ByteBuf bb2 = 
serializer.writeResponseChunk(Arrays.asList("chunk", 2), allocator);
-        final ByteBuf bb3 = serializer.writeResponseFooter(response, 
allocator);
+        final ByteBuf bb0 = serializer.writeHeader(response, allocator);
+        final ByteBuf bb1 = serializer.writeChunk(Arrays.asList("chunk", 1), 
allocator);
+        final ByteBuf bb2 = serializer.writeChunk(Arrays.asList("chunk", 2), 
allocator);
+        final ByteBuf bb3 = serializer.writeFooter(response, allocator);
 
         final ByteBuf bbCombined = allocator.buffer(bb0.readableBytes() + 
bb1.readableBytes() +
                 bb2.readableBytes() + bb3.readableBytes());
@@ -111,10 +84,10 @@ public class GraphSONMessageSerializerV4Test extends 
GraphSONMessageSerializerV3
                 .statusMessage("OK")
                 .create();
 
-        final ByteBuf bb0 = serializer.writeResponseHeader(response, 
allocator);
-        final ByteBuf bb1 = 
serializer.writeResponseChunk(Arrays.asList("chunk", 1), allocator);
-        final ByteBuf bb2 = 
serializer.writeResponseChunk(Arrays.asList("chunk", 2), allocator);
-        final ByteBuf bb3 = serializer.writeError(response, allocator);
+        final ByteBuf bb0 = serializer.writeHeader(response, allocator);
+        final ByteBuf bb1 = serializer.writeChunk(Arrays.asList("chunk", 1), 
allocator);
+        final ByteBuf bb2 = serializer.writeChunk(Arrays.asList("chunk", 2), 
allocator);
+        final ByteBuf bb3 = serializer.writeErrorFooter(response, allocator);
 
         final ByteBuf bbCombined = allocator.buffer(bb0.readableBytes() + 
bb1.readableBytes() +
                 bb2.readableBytes() + bb3.readableBytes());
diff --git 
a/gremlin-util/src/test/java/org/apache/tinkerpop/gremlin/util/ser/binary/GraphBinaryMessageSerializerV4Test.java
 
b/gremlin-util/src/test/java/org/apache/tinkerpop/gremlin/util/ser/binary/GraphBinaryMessageSerializerV4Test.java
new file mode 100644
index 0000000000..daeb368ab4
--- /dev/null
+++ 
b/gremlin-util/src/test/java/org/apache/tinkerpop/gremlin/util/ser/binary/GraphBinaryMessageSerializerV4Test.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.util.ser.binary;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode;
+import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
+import org.apache.tinkerpop.gremlin.util.ser.SerializationException;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.tinkerpop.shaded.jackson.databind.type.LogicalType.Collection;
+import static org.junit.Assert.assertEquals;
+
+public class GraphBinaryMessageSerializerV4Test {
+
+    private final ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
+    private final GraphBinaryMessageSerializerV4 serializer = new 
GraphBinaryMessageSerializerV4();
+
+    @Test
+    public void shouldSerializeAndDeserializeResponseInSingleChunk() throws 
SerializationException {
+        final ResponseMessage response = 
ResponseMessage.build(UUID.randomUUID())
+                .code(ResponseStatusCode.SUCCESS)
+                .statusMessage("OK")
+                .result(Arrays.asList(1, "test"))
+                .create();
+
+        final ByteBuf buffer = serializer.writeHeader(response, allocator);
+        final ResponseMessage deserialized = serializer.readChunk(buffer, 
true);
+        assertResponseEquals(response, deserialized);
+    }
+
+    @Test
+    public void shouldSerializeAndDeserializeResponseInHeaderChunk() throws 
SerializationException {
+        final ResponseMessage response = 
ResponseMessage.build(UUID.randomUUID())
+                .result(Arrays.asList(1, "test"))
+                .create();
+
+        final ByteBuf buffer = serializer.writeHeader(response, allocator);
+        final ResponseMessage deserialized = serializer.readChunk(buffer, 
true);
+        assertResponseEquals(response, deserialized);
+    }
+
+    @Test
+    public void shouldSerializeAndDeserializeResponseInDataChunk() throws 
SerializationException {
+        final List data = Arrays.asList(1, "test");
+        final ByteBuf buffer = serializer.writeChunk(data, allocator);
+        final ResponseMessage deserialized = serializer.readChunk(buffer, 
false);
+
+        assertEquals(data, deserialized.getResult().getData());
+    }
+
+    @Test
+    public void shouldSerializeAndDeserializeResponseInFooterChunk() throws 
SerializationException {
+        final ResponseMessage response = ResponseMessage.build((UUID)null)
+                .result(Arrays.asList(1, "test"))
+                .code(ResponseStatusCode.SUCCESS)
+                .statusMessage("OK")
+                .create();
+
+        final ByteBuf buffer = serializer.writeFooter(response, allocator);
+        final ResponseMessage deserialized = serializer.readChunk(buffer, 
false);
+        assertResponseEquals(response, deserialized);
+    }
+
+    @Test
+    public void shouldSerializeAndDeserializeErrorResponseWithEmptyData() 
throws SerializationException {
+        final ResponseMessage response = 
ResponseMessage.build(UUID.randomUUID())
+                .code(ResponseStatusCode.FORBIDDEN)
+                .statusMessage("FORBIDDEN")
+                .create();
+
+        final ByteBuf buffer = serializer.writeHeader(response, allocator);
+        final ResponseMessage deserialized = serializer.readChunk(buffer, 
true);
+        assertResponseEquals(response, deserialized);
+    }
+
+    @Test
+    public void shouldSerializeAndDeserializeCompositeResponse() throws 
SerializationException {
+        final List headerData = Arrays.asList(0, "header");
+        final ResponseMessage header = 
ResponseMessage.buildV4(UUID.randomUUID())
+                .result(headerData)
+                .create();
+
+        final List chunkData1 = Arrays.asList(1, "data1");
+        final List chunkData2 = Arrays.asList(2, "data2");
+
+        final List footerData = Arrays.asList(0xFF, "footer");
+        final ResponseMessage footer = ResponseMessage.build((UUID)null)
+                .result(footerData)
+                .code(ResponseStatusCode.SUCCESS)
+                .statusMessage("OK")
+                .create();
+
+        final ByteBuf bb0 = serializer.writeHeader(header, allocator);
+        final ByteBuf bb1 = serializer.writeChunk(chunkData1, allocator);
+        final ByteBuf bb2 = serializer.writeChunk(chunkData2, allocator);
+        final ByteBuf bb3 = serializer.writeFooter(footer, allocator);
+
+        final ByteBuf bbCombined = allocator.buffer(bb0.readableBytes() + 
bb1.readableBytes() +
+                bb2.readableBytes() + bb3.readableBytes());
+
+        bbCombined.writeBytes(bb0);
+        bbCombined.writeBytes(bb1);
+        bbCombined.writeBytes(bb2);
+        bbCombined.writeBytes(bb3);
+
+        final ResponseMessage deserialized = serializer.readChunk(bbCombined, 
true);
+
+        assertEquals(header.getRequestId(), deserialized.getRequestId());
+        // Status
+        assertEquals(footer.getStatus().getCode(), 
deserialized.getStatus().getCode());
+        assertEquals(footer.getStatus().getMessage(), 
deserialized.getStatus().getMessage());
+        // Result
+        List<Integer> combinedData = new ArrayList<>();
+        Stream.of(headerData, chunkData1, chunkData2, 
footerData).forEach(combinedData::addAll);
+        assertEquals(combinedData, deserialized.getResult().getData());
+    }
+
+
+    // copy-paste because response format will be different
+    private static void assertResponseEquals(final ResponseMessage expected, 
final ResponseMessage actual) {
+        assertEquals(expected.getRequestId(), actual.getRequestId());
+        // Status
+        assertEquals(expected.getStatus().getCode(), 
actual.getStatus().getCode());
+        assertEquals(expected.getStatus().getMessage(), 
actual.getStatus().getMessage());
+        assertEquals(expected.getStatus().getAttributes(), 
actual.getStatus().getAttributes());
+        // Result
+        // null == empty List
+        if (!isEmptyData(expected) && !isEmptyData(actual)) {
+            assertEquals(expected.getResult().getData(), 
actual.getResult().getData());
+        }
+        assertEquals(expected.getResult().getMeta(), 
actual.getResult().getMeta());
+    }
+
+    private static boolean isEmptyData(final ResponseMessage responseMessage) {
+        return responseMessage.getResult() == null || 
responseMessage.getResult().getData() == null ||
+                ((List) responseMessage.getResult().getData()).isEmpty();
+    }
+}


Reply via email to