tustvold commented on a change in pull request #1214:
URL: https://github.com/apache/arrow-rs/pull/1214#discussion_r789070080



##########
File path: parquet/src/arrow/arrow_writer.rs
##########
@@ -75,54 +87,109 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> {
 
         Ok(Self {
             writer: file_writer,
+            buffer: vec![Default::default(); arrow_schema.fields().len()],
+            buffered_rows: 0,
             arrow_schema,
             max_row_group_size,
         })
     }
 
-    /// Write a RecordBatch to writer
+    /// Enqueues the provided `RecordBatch` to be written
     ///
-    /// The writer will slice the `batch` into `max_row_group_size`,
-    /// but if a batch has left-over rows less than the row group size,
-    /// the last row group will have fewer records.
-    /// This is currently a limitation  because we close the row group
-    /// instead of keeping it open for the next batch.
+    /// If following this there are more than `max_row_group_size` rows 
buffered,
+    /// this will flush out one or more row groups with `max_row_group_size` 
rows,
+    /// and drop any fully written `RecordBatch`
     pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
         // validate batch schema against writer's supplied schema
         if self.arrow_schema != batch.schema() {
             return Err(ParquetError::ArrowError(
                 "Record batch schema does not match writer schema".to_string(),
             ));
         }
-        // Track the number of rows being written in the batch.
-        // We currently do not have a way of slicing nested arrays, thus we
-        // track this manually.
-        let num_rows = batch.num_rows();
-        let batches = (num_rows + self.max_row_group_size - 1) / 
self.max_row_group_size;
-        let min_batch = num_rows.min(self.max_row_group_size);
-        for batch_index in 0..batches {
-            // Determine the offset and length of arrays
-            let offset = batch_index * min_batch;
-            let length = (num_rows - offset).min(self.max_row_group_size);
-
-            // Compute the definition and repetition levels of the batch
-            let batch_level = LevelInfo::new(offset, length);
-            let mut row_group_writer = self.writer.next_row_group()?;
-            for (array, field) in 
batch.columns().iter().zip(batch.schema().fields()) {
-                let mut levels = batch_level.calculate_array_levels(array, 
field);
-                // Reverse levels as we pop() them when writing arrays
-                levels.reverse();
-                write_leaves(&mut row_group_writer, array, &mut levels)?;
+
+        for (buffer, batch) in self.buffer.iter_mut().zip(batch.columns()) {
+            buffer.push_back(batch.clone())
+        }
+
+        self.buffered_rows += batch.num_rows();
+
+        self.flush_completed()?;
+
+        Ok(())
+    }
+
+    /// Flushes buffered data until there are less than `max_row_group_size` 
rows buffered
+    fn flush_completed(&mut self) -> Result<()> {
+        while self.buffered_rows >= self.max_row_group_size {
+            self.flush_row_group(self.max_row_group_size)?;
+        }
+        Ok(())
+    }
+
+    /// Flushes `num_rows` from the buffer into a new row group
+    fn flush_row_group(&mut self, num_rows: usize) -> Result<()> {
+        if num_rows == 0 {
+            return Ok(());
+        }
+
+        assert!(
+            num_rows <= self.buffered_rows,
+            "cannot flush {} rows only have {}",
+            num_rows,
+            self.buffered_rows
+        );
+
+        assert!(
+            num_rows <= self.max_row_group_size,
+            "cannot flush {} rows would exceed max row group size of {}",
+            num_rows,
+            self.max_row_group_size
+        );
+
+        let batch_level = LevelInfo::new(0, num_rows);
+        let mut row_group_writer = self.writer.next_row_group()?;
+
+        for (col_buffer, field) in 
self.buffer.iter_mut().zip(self.arrow_schema.fields())
+        {
+            // Collect the number of arrays to append
+            let mut remaining = num_rows;
+            let mut arrays = Vec::with_capacity(col_buffer.len());
+            while remaining != 0 {
+                match col_buffer.pop_front() {
+                    Some(next) if next.len() > remaining => {
+                        col_buffer
+                            .push_front(next.slice(remaining, next.len() - 
remaining));
+                        arrays.push(next.slice(0, remaining));
+                        remaining = 0;
+                    }
+                    Some(next) => {
+                        remaining -= next.len();
+                        arrays.push(next);
+                    }
+                    _ => break,
+                }
             }
 
-            self.writer.close_row_group(row_group_writer)?;
+            // Workaround write logic expecting a single array

Review comment:
       This is a little bit sad, but when I tried to avoid this I ended up in a 
brain-melt of supporting nested ListArrays of StructArray, etc...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to