This is an automated email from the ASF dual-hosted git repository. jiayuliu pushed a commit to branch add-bloom-filter in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
commit 2fdc350b5fd4f81b378c5c04ebd46e60d95745c9 Author: Jiayu Liu <[email protected]> AuthorDate: Wed Nov 9 10:16:56 2022 +0800 add bloom filter implementation based on split block spec --- parquet/Cargo.toml | 5 +- parquet/src/bloom_filter/mod.rs | 122 ++++++++++++++++++++++++++++++++++++++++ parquet/src/lib.rs | 1 + 3 files changed, 127 insertions(+), 1 deletion(-) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 70320ba65..1ae25aeaf 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -49,6 +49,7 @@ seq-macro = { version = "0.3", default-features = false } futures = { version = "0.3", default-features = false, features = ["std"], optional = true } tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "rt", "io-util"] } hashbrown = { version = "0.12", default-features = false } +twox-hash = { version = "1.6", optional = true } [dev-dependencies] base64 = { version = "0.13", default-features = false, features = ["std"] } @@ -68,7 +69,7 @@ rand = { version = "0.8", default-features = false, features = ["std", "std_rng" all-features = true [features] -default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"] +default = ["arrow", "bloom", "snap", "brotli", "flate2", "lz4", "zstd", "base64"] # Enable arrow reader/writer APIs arrow = ["dep:arrow", "base64"] # Enable CLI tools @@ -81,6 +82,8 @@ test_common = ["arrow/test_utils"] experimental = [] # Enable async APIs async = ["futures", "tokio"] +# Bloomfilter +bloom = ["twox-hash"] [[test]] name = "arrow_writer_layout" diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs new file mode 100644 index 000000000..02733d7f3 --- /dev/null +++ b/parquet/src/bloom_filter/mod.rs @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Bloom filter implementation specific to Parquet, as described +//! in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md) + +const SALT: [u32; 8] = [ + 0x47b6137b_u32, + 0x44974d91_u32, + 0x8824ad5b_u32, + 0xa2b7289d_u32, + 0x705495c7_u32, + 0x2df1424b_u32, + 0x9efc4947_u32, + 0x5c6bfb31_u32, +]; + +/// Each block is 256 bits, broken up into eight contiguous "words", each consisting of 32 bits. +/// Each word is thought of as an array of bits; each bit is either "set" or "not set". +type Block = [u32; 8]; + +/// takes as its argument a single unsigned 32-bit integer and returns a block in which each +/// word has exactly one bit set. +fn mask(x: u32) -> Block { + let mut result = [0_u32; 8]; + for i in 0..8 { + // wrapping instead of checking for overflow + let y = x.wrapping_mul(SALT[i]); + let y = y >> 27; + result[i] = 1 << y; + } + result +} + +/// setting every bit in the block that was also set in the result from mask +fn block_insert(block: &mut Block, hash: u32) { + let mask = mask(hash); + for i in 0..8 { + block[i] |= mask[i]; + } +} + +/// returns true when every bit that is set in the result of mask is also set in the block. +fn block_check(block: &Block, hash: u32) -> bool { + let mask = mask(hash); + for i in 0..8 { + if block[i] & mask[i] == 0 { + return false; + } + } + true +} + +/// A split block Bloom filter +type SBBF = Vec<Block>; + +#[inline] +fn hash_to_block_index(sbbf: &SBBF, hash: u32) -> usize { + // unchecked_mul is unstable, but in reality this is safe, we'd just use saturating mul + // but it will not saturate + ((hash as usize) >> 32).saturating_mul(sbbf.len()) >> 32 +} + +fn sbbf_insert(sbbf: &mut SBBF, hash: u32) { + let block_index = hash_to_block_index(sbbf, hash); + let block = &mut sbbf[block_index]; + block_insert(block, hash); +} + +/// The filter_check operation uses the same method as filter_insert to select a block to operate +/// on, then uses the least significant 32 bits of its argument as an argument to block_check called +/// on that block, returning the result. +fn sbbf_check(sbbf: &SBBF, hash: u32) -> bool { + let block_index = hash_to_block_index(sbbf, hash); + let block = &sbbf[block_index]; + block_check(block, hash) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_mask_set_quick_check() { + for i in 0..1_000_000 { + let result = mask(i); + assert!(result.iter().all(|&x| x.count_ones() == 1)); + } + } + + #[test] + fn test_block_insert_and_check() { + for i in 0..1_000_000 { + let mut block = [0_u32; 8]; + block_insert(&mut block, i); + assert!(block_check(&block, i)); + } + } + + #[test] + fn test_sbbf_insert_and_check() { + let mut sbbf = vec![[0_u32; 8]; 1_000]; + for i in 0..1_000_000 { + sbbf_insert(&mut sbbf, i); + assert!(sbbf_check(&sbbf, i)); + } + } +} diff --git a/parquet/src/lib.rs b/parquet/src/lib.rs index b34d9aa8a..af6b0863d 100644 --- a/parquet/src/lib.rs +++ b/parquet/src/lib.rs @@ -84,6 +84,7 @@ pub mod arrow; pub mod column; experimental!(mod compression); experimental!(mod encodings); +experimental!(mod bloom_filter); pub mod file; pub mod record; pub mod schema;
