This is an automated email from the ASF dual-hosted git repository. jiayuliu pushed a commit to branch add-bloom-filter-3 in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
commit 9b8a0f51517b3235ccd57461f439a400dbbee4c1 Author: Jiayu Liu <ji...@hey.com> AuthorDate: Tue Nov 15 21:47:56 2022 +0800 write out to bloom filter --- parquet/src/bloom_filter/mod.rs | 1 + parquet/src/column/writer/mod.rs | 15 ++++++++++++++ parquet/src/file/writer.rs | 45 ++++++++++++++++++++++++++++++++++++++-- 3 files changed, 59 insertions(+), 2 deletions(-) diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 4944a93f8..d0bee8a5f 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -80,6 +80,7 @@ fn block_check(block: &Block, hash: u32) -> bool { } /// A split block Bloom filter +#[derive(Debug, Clone)] pub struct Sbbf(Vec<Block>); const SBBF_HEADER_SIZE_ESTIMATE: usize = 20; diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 3cdf04f54..f8e79d792 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -16,6 +16,9 @@ // under the License. //! Contains column writer API. + +#[cfg(feature = "bloom")] +use crate::bloom_filter::Sbbf; use crate::format::{ColumnIndex, OffsetIndex}; use std::collections::{BTreeSet, VecDeque}; @@ -154,6 +157,9 @@ pub struct ColumnCloseResult { pub rows_written: u64, /// Metadata for this column chunk pub metadata: ColumnChunkMetaData, + /// Optional bloom filter for this column + #[cfg(feature = "bloom")] + pub bloom_filter: Option<Sbbf>, /// Optional column index, for filtering pub column_index: Option<ColumnIndex>, /// Optional offset index, identifying page locations @@ -209,6 +215,10 @@ pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> { rep_levels_sink: Vec<i16>, data_pages: VecDeque<CompressedPage>, + // bloom filter + #[cfg(feature = "bloom")] + bloom_filter: Option<Sbbf>, + // column index and offset index column_index_builder: ColumnIndexBuilder, offset_index_builder: OffsetIndexBuilder, @@ -260,6 +270,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { num_column_nulls: 0, column_distinct_count: None, }, + // TODO! + #[cfg(feature = "bloom")] + bloom_filter: None, column_index_builder: ColumnIndexBuilder::new(), offset_index_builder: OffsetIndexBuilder::new(), encodings, @@ -458,6 +471,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { Ok(ColumnCloseResult { bytes_written: self.column_metrics.total_bytes_written, rows_written: self.column_metrics.total_rows_written, + #[cfg(feature = "bloom")] + bloom_filter: self.bloom_filter, metadata, column_index, offset_index, diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 2efaf7caf..90c9b6bfc 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -18,10 +18,11 @@ //! Contains file writer API, and provides methods to write row groups and columns by //! using row group writers and column writers respectively. -use std::{io::Write, sync::Arc}; - +#[cfg(feature = "bloom")] +use crate::bloom_filter::Sbbf; use crate::format as parquet; use crate::format::{ColumnIndex, OffsetIndex, RowGroup}; +use std::{io::Write, sync::Arc}; use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol, TSerializable}; use crate::basic::PageType; @@ -116,6 +117,8 @@ pub struct SerializedFileWriter<W: Write> { descr: SchemaDescPtr, props: WriterPropertiesPtr, row_groups: Vec<RowGroupMetaDataPtr>, + #[cfg(feature = "bloom")] + bloom_filters: Vec<Vec<Option<Sbbf>>>, column_indexes: Vec<Vec<Option<ColumnIndex>>>, offset_indexes: Vec<Vec<Option<OffsetIndex>>>, row_group_index: usize, @@ -132,6 +135,8 @@ impl<W: Write> SerializedFileWriter<W> { descr: Arc::new(SchemaDescriptor::new(schema)), props: properties, row_groups: vec![], + #[cfg(feature = "bloom")] + bloom_filters: vec![], column_indexes: Vec::new(), offset_indexes: Vec::new(), row_group_index: 0, @@ -212,6 +217,32 @@ impl<W: Write> SerializedFileWriter<W> { Ok(()) } + #[cfg(feature = "bloom")] + /// Serialize all the bloom filter to the file + fn write_bloom_filters(&mut self, row_groups: &mut [RowGroup]) -> Result<()> { + // iter row group + // iter each column + // write bloom filter to the file + for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() { + for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() + { + match &self.bloom_filters[row_group_idx][column_idx] { + Some(bloom_filter) => { + let start_offset = self.buf.bytes_written(); + let mut protocol = TCompactOutputProtocol::new(&mut self.buf); + bloom_filter.write_to_out_protocol(&mut protocol)?; + protocol.flush()?; + let end_offset = self.buf.bytes_written(); + // set offset and index for bloom filter + column_metadata.bloom_filter_offset = Some(start_offset as i64); + } + None => {} + } + } + } + Ok(()) + } + /// Serialize all the column index to the file fn write_column_indexes(&mut self, row_groups: &mut [RowGroup]) -> Result<()> { // iter row group @@ -250,6 +281,8 @@ impl<W: Write> SerializedFileWriter<W> { .map(|v| v.to_thrift()) .collect::<Vec<_>>(); + #[cfg(feature = "bloom")] + self.write_bloom_filters(&mut row_groups)?; // Write column indexes and offset indexes self.write_column_indexes(&mut row_groups)?; self.write_offset_indexes(&mut row_groups)?; @@ -320,6 +353,8 @@ pub struct SerializedRowGroupWriter<'a, W: Write> { column_index: usize, row_group_metadata: Option<RowGroupMetaDataPtr>, column_chunks: Vec<ColumnChunkMetaData>, + #[cfg(feature = "bloom")] + bloom_filters: Vec<Option<Sbbf>>, column_indexes: Vec<Option<ColumnIndex>>, offset_indexes: Vec<Option<OffsetIndex>>, on_close: Option<OnCloseRowGroup<'a>>, @@ -348,6 +383,8 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { column_index: 0, row_group_metadata: None, column_chunks: Vec::with_capacity(num_columns), + #[cfg(feature = "bloom")] + bloom_filters: Vec::with_capacity(num_columns), column_indexes: Vec::with_capacity(num_columns), offset_indexes: Vec::with_capacity(num_columns), total_bytes_written: 0, @@ -380,6 +417,8 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { let column_chunks = &mut self.column_chunks; let column_indexes = &mut self.column_indexes; let offset_indexes = &mut self.offset_indexes; + #[cfg(feature = "bloom")] + let bloom_filters = &mut self.bloom_filters; let on_close = |r: ColumnCloseResult| { // Update row group writer metrics @@ -387,6 +426,8 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { column_chunks.push(r.metadata); column_indexes.push(r.column_index); offset_indexes.push(r.offset_index); + #[cfg(feature = "bloom")] + bloom_filters.push(r.bloom_filter); if let Some(rows) = *total_rows_written { if rows != r.rows_written {