viirya commented on code in PR #3402:
URL: https://github.com/apache/arrow-rs/pull/3402#discussion_r1061812232


##########
arrow-flight/src/client.rs:
##########
@@ -256,13 +258,273 @@ impl FlightClient {
         Ok(response)
     }
 
+    /// Make a `DoPut` call to the server with the provided
+    /// [`Stream`](futures::Stream) of [`FlightData`] and returning a
+    /// stream of [`PutResult`].
+    ///
+    /// # Example:
+    /// ```no_run
+    /// # async fn run() {
+    /// # use futures::{TryStreamExt, StreamExt};
+    /// # use std::sync::Arc;
+    /// # use arrow_array::UInt64Array;
+    /// # use arrow_array::RecordBatch;
+    /// # use arrow_flight::{FlightClient, FlightDescriptor, PutResult};
+    /// # use arrow_flight::encode::FlightDataEncoderBuilder;
+    /// # let batch = RecordBatch::try_from_iter(vec![
+    /// #  ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
+    /// # ]).unwrap();
+    /// # let channel: tonic::transport::Channel = unimplemented!();
+    /// let mut client = FlightClient::new(channel);
+    ///
+    /// // encode the batch as a stream of `FlightData`
+    /// let flight_data_stream = FlightDataEncoderBuilder::new()
+    ///   .build(futures::stream::iter(vec![Ok(batch)]))
+    ///   // data encoder return Results, but do_put requires FlightData
+    ///   .map(|batch|batch.unwrap());
+    ///
+    /// // send the stream and get the results as `PutResult`
+    /// let response: Vec<PutResult>= client
+    ///   .do_put(flight_data_stream)
+    ///   .await
+    ///   .unwrap()
+    ///   .try_collect() // use TryStreamExt to collect stream
+    ///   .await
+    ///   .expect("error calling do_put");
+    /// # }
+    /// ```
+    pub async fn do_put<S: Stream<Item = FlightData> + Send + 'static>(
+        &mut self,
+        request: S,
+    ) -> Result<BoxStream<'static, Result<PutResult>>> {
+        let request = self.make_request(request);
+
+        let response = self
+            .inner
+            .do_put(request)
+            .await?
+            .into_inner()
+            .map_err(FlightError::Tonic);
+
+        Ok(response.boxed())
+    }
+
+    /// Make a `DoExchange` call to the server with the provided
+    /// [`Stream`](futures::Stream) of [`FlightData`] and returning a
+    /// stream of [`FlightData`].
+    ///
+    /// # Example:
+    /// ```no_run
+    /// # async fn run() {
+    /// # use futures::{TryStreamExt, StreamExt};
+    /// # use std::sync::Arc;
+    /// # use arrow_array::UInt64Array;
+    /// # use arrow_array::RecordBatch;
+    /// # use arrow_flight::{FlightClient, FlightDescriptor, PutResult};
+    /// # use arrow_flight::encode::FlightDataEncoderBuilder;
+    /// # let batch = RecordBatch::try_from_iter(vec![
+    /// #  ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
+    /// # ]).unwrap();
+    /// # let channel: tonic::transport::Channel = unimplemented!();
+    /// let mut client = FlightClient::new(channel);
+    ///
+    /// // encode the batch as a stream of `FlightData`
+    /// let flight_data_stream = FlightDataEncoderBuilder::new()
+    ///   .build(futures::stream::iter(vec![Ok(batch)]))
+    ///   // data encoder return Results, but do_put requires FlightData
+    ///   .map(|batch|batch.unwrap());
+    ///
+    /// // send the stream and get the results as `RecordBatches`
+    /// let response: Vec<RecordBatch> = client
+    ///   .do_exchange(flight_data_stream)
+    ///   .await
+    ///   .unwrap()
+    ///   .try_collect() // use TryStreamExt to collect stream
+    ///   .await
+    ///   .expect("error calling do_put");
+    /// # }
+    /// ```
+    pub async fn do_exchange<S: Stream<Item = FlightData> + Send + 'static>(
+        &mut self,
+        request: S,
+    ) -> Result<FlightRecordBatchStream> {
+        let request = self.make_request(request);
+
+        let response = self
+            .inner
+            .do_exchange(request)
+            .await?
+            .into_inner()
+            .map_err(FlightError::Tonic);
+
+        Ok(FlightRecordBatchStream::new_from_flight_data(response))
+    }
+
+    /// Make a `ListFlights` call to the server with the provided
+    /// critera and returns a [`Stream`](futures::Stream) of [`FlightInfo`].
+    ///
+    /// # Example:
+    /// ```no_run
+    /// # async fn run() {
+    /// # use futures::TryStreamExt;
+    /// # use bytes::Bytes;
+    /// # use arrow_flight::{FlightInfo, FlightClient};
+    /// # let channel: tonic::transport::Channel = unimplemented!();
+    /// let mut client = FlightClient::new(channel);
+    ///
+    /// // Send 'Name=Foo' bytes as the "expression" to the server
+    /// // and gather the returned FlightInfo
+    /// let responses: Vec<FlightInfo> = client
+    ///   .list_flights(Bytes::from("Name=Foo"))
+    ///   .await
+    ///   .expect("error listing flights")
+    ///   .try_collect() // use TryStreamExt to collect stream
+    ///   .await
+    ///   .expect("error gathering flights");
+    /// # }
+    /// ```
+    pub async fn list_flights(
+        &mut self,
+        expression: impl Into<Bytes>,
+    ) -> Result<BoxStream<'static, Result<FlightInfo>>> {
+        let request = Criteria {
+            expression: expression.into(),
+        };
+
+        let request = self.make_request(request);
+
+        let response = self
+            .inner
+            .list_flights(request)
+            .await?
+            .into_inner()
+            .map_err(FlightError::Tonic);
+
+        Ok(response.boxed())
+    }
+
+    /// Make a `GetSchema` call to the server with the provided
+    /// [`FlightDescriptor`] and returns the associated [`Schema`].
+    ///
+    /// # Example:
+    /// ```no_run
+    /// # async fn run() {
+    /// # use bytes::Bytes;
+    /// # use arrow_flight::{FlightDescriptor, FlightClient};
+    /// # use arrow_schema::Schema;
+    /// # let channel: tonic::transport::Channel = unimplemented!();
+    /// let mut client = FlightClient::new(channel);
+    ///
+    /// // Request the schema result of a 'CMD' request to the server
+    /// let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec());
+    ///
+    /// let schema: Schema = client
+    ///   .get_schema(request)
+    ///   .await
+    ///   .expect("error making request");
+    /// # }
+    /// ```
+    pub async fn get_schema(
+        &mut self,
+        flight_descriptor: FlightDescriptor,
+    ) -> Result<Schema> {
+        let request = self.make_request(flight_descriptor);
+
+        let schema_result = self.inner.get_schema(request).await?.into_inner();
+
+        // attempt decode from IPC
+        let schema: Schema = schema_result.try_into()?;
+
+        Ok(schema)
+    }
+
+    /// Make a `ListActions` call to the server and returns a

Review Comment:
   ```suggestion
       /// Make a `ListActions` call to the server and returning a
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to