westonpace commented on PR #43632: URL: https://github.com/apache/arrow/pull/43632#issuecomment-2281940063
I've been coding Rust too long now and as a result I must at least offer, for your consideration, a pull based asynchronous stream (which is convenient in that it is nearly identical to the synchronous version and it doesn't require special backpressure signals). Cancellation is handled by releasing the `ArrowAsyncDeviceArrayStream` before it is finished (if canceled while a pending call is in progress then the `Waker` needs to stay alive until `wake` is called and the producer needs to call `wake` even though it has been released). If parallelism is desired by the consumer then the consumer should launch a new thread task to call `get_next` as soon as the previous call completes (while the old thread processes the data returned by the previous call to `get_next`). ``` struct Waker { // Signal to the producer that more data is available, the consumer shall release any resources // associated with the waker after this method is called. The producer shall not call any other // methods after calling this method. // // The producer must always call this method even if an error is encountered (in which case the // error will be reported on the following call to `get_next`). void wake(Waker* waker); void* private_data; }; struct ArrowAsyncDeviceArrayStream { // Callback to get the next array // (if no error and the array is released, the stream has ended) // // Return value: 0 if successful, an `errno`-compatible error code otherwise. // // If EWOULDBLOCK is returned then the producer is not ready to generate more data. If the // producer returns this value then the producer takes ownership of `waker` and is responsible // for calling `wake` when more data is available. If any other value is returned then the producer // shall ignore `waker` and never call any methods on it. // // If successful, the ArrowDeviceArray must be released independently from the stream. int (*get_next)(struct ArrowAsyncDeviceArrayStream* self, Waker* waker, struct ArrowDeviceArray* out); // The rest is identical to `ArrowDeviceArrayStream` ArrowDeviceType device_type; const char* (*get_last_error)(struct ArrowAsyncDeviceArrayStream* self); void (*release)(struct ArrowAsyncDeviceArrayStream* self); void* private_data; }; ``` Or, just for fun, an epoll-styled approach, which has the same pull-based advantages and doesn't require a waker but is less friendly towards coroutine-style asynchronicity. ``` struct ArrowAsyncDeviceArrayStream { // Callback to get the next array // (if no error and the array is released, the stream has ended) // // Return value: 0 if successful, an `errno`-compatible error code otherwise. // // If EWOULDBLOCK is returned then the producer is not ready to generate more data. The // consumer should call `wait`. `out` will not be valid. // // If successful, the ArrowDeviceArray must be released independently from the stream. int (*get_next)(struct ArrowAsyncDeviceArrayStream* self, struct ArrowDeviceArray* out); // Blocks until data is available void (*wait)(struct ArrowAsyncDeviceArrayStream* self, int timeout); // The rest is identical to `ArrowDeviceArrayStream` ArrowDeviceType device_type; const char* (*get_last_error)(struct ArrowAsyncDeviceArrayStream* self); void (*release)(struct ArrowAsyncDeviceArrayStream* self); void* private_data; }; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org