Ah, that makes sense. 

Let us know if you still can't get it to work and I can probably rig up a full 
example. I had also filed ARROW-15287 [1] earlier which would hopefully help 
answer these questions.

[1]: https://issues.apache.org/jira/browse/ARROW-15287

-David

On Mon, Jan 10, 2022, at 19:55, Matt Youill wrote:
> Use case is a distributed setting. A Flight server at edge of a cluster 
> receives Arrow data from remote nodes in IPC format. Rather than 
> deserializing and serializing again to send out via Flight, better to leave 
> as is.
> 
> Anyway, thanks. Best, Matt
> 
> On 11/1/22 12:14 am, David Li wrote:
>> Hey Matt,
>> 
>> It's not built out of the box but I think you're on the right track. That 
>> said, I'm curious about your use case here that you have pre-serialized 
>> bytes - is this to avoid using the Arrow reader at some point?
>> 
>> Descriptor can indeed be ignored here. app_metadata is optional.
>> 
>> The first message in a stream should be an IPC schema message. It should 
>> then be followed by DictionaryBatch messages, then RecordBatch messages. All 
>> of these follow the "encapsulated message format" [1] where the metadata 
>> flatbuffer goes in metadata and the message body goes in body_buffers. (The 
>> continuation token is omitted, but not the length, IIRC.)
>> 
>> So for schema, you would have the IPC schema flatbuffer in "metadata" and no 
>> body. For RecordBatch/DictionaryBatch, you would have the IPC record 
>> batch/dictionary batch in "metadata" and the data in "body". This means that 
>> you may need to parse your pre-serialized bytes (to some extent) to separate 
>> the two.
>> 
>> There's multiple body buffers so that we don't require concatenation. For 
>> instance, a RecordBatch in memory might be backed by multiple allocations. 
>> Only taking one body buffer would mean that we would have to concatenate 
>> everything before sending, which defeats the zero-copy goal. But if what you 
>> have is truly pre-serialized, then you can pass just the one buffer.
>> 
>> [1]: 
>> https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
>> 
>> -David
>> 
>> On Mon, Jan 10, 2022, at 02:32, Matt Youill wrote:
>>> Hi,
>>> 
>>> Have been hacking on this for a while, but wanted to make sure I'm on 
>>> the right track.
>>> 
>>> Is it possible to supply a pre-serialized IPC stream of data from a 
>>> Flight server's DoGet function? It looks like a table *object* (or 
>>> schema + record batches) can be supplied to the FlightDataStream 
>>> parameter (using a RecordBatchStream) but not plain bytes.
>>> 
>>> I've had a look at implementing a FlightDataStream for plain bytes. I 
>>> can see the byte stream needs to be split up into FlightPayloads, but 
>>> it's not clear what goes where in each one.
>>> 
>>> Given the following defs for FlightPayloads...
>>> 
>>> struct ARROW_FLIGHT_EXPORT FlightPayload {
>>>   std::shared_ptr<Buffer> descriptor;
>>>   std::shared_ptr<Buffer> app_metadata;
>>>   ipc::IpcPayload ipc_message;
>>> };
>>> 
>>> struct IpcPayload {
>>>   MessageType type = MessageType::NONE;
>>>   std::shared_ptr<Buffer> metadata;
>>>   std::vector<std::shared_ptr<Buffer>> body_buffers;
>>>   int64_t body_length = 0;
>>> };
>>> 
>>> AFAICT it looks like:
>>> 
>>> "descriptor" is ignored for DoGet
>>> 
>>> "app_metadata" can be ignored
>>> 
>>> "type" is set to whatever message it is (e.g. schema, batch, etc)
>>> 
>>> "metadata" buffer should contain a schema IPC bytes?
>>> 
>>> "body_buffers" should contain IPC bytes for each batch? (Why are there 
>>> multiple buffers? Is there any reason not to just use the first slot of 
>>> the buffers vector?)
>>> 
>>> Any advice appreciated.
>>> 
>>> Thanks, Matt
>>> 
>>> 
>> 

Reply via email to