lidavidm commented on a change in pull request #8963: URL: https://github.com/apache/arrow/pull/8963#discussion_r551959223
########## File path: java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java ########## @@ -317,6 +325,71 @@ private void test(BiConsumer<FlightClient, BufferAllocator> consumer) throws Exc } } + /** ARROW-10962: accept FlightData messages generated by Protobuf (which can omit empty fields). */ + @Test + public void testProtobufRecordBatchCompatibility() throws Exception { + final Schema schema = new Schema(Collections.singletonList(Field.nullable("foo", new ArrowType.Int(32, true)))); + try (final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + final VectorUnloader unloader = new VectorUnloader(root); + root.setRowCount(0); + final MethodDescriptor.Marshaller<ArrowMessage> marshaller = ArrowMessage.createMarshaller(allocator); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (final ArrowMessage message = new ArrowMessage(unloader.getRecordBatch(), null, new IpcOption())) { + Assert.assertEquals(ArrowMessage.HeaderType.RECORD_BATCH, message.getMessageType()); + try (final InputStream serialized = marshaller.stream(message)) { + final byte[] buf = new byte[1024]; + while (true) { + int read = serialized.read(buf); + if (read < 0) { + break; + } + baos.write(buf, 0, read); + } + } + } + final byte[] serializedMessage = baos.toByteArray(); + final Flight.FlightData protobufData = Flight.FlightData.parseFrom(serializedMessage); + Assert.assertEquals(0, protobufData.getDataBody().size()); + // Should not throw + final ArrowRecordBatch rb = + marshaller.parse(new ByteArrayInputStream(protobufData.toByteArray())).asRecordBatch(); + Assert.assertEquals(rb.computeBodyLength(), 0); + } + } + + /** ARROW-10962: accept FlightData messages generated by Protobuf (which can omit empty fields). */ + @Test + public void testProtobufSchemaCompatibility() throws Exception { + final Schema schema = new Schema(Collections.singletonList(Field.nullable("foo", new ArrowType.Int(32, true)))); + try (final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE)) { + final MethodDescriptor.Marshaller<ArrowMessage> marshaller = ArrowMessage.createMarshaller(allocator); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + Flight.FlightDescriptor descriptor = FlightDescriptor.command(new byte[0]).toProtocol(); + try (final ArrowMessage message = new ArrowMessage(descriptor, schema, new IpcOption())) { Review comment: I've added assertions here. ########## File path: java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java ########## @@ -317,6 +325,71 @@ private void test(BiConsumer<FlightClient, BufferAllocator> consumer) throws Exc } } + /** ARROW-10962: accept FlightData messages generated by Protobuf (which can omit empty fields). */ + @Test + public void testProtobufRecordBatchCompatibility() throws Exception { + final Schema schema = new Schema(Collections.singletonList(Field.nullable("foo", new ArrowType.Int(32, true)))); + try (final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + final VectorUnloader unloader = new VectorUnloader(root); + root.setRowCount(0); + final MethodDescriptor.Marshaller<ArrowMessage> marshaller = ArrowMessage.createMarshaller(allocator); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (final ArrowMessage message = new ArrowMessage(unloader.getRecordBatch(), null, new IpcOption())) { + Assert.assertEquals(ArrowMessage.HeaderType.RECORD_BATCH, message.getMessageType()); + try (final InputStream serialized = marshaller.stream(message)) { + final byte[] buf = new byte[1024]; + while (true) { + int read = serialized.read(buf); + if (read < 0) { + break; + } + baos.write(buf, 0, read); + } + } + } + final byte[] serializedMessage = baos.toByteArray(); + final Flight.FlightData protobufData = Flight.FlightData.parseFrom(serializedMessage); + Assert.assertEquals(0, protobufData.getDataBody().size()); + // Should not throw + final ArrowRecordBatch rb = + marshaller.parse(new ByteArrayInputStream(protobufData.toByteArray())).asRecordBatch(); + Assert.assertEquals(rb.computeBodyLength(), 0); + } + } + + /** ARROW-10962: accept FlightData messages generated by Protobuf (which can omit empty fields). */ + @Test + public void testProtobufSchemaCompatibility() throws Exception { + final Schema schema = new Schema(Collections.singletonList(Field.nullable("foo", new ArrowType.Int(32, true)))); + try (final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE)) { + final MethodDescriptor.Marshaller<ArrowMessage> marshaller = ArrowMessage.createMarshaller(allocator); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + Flight.FlightDescriptor descriptor = FlightDescriptor.command(new byte[0]).toProtocol(); + try (final ArrowMessage message = new ArrowMessage(descriptor, schema, new IpcOption())) { + Assert.assertEquals(ArrowMessage.HeaderType.SCHEMA, message.getMessageType()); + try (final InputStream serialized = marshaller.stream(message)) { Review comment: Done. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org