This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch add-bloom-filter
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git

commit 2fdc350b5fd4f81b378c5c04ebd46e60d95745c9
Author: Jiayu Liu <[email protected]>
AuthorDate: Wed Nov 9 10:16:56 2022 +0800

    add bloom filter implementation based on split block spec
---
 parquet/Cargo.toml              |   5 +-
 parquet/src/bloom_filter/mod.rs | 122 ++++++++++++++++++++++++++++++++++++++++
 parquet/src/lib.rs              |   1 +
 3 files changed, 127 insertions(+), 1 deletion(-)

diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index 70320ba65..1ae25aeaf 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -49,6 +49,7 @@ seq-macro = { version = "0.3", default-features = false }
 futures = { version = "0.3", default-features = false, features = ["std"], 
optional = true }
 tokio = { version = "1.0", optional = true, default-features = false, features 
= ["macros", "rt", "io-util"] }
 hashbrown = { version = "0.12", default-features = false }
+twox-hash = { version = "1.6", optional = true }
 
 [dev-dependencies]
 base64 = { version = "0.13", default-features = false, features = ["std"] }
@@ -68,7 +69,7 @@ rand = { version = "0.8", default-features = false, features 
= ["std", "std_rng"
 all-features = true
 
 [features]
-default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
+default = ["arrow", "bloom", "snap", "brotli", "flate2", "lz4", "zstd", 
"base64"]
 # Enable arrow reader/writer APIs
 arrow = ["dep:arrow", "base64"]
 # Enable CLI tools
@@ -81,6 +82,8 @@ test_common = ["arrow/test_utils"]
 experimental = []
 # Enable async APIs
 async = ["futures", "tokio"]
+# Bloomfilter
+bloom = ["twox-hash"]
 
 [[test]]
 name = "arrow_writer_layout"
diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs
new file mode 100644
index 000000000..02733d7f3
--- /dev/null
+++ b/parquet/src/bloom_filter/mod.rs
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Bloom filter implementation specific to Parquet, as described
+//! in the 
[spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md)
+
+const SALT: [u32; 8] = [
+  0x47b6137b_u32,
+  0x44974d91_u32,
+  0x8824ad5b_u32,
+  0xa2b7289d_u32,
+  0x705495c7_u32,
+  0x2df1424b_u32,
+  0x9efc4947_u32,
+  0x5c6bfb31_u32,
+];
+
+/// Each block is 256 bits, broken up into eight contiguous "words", each 
consisting of 32 bits.
+/// Each word is thought of as an array of bits; each bit is either "set" or 
"not set".
+type Block = [u32; 8];
+
+/// takes as its argument a single unsigned 32-bit integer and returns a block 
in which each
+/// word has exactly one bit set.
+fn mask(x: u32) -> Block {
+  let mut result = [0_u32; 8];
+  for i in 0..8 {
+    // wrapping instead of checking for overflow
+    let y = x.wrapping_mul(SALT[i]);
+    let y = y >> 27;
+    result[i] = 1 << y;
+  }
+  result
+}
+
+/// setting every bit in the block that was also set in the result from mask
+fn block_insert(block: &mut Block, hash: u32) {
+  let mask = mask(hash);
+  for i in 0..8 {
+    block[i] |= mask[i];
+  }
+}
+
+/// returns true when every bit that is set in the result of mask is also set 
in the block.
+fn block_check(block: &Block, hash: u32) -> bool {
+  let mask = mask(hash);
+  for i in 0..8 {
+    if block[i] & mask[i] == 0 {
+      return false;
+    }
+  }
+  true
+}
+
+/// A split block Bloom filter
+type SBBF = Vec<Block>;
+
+#[inline]
+fn hash_to_block_index(sbbf: &SBBF, hash: u32) -> usize {
+  // unchecked_mul is unstable, but in reality this is safe, we'd just use 
saturating mul
+  // but it will not saturate
+  ((hash as usize) >> 32).saturating_mul(sbbf.len()) >> 32
+}
+
+fn sbbf_insert(sbbf: &mut SBBF, hash: u32) {
+  let block_index = hash_to_block_index(sbbf, hash);
+  let block = &mut sbbf[block_index];
+  block_insert(block, hash);
+}
+
+/// The filter_check operation uses the same method as filter_insert to select 
a block to operate
+/// on, then uses the least significant 32 bits of its argument as an argument 
to block_check called
+/// on that block, returning the result.
+fn sbbf_check(sbbf: &SBBF, hash: u32) -> bool {
+  let block_index = hash_to_block_index(sbbf, hash);
+  let block = &sbbf[block_index];
+  block_check(block, hash)
+}
+
+#[cfg(test)]
+mod tests {
+  use super::*;
+
+  #[test]
+  fn test_mask_set_quick_check() {
+    for i in 0..1_000_000 {
+      let result = mask(i);
+      assert!(result.iter().all(|&x| x.count_ones() == 1));
+    }
+  }
+
+  #[test]
+  fn test_block_insert_and_check() {
+    for i in 0..1_000_000 {
+      let mut block = [0_u32; 8];
+      block_insert(&mut block, i);
+      assert!(block_check(&block, i));
+    }
+  }
+
+  #[test]
+  fn test_sbbf_insert_and_check() {
+    let mut sbbf = vec![[0_u32; 8]; 1_000];
+    for i in 0..1_000_000 {
+      sbbf_insert(&mut sbbf, i);
+      assert!(sbbf_check(&sbbf, i));
+    }
+  }
+}
diff --git a/parquet/src/lib.rs b/parquet/src/lib.rs
index b34d9aa8a..af6b0863d 100644
--- a/parquet/src/lib.rs
+++ b/parquet/src/lib.rs
@@ -84,6 +84,7 @@ pub mod arrow;
 pub mod column;
 experimental!(mod compression);
 experimental!(mod encodings);
+experimental!(mod bloom_filter);
 pub mod file;
 pub mod record;
 pub mod schema;

Reply via email to