tustvold commented on code in PR #4280:
URL: https://github.com/apache/arrow-rs/pull/4280#discussion_r1205524286


##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -152,43 +151,69 @@ impl<W: Write> ArrowWriter<W> {
         self.writer.flushed_row_groups()
     }
 
-    /// Enqueues the provided `RecordBatch` to be written
+    /// Returns the length in bytes of the current in progress row group
+    pub fn in_progress_size(&self) -> usize {
+        match &self.in_progress {
+            Some(in_progress) => in_progress
+                .writers
+                .iter()
+                .map(|(x, _)| x.lock().unwrap().length)
+                .sum(),
+            None => 0,
+        }
+    }
+
+    /// Encodes the provided [`RecordBatch`]
     ///
-    /// If following this there are more than `max_row_group_size` rows 
buffered,
-    /// this will flush out one or more row groups with `max_row_group_size` 
rows,
-    /// and drop any fully written `RecordBatch`
+    /// If this would cause the current row group to exceed 
[`WriterProperties::max_row_group_size`]
+    /// rows, the contents of `batch` will be distributed across multiple row 
groups such that all
+    /// but the final row group in the file contain 
[`WriterProperties::max_row_group_size`] rows
     pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
-        // validate batch schema against writer's supplied schema
-        let batch_schema = batch.schema();
-        if !(Arc::ptr_eq(&self.arrow_schema, &batch_schema)
-            || self.arrow_schema.contains(&batch_schema))
-        {
-            return Err(ParquetError::ArrowError(
-                "Record batch schema does not match writer schema".to_string(),
-            ));
+        if batch.num_rows() == 0 {
+            return Ok(());
         }
 
-        for (buffer, column) in self.buffer.iter_mut().zip(batch.columns()) {
-            buffer.push_back(column.clone())
+        // If would exceed max_row_group_size, split batch
+        if self.buffered_rows + batch.num_rows() > self.max_row_group_size {
+            let to_write = self.max_row_group_size - self.buffered_rows;
+            let a = batch.slice(0, to_write);
+            let b = batch.slice(to_write, batch.num_rows() - to_write);
+            self.write(&a)?;
+            return self.write(&b);

Review Comment:
   I was being a bit lazy here, but this recursion will only be a problem if 
the the size of the RecordBatch is orders of magnitude larger than the row 
group size, which would be a pretty strange situation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to