This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 6f2231353 fix incorrect buffer size limiting in parquet async writer
(#4478)
6f2231353 is described below
commit 6f2231353f92dbffea7d5fd9c93db2641c7ffc41
Author: Zhang Li <[email protected]>
AuthorDate: Mon Jul 10 23:10:07 2023 +0800
fix incorrect buffer size limiting in parquet async writer (#4478)
* fix incorrect buffer size limiting in parquet async writer
* Format
* Review feedback
---------
Co-authored-by: zhangli20 <[email protected]>
Co-authored-by: Raphael Taylor-Davies <[email protected]>
---
parquet/src/arrow/async_writer/mod.rs | 20 +++++++++++++++-----
1 file changed, 15 insertions(+), 5 deletions(-)
diff --git a/parquet/src/arrow/async_writer/mod.rs
b/parquet/src/arrow/async_writer/mod.rs
index 339618364..4d8cf1b90 100644
--- a/parquet/src/arrow/async_writer/mod.rs
+++ b/parquet/src/arrow/async_writer/mod.rs
@@ -77,12 +77,16 @@ pub struct AsyncArrowWriter<W> {
/// The inner buffer shared by the `sync_writer` and the `async_writer`
shared_buffer: SharedBuffer,
+
+ /// Trigger forced flushing once buffer size reaches this value
+ buffer_size: usize,
}
impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
/// Try to create a new Async Arrow Writer.
///
- /// `buffer_size` determines the initial size of the intermediate buffer.
+ /// `buffer_size` determines the number of bytes to buffer before flushing
+ /// to the underlying [`AsyncWrite`]
///
/// The intermediate buffer will automatically be resized if necessary
///
@@ -102,6 +106,7 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
sync_writer,
async_writer: writer,
shared_buffer,
+ buffer_size,
})
}
@@ -111,7 +116,12 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
/// checked and flush if at least half full
pub async fn write(&mut self, batch: &RecordBatch) -> Result<()> {
self.sync_writer.write(batch)?;
- Self::try_flush(&mut self.shared_buffer, &mut self.async_writer,
false).await
+ Self::try_flush(
+ &mut self.shared_buffer,
+ &mut self.async_writer,
+ self.buffer_size,
+ )
+ .await
}
/// Append [`KeyValue`] metadata in addition to those in
[`WriterProperties`]
@@ -128,7 +138,7 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
let metadata = self.sync_writer.close()?;
// Force to flush the remaining data.
- Self::try_flush(&mut self.shared_buffer, &mut self.async_writer,
true).await?;
+ Self::try_flush(&mut self.shared_buffer, &mut self.async_writer,
0).await?;
self.async_writer.shutdown().await?;
Ok(metadata)
@@ -139,10 +149,10 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
async fn try_flush(
shared_buffer: &mut SharedBuffer,
async_writer: &mut W,
- force: bool,
+ buffer_size: usize,
) -> Result<()> {
let mut buffer = shared_buffer.buffer.try_lock().unwrap();
- if !force && buffer.len() < buffer.capacity() / 2 {
+ if buffer.is_empty() || buffer.len() < buffer_size {
// no need to flush
return Ok(());
}