alamb commented on code in PR #7212:
URL: https://github.com/apache/arrow-datafusion/pull/7212#discussion_r1287045354
##########
datafusion/core/src/datasource/file_format/mod.rs:
##########
@@ -313,6 +313,73 @@ pub trait BatchSerializer: Unpin + Send {
async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes>;
}
+async fn check_for_errors<T, W: AsyncWrite + Unpin + Send>(
+ result: Result<T>,
+ writers: &mut [AbortableWrite<W>],
+) -> Result<T> {
+ match result {
+ Ok(value) => Ok(value),
+ Err(e) => {
+ // Abort all writers before returning the error:
+ for writer in writers {
+ let mut abort_future = writer.abort_writer();
+ if let Ok(abort_future) = &mut abort_future {
+ let _ = abort_future.await;
+ }
+ // Ignore errors that occur during abortion,
+ // We do try to abort all writers before returning error.
+ }
+ // After aborting writers return original error.
+ Err(e)
+ }
+ }
+}
+
+/// Contains the common logic for serializing RecordBatches and
+/// writing the resulting bytes to an ObjectStore.
+/// Serialization is assumed to be stateless, i.e.
+/// each RecordBatch can be serialized without any
+/// dependency on the RecordBatches before or after.
+async fn stateless_serialize_and_write_files(
Review Comment:
I agree -- and I think that will mean when we parallelize the logic more all
the writers will benefit
##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -1632,19 +1695,9 @@ mod tests {
// Assert that the batches read from the file match the expected
result.
assert_batches_eq!(expected, &res);
- // Open the CSV file, read its contents as a record batch, and collect
the batches into a vector.
- let file = File::open(path.clone())?;
- let reader = csv::ReaderBuilder::new(schema.clone())
- .has_header(true)
- .with_batch_size(batch_size)
- .build(file)
- .map_err(|e| DataFusionError::Internal(e.to_string()))?;
-
- let batches = reader
- .collect::<Vec<ArrowResult<RecordBatch>>>()
- .into_iter()
- .collect::<ArrowResult<Vec<RecordBatch>>>()
- .map_err(|e| DataFusionError::Internal(e.to_string()))?;
+
+ // Read the records in the table
+ let batches = session_ctx.sql("select * from
t").await?.collect().await?;
Review Comment:
👍
##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -1632,19 +1695,9 @@ mod tests {
// Assert that the batches read from the file match the expected
result.
assert_batches_eq!(expected, &res);
- // Open the CSV file, read its contents as a record batch, and collect
the batches into a vector.
- let file = File::open(path.clone())?;
- let reader = csv::ReaderBuilder::new(schema.clone())
- .has_header(true)
- .with_batch_size(batch_size)
- .build(file)
- .map_err(|e| DataFusionError::Internal(e.to_string()))?;
-
- let batches = reader
- .collect::<Vec<ArrowResult<RecordBatch>>>()
- .into_iter()
- .collect::<ArrowResult<Vec<RecordBatch>>>()
- .map_err(|e| DataFusionError::Internal(e.to_string()))?;
+
+ // Read the records in the table
+ let batches = session_ctx.sql("select * from
t").await?.collect().await?;
Review Comment:
👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]