alamb commented on code in PR #60:
URL: https://github.com/apache/datafusion-ray/pull/60#discussion_r1955311978
##########
src/util.rs:
##########
@@ -0,0 +1,457 @@
+use std::collections::HashMap;
+use std::fmt::Display;
+use std::future::Future;
+use std::io::Cursor;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::time::Duration;
+
+use arrow::array::RecordBatch;
+use arrow::datatypes::SchemaRef;
+use arrow::error::ArrowError;
+use arrow::ipc::convert::fb_to_schema;
+use arrow::ipc::reader::StreamReader;
+use arrow::ipc::writer::{IpcWriteOptions, StreamWriter};
+use arrow::ipc::{root_as_message, MetadataVersion};
+use arrow::pyarrow::*;
+use arrow::util::pretty;
+use arrow_flight::{FlightClient, FlightData, Ticket};
+use async_stream::stream;
+use datafusion::common::internal_datafusion_err;
+use datafusion::common::tree_node::{Transformed, TreeNode};
+use datafusion::datasource::physical_plan::ParquetExec;
+use datafusion::error::DataFusionError;
+use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream,
SessionStateBuilder};
+use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
+use datafusion::physical_plan::{displayable, ExecutionPlan,
ExecutionPlanProperties};
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_proto::physical_plan::AsExecutionPlan;
+use futures::{Stream, StreamExt};
+use parking_lot::Mutex;
+use pyo3::prelude::*;
+use pyo3::types::{PyBytes, PyList};
+use tonic::transport::Channel;
+
+use crate::codec::RayCodec;
+use crate::protobuf::FlightTicketData;
+use crate::ray_stage_reader::RayStageReaderExec;
+use crate::stage_service::ServiceClients;
+use prost::Message;
+use tokio::macros::support::thread_rng_n;
+
+pub(crate) trait ResultExt<T> {
+ fn to_py_err(self) -> PyResult<T>;
+}
+
+impl<T, E> ResultExt<T> for Result<T, E>
+where
+ E: std::fmt::Debug,
+{
+ fn to_py_err(self) -> PyResult<T> {
+ match self {
+ Ok(x) => Ok(x),
+ Err(e) => Err(PyErr::new::<pyo3::exceptions::PyException,
_>(format!(
+ "{:?}",
+ e
+ ))),
+ }
+ }
+}
+
+/// we need these two functions to go back and forth between IPC
representations
+/// from rust to rust to avoid using the C++ implementation from pyarrow as it
+/// will generate unaligned data causing us errors
+///
+/// not used in current arrow flight implementation, but leaving these here
+#[pyfunction]
+pub fn batch_to_ipc(py: Python, batch: PyArrowType<RecordBatch>) ->
PyResult<Py<PyBytes>> {
+ let batch = batch.0;
+
+ let bytes = batch_to_ipc_helper(&batch).to_py_err()?;
+
+ //TODO: unsure about this next line. Compiler is happy but is this
correct?
+ Ok(PyBytes::new_bound(py, &bytes).unbind())
+}
+
+#[pyfunction]
+pub fn ipc_to_batch(bytes: &[u8], py: Python) -> PyResult<PyObject> {
+ let batch = ipc_to_batch_helper(bytes).to_py_err()?;
+ batch.to_pyarrow(py)
+}
+
+fn batch_to_ipc_helper(batch: &RecordBatch) -> Result<Vec<u8>, ArrowError> {
+ let schema = batch.schema();
+ let buffer: Vec<u8> = Vec::new();
+ let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)
+ .map_err(|e| internal_datafusion_err!("Cannot create ipcwriteoptions
{e}"))?;
+
+ let mut stream_writer = StreamWriter::try_new_with_options(buffer,
&schema, options)?;
+ stream_writer.write(batch)?;
+ stream_writer.into_inner()
+}
+
+fn ipc_to_batch_helper(bytes: &[u8]) -> Result<RecordBatch, ArrowError> {
+ let mut stream_reader = StreamReader::try_new_buffered(Cursor::new(bytes),
None)?;
Review Comment:
Also, once this is available
- https://github.com/apache/arrow-rs/pull/7120
You can probably save quite a bit of time revalidating known good inputs
##########
src/util.rs:
##########
@@ -0,0 +1,457 @@
+use std::collections::HashMap;
+use std::fmt::Display;
+use std::future::Future;
+use std::io::Cursor;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::time::Duration;
+
+use arrow::array::RecordBatch;
+use arrow::datatypes::SchemaRef;
+use arrow::error::ArrowError;
+use arrow::ipc::convert::fb_to_schema;
+use arrow::ipc::reader::StreamReader;
+use arrow::ipc::writer::{IpcWriteOptions, StreamWriter};
+use arrow::ipc::{root_as_message, MetadataVersion};
+use arrow::pyarrow::*;
+use arrow::util::pretty;
+use arrow_flight::{FlightClient, FlightData, Ticket};
+use async_stream::stream;
+use datafusion::common::internal_datafusion_err;
+use datafusion::common::tree_node::{Transformed, TreeNode};
+use datafusion::datasource::physical_plan::ParquetExec;
+use datafusion::error::DataFusionError;
+use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream,
SessionStateBuilder};
+use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
+use datafusion::physical_plan::{displayable, ExecutionPlan,
ExecutionPlanProperties};
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_proto::physical_plan::AsExecutionPlan;
+use futures::{Stream, StreamExt};
+use parking_lot::Mutex;
+use pyo3::prelude::*;
+use pyo3::types::{PyBytes, PyList};
+use tonic::transport::Channel;
+
+use crate::codec::RayCodec;
+use crate::protobuf::FlightTicketData;
+use crate::ray_stage_reader::RayStageReaderExec;
+use crate::stage_service::ServiceClients;
+use prost::Message;
+use tokio::macros::support::thread_rng_n;
+
+pub(crate) trait ResultExt<T> {
+ fn to_py_err(self) -> PyResult<T>;
+}
+
+impl<T, E> ResultExt<T> for Result<T, E>
+where
+ E: std::fmt::Debug,
+{
+ fn to_py_err(self) -> PyResult<T> {
+ match self {
+ Ok(x) => Ok(x),
+ Err(e) => Err(PyErr::new::<pyo3::exceptions::PyException,
_>(format!(
+ "{:?}",
+ e
+ ))),
+ }
+ }
+}
+
+/// we need these two functions to go back and forth between IPC
representations
+/// from rust to rust to avoid using the C++ implementation from pyarrow as it
+/// will generate unaligned data causing us errors
+///
+/// not used in current arrow flight implementation, but leaving these here
+#[pyfunction]
+pub fn batch_to_ipc(py: Python, batch: PyArrowType<RecordBatch>) ->
PyResult<Py<PyBytes>> {
+ let batch = batch.0;
+
+ let bytes = batch_to_ipc_helper(&batch).to_py_err()?;
+
+ //TODO: unsure about this next line. Compiler is happy but is this
correct?
+ Ok(PyBytes::new_bound(py, &bytes).unbind())
+}
+
+#[pyfunction]
+pub fn ipc_to_batch(bytes: &[u8], py: Python) -> PyResult<PyObject> {
+ let batch = ipc_to_batch_helper(bytes).to_py_err()?;
+ batch.to_pyarrow(py)
+}
+
+fn batch_to_ipc_helper(batch: &RecordBatch) -> Result<Vec<u8>, ArrowError> {
+ let schema = batch.schema();
+ let buffer: Vec<u8> = Vec::new();
+ let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)
+ .map_err(|e| internal_datafusion_err!("Cannot create ipcwriteoptions
{e}"))?;
+
+ let mut stream_writer = StreamWriter::try_new_with_options(buffer,
&schema, options)?;
+ stream_writer.write(batch)?;
+ stream_writer.into_inner()
+}
+
+fn ipc_to_batch_helper(bytes: &[u8]) -> Result<RecordBatch, ArrowError> {
+ let mut stream_reader = StreamReader::try_new_buffered(Cursor::new(bytes),
None)?;
Review Comment:
FYI since you are reading from memory here anyways I don't think buffering
adds much extra value
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]