liyafan82 commented on a change in pull request #8963:
URL: https://github.com/apache/arrow/pull/8963#discussion_r551866948
##########
File path:
java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
##########
@@ -317,6 +325,71 @@ private void test(BiConsumer<FlightClient,
BufferAllocator> consumer) throws Exc
}
}
+ /** ARROW-10962: accept FlightData messages generated by Protobuf (which can
omit empty fields). */
+ @Test
+ public void testProtobufRecordBatchCompatibility() throws Exception {
+ final Schema schema = new
Schema(Collections.singletonList(Field.nullable("foo", new ArrowType.Int(32,
true))));
+ try (final BufferAllocator allocator = new
RootAllocator(Integer.MAX_VALUE);
+ final VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ final VectorUnloader unloader = new VectorUnloader(root);
+ root.setRowCount(0);
+ final MethodDescriptor.Marshaller<ArrowMessage> marshaller =
ArrowMessage.createMarshaller(allocator);
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (final ArrowMessage message = new
ArrowMessage(unloader.getRecordBatch(), null, new IpcOption())) {
+ Assert.assertEquals(ArrowMessage.HeaderType.RECORD_BATCH,
message.getMessageType());
+ try (final InputStream serialized = marshaller.stream(message)) {
+ final byte[] buf = new byte[1024];
+ while (true) {
+ int read = serialized.read(buf);
+ if (read < 0) {
+ break;
+ }
+ baos.write(buf, 0, read);
+ }
+ }
+ }
+ final byte[] serializedMessage = baos.toByteArray();
+ final Flight.FlightData protobufData =
Flight.FlightData.parseFrom(serializedMessage);
+ Assert.assertEquals(0, protobufData.getDataBody().size());
+ // Should not throw
+ final ArrowRecordBatch rb =
+ marshaller.parse(new
ByteArrayInputStream(protobufData.toByteArray())).asRecordBatch();
+ Assert.assertEquals(rb.computeBodyLength(), 0);
+ }
+ }
+
+ /** ARROW-10962: accept FlightData messages generated by Protobuf (which can
omit empty fields). */
+ @Test
+ public void testProtobufSchemaCompatibility() throws Exception {
+ final Schema schema = new
Schema(Collections.singletonList(Field.nullable("foo", new ArrowType.Int(32,
true))));
+ try (final BufferAllocator allocator = new
RootAllocator(Integer.MAX_VALUE)) {
+ final MethodDescriptor.Marshaller<ArrowMessage> marshaller =
ArrowMessage.createMarshaller(allocator);
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ Flight.FlightDescriptor descriptor = FlightDescriptor.command(new
byte[0]).toProtocol();
+ try (final ArrowMessage message = new ArrowMessage(descriptor, schema,
new IpcOption())) {
+ Assert.assertEquals(ArrowMessage.HeaderType.SCHEMA,
message.getMessageType());
+ try (final InputStream serialized = marshaller.stream(message)) {
Review comment:
We can extract a common method for the logic of writing the message to a
ByteArrayOutputStream?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]