westonpace commented on PR #43632:
URL: https://github.com/apache/arrow/pull/43632#issuecomment-2281940063

   I've been coding Rust too long now and as a result I must at least offer, 
for your consideration, a pull based asynchronous stream (which is convenient 
in that it is nearly identical to the synchronous version and it doesn't 
require special backpressure signals).  Cancellation is handled by releasing 
the `ArrowAsyncDeviceArrayStream` before it is finished (if canceled while a 
pending call is in progress then the `Waker` needs to stay alive until `wake` 
is called and the producer needs to call `wake` even though it has been 
released).
   
   If parallelism is desired by the consumer then the consumer should launch a 
new thread task to call `get_next` as soon as the previous call completes 
(while the old thread processes the data returned by the previous call to 
`get_next`).
   
   ```
   struct Waker {
     // Signal to the producer that more data is available, the consumer shall 
release any resources
     // associated with the waker after this method is called.  The producer 
shall not call any other
     // methods after calling this method.
     //
     // The producer must always call this method even if an error is 
encountered (in which case the
     // error will be reported on the following call to `get_next`).
     void wake(Waker* waker); 
     void* private_data;
   };
   
   struct ArrowAsyncDeviceArrayStream {
     // Callback to get the next array
     // (if no error and the array is released, the stream has ended)
     //
     // Return value: 0 if successful, an `errno`-compatible error code 
otherwise.
     //
     // If EWOULDBLOCK is returned then the producer is not ready to generate 
more data.  If the
     // producer returns this value then the producer takes ownership of 
`waker` and is responsible
     // for calling `wake` when more data is available.  If any other value is 
returned then the producer
     // shall ignore `waker` and never call any methods on it.
     //
     // If successful, the ArrowDeviceArray must be released independently from 
the stream.
     int (*get_next)(struct ArrowAsyncDeviceArrayStream* self, Waker* waker, 
struct ArrowDeviceArray* out);
   
     // The rest is identical to `ArrowDeviceArrayStream`
     ArrowDeviceType device_type;
     const char* (*get_last_error)(struct ArrowAsyncDeviceArrayStream* self);
     void (*release)(struct ArrowAsyncDeviceArrayStream* self);
     void* private_data;
   };
   ```
   
   Or, just for fun, an epoll-styled approach, which has the same pull-based 
advantages and doesn't require a waker but is less friendly towards 
coroutine-style asynchronicity.
   
   ```
   struct ArrowAsyncDeviceArrayStream {
     // Callback to get the next array
     // (if no error and the array is released, the stream has ended)
     //
     // Return value: 0 if successful, an `errno`-compatible error code 
otherwise.
     //
     // If EWOULDBLOCK is returned then the producer is not ready to generate 
more data.  The
     // consumer should call `wait`.  `out` will not be valid.
     //
     // If successful, the ArrowDeviceArray must be released independently from 
the stream.
     int (*get_next)(struct ArrowAsyncDeviceArrayStream* self, struct 
ArrowDeviceArray* out);
   
     // Blocks until data is available
     void (*wait)(struct ArrowAsyncDeviceArrayStream* self, int timeout);
   
     // The rest is identical to `ArrowDeviceArrayStream`
     ArrowDeviceType device_type;
     const char* (*get_last_error)(struct ArrowAsyncDeviceArrayStream* self);
     void (*release)(struct ArrowAsyncDeviceArrayStream* self);
     void* private_data;
   };
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to