This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f2231353 fix incorrect buffer size limiting in parquet async writer 
(#4478)
6f2231353 is described below

commit 6f2231353f92dbffea7d5fd9c93db2641c7ffc41
Author: Zhang Li <[email protected]>
AuthorDate: Mon Jul 10 23:10:07 2023 +0800

    fix incorrect buffer size limiting in parquet async writer (#4478)
    
    * fix incorrect buffer size limiting in parquet async writer
    
    * Format
    
    * Review feedback
    
    ---------
    
    Co-authored-by: zhangli20 <[email protected]>
    Co-authored-by: Raphael Taylor-Davies <[email protected]>
---
 parquet/src/arrow/async_writer/mod.rs | 20 +++++++++++++++-----
 1 file changed, 15 insertions(+), 5 deletions(-)

diff --git a/parquet/src/arrow/async_writer/mod.rs 
b/parquet/src/arrow/async_writer/mod.rs
index 339618364..4d8cf1b90 100644
--- a/parquet/src/arrow/async_writer/mod.rs
+++ b/parquet/src/arrow/async_writer/mod.rs
@@ -77,12 +77,16 @@ pub struct AsyncArrowWriter<W> {
 
     /// The inner buffer shared by the `sync_writer` and the `async_writer`
     shared_buffer: SharedBuffer,
+
+    /// Trigger forced flushing once buffer size reaches this value
+    buffer_size: usize,
 }
 
 impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
     /// Try to create a new Async Arrow Writer.
     ///
-    /// `buffer_size` determines the initial size of the intermediate buffer.
+    /// `buffer_size` determines the number of bytes to buffer before flushing
+    /// to the underlying [`AsyncWrite`]
     ///
     /// The intermediate buffer will automatically be resized if necessary
     ///
@@ -102,6 +106,7 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
             sync_writer,
             async_writer: writer,
             shared_buffer,
+            buffer_size,
         })
     }
 
@@ -111,7 +116,12 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
     /// checked and flush if at least half full
     pub async fn write(&mut self, batch: &RecordBatch) -> Result<()> {
         self.sync_writer.write(batch)?;
-        Self::try_flush(&mut self.shared_buffer, &mut self.async_writer, 
false).await
+        Self::try_flush(
+            &mut self.shared_buffer,
+            &mut self.async_writer,
+            self.buffer_size,
+        )
+        .await
     }
 
     /// Append [`KeyValue`] metadata in addition to those in 
[`WriterProperties`]
@@ -128,7 +138,7 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
         let metadata = self.sync_writer.close()?;
 
         // Force to flush the remaining data.
-        Self::try_flush(&mut self.shared_buffer, &mut self.async_writer, 
true).await?;
+        Self::try_flush(&mut self.shared_buffer, &mut self.async_writer, 
0).await?;
         self.async_writer.shutdown().await?;
 
         Ok(metadata)
@@ -139,10 +149,10 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
     async fn try_flush(
         shared_buffer: &mut SharedBuffer,
         async_writer: &mut W,
-        force: bool,
+        buffer_size: usize,
     ) -> Result<()> {
         let mut buffer = shared_buffer.buffer.try_lock().unwrap();
-        if !force && buffer.len() < buffer.capacity() / 2 {
+        if buffer.is_empty() || buffer.len() < buffer_size {
             // no need to flush
             return Ok(());
         }

Reply via email to