pitrou commented on code in PR #36986:
URL: https://github.com/apache/arrow/pull/36986#discussion_r1288718361


##########
python/pyarrow/_flight.pyx:
##########
@@ -1219,6 +1224,80 @@ cdef class FlightMetadataWriter(_Weakrefable):
             check_flight_status(self.writer.get().WriteMetadata(deref(buf)))
 
 
+class AsyncCall:
+    # General strategy: create an anyio Event, tell the future to wake it on 
success/error
+    def __init__(self):
+        self.event = anyio.Event()
+        self.result = None
+        self.exception = None
+
+    async def wait(self) -> object:
+        print("Wait event")
+        import asyncio
+        self._loop = asyncio.get_running_loop()
+        await self.event.wait()
+        print("Waiting event")
+        if self.exception:
+            raise self.exception
+        return self.result
+
+
+cdef class AsyncFlightClient:
+    """
+    The async interface of a FlightClient.
+
+    This interface is EXPERIMENTAL.
+    """
+
+    cdef:
+        FlightClient client
+
+    def __init__(self, FlightClient client) -> None:
+        self.client = client
+
+    # TODO: return type that allows you to optionally get response
+    # headers/trailers - maybe a runtime option? since we'd have to copy all
+    # of them. or just always copy them? moot point for now since not yet
+    # exposed.
+    async def get_flight_info(
+        self,
+        descriptor: FlightDescriptor,
+        *,
+        options: FlightCallOptions = None,
+    ):
+        call = AsyncCall()

Review Comment:
   Could pass the asyncio loop to `AsyncCall` here.



##########
python/pyarrow/_flight.pyx:
##########
@@ -1219,6 +1224,80 @@ cdef class FlightMetadataWriter(_Weakrefable):
             check_flight_status(self.writer.get().WriteMetadata(deref(buf)))
 
 
+class AsyncCall:
+    # General strategy: create an anyio Event, tell the future to wake it on 
success/error
+    def __init__(self):
+        self.event = anyio.Event()
+        self.result = None
+        self.exception = None
+
+    async def wait(self) -> object:
+        print("Wait event")
+        import asyncio
+        self._loop = asyncio.get_running_loop()
+        await self.event.wait()
+        print("Waiting event")
+        if self.exception:
+            raise self.exception
+        return self.result
+
+
+cdef class AsyncFlightClient:
+    """
+    The async interface of a FlightClient.
+
+    This interface is EXPERIMENTAL.
+    """
+
+    cdef:
+        FlightClient client
+
+    def __init__(self, FlightClient client) -> None:
+        self.client = client
+
+    # TODO: return type that allows you to optionally get response
+    # headers/trailers - maybe a runtime option? since we'd have to copy all
+    # of them. or just always copy them? moot point for now since not yet
+    # exposed.

Review Comment:
   Why would we need to copy them? Are they not available anymore?



##########
python/pyarrow/_flight.pyx:
##########
@@ -1219,6 +1224,80 @@ cdef class FlightMetadataWriter(_Weakrefable):
             check_flight_status(self.writer.get().WriteMetadata(deref(buf)))
 
 
+class AsyncCall:
+    # General strategy: create an anyio Event, tell the future to wake it on 
success/error
+    def __init__(self):
+        self.event = anyio.Event()
+        self.result = None
+        self.exception = None
+
+    async def wait(self) -> object:
+        print("Wait event")
+        import asyncio
+        self._loop = asyncio.get_running_loop()
+        await self.event.wait()
+        print("Waiting event")
+        if self.exception:
+            raise self.exception
+        return self.result
+
+
+cdef class AsyncFlightClient:
+    """
+    The async interface of a FlightClient.
+
+    This interface is EXPERIMENTAL.
+    """
+
+    cdef:
+        FlightClient client
+
+    def __init__(self, FlightClient client) -> None:
+        self.client = client
+
+    # TODO: return type that allows you to optionally get response
+    # headers/trailers - maybe a runtime option? since we'd have to copy all
+    # of them. or just always copy them? moot point for now since not yet
+    # exposed.
+    async def get_flight_info(
+        self,
+        descriptor: FlightDescriptor,
+        *,
+        options: FlightCallOptions = None,
+    ):
+        call = AsyncCall()
+        self._get_flight_info(call, descriptor, options)
+        return await call.wait()
+
+    cdef _get_flight_info(self, call, descriptor, options):
+        cdef:
+            CFlightCallOptions* c_options = \
+                FlightCallOptions.unwrap(options)
+            CFlightDescriptor c_descriptor = \
+                FlightDescriptor.unwrap(descriptor)
+            function[cb_client_async_get_flight_info] callback = \
+                &_client_async_get_flight_info
+
+        with nogil:
+            CAsyncGetFlightInfo(self.client.client.get(), deref(c_options), 
c_descriptor, call, callback)
+
+
+cdef void _client_async_get_flight_info(void* self, CFlightInfo* info, const 
CStatus& status):
+    cdef FlightInfo result = FlightInfo.__new__(FlightInfo)
+    call = <object> self
+    if not status.ok():
+        print("Failed")
+        call.exception = RuntimeError(status.ToString())

Review Comment:
   Should use our regular Status conversion routines.



##########
python/pyarrow/src/arrow/python/flight.cc:
##########
@@ -383,6 +383,31 @@ Status CreateSchemaResult(const 
std::shared_ptr<arrow::Schema>& schema,
   return arrow::flight::SchemaResult::Make(*schema).Value(out);
 }
 
+void AsyncGetFlightInfo(
+    arrow::flight::FlightClient* client,
+    const arrow::flight::FlightCallOptions& options,
+    const arrow::flight::FlightDescriptor& descriptor,
+    PyObject* context, AsyncGetFlightInfoCallback callback) {
+  // TODO: OwnedRefNoGil the context?

Review Comment:
   Should certainly do so indeed.



##########
python/pyarrow/_flight.pyx:
##########
@@ -1320,6 +1399,11 @@ cdef class FlightClient(_Weakrefable):
             check_flight_status(CFlightClient.Connect(c_location, c_options
                                                       ).Value(&self.client))
 
+    def as_async(self) -> None:
+        if anyio is None:
+            raise RuntimeError("anyio is required for the async interface")

Review Comment:
   I'm not sure why we would want `anyio`. Just start with plain `asyncio`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to