djanderson commented on issue #5116:
URL: https://github.com/apache/arrow-rs/issues/5116#issuecomment-1826248466

   @tustvold, thank you very much for taking the time to respond. After quite a 
bit more experimentation, I was able to get the `mpsc::channel` method working!
   
   It's a little weird since I need the `object_store` in the receiver task as 
well to fetch more metadata to generate the `FlightInfo`, I wasn't able to find 
a way to only pass the `ObjectMeta`. There may be a better way to do it than 
passing through the `ParquetObjectReader` but I'm going to include what I came 
up with in case it it gives anyone else a jumping off point. _Caveat emptor_ 
that I only verified this with a couple files and the pyarrow client.
   
   <details>
   <summary>Imports</summary>
   
   ```rust
   use arrow::ipc::writer::IpcWriteOptions;
   use arrow_flight::IpcMessage;
   use arrow_flight::{
       flight_descriptor::DescriptorType, flight_service_server::FlightService,
       flight_service_server::FlightServiceServer, Action, ActionType, 
Criteria, Empty, FlightData,
       FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, 
PutResult, SchemaAsIpc,
       SchemaResult, Ticket,
   };
   use base64::prelude::BASE64_STANDARD;
   use base64::Engine;
   use bytes::Bytes;
   use futures::channel::mpsc;
   use futures::stream::{BoxStream, StreamExt};
   use log::{debug, error, info};
   use object_store::ObjectMeta;
   use object_store::{local::LocalFileSystem, ObjectStore};
   use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
   use parquet::arrow::parquet_to_arrow_schema;
   use rand::distributions::{Alphanumeric, DistString};
   use std::collections::HashMap;
   use std::path::Path;
   use std::sync::{Arc, Mutex};
   use tonic::transport::Server;
   use tonic::{Request, Response, Status, Streaming};
   ```
   
   </details>
   
   ```rust
       async fn list_flights(
           &self,
           request: Request<Criteria>,
       ) -> Result<Response<Self::ListFlightsStream>, Status> {
           let context = self.check_session_token(&request)?;
           let (mut tx, rx) = mpsc::channel::<(ObjectMeta, 
ParquetObjectReader)>(2);
   
           let store = context.object_store;
           tokio::spawn(async move {
               let prefix = None;
               let mut objects = store.list(prefix);
               while let Some(md) = objects.next().await.transpose().unwrap() {
                   let reader = ParquetObjectReader::new(store.clone(), 
md.clone());
                   if let Err(_) = tx.try_send((md, reader)) {
                       debug!("rx channel dropped");
                       break;
                   }
               }
               tx.close_channel();
           });
   
           let result = rx.filter_map(|(object_md, mut pqt_reader)| async move {
               let Ok(pqt_md) = pqt_reader.get_metadata().await else {
                   error!("Failed to get parquet metadata from {}", 
object_md.location);
                   return None;
               };
   
               let file_md = pqt_md.file_metadata();
   
               // Convert file's schema to arrow format and serialize as IPC 
message
               let Ok(arrow_schema) = 
parquet_to_arrow_schema(file_md.schema_descr(), None) else {
                   error!("Failed to convert schema for {}", 
object_md.location);
                   return None;
               };
               let Ok(IpcMessage(schema)) =
                   SchemaAsIpc::new(&arrow_schema, 
&IpcWriteOptions::default()).try_into()
               else {
                   error!("Failed to serialize schema for {}", 
object_md.location);
                   return None;
               };
   
               let flight_descriptor = Some(FlightDescriptor {
                   r#type: DescriptorType::Path.into(),
                   cmd: Bytes::new(),
                   path: vec![object_md.location.to_string()],
               });
   
               return Some(Ok(FlightInfo {
                   flight_descriptor,
                   endpoint: vec![],
                   total_records: file_md.num_rows(),
                   total_bytes: object_md.size as i64,
                   ordered: false,
                   schema: schema.into(),
               }));
           });
   
           Ok(Response::new(Box::pin(result) as Self::ListFlightsStream))
       }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to