Great! I should really get around to that Jira so we have some better docs about this.
-David On Tue, Feb 8, 2022, at 17:23, R KB wrote: > Thanks David, > > Got it working :) > > On Tue, 8 Feb 2022 at 21:41, David Li <[email protected]> wrote: >> __ >> IIRC header_length does not include those first 8 bytes so there's an offset >> needed in all slices here. >> >> Also, the first FlightData is expected to contain the schema. >> >> On Tue, Feb 8, 2022, at 16:32, R KB wrote: >>> Yeah, I think getting into the voided warranty domain. >>> >>> I've done this: >>> >>> class FlightService(flight_grpc.FlightServiceServicer): >>> def DoGet(self, request, ctx) -> Iterator[flight_proto.FlightData]: >>> tbl = pyarrow.Table.from_pydict({"hello_world": [1,2,3]}) >>> batches = tbl.to_batches() >>> buf = batches[0].serialize() >>> >>> buffer_ = buf.to_pybytes() >>> header_length = struct.unpack("<I", buffer_[4:8])[0] >>> >>> yield flight_proto.FlightData( >>> flight_descriptor=flight_proto.FlightDescriptor(), >>> data_header=buffer_[8:header_length], >>> data_body=buffer_[header_length+8:] >>> ) >>> >>> >>> >>> Which looks kinda right... but I'm getting: >>> >>> FlightInternalError: Server never sent a data message >>> >>> When I try to request with the flight client. >>>> >>> >>> >>> On Tue, 8 Feb 2022 at 20:58, David Li <[email protected]> wrote: >>>> __ >>>> Ah, right. You'd first serialize the batch to a byte buffer, then read the >>>> flatbuffer length from the second 4 bytes (since the first 4 bytes are the >>>> IPC continuation token), slice the buffer, and use that as the message >>>> header, and the remainder of the buffer as the message body. (But this is >>>> starting to get into "your warranty is void"/"double-check with the Arrow >>>> specification" territory.) >>>> >>>> -David >>>> >>>> On Tue, Feb 8, 2022, at 15:25, R KB wrote: >>>>> I'm confident I can do that with a RecordBatchWriter and BytesIO object >>>>> that I can the value of., however what do I do about the message header? >>>>> >>>>> On Tue, 8 Feb 2022 at 19:17, David Li <[email protected]> wrote: >>>>>> __ >>>>>> No, you'd have to serialize the buffer, chop off the first 8 bytes >>>>>> yourself, and generate the Protobuf. >>>>>> >>>>>> On Tue, Feb 8, 2022, at 13:59, R KB wrote: >>>>>>> Thanks for that; hopefully those links should be fruitful. >>>>>>> >>>>>>> Question, before I get started, is there an API exposed so that I could >>>>>>> do the reverse of this: >>>>>>> >>>>>>> `import asyncio >>>>>>> import pathlib >>>>>>> import struct >>>>>>> >>>>>>> import grpc >>>>>>> import pyarrow as pa >>>>>>> import pyarrow.flight as pf >>>>>>> >>>>>>> import Flight_pb2, Flight_pb2_grpc >>>>>>> >>>>>>> async def main(): >>>>>>> ticket = pf.Ticket("tick") >>>>>>> async with grpc.aio.insecure_channel("localhost:1234") as channel: >>>>>>> stub = Flight_pb2_grpc.FlightServiceStub(channel) >>>>>>> schema = None >>>>>>> async for data in >>>>>>> stub.DoGet(Flight_pb2.Ticket(ticket=ticket.ticket)): >>>>>>> # 4 bytes: Need IPC continuation token >>>>>>> token = b'\xff\xff\xff\xff' >>>>>>> # 4 bytes: message length (little-endian) >>>>>>> length = struct.pack('<I', len(data.data_header)) >>>>>>> buf = pa.py_buffer(token + length + data.data_header + >>>>>>> data.data_body) >>>>>>> message = pa.ipc.read_message(buf) >>>>>>> print(message) >>>>>>> if schema is None: >>>>>>> # This should work but is unimplemented >>>>>>> # print(pa.ipc.read_schema(message)) >>>>>>> schema = pa.ipc.read_schema(buf) >>>>>>> print(schema) >>>>>>> else: >>>>>>> batch = pa.ipc.read_record_batch(message, schema) >>>>>>> print(batch) >>>>>>> print(batch.to_pydict()) >>>>>>> >>>>>>> asyncio.run(main())` >>>>>>> >>>>>>> On Tue, 8 Feb 2022 at 18:33, David Li <[email protected]> wrote: >>>>>>>> __ >>>>>>>> Unfortunately Flight wraps the C++ Flight implementation, which uses >>>>>>>> gRPC/C++, which is mostly a separate library entirely from grpcio and >>>>>>>> does not benefit from any improvements there. (The two do share a >>>>>>>> common network stack, but that's all; also, grpcio doesn't expose any >>>>>>>> of the lower level APIs that might make it possible to combine the two >>>>>>>> somehow.) >>>>>>>> >>>>>>>> You might ask why pyarrow.flight didn't use grpcio directly (with >>>>>>>> bindings to transmute from FlightData to RecordBatch). However at the >>>>>>>> time the thought is that we would also have non-gRPC transports (which >>>>>>>> are finally being worked on) and so a from-scratch grpcio/Python >>>>>>>> implementation was not desirable. >>>>>>>> >>>>>>>> That said there are issues filed about better documenting FlightData. >>>>>>>> See ARROW-15287[1] which links a StackOverflow answer that >>>>>>>> demonstrates how to glue together asyncio/grpcio/PyArrow. >>>>>>>> >>>>>>>> There's also some previous discussion about adding async to Flight >>>>>>>> more generally [2]. >>>>>>>> >>>>>>>> [1]: https://issues.apache.org/jira/browse/ARROW-15287 >>>>>>>> [2]: "[C++] Async Arrow Flight" 2021/06/02 >>>>>>>> https://lists.apache.org/thread/jrj6yx53gyj0tr18pfdghtb8krp4gpfv >>>>>>>> >>>>>>>> -David >>>>>>>> >>>>>>>> On Tue, Feb 8, 2022, at 13:24, R KB wrote: >>>>>>>>> GRPC has pretty good AsyncIO support at this point, and since Flight >>>>>>>>> is essentially a wrapper around some GRPC types: why can't we just >>>>>>>>> expose something that generates FlightData grpc objects? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>> >>
