This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch revert-5932-revert-5860-interleave-bloom
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git

commit 5f78af70623018ec869aa05f2b0f1bdd376da046
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Jun 21 11:59:10 2024 -0400

    Revert "Revert "Write Bloom filters between row groups instead of the end  
(#…"
    
    This reverts commit 22e0b4432c9838f2536284015271d3de9a165135.
---
 parquet/Cargo.toml                    |   8 +++
 parquet/examples/write_parquet.rs     | 131 ++++++++++++++++++++++++++++++++++
 parquet/src/arrow/arrow_writer/mod.rs |  28 +++++++-
 parquet/src/arrow/async_writer/mod.rs |   4 +-
 parquet/src/file/metadata.rs          |   5 ++
 parquet/src/file/properties.rs        |  36 ++++++++++
 parquet/src/file/writer.rs            | 117 ++++++++++++++++++------------
 7 files changed, 277 insertions(+), 52 deletions(-)

diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index 775ac825a2e..eec7faf09d0 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -67,6 +67,7 @@ hashbrown = { version = "0.14", default-features = false }
 twox-hash = { version = "1.6", default-features = false }
 paste = { version = "1.0" }
 half = { version = "2.1", default-features = false, features = ["num-traits"] }
+sysinfo = { version = "0.30.12", optional = true, default-features = false }
 
 [dev-dependencies]
 base64 = { version = "0.22", default-features = false, features = ["std"] }
@@ -114,12 +115,19 @@ async = ["futures", "tokio"]
 object_store = ["dep:object_store", "async"]
 # Group Zstd dependencies
 zstd = ["dep:zstd", "zstd-sys"]
+# Display memory in example/write_parquet.rs
+sysinfo = ["dep:sysinfo"]
 
 [[example]]
 name = "read_parquet"
 required-features = ["arrow"]
 path = "./examples/read_parquet.rs"
 
+[[example]]
+name = "write_parquet"
+required-features = ["cli", "sysinfo"]
+path = "./examples/write_parquet.rs"
+
 [[example]]
 name = "async_read_parquet"
 required-features = ["arrow", "async"]
diff --git a/parquet/examples/write_parquet.rs 
b/parquet/examples/write_parquet.rs
new file mode 100644
index 00000000000..d2ef550df84
--- /dev/null
+++ b/parquet/examples/write_parquet.rs
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fs::File;
+use std::path::PathBuf;
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+
+use arrow::array::{StructArray, UInt64Builder};
+use arrow::datatypes::DataType::UInt64;
+use arrow::datatypes::{Field, Schema};
+use clap::{Parser, ValueEnum};
+use parquet::arrow::ArrowWriter as ParquetWriter;
+use parquet::basic::Encoding;
+use parquet::errors::Result;
+use parquet::file::properties::{BloomFilterPosition, WriterProperties};
+use sysinfo::{MemoryRefreshKind, Pid, ProcessRefreshKind, RefreshKind, System};
+
+#[derive(ValueEnum, Clone)]
+enum BloomFilterPositionArg {
+    End,
+    AfterRowGroup,
+}
+
+#[derive(Parser)]
+#[command(version)]
+/// Writes sequences of integers, with a Bloom Filter, while logging timing 
and memory usage.
+struct Args {
+    #[arg(long, default_value_t = 1000)]
+    /// Number of batches to write
+    iterations: u64,
+
+    #[arg(long, default_value_t = 1000000)]
+    /// Number of rows in each batch
+    batch: u64,
+
+    #[arg(long, value_enum, 
default_value_t=BloomFilterPositionArg::AfterRowGroup)]
+    /// Where to write Bloom Filters
+    bloom_filter_position: BloomFilterPositionArg,
+
+    /// Path to the file to write
+    path: PathBuf,
+}
+
+fn now() -> String {
+    chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string()
+}
+
+fn mem(system: &mut System) -> String {
+    let pid = Pid::from(std::process::id() as usize);
+    system.refresh_process_specifics(pid, 
ProcessRefreshKind::new().with_memory());
+    system
+        .process(pid)
+        .map(|proc| format!("{}MB", proc.memory() / 1_000_000))
+        .unwrap_or("N/A".to_string())
+}
+
+fn main() -> Result<()> {
+    let args = Args::parse();
+
+    let bloom_filter_position = match args.bloom_filter_position {
+        BloomFilterPositionArg::End => BloomFilterPosition::End,
+        BloomFilterPositionArg::AfterRowGroup => 
BloomFilterPosition::AfterRowGroup,
+    };
+
+    let properties = WriterProperties::builder()
+        .set_column_bloom_filter_enabled("id".into(), true)
+        .set_column_encoding("id".into(), Encoding::DELTA_BINARY_PACKED)
+        .set_bloom_filter_position(bloom_filter_position)
+        .build();
+    let schema = Arc::new(Schema::new(vec![Field::new("id", UInt64, false)]));
+    // Create parquet file that will be read.
+    let file = File::create(args.path).unwrap();
+    let mut writer = ParquetWriter::try_new(file, schema.clone(), 
Some(properties))?;
+
+    let mut system =
+        
System::new_with_specifics(RefreshKind::new().with_memory(MemoryRefreshKind::everything()));
+    eprintln!(
+        "{} Writing {} batches of {} rows. RSS = {}",
+        now(),
+        args.iterations,
+        args.batch,
+        mem(&mut system)
+    );
+
+    let mut array_builder = UInt64Builder::new();
+    let mut last_log = Instant::now();
+    for i in 0..args.iterations {
+        if Instant::now() - last_log > Duration::new(10, 0) {
+            last_log = Instant::now();
+            eprintln!(
+                "{} Iteration {}/{}. RSS = {}",
+                now(),
+                i + 1,
+                args.iterations,
+                mem(&mut system)
+            );
+        }
+        for j in 0..args.batch {
+            array_builder.append_value(i + j);
+        }
+        writer.write(
+            &StructArray::new(
+                schema.fields().clone(),
+                vec![Arc::new(array_builder.finish())],
+                None,
+            )
+            .into(),
+        )?;
+    }
+    writer.flush()?;
+    writer.close()?;
+
+    eprintln!("{} Done. RSS = {}", now(), mem(&mut system));
+
+    Ok(())
+}
diff --git a/parquet/src/arrow/arrow_writer/mod.rs 
b/parquet/src/arrow/arrow_writer/mod.rs
index 0beb93f80a5..800751ff964 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -43,7 +43,7 @@ use crate::column::writer::{
 };
 use crate::data_type::{ByteArray, FixedLenByteArray};
 use crate::errors::{ParquetError, Result};
-use crate::file::metadata::{ColumnChunkMetaData, KeyValue, 
RowGroupMetaDataPtr};
+use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaData};
 use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
 use crate::file::reader::{ChunkReader, Length};
 use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
@@ -199,7 +199,7 @@ impl<W: Write + Send> ArrowWriter<W> {
     }
 
     /// Returns metadata for any flushed row groups
-    pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] {
+    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
         self.writer.flushed_row_groups()
     }
 
@@ -1053,7 +1053,9 @@ mod tests {
     use crate::file::metadata::ParquetMetaData;
     use crate::file::page_index::index::Index;
     use crate::file::page_index::index_reader::read_pages_locations;
-    use crate::file::properties::{EnabledStatistics, ReaderProperties, 
WriterVersion};
+    use crate::file::properties::{
+        BloomFilterPosition, EnabledStatistics, ReaderProperties, 
WriterVersion,
+    };
     use crate::file::serialized_reader::ReadOptionsBuilder;
     use crate::file::{
         reader::{FileReader, SerializedFileReader},
@@ -1701,6 +1703,7 @@ mod tests {
         values: ArrayRef,
         schema: SchemaRef,
         bloom_filter: bool,
+        bloom_filter_position: BloomFilterPosition,
     }
 
     impl RoundTripOptions {
@@ -1711,6 +1714,7 @@ mod tests {
                 values,
                 schema: Arc::new(schema),
                 bloom_filter: false,
+                bloom_filter_position: BloomFilterPosition::AfterRowGroup,
             }
         }
     }
@@ -1730,6 +1734,7 @@ mod tests {
             values,
             schema,
             bloom_filter,
+            bloom_filter_position,
         } = options;
 
         let encodings = match values.data_type() {
@@ -1770,6 +1775,7 @@ mod tests {
                             
.set_dictionary_page_size_limit(dictionary_size.max(1))
                             .set_encoding(*encoding)
                             .set_bloom_filter_enabled(bloom_filter)
+                            .set_bloom_filter_position(bloom_filter_position)
                             .build();
 
                         files.push(roundtrip_opts(&expected_batch, props))
@@ -2127,6 +2133,22 @@ mod tests {
         values_required::<BinaryViewArray, _>(many_vecs_iter);
     }
 
+    #[test]
+    fn i32_column_bloom_filter_at_end() {
+        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
+        let mut options = RoundTripOptions::new(array, false);
+        options.bloom_filter = true;
+        options.bloom_filter_position = BloomFilterPosition::End;
+
+        let files = one_column_roundtrip_with_options(options);
+        check_bloom_filter(
+            files,
+            "col".to_string(),
+            (0..SMALL_SIZE as i32).collect(),
+            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
+        );
+    }
+
     #[test]
     fn i32_column_bloom_filter() {
         let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
diff --git a/parquet/src/arrow/async_writer/mod.rs 
b/parquet/src/arrow/async_writer/mod.rs
index 0bedf1fcb73..28efbdc7c66 100644
--- a/parquet/src/arrow/async_writer/mod.rs
+++ b/parquet/src/arrow/async_writer/mod.rs
@@ -54,7 +54,7 @@ use crate::{
     arrow::arrow_writer::ArrowWriterOptions,
     arrow::ArrowWriter,
     errors::{ParquetError, Result},
-    file::{metadata::RowGroupMetaDataPtr, properties::WriterProperties},
+    file::{metadata::RowGroupMetaData, properties::WriterProperties},
     format::{FileMetaData, KeyValue},
 };
 use arrow_array::RecordBatch;
@@ -172,7 +172,7 @@ impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
     }
 
     /// Returns metadata for any flushed row groups
-    pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] {
+    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
         self.sync_writer.flushed_row_groups()
     }
 
diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs
index fb8f798fd3a..255fe1b7b25 100644
--- a/parquet/src/file/metadata.rs
+++ b/parquet/src/file/metadata.rs
@@ -333,6 +333,11 @@ impl RowGroupMetaData {
         &self.columns
     }
 
+    /// Returns mutable slice of column chunk metadata.
+    pub fn columns_mut(&mut self) -> &mut [ColumnChunkMetaData] {
+        &mut self.columns
+    }
+
     /// Number of rows in this row group.
     pub fn num_rows(&self) -> i64 {
         self.num_rows
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index 87d84cef80a..7fc73bd56fe 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -43,6 +43,8 @@ pub const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = 
EnabledStatistics::Pag
 pub const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096;
 /// Default value for [`WriterProperties::max_row_group_size`]
 pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
+/// Default value for [`WriterProperties::bloom_filter_position`]
+pub const DEFAULT_BLOOM_FILTER_POSITION: BloomFilterPosition = 
BloomFilterPosition::AfterRowGroup;
 /// Default value for [`WriterProperties::created_by`]
 pub const DEFAULT_CREATED_BY: &str = concat!("parquet-rs version ", 
env!("CARGO_PKG_VERSION"));
 /// Default value for [`WriterProperties::column_index_truncate_length`]
@@ -86,6 +88,24 @@ impl FromStr for WriterVersion {
     }
 }
 
+/// Where in the file [`ArrowWriter`](crate::arrow::arrow_writer::ArrowWriter) 
should
+/// write Bloom filters
+///
+/// Basic constant, which is not part of the Thrift definition.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum BloomFilterPosition {
+    /// Write Bloom Filters of each row group right after the row group
+    ///
+    /// This saves memory by writing it as soon as it is computed, at the cost
+    /// of data locality for readers
+    AfterRowGroup,
+    /// Write Bloom Filters at the end of the file
+    ///
+    /// This allows better data locality for readers, at the cost of memory 
usage
+    /// for writers.
+    End,
+}
+
 /// Reference counted writer properties.
 pub type WriterPropertiesPtr = Arc<WriterProperties>;
 
@@ -130,6 +150,7 @@ pub struct WriterProperties {
     data_page_row_count_limit: usize,
     write_batch_size: usize,
     max_row_group_size: usize,
+    bloom_filter_position: BloomFilterPosition,
     writer_version: WriterVersion,
     created_by: String,
     pub(crate) key_value_metadata: Option<Vec<KeyValue>>,
@@ -217,6 +238,11 @@ impl WriterProperties {
         self.max_row_group_size
     }
 
+    /// Returns maximum number of rows in a row group.
+    pub fn bloom_filter_position(&self) -> BloomFilterPosition {
+        self.bloom_filter_position
+    }
+
     /// Returns configured writer version.
     pub fn writer_version(&self) -> WriterVersion {
         self.writer_version
@@ -338,6 +364,7 @@ pub struct WriterPropertiesBuilder {
     data_page_row_count_limit: usize,
     write_batch_size: usize,
     max_row_group_size: usize,
+    bloom_filter_position: BloomFilterPosition,
     writer_version: WriterVersion,
     created_by: String,
     key_value_metadata: Option<Vec<KeyValue>>,
@@ -357,6 +384,7 @@ impl WriterPropertiesBuilder {
             data_page_row_count_limit: usize::MAX,
             write_batch_size: DEFAULT_WRITE_BATCH_SIZE,
             max_row_group_size: DEFAULT_MAX_ROW_GROUP_SIZE,
+            bloom_filter_position: DEFAULT_BLOOM_FILTER_POSITION,
             writer_version: DEFAULT_WRITER_VERSION,
             created_by: DEFAULT_CREATED_BY.to_string(),
             key_value_metadata: None,
@@ -376,6 +404,7 @@ impl WriterPropertiesBuilder {
             data_page_row_count_limit: self.data_page_row_count_limit,
             write_batch_size: self.write_batch_size,
             max_row_group_size: self.max_row_group_size,
+            bloom_filter_position: self.bloom_filter_position,
             writer_version: self.writer_version,
             created_by: self.created_by,
             key_value_metadata: self.key_value_metadata,
@@ -487,6 +516,12 @@ impl WriterPropertiesBuilder {
         self
     }
 
+    /// Sets where in the final file Bloom Filters are written (default 
`AfterRowGroup`)
+    pub fn set_bloom_filter_position(mut self, value: BloomFilterPosition) -> 
Self {
+        self.bloom_filter_position = value;
+        self
+    }
+
     /// Sets "created by" property (defaults to `parquet-rs version 
<VERSION>`).
     pub fn set_created_by(mut self, value: String) -> Self {
         self.created_by = value;
@@ -1052,6 +1087,7 @@ mod tests {
         );
         assert_eq!(props.write_batch_size(), DEFAULT_WRITE_BATCH_SIZE);
         assert_eq!(props.max_row_group_size(), DEFAULT_MAX_ROW_GROUP_SIZE);
+        assert_eq!(props.bloom_filter_position(), 
DEFAULT_BLOOM_FILTER_POSITION);
         assert_eq!(props.writer_version(), DEFAULT_WRITER_VERSION);
         assert_eq!(props.created_by(), DEFAULT_CREATED_BY);
         assert_eq!(props.key_value_metadata(), None);
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 7806384cdb5..eb633f31c47 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -34,8 +34,9 @@ use crate::column::{
 };
 use crate::data_type::DataType;
 use crate::errors::{ParquetError, Result};
+use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr};
 use crate::file::reader::ChunkReader;
-use crate::file::{metadata::*, properties::WriterPropertiesPtr, PARQUET_MAGIC};
+use crate::file::{metadata::*, PARQUET_MAGIC};
 use crate::schema::types::{self, ColumnDescPtr, SchemaDescPtr, 
SchemaDescriptor, TypePtr};
 
 /// A wrapper around a [`Write`] that keeps track of the number
@@ -115,9 +116,10 @@ pub type OnCloseColumnChunk<'a> = Box<dyn 
FnOnce(ColumnCloseResult) -> Result<()
 /// - the row group metadata
 /// - the column index for each column chunk
 /// - the offset index for each column chunk
-pub type OnCloseRowGroup<'a> = Box<
+pub type OnCloseRowGroup<'a, W> = Box<
     dyn FnOnce(
-            RowGroupMetaDataPtr,
+            &'a mut TrackedWrite<W>,
+            RowGroupMetaData,
             Vec<Option<Sbbf>>,
             Vec<Option<ColumnIndex>>,
             Vec<Option<OffsetIndex>>,
@@ -143,7 +145,7 @@ pub struct SerializedFileWriter<W: Write> {
     schema: TypePtr,
     descr: SchemaDescPtr,
     props: WriterPropertiesPtr,
-    row_groups: Vec<RowGroupMetaDataPtr>,
+    row_groups: Vec<RowGroupMetaData>,
     bloom_filters: Vec<Vec<Option<Sbbf>>>,
     column_indexes: Vec<Vec<Option<ColumnIndex>>>,
     offset_indexes: Vec<Vec<Option<OffsetIndex>>>,
@@ -197,18 +199,29 @@ impl<W: Write + Send> SerializedFileWriter<W> {
 
         self.row_group_index += 1;
 
+        let bloom_filter_position = self.properties().bloom_filter_position();
         let row_groups = &mut self.row_groups;
         let row_bloom_filters = &mut self.bloom_filters;
         let row_column_indexes = &mut self.column_indexes;
         let row_offset_indexes = &mut self.offset_indexes;
-        let on_close =
-            |metadata, row_group_bloom_filter, row_group_column_index, 
row_group_offset_index| {
-                row_groups.push(metadata);
-                row_bloom_filters.push(row_group_bloom_filter);
-                row_column_indexes.push(row_group_column_index);
-                row_offset_indexes.push(row_group_offset_index);
-                Ok(())
+        let on_close = move |buf,
+                             mut metadata,
+                             row_group_bloom_filter,
+                             row_group_column_index,
+                             row_group_offset_index| {
+            row_bloom_filters.push(row_group_bloom_filter);
+            row_column_indexes.push(row_group_column_index);
+            row_offset_indexes.push(row_group_offset_index);
+            // write bloom filters out immediately after the row group if 
requested
+            match bloom_filter_position {
+                BloomFilterPosition::AfterRowGroup => {
+                    write_bloom_filters(buf, row_bloom_filters, &mut metadata)?
+                }
+                BloomFilterPosition::End => (),
             };
+            row_groups.push(metadata);
+            Ok(())
+        };
 
         let row_group_writer = SerializedRowGroupWriter::new(
             self.descr.clone(),
@@ -221,7 +234,7 @@ impl<W: Write + Send> SerializedFileWriter<W> {
     }
 
     /// Returns metadata for any flushed row groups
-    pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] {
+    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
         &self.row_groups
     }
 
@@ -273,34 +286,6 @@ impl<W: Write + Send> SerializedFileWriter<W> {
         Ok(())
     }
 
-    /// Serialize all the bloom filter to the file
-    fn write_bloom_filters(&mut self, row_groups: &mut [RowGroup]) -> 
Result<()> {
-        // iter row group
-        // iter each column
-        // write bloom filter to the file
-        for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() {
-            for (column_idx, column_chunk) in 
row_group.columns.iter_mut().enumerate() {
-                match &self.bloom_filters[row_group_idx][column_idx] {
-                    Some(bloom_filter) => {
-                        let start_offset = self.buf.bytes_written();
-                        bloom_filter.write(&mut self.buf)?;
-                        let end_offset = self.buf.bytes_written();
-                        // set offset and index for bloom filter
-                        let column_chunk_meta = column_chunk
-                            .meta_data
-                            .as_mut()
-                            .expect("can't have bloom filter without column 
metadata");
-                        column_chunk_meta.bloom_filter_offset = 
Some(start_offset as i64);
-                        column_chunk_meta.bloom_filter_length =
-                            Some((end_offset - start_offset) as i32);
-                    }
-                    None => {}
-                }
-            }
-        }
-        Ok(())
-    }
-
     /// Serialize all the column index to the file
     fn write_column_indexes(&mut self, row_groups: &mut [RowGroup]) -> 
Result<()> {
         // iter row group
@@ -331,6 +316,11 @@ impl<W: Write + Send> SerializedFileWriter<W> {
         self.finished = true;
         let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum();
 
+        // write out any remaining bloom filters after all row groups
+        for row_group in &mut self.row_groups {
+            write_bloom_filters(&mut self.buf, &mut self.bloom_filters, 
row_group)?;
+        }
+
         let mut row_groups = self
             .row_groups
             .as_slice()
@@ -338,7 +328,6 @@ impl<W: Write + Send> SerializedFileWriter<W> {
             .map(|v| v.to_thrift())
             .collect::<Vec<_>>();
 
-        self.write_bloom_filters(&mut row_groups)?;
         // Write column indexes and offset indexes
         self.write_column_indexes(&mut row_groups)?;
         self.write_offset_indexes(&mut row_groups)?;
@@ -443,6 +432,40 @@ impl<W: Write + Send> SerializedFileWriter<W> {
     }
 }
 
+/// Serialize all the bloom filters of the given row group to the given buffer,
+/// and returns the updated row group metadata.
+fn write_bloom_filters<W: Write + Send>(
+    buf: &mut TrackedWrite<W>,
+    bloom_filters: &mut [Vec<Option<Sbbf>>],
+    row_group: &mut RowGroupMetaData,
+) -> Result<()> {
+    // iter row group
+    // iter each column
+    // write bloom filter to the file
+
+    let row_group_idx: u16 = row_group
+        .ordinal()
+        .expect("Missing row group ordinal")
+        .try_into()
+        .expect("Negative row group ordinal");
+    let row_group_idx = row_group_idx as usize;
+    for (column_idx, column_chunk) in 
row_group.columns_mut().iter_mut().enumerate() {
+        if let Some(bloom_filter) = 
bloom_filters[row_group_idx][column_idx].take() {
+            let start_offset = buf.bytes_written();
+            bloom_filter.write(&mut *buf)?;
+            let end_offset = buf.bytes_written();
+            // set offset and index for bloom filter
+            *column_chunk = column_chunk
+                .clone()
+                .into_builder()
+                .set_bloom_filter_offset(Some(start_offset as i64))
+                .set_bloom_filter_length(Some((end_offset - start_offset) as 
i32))
+                .build()?;
+        }
+    }
+    Ok(())
+}
+
 /// Parquet row group writer API.
 /// Provides methods to access column writers in an iterator-like fashion, 
order is
 /// guaranteed to match the order of schema leaves (column descriptors).
@@ -468,7 +491,7 @@ pub struct SerializedRowGroupWriter<'a, W: Write> {
     offset_indexes: Vec<Option<OffsetIndex>>,
     row_group_index: i16,
     file_offset: i64,
-    on_close: Option<OnCloseRowGroup<'a>>,
+    on_close: Option<OnCloseRowGroup<'a, W>>,
 }
 
 impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
@@ -485,7 +508,7 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
         properties: WriterPropertiesPtr,
         buf: &'a mut TrackedWrite<W>,
         row_group_index: i16,
-        on_close: Option<OnCloseRowGroup<'a>>,
+        on_close: Option<OnCloseRowGroup<'a, W>>,
     ) -> Self {
         let num_columns = schema_descr.num_columns();
         let file_offset = buf.bytes_written() as i64;
@@ -669,12 +692,12 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> 
{
                 .set_file_offset(self.file_offset)
                 .build()?;
 
-            let metadata = Arc::new(row_group_metadata);
-            self.row_group_metadata = Some(metadata.clone());
+            self.row_group_metadata = 
Some(Arc::new(row_group_metadata.clone()));
 
             if let Some(on_close) = self.on_close.take() {
                 on_close(
-                    metadata,
+                    self.buf,
+                    row_group_metadata,
                     self.bloom_filters,
                     self.column_indexes,
                     self.offset_indexes,
@@ -1446,7 +1469,7 @@ mod tests {
             assert_eq!(flushed.len(), idx + 1);
             assert_eq!(Some(idx as i16), last_group.ordinal());
             assert_eq!(Some(row_group_file_offset as i64), 
last_group.file_offset());
-            assert_eq!(flushed[idx].as_ref(), last_group.as_ref());
+            assert_eq!(&flushed[idx], last_group.as_ref());
         }
         let file_metadata = file_writer.close().unwrap();
 

Reply via email to