This is an automated email from the ASF dual-hosted git repository.

pitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 924ee2f1a6 GH-50026: [C++][Parquet] SIMD-accelerate SBBF probe via 
branchless autovec (#50030)
924ee2f1a6 is described below

commit 924ee2f1a634d92038c20703dd9df8c13f661dc1
Author: Dan Mattheiss <[email protected]>
AuthorDate: Thu Jun 18 10:45:58 2026 -0400

    GH-50026: [C++][Parquet] SIMD-accelerate SBBF probe via branchless autovec 
(#50030)
    
    ### Rationale for this change
    
      `BlockSplitBloomFilter::FindHash` ships the scalar reference probe — an 
8-iteration short-circuit loop. The short-circuit blocks autovectorization, and 
on miss-heavy workloads (Parquet row-group skipping) the per-lane 
branch-mispredict dominates probe latency.
    
      Closes #50026. Dev list discussion: 
https://lists.apache.org/thread/omof0fq47tndfd80g5hwp2bvjmzvpb40. Sibling 
change in Rust: apache/arrow-rs#10011.
    
      ### What changes are included in this PR?
    
      - Rewrite `FindHash` as a branchless OR-accumulator reduction. The new 
shape autovectorizes to SSE on x86 and NEON on aarch64 at the baseline.
      - Add `bloom_filter_avx2.cc` (xsimd kernel built with `-mavx2`) behind 
`CpuInfo`-based `DynamicDispatch`, mirroring the existing 
`level_comparison_avx2` pattern. xsimd was a requirement from the dev thread; 
the AVX2 target spells the reduction explicitly with `xsimd::batch<uint32_t, 
xsimd::avx2>` (pinned so an AVX-512 baseline build can't widen the batch to 16 
lanes and over-read the 32-byte block) because gcc/MSVC don't lower the autovec 
body to a single `vptest`.
      - No on-disk format change, no public API change, and bit-identical to 
the scalar reference.
      - Insert path uses the same loop shape and will land in a follow-up PR.
    
      ### Performance
    
      End-to-end `FindHash` via `parquet/benches/bloom_filter_benchmark.cc`.
    
      **M1** (Apple clang -O3, NEON via autovec, 10 reps, CV ≤ 0.4%):
    
      | Bench | upstream/main | this PR | Speedup |
      |---|---:|---:|---:|
      | `BM_FindExistingHash` (hit-heavy) | 3.85 ns/probe | 2.41 ns/probe | 
**1.60×** |
      | `BM_FindNonExistingHash` (miss-heavy) | 9.04 ns/probe | 2.41 ns/probe | 
**3.75×** |
    
      **x86-64** (gcc 13.3, -O2 -mavx2 via AVX2 dispatch TU, 5 reps, CV ≤ 0.6%):
    
      | Bench | upstream/main | this PR | Speedup |
      |---|---:|---:|---:|
      | `BM_FindExistingHash` (hit-heavy) | 8.62 ns/probe | 4.32 ns/probe | 
**2.00×** |
      | `BM_FindNonExistingHash` (miss-heavy) | 15.29 ns/probe | 4.33 ns/probe 
| **3.53×** |
    
      The scalar miss path stalls on the data-dependent early-exit (slower than 
its own hit path on both archs); the branchless reduction is constant-time 
across hit/miss. `InsertHash`, `BatchInsertHash`, `ComputeHash`, 
`BatchComputeHash` all unchanged (16 benches within ±0.6%, inside CV).
    
    Cache regime sweep: scalar vs xsimd, post-hash probe latency:
    
      | Regime | scalar | xsimd | Speedup |
      |---|---:|---:|---:|
      | Small in-cache (0.5 MiB) | 12.35 ns | 2.48 ns | 5.0× |
      | Medium out-of-L3 (128 MiB) | 18.40 ns | 7.41 ns | 2.5× |
      | Large deep DRAM (1 GiB) | 31.05 ns | 22.10 ns | 1.4× |
    
    Branchless body alone (no xsimd kernel) on AVX2:
     - on clang `-mavx2` it's within noise of the hand-written xsimd kernel in 
every regime
     - on gcc it matches except ~0.79× of xsimd in the out-of-L3 regime.
     - That gap is why this PR ships a separate xsimd kernel for the AVX2 TU 
rather than relying on autovec alone — on clang-only builds the xsimd kernel is 
essentially a no-op but on gcc/MSVC it pins the `vptest` lowering.
    
      ### Are these changes tested?
    
      Yes. New `BloomFilterProbeKernel` test calls both dispatch targets 
directly across 20K random blocks + 200 production-populated blocks per CI run, 
asserting bit-identical output. `DynamicDispatch` resolves once at static init, 
so without this
      test the un-picked target would never be exercised in CI.
    
      Existing `BasicTest`, `FPPTest`, and `CompatibilityTest` continue to pass 
on both the scalar baseline and the AVX2 dispatch path.
    
      ### Are there any user-facing changes?
    
      No. Read-path implementation change only.
    * GitHub Issue: #50026
    
    Lead-authored-by: Dan Mattheiss <[email protected]>
    Co-authored-by: dmatth1 <[email protected]>
    Co-authored-by: Copilot Autofix powered by AI 
<[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/parquet/CMakeLists.txt                     | 14 +++-
 cpp/src/parquet/bloom_filter.cc                    | 44 ++++++++---
 cpp/src/parquet/bloom_filter.h                     |  6 +-
 cpp/src/parquet/bloom_filter_avx2.cc               | 53 +++++++++++++
 cpp/src/parquet/bloom_filter_avx2_internal.h       | 33 ++++++++
 cpp/src/parquet/bloom_filter_block_impl_internal.h | 39 ++++++++++
 cpp/src/parquet/bloom_filter_test.cc               | 90 ++++++++++++++++++++++
 7 files changed, 263 insertions(+), 16 deletions(-)

diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt
index 6839f2ff4c..f8a42b5b96 100644
--- a/cpp/src/parquet/CMakeLists.txt
+++ b/cpp/src/parquet/CMakeLists.txt
@@ -195,7 +195,11 @@ set(PARQUET_SRCS
 
 if(ARROW_HAVE_RUNTIME_AVX2)
   # AVX2 is used as a proxy for BMI2.
-  list(APPEND PARQUET_SRCS level_comparison_avx2.cc level_conversion_bmi2.cc)
+  list(APPEND
+       PARQUET_SRCS
+       level_comparison_avx2.cc
+       level_conversion_bmi2.cc
+       bloom_filter_avx2.cc)
   # We need CMAKE_CXX_FLAGS_RELEASE here to prevent the one-definition-rule
   # violation with -DCMAKE_BUILD_TYPE=MinSizeRel. CMAKE_CXX_FLAGS_RELEASE
   # will force inlining as much as possible.
@@ -205,8 +209,8 @@ if(ARROW_HAVE_RUNTIME_AVX2)
     separate_arguments(RELEASE_FLAGS NATIVE_COMMAND 
"${CMAKE_CXX_FLAGS_RELEASE}")
     list(APPEND AVX2_FLAGS ${RELEASE_FLAGS})
   endif()
-  set_source_files_properties(level_comparison_avx2.cc PROPERTIES 
COMPILE_OPTIONS
-                                                                  
"${AVX2_FLAGS}")
+  set_source_files_properties(level_comparison_avx2.cc bloom_filter_avx2.cc
+                              PROPERTIES COMPILE_OPTIONS "${AVX2_FLAGS}")
   # WARNING: DO NOT BLINDLY COPY THIS CODE FOR OTHER BMI2 USE CASES.
   # This code is always guarded by runtime dispatch which verifies
   # BMI2 is present.  For a very small number of CPUs AVX2 does not
@@ -223,6 +227,10 @@ if(ARROW_HAVE_RUNTIME_AVX2)
     set_source_files_properties(level_conversion_bmi2.cc PROPERTIES 
COMPILE_OPTIONS
                                                                     
"${BMI2_FLAGS}")
   endif()
+elseif(ARROW_SIMD_LEVEL MATCHES "^(AVX2|AVX512)$")
+  # AVX2 baseline without runtime dispatch: the dispatch table still references
+  # FindHashBlockAvx2, so the kernel must be built (no extra flags needed 
here).
+  list(APPEND PARQUET_SRCS bloom_filter_avx2.cc)
 endif()
 
 set(PARQUET_SHARED_LINK_LIBS)
diff --git a/cpp/src/parquet/bloom_filter.cc b/cpp/src/parquet/bloom_filter.cc
index 5959817ea2..c6f6bdfe55 100644
--- a/cpp/src/parquet/bloom_filter.cc
+++ b/cpp/src/parquet/bloom_filter.cc
@@ -16,15 +16,18 @@
 // under the License.
 
 #include <algorithm>
+#include <array>
 #include <bit>
 #include <cmath>
 #include <cstdint>
 #include <cstring>
 #include <limits>
 #include <memory>
+#include <span>
 
 #include "arrow/io/memory.h"
 #include "arrow/util/bitmap_ops.h"
+#include "arrow/util/dispatch_internal.h"
 #include "arrow/util/logging_internal.h"
 #include "arrow/util/macros.h"
 
@@ -37,7 +40,33 @@
 #include "parquet/thrift_internal.h"
 #include "parquet/xxhasher.h"
 
+#if defined(ARROW_HAVE_AVX2) || defined(ARROW_HAVE_RUNTIME_AVX2)
+#  include "parquet/bloom_filter_avx2_internal.h"
+#endif
+
+#include "parquet/bloom_filter_block_impl_internal.h"
+
 namespace parquet {
+
+namespace internal {
+namespace {
+
+using ::arrow::internal::DynamicDispatch;
+
+struct FindHashBlockDynamicFunction {
+  using FunctionType = decltype(&FindHashBlockImpl);
+
+  static constexpr auto targets() {
+    return std::array{
+        ARROW_DISPATCH_TARGET_NONE(&FindHashBlockImpl)  //
+        ARROW_DISPATCH_TARGET_AVX2(&FindHashBlockAvx2)  //
+    };
+  }
+};
+
+}  // namespace
+}  // namespace internal
+
 namespace {
 
 constexpr int32_t kCiphertextLengthSize = 4;
@@ -434,16 +463,11 @@ bool BlockSplitBloomFilter::FindHash(uint64_t hash) const 
{
   const uint32_t bucket_index = static_cast<uint32_t>(((hash >> 32) * 
NumBlocks()) >> 32);
   const uint32_t key = static_cast<uint32_t>(hash);
   const uint32_t* bitset32 = reinterpret_cast<const uint32_t*>(data_->data());
-
-  for (int i = 0; i < kBitsSetPerBlock; ++i) {
-    // Calculate mask for key in the given bitset.
-    const uint32_t mask = UINT32_C(0x1) << ((key * SALT[i]) >> 27);
-    if (ARROW_PREDICT_FALSE(0 ==
-                            (bitset32[kBitsSetPerBlock * bucket_index + i] & 
mask))) {
-      return false;
-    }
-  }
-  return true;
+  const uint32_t* block = bitset32 + kBitsSetPerBlock * bucket_index;
+  static 
::arrow::internal::DynamicDispatch<internal::FindHashBlockDynamicFunction>
+      dispatch;
+  return dispatch(std::span<const uint32_t, kBitsSetPerBlock>(block, 
kBitsSetPerBlock),
+                  std::span<const uint32_t, kBitsSetPerBlock>(SALT), key);
 }
 
 void BlockSplitBloomFilter::InsertHashImpl(uint64_t hash) {
diff --git a/cpp/src/parquet/bloom_filter.h b/cpp/src/parquet/bloom_filter.h
index 0b4c8df708..d70a4d1822 100644
--- a/cpp/src/parquet/bloom_filter.h
+++ b/cpp/src/parquet/bloom_filter.h
@@ -223,6 +223,9 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public 
BloomFilter {
   /// Minimum Bloom filter size, it sets to 32 bytes to fit a tiny Bloom 
filter.
   static constexpr uint32_t kMinimumBloomFilterBytes = 32;
 
+  /// The number of bits set in each tiny (256-bit) Bloom filter block.
+  static constexpr int kBitsSetPerBlock = 8;
+
   /// Calculate optimal size according to the number of distinct values and 
false
   /// positive probability.
   ///
@@ -363,9 +366,6 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public 
BloomFilter {
   // Bytes in a tiny Bloom filter block.
   static constexpr int kBytesPerFilterBlock = 32;
 
-  // The number of bits to be set in each tiny Bloom filter
-  static constexpr int kBitsSetPerBlock = 8;
-
   // A mask structure used to set bits in each tiny Bloom filter.
   struct BlockMask {
     uint32_t item[kBitsSetPerBlock];
diff --git a/cpp/src/parquet/bloom_filter_avx2.cc 
b/cpp/src/parquet/bloom_filter_avx2.cc
new file mode 100644
index 0000000000..3017b5ab3b
--- /dev/null
+++ b/cpp/src/parquet/bloom_filter_avx2.cc
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <xsimd/xsimd.hpp>
+
+#include "parquet/bloom_filter_avx2_internal.h"
+
+namespace parquet::internal {
+
+// Parquet SBBF probe (256-bit block, 8 SALT-derived bits per probe). Unrelated
+// to cpp/src/arrow/acero/bloom_filter_avx2.cc -- Acero's blocked bloom filter
+// is a different algorithm (64-bit block, ~4 bits per probe, in-memory only)
+// and the two kernels are not interchangeable. See the Parquet spec for the
+// SBBF on-disk layout this kernel must match.
+//
+// Spelled in xsimd rather than reusing the autovectorized body in
+// bloom_filter_block_impl_internal.h: only clang lowers that body to a single 
vptest;
+// gcc and MSVC emit a longer horizontal vpor reduction.
+bool FindHashBlockAvx2(
+    std::span<const uint32_t, BlockSplitBloomFilter::kBitsSetPerBlock> block,
+    std::span<const uint32_t, BlockSplitBloomFilter::kBitsSetPerBlock> salt,
+    uint32_t key) {
+  static_assert(BlockSplitBloomFilter::kBitsSetPerBlock == 8,
+                "AVX2 SBBF probe kernel assumes 8 bits set per 256-bit block");
+  // Pin to avx2: the default-arch batch widens to 16 lanes under an AVX-512
+  // baseline and would over-read the 32-byte block.
+  using batch = xsimd::batch<uint32_t, xsimd::avx2>;
+  const batch mask = batch(uint32_t{1}) << xsimd::bitwise_rshift<27>(
+                         batch(key) * batch::load_unaligned(salt.data()));
+  const batch miss = xsimd::bitwise_andnot(mask, 
batch::load_unaligned(block.data()));
+  // `miss != 0` (one extra vpcmpeqd) is deliberate: reinterpreting `miss`
+  // directly as a batch_bool would skip the compare but feed non-canonical
+  // lane values into batch_bool, which relies on xsimd's AVX2 backend
+  // lowering none() to a whole-register vptest. That lowering is not part
+  // of xsimd's documented contract.
+  return xsimd::none(miss != batch(uint32_t{0}));
+}
+
+}  // namespace parquet::internal
diff --git a/cpp/src/parquet/bloom_filter_avx2_internal.h 
b/cpp/src/parquet/bloom_filter_avx2_internal.h
new file mode 100644
index 0000000000..fdf131c9a7
--- /dev/null
+++ b/cpp/src/parquet/bloom_filter_avx2_internal.h
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <span>
+
+#include "parquet/bloom_filter.h"
+#include "parquet/platform.h"
+
+namespace parquet::internal {
+
+PARQUET_EXPORT bool FindHashBlockAvx2(
+    std::span<const uint32_t, BlockSplitBloomFilter::kBitsSetPerBlock> block,
+    std::span<const uint32_t, BlockSplitBloomFilter::kBitsSetPerBlock> salt,
+    uint32_t key);
+
+}  // namespace parquet::internal
diff --git a/cpp/src/parquet/bloom_filter_block_impl_internal.h 
b/cpp/src/parquet/bloom_filter_block_impl_internal.h
new file mode 100644
index 0000000000..f180e56968
--- /dev/null
+++ b/cpp/src/parquet/bloom_filter_block_impl_internal.h
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <span>
+
+#include "parquet/bloom_filter.h"
+
+namespace parquet::internal {
+
+inline bool FindHashBlockImpl(
+    std::span<const uint32_t, BlockSplitBloomFilter::kBitsSetPerBlock> block,
+    std::span<const uint32_t, BlockSplitBloomFilter::kBitsSetPerBlock> salt,
+    uint32_t key) {
+  uint32_t miss = 0;
+  for (int i = 0; i < BlockSplitBloomFilter::kBitsSetPerBlock; ++i) {
+    const uint32_t mask = static_cast<uint32_t>(1) << ((key * salt[i]) >> 27);
+    miss |= (~block[i] & mask);
+  }
+  return miss == 0;
+}
+
+}  // namespace parquet::internal
diff --git a/cpp/src/parquet/bloom_filter_test.cc 
b/cpp/src/parquet/bloom_filter_test.cc
index ff83b97302..029d955ea0 100644
--- a/cpp/src/parquet/bloom_filter_test.cc
+++ b/cpp/src/parquet/bloom_filter_test.cc
@@ -21,6 +21,7 @@
 #include <limits>
 #include <memory>
 #include <random>
+#include <span>
 #include <string>
 #include <utility>
 #include <vector>
@@ -30,6 +31,7 @@
 #include "arrow/status.h"
 #include "arrow/testing/gtest_util.h"
 #include "arrow/testing/random.h"
+#include "arrow/util/cpu_info.h"
 
 #include "parquet/bloom_filter.h"
 #include "parquet/exception.h"
@@ -38,6 +40,14 @@
 #include "parquet/types.h"
 #include "parquet/xxhasher.h"
 
+// Both dispatch targets included directly so the test can exercise the
+// un-picked one too -- DynamicDispatch resolves once at static init.
+#include "parquet/bloom_filter_block_impl_internal.h"
+
+#if defined(ARROW_HAVE_RUNTIME_AVX2)
+#  include "parquet/bloom_filter_avx2_internal.h"
+#endif
+
 namespace parquet {
 namespace test {
 
@@ -434,5 +444,85 @@ TYPED_TEST(TestBatchBloomFilter, Basic) {
   AssertBufferEqual(*buffer, *batch_insert_buffer);
 }
 
+// Guards against silent drift between the baseline and AVX2 probe bodies --
+// DynamicDispatch only runs one of them per host.
+#if defined(ARROW_HAVE_RUNTIME_AVX2)
+namespace {
+
+// Aliased from the class constant for brevity in array bounds below.
+constexpr int kBitsSetPerBlock = BlockSplitBloomFilter::kBitsSetPerBlock;
+
+// Test-only SALT (matches the Parquet SBBF spec values used in
+// bloom_filter.h). Kernel-vs-kernel agreement holds for any SALT, so this
+// duplication is a contained test-side convenience, not a spec mirror.
+constexpr uint32_t kProbeTestSalt[kBitsSetPerBlock] = {
+    0x47b6137bU, 0x44974d91U, 0x8824ad5bU, 0xa2b7289dU,
+    0x705495c7U, 0x2df1424bU, 0x9efc4947U, 0x5c6bfb31U,
+};
+
+inline void InsertIntoBlock(uint32_t* block, uint32_t key) {
+  for (int i = 0; i < kBitsSetPerBlock; ++i) {
+    block[i] |= uint32_t{1} << ((key * kProbeTestSalt[i]) >> 27);
+  }
+}
+
+void AssertKernelsAgree(std::span<const uint32_t, kBitsSetPerBlock> block, 
uint32_t key) {
+  const bool standard = internal::FindHashBlockImpl(block, kProbeTestSalt, 
key);
+  const bool avx2 = internal::FindHashBlockAvx2(block, kProbeTestSalt, key);
+  ASSERT_EQ(standard, avx2) << "dispatch targets diverged for key=0x" << 
std::hex << key;
+}
+
+}  // namespace
+
+class BloomFilterProbeKernel : public ::testing::Test {
+ protected:
+  void SetUp() override {
+    if (!::arrow::internal::CpuInfo::GetInstance()->IsSupported(
+            ::arrow::internal::CpuInfo::AVX2)) {
+      GTEST_SKIP() << "AVX2 not available at runtime";
+    }
+  }
+};
+
+// Random-block fuzz: exercises the full bit lattice, catches reduction /
+// operand-order bugs that don't depend on realistic fill density.
+TEST_F(BloomFilterProbeKernel, AgreeOnRandomBlocks) {
+  std::mt19937_64 rng(0xC0FFEE);
+  constexpr int kNumTrials = 20000;
+  for (int trial = 0; trial < kNumTrials; ++trial) {
+    uint32_t block[kBitsSetPerBlock];
+    for (uint32_t& word : block) {
+      word = static_cast<uint32_t>(rng());
+    }
+    AssertKernelsAgree(block, static_cast<uint32_t>(rng()));
+  }
+}
+
+// Production-fill fuzz: blocks populated by the same SALT-derived insert the
+// writer uses, then probed with both inserted keys (must match) and fresh
+// keys (mostly miss). Catches bugs that only surface on real fill density.
+TEST_F(BloomFilterProbeKernel, AgreeOnPopulatedBlocks) {
+  std::mt19937_64 rng(0xBABECAFE);
+  constexpr int kNumBlocks = 200;
+  constexpr int kKeysPerBlock = 6;  // ~k inserts per 256-bit block, realistic 
FPP.
+  for (int b = 0; b < kNumBlocks; ++b) {
+    uint32_t block[kBitsSetPerBlock] = {0};
+    std::vector<uint32_t> inserted;
+    inserted.reserve(kKeysPerBlock);
+    for (int k = 0; k < kKeysPerBlock; ++k) {
+      const uint32_t key = static_cast<uint32_t>(rng());
+      InsertIntoBlock(block, key);
+      inserted.push_back(key);
+    }
+    for (uint32_t key : inserted) {
+      AssertKernelsAgree(block, key);
+    }
+    for (int q = 0; q < 50; ++q) {
+      AssertKernelsAgree(block, static_cast<uint32_t>(rng()));
+    }
+  }
+}
+#endif
+
 }  // namespace test
 }  // namespace parquet

Reply via email to