This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 181cb3d66e Minor: Clarify rationale for FlightDataEncoder API, add
examples (#4916)
181cb3d66e is described below
commit 181cb3d66e33c689be31292646ae63879cf0c134
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Oct 11 02:57:45 2023 -0400
Minor: Clarify rationale for FlightDataEncoder API, add examples (#4916)
---
arrow-flight/src/encode.rs | 48 ++++++++++++++++++++++++++++++++++++++++++----
1 file changed, 44 insertions(+), 4 deletions(-)
diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs
index cd2ee7c02b..28c181c0d5 100644
--- a/arrow-flight/src/encode.rs
+++ b/arrow-flight/src/encode.rs
@@ -30,6 +30,11 @@ use futures::{ready, stream::BoxStream, Stream, StreamExt};
/// This can be used to implement [`FlightService::do_get`] in an
/// Arrow Flight implementation;
///
+/// This structure encodes a stream of `Result`s rather than `RecordBatch`es
to
+/// propagate errors from streaming execution, where the generation of the
+/// `RecordBatch`es is incremental, and an error may occur even after
+/// several have already been successfully produced.
+///
/// # Caveats
/// 1. [`DictionaryArray`](arrow_array::array::DictionaryArray)s
/// are converted to their underlying types prior to transport, due to
@@ -41,14 +46,14 @@ use futures::{ready, stream::BoxStream, Stream, StreamExt};
/// # use arrow_array::{ArrayRef, RecordBatch, UInt32Array};
/// # async fn f() {
/// # let c1 = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]);
-/// # let record_batch = RecordBatch::try_from_iter(vec![
+/// # let batch = RecordBatch::try_from_iter(vec![
/// # ("a", Arc::new(c1) as ArrayRef)
/// # ])
/// # .expect("cannot create record batch");
/// use arrow_flight::encode::FlightDataEncoderBuilder;
///
/// // Get an input stream of Result<RecordBatch, FlightError>
-/// let input_stream = futures::stream::iter(vec![Ok(record_batch)]);
+/// let input_stream = futures::stream::iter(vec![Ok(batch)]);
///
/// // Build a stream of `Result<FlightData>` (e.g. to return for do_get)
/// let flight_data_stream = FlightDataEncoderBuilder::new()
@@ -59,6 +64,39 @@ use futures::{ready, stream::BoxStream, Stream, StreamExt};
/// # }
/// ```
///
+/// # Example: Sending `Vec<RecordBatch>`
+///
+/// You can create a [`Stream`] to pass to [`Self::build`] from an existing
+/// `Vec` of `RecordBatch`es like this:
+///
+/// ```
+/// # use std::sync::Arc;
+/// # use arrow_array::{ArrayRef, RecordBatch, UInt32Array};
+/// # async fn f() {
+/// # fn make_batches() -> Vec<RecordBatch> {
+/// # let c1 = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]);
+/// # let batch = RecordBatch::try_from_iter(vec![
+/// # ("a", Arc::new(c1) as ArrayRef)
+/// # ])
+/// # .expect("cannot create record batch");
+/// # vec![batch.clone(), batch.clone()]
+/// # }
+/// use arrow_flight::encode::FlightDataEncoderBuilder;
+///
+/// // Get batches that you want to send via Flight
+/// let batches: Vec<RecordBatch> = make_batches();
+///
+/// // Create an input stream of Result<RecordBatch, FlightError>
+/// let input_stream = futures::stream::iter(
+/// batches.into_iter().map(Ok)
+/// );
+///
+/// // Build a stream of `Result<FlightData>` (e.g. to return for do_get)
+/// let flight_data_stream = FlightDataEncoderBuilder::new()
+/// .build(input_stream);
+/// # }
+/// ```
+///
/// [`FlightService::do_get`]:
crate::flight_service_server::FlightService::do_get
/// [`FlightError`]: crate::error::FlightError
#[derive(Debug)]
@@ -146,8 +184,10 @@ impl FlightDataEncoderBuilder {
self
}
- /// Return a [`Stream`] of [`FlightData`],
- /// consuming self. More details on [`FlightDataEncoder`]
+ /// Takes a [`Stream`] of [`Result<RecordBatch>`] and returns a [`Stream`]
+ /// of [`FlightData`], consuming self.
+ ///
+ /// See example on [`Self`] and [`FlightDataEncoder`] for more details
pub fn build<S>(self, input: S) -> FlightDataEncoder
where
S: Stream<Item = Result<RecordBatch>> + Send + 'static,