Great! I should really get around to that Jira so we have some better docs 
about this.

-David

On Tue, Feb 8, 2022, at 17:23, R KB wrote:
> Thanks David,
> 
> Got it working :) 
> 
> On Tue, 8 Feb 2022 at 21:41, David Li <[email protected]> wrote:
>> __
>> IIRC header_length does not include those first 8 bytes so there's an offset 
>> needed in all slices here.
>> 
>> Also, the first FlightData is expected to contain the schema.
>> 
>> On Tue, Feb 8, 2022, at 16:32, R KB wrote:
>>> Yeah, I think getting into the voided warranty domain.
>>> 
>>> I've done this:
>>> 
>>> class FlightService(flight_grpc.FlightServiceServicer):
>>>     def DoGet(self, request, ctx) -> Iterator[flight_proto.FlightData]:
>>>         tbl = pyarrow.Table.from_pydict({"hello_world": [1,2,3]})
>>>         batches = tbl.to_batches()
>>>         buf = batches[0].serialize()
>>> 
>>>         buffer_ = buf.to_pybytes()
>>>         header_length = struct.unpack("<I", buffer_[4:8])[0]
>>> 
>>>         yield flight_proto.FlightData(
>>>             flight_descriptor=flight_proto.FlightDescriptor(),
>>>             data_header=buffer_[8:header_length],
>>>             data_body=buffer_[header_length+8:]
>>>         )
>>> 
>>> 
>>> 
>>> Which looks kinda right... but I'm getting: 
>>> 
>>> FlightInternalError: Server never sent a data message
>>> 
>>> When I try to request with the flight client.
>>>> 
>>> 
>>> 
>>> On Tue, 8 Feb 2022 at 20:58, David Li <[email protected]> wrote:
>>>> __
>>>> Ah, right. You'd first serialize the batch to a byte buffer, then read the 
>>>> flatbuffer length from the second 4 bytes (since the first 4 bytes are the 
>>>> IPC continuation token), slice the buffer, and use that as the message 
>>>> header, and the remainder of the buffer as the message body. (But this is 
>>>> starting to get into "your warranty is void"/"double-check with the Arrow 
>>>> specification" territory.)
>>>> 
>>>> -David
>>>> 
>>>> On Tue, Feb 8, 2022, at 15:25, R KB wrote:
>>>>> I'm confident I can do that with a RecordBatchWriter and BytesIO object 
>>>>> that I can the value of., however what do I do about the message header?
>>>>> 
>>>>> On Tue, 8 Feb 2022 at 19:17, David Li <[email protected]> wrote:
>>>>>> __
>>>>>> No, you'd have to serialize the buffer, chop off the first 8 bytes 
>>>>>> yourself, and generate the Protobuf.
>>>>>> 
>>>>>> On Tue, Feb 8, 2022, at 13:59, R KB wrote:
>>>>>>> Thanks for that; hopefully those links should be fruitful.
>>>>>>> 
>>>>>>> Question, before I get started, is there an API exposed so that I could 
>>>>>>> do the reverse of this: 
>>>>>>> 
>>>>>>> `import asyncio
>>>>>>> import pathlib
>>>>>>> import struct
>>>>>>> 
>>>>>>> import grpc
>>>>>>> import pyarrow as pa
>>>>>>> import pyarrow.flight as pf
>>>>>>> 
>>>>>>> import Flight_pb2, Flight_pb2_grpc
>>>>>>> 
>>>>>>> async def main():
>>>>>>>     ticket = pf.Ticket("tick")
>>>>>>>     async with grpc.aio.insecure_channel("localhost:1234") as channel:
>>>>>>>         stub = Flight_pb2_grpc.FlightServiceStub(channel)
>>>>>>>         schema = None
>>>>>>>         async for data in 
>>>>>>> stub.DoGet(Flight_pb2.Ticket(ticket=ticket.ticket)):
>>>>>>>             # 4 bytes: Need IPC continuation token
>>>>>>>             token = b'\xff\xff\xff\xff'
>>>>>>>             # 4 bytes: message length (little-endian)
>>>>>>>             length = struct.pack('<I', len(data.data_header))
>>>>>>>             buf = pa.py_buffer(token + length + data.data_header + 
>>>>>>> data.data_body)
>>>>>>>             message = pa.ipc.read_message(buf)
>>>>>>>             print(message)
>>>>>>>             if schema is None:
>>>>>>>                 # This should work but is unimplemented
>>>>>>>                 # print(pa.ipc.read_schema(message))
>>>>>>>                 schema = pa.ipc.read_schema(buf)
>>>>>>>                 print(schema)
>>>>>>>             else:
>>>>>>>                 batch = pa.ipc.read_record_batch(message, schema)
>>>>>>>                 print(batch)
>>>>>>>                 print(batch.to_pydict())
>>>>>>> 
>>>>>>> asyncio.run(main())`
>>>>>>> 
>>>>>>> On Tue, 8 Feb 2022 at 18:33, David Li <[email protected]> wrote:
>>>>>>>> __
>>>>>>>> Unfortunately Flight wraps the C++ Flight implementation, which uses 
>>>>>>>> gRPC/C++, which is mostly a separate library entirely from grpcio and 
>>>>>>>> does not benefit from any improvements there. (The two do share a 
>>>>>>>> common network stack, but that's all; also, grpcio doesn't expose any 
>>>>>>>> of the lower level APIs that might make it possible to combine the two 
>>>>>>>> somehow.)
>>>>>>>> 
>>>>>>>> You might ask why pyarrow.flight didn't use grpcio directly (with 
>>>>>>>> bindings to transmute from FlightData to RecordBatch). However at the 
>>>>>>>> time the thought is that we would also have non-gRPC transports (which 
>>>>>>>> are finally being worked on) and so a from-scratch grpcio/Python 
>>>>>>>> implementation was not desirable. 
>>>>>>>> 
>>>>>>>> That said there are issues filed about better documenting FlightData. 
>>>>>>>> See ARROW-15287[1] which links a StackOverflow answer that 
>>>>>>>> demonstrates how to glue together asyncio/grpcio/PyArrow.
>>>>>>>> 
>>>>>>>> There's also some previous discussion about adding async to Flight 
>>>>>>>> more generally [2].
>>>>>>>> 
>>>>>>>> [1]: https://issues.apache.org/jira/browse/ARROW-15287
>>>>>>>> [2]: "[C++] Async Arrow Flight" 2021/06/02 
>>>>>>>> https://lists.apache.org/thread/jrj6yx53gyj0tr18pfdghtb8krp4gpfv
>>>>>>>> 
>>>>>>>> -David
>>>>>>>> 
>>>>>>>> On Tue, Feb 8, 2022, at 13:24, R KB wrote:
>>>>>>>>> GRPC has pretty good AsyncIO support at this point, and since Flight 
>>>>>>>>> is essentially a wrapper around some GRPC types: why can't we just 
>>>>>>>>> expose something that generates FlightData grpc objects?
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> 

Reply via email to