westonpace commented on a change in pull request #12067: URL: https://github.com/apache/arrow/pull/12067#discussion_r779963566
########## File path: cpp/src/arrow/compute/exec/bloom_filter.h ########## @@ -0,0 +1,313 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#if defined(ARROW_HAVE_AVX2) +#include <immintrin.h> +#endif + +#include <atomic> +#include <cstdint> +#include <memory> +#include "arrow/compute/exec/partition_util.h" +#include "arrow/compute/exec/util.h" +#include "arrow/memory_pool.h" +#include "arrow/result.h" +#include "arrow/status.h" + +namespace arrow { +namespace compute { + +// A set of pre-generated bit masks from a 64-bit word. +// +// It is used to map selected bits of hash to a bit mask that will be used in +// a Bloom filter. +// +// These bit masks need to look random and need to have a similar fractions of +// bits set in order for a Bloom filter to have a low false positives rate. +// +struct BloomFilterMasks { + // Generate all masks as a single bit vector. Each bit offset in this bit + // vector corresponds to a single mask. + // In each consecutive kBitsPerMask bits, there must be between + // kMinBitsSet and kMaxBitsSet bits set. + // + BloomFilterMasks(); + + inline uint64_t mask(int bit_offset) { +#if ARROW_LITTLE_ENDIAN + return (util::SafeLoadAs<uint64_t>(masks_ + bit_offset / 8) >> (bit_offset % 8)) & + kFullMask; +#else + return (BYTESWAP(util::SafeLoadAs<uint64_t>(masks_ + bit_offset / 8)) >> + (bit_offset % 8)) & + kFullMask; +#endif + } + + // Masks are 57 bits long because then they can be accessed at an + // arbitrary bit offset using a single unaligned 64-bit load instruction. + // + static constexpr int kBitsPerMask = 57; + static constexpr uint64_t kFullMask = (1ULL << kBitsPerMask) - 1; + + // Minimum and maximum number of bits set in each mask. + // This constraint is enforced when generating the bit masks. + // Values should be close to each other and chosen as to minimize a Bloom + // filter false positives rate. + // + static constexpr int kMinBitsSet = 4; + static constexpr int kMaxBitsSet = 5; + + // Number of generated masks. + // Having more masks to choose will improve false positives rate of Bloom + // filter but will also use more memory, which may lead to more CPU cache + // misses. + // The chosen value results in using only a few cache-lines for mask lookups, + // while providing a good variety of available bit masks. + // + static constexpr int kLogNumMasks = 10; + static constexpr int kNumMasks = 1 << kLogNumMasks; + + // Data of masks. Masks are stored in a single bit vector. Nth mask is + // kBitsPerMask bits starting at bit offset N. + // + static constexpr int kTotalBytes = (kNumMasks + 64) / 8; + uint8_t masks_[kTotalBytes]; +}; + +// A variant of a blocked Bloom filter implementation. +// A Bloom filter is a data structure that provides approximate membership test +// functionality based only on the hash of the key. Membership test may return +// false positives but not false negatives. Approximation of the result allows +// in general case (for arbitrary data types of keys) to save on both memory and +// lookup cost compared to the accurate membership test. +// The accurate test may sometimes still be cheaper for a specific data types +// and inputs, e.g. integers from a small range. +// +// This blocked Bloom filter is optimized for use in hash joins, to achieve a +// good balance between the size of the filter, the cost of its building and +// querying and the rate of false positives. +// +class BlockedBloomFilter { + friend class BloomFilterBuilder_Partitioned; + friend class BloomFilterBuilder_Atomic; Review comment: Can these two lines be removed? I don't see these classes anywhere. ########## File path: cpp/src/arrow/compute/exec/bloom_filter.h ########## @@ -0,0 +1,313 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#if defined(ARROW_HAVE_AVX2) +#include <immintrin.h> +#endif + +#include <atomic> +#include <cstdint> +#include <memory> +#include "arrow/compute/exec/partition_util.h" +#include "arrow/compute/exec/util.h" +#include "arrow/memory_pool.h" +#include "arrow/result.h" +#include "arrow/status.h" + +namespace arrow { +namespace compute { + +// A set of pre-generated bit masks from a 64-bit word. +// +// It is used to map selected bits of hash to a bit mask that will be used in +// a Bloom filter. +// +// These bit masks need to look random and need to have a similar fractions of +// bits set in order for a Bloom filter to have a low false positives rate. +// +struct BloomFilterMasks { + // Generate all masks as a single bit vector. Each bit offset in this bit + // vector corresponds to a single mask. + // In each consecutive kBitsPerMask bits, there must be between + // kMinBitsSet and kMaxBitsSet bits set. + // + BloomFilterMasks(); + + inline uint64_t mask(int bit_offset) { +#if ARROW_LITTLE_ENDIAN + return (util::SafeLoadAs<uint64_t>(masks_ + bit_offset / 8) >> (bit_offset % 8)) & + kFullMask; +#else + return (BYTESWAP(util::SafeLoadAs<uint64_t>(masks_ + bit_offset / 8)) >> + (bit_offset % 8)) & + kFullMask; +#endif + } + + // Masks are 57 bits long because then they can be accessed at an + // arbitrary bit offset using a single unaligned 64-bit load instruction. + // + static constexpr int kBitsPerMask = 57; + static constexpr uint64_t kFullMask = (1ULL << kBitsPerMask) - 1; + + // Minimum and maximum number of bits set in each mask. + // This constraint is enforced when generating the bit masks. + // Values should be close to each other and chosen as to minimize a Bloom + // filter false positives rate. + // + static constexpr int kMinBitsSet = 4; + static constexpr int kMaxBitsSet = 5; + + // Number of generated masks. + // Having more masks to choose will improve false positives rate of Bloom + // filter but will also use more memory, which may lead to more CPU cache + // misses. + // The chosen value results in using only a few cache-lines for mask lookups, + // while providing a good variety of available bit masks. + // + static constexpr int kLogNumMasks = 10; + static constexpr int kNumMasks = 1 << kLogNumMasks; + + // Data of masks. Masks are stored in a single bit vector. Nth mask is + // kBitsPerMask bits starting at bit offset N. + // + static constexpr int kTotalBytes = (kNumMasks + 64) / 8; + uint8_t masks_[kTotalBytes]; +}; + +// A variant of a blocked Bloom filter implementation. +// A Bloom filter is a data structure that provides approximate membership test +// functionality based only on the hash of the key. Membership test may return +// false positives but not false negatives. Approximation of the result allows +// in general case (for arbitrary data types of keys) to save on both memory and +// lookup cost compared to the accurate membership test. +// The accurate test may sometimes still be cheaper for a specific data types +// and inputs, e.g. integers from a small range. +// +// This blocked Bloom filter is optimized for use in hash joins, to achieve a +// good balance between the size of the filter, the cost of its building and +// querying and the rate of false positives. +// +class BlockedBloomFilter { Review comment: Can you link to a paper or description describing the filter you based your implementation on and then briefly describe what's variant about this one (or at least what the goals were)? ########## File path: cpp/src/arrow/compute/exec/bloom_filter.h ########## @@ -0,0 +1,313 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#if defined(ARROW_HAVE_AVX2) +#include <immintrin.h> +#endif + +#include <atomic> +#include <cstdint> +#include <memory> +#include "arrow/compute/exec/partition_util.h" +#include "arrow/compute/exec/util.h" +#include "arrow/memory_pool.h" +#include "arrow/result.h" +#include "arrow/status.h" + +namespace arrow { +namespace compute { + +// A set of pre-generated bit masks from a 64-bit word. +// +// It is used to map selected bits of hash to a bit mask that will be used in +// a Bloom filter. +// +// These bit masks need to look random and need to have a similar fractions of +// bits set in order for a Bloom filter to have a low false positives rate. +// +struct BloomFilterMasks { + // Generate all masks as a single bit vector. Each bit offset in this bit + // vector corresponds to a single mask. + // In each consecutive kBitsPerMask bits, there must be between + // kMinBitsSet and kMaxBitsSet bits set. + // + BloomFilterMasks(); + + inline uint64_t mask(int bit_offset) { +#if ARROW_LITTLE_ENDIAN + return (util::SafeLoadAs<uint64_t>(masks_ + bit_offset / 8) >> (bit_offset % 8)) & + kFullMask; +#else + return (BYTESWAP(util::SafeLoadAs<uint64_t>(masks_ + bit_offset / 8)) >> + (bit_offset % 8)) & + kFullMask; +#endif + } + + // Masks are 57 bits long because then they can be accessed at an + // arbitrary bit offset using a single unaligned 64-bit load instruction. + // + static constexpr int kBitsPerMask = 57; + static constexpr uint64_t kFullMask = (1ULL << kBitsPerMask) - 1; + + // Minimum and maximum number of bits set in each mask. + // This constraint is enforced when generating the bit masks. + // Values should be close to each other and chosen as to minimize a Bloom + // filter false positives rate. + // + static constexpr int kMinBitsSet = 4; + static constexpr int kMaxBitsSet = 5; + + // Number of generated masks. + // Having more masks to choose will improve false positives rate of Bloom + // filter but will also use more memory, which may lead to more CPU cache + // misses. + // The chosen value results in using only a few cache-lines for mask lookups, + // while providing a good variety of available bit masks. + // + static constexpr int kLogNumMasks = 10; + static constexpr int kNumMasks = 1 << kLogNumMasks; + + // Data of masks. Masks are stored in a single bit vector. Nth mask is + // kBitsPerMask bits starting at bit offset N. + // + static constexpr int kTotalBytes = (kNumMasks + 64) / 8; + uint8_t masks_[kTotalBytes]; +}; + +// A variant of a blocked Bloom filter implementation. +// A Bloom filter is a data structure that provides approximate membership test +// functionality based only on the hash of the key. Membership test may return +// false positives but not false negatives. Approximation of the result allows +// in general case (for arbitrary data types of keys) to save on both memory and +// lookup cost compared to the accurate membership test. +// The accurate test may sometimes still be cheaper for a specific data types +// and inputs, e.g. integers from a small range. +// +// This blocked Bloom filter is optimized for use in hash joins, to achieve a +// good balance between the size of the filter, the cost of its building and +// querying and the rate of false positives. +// +class BlockedBloomFilter { + friend class BloomFilterBuilder_Partitioned; + friend class BloomFilterBuilder_Atomic; + + public: + Status CreateEmpty(int64_t num_rows_to_insert, MemoryPool* pool); + + inline void Insert(uint64_t hash) { + uint64_t m = mask(hash); + uint64_t& b = blocks_[block_id(hash)]; + b |= m; + } + + inline bool Find(uint64_t hash) const { + uint64_t m = mask(hash); + uint64_t b = blocks_[block_id(hash)]; + return (b & m) == m; + } + + void Insert(int64_t hardware_flags, int64_t num_rows, const uint32_t* hashes); + void Insert(int64_t hardware_flags, int64_t num_rows, const uint64_t* hashes); + + // Uses SIMD if available for smaller Bloom filters. + // Uses memory prefetching for larger Bloom filters. + // + void Find(int64_t hardware_flags, int64_t num_rows, const uint32_t* hashes, + uint8_t* result_bit_vector, bool enable_prefetch = true) const; + void Find(int64_t hardware_flags, int64_t num_rows, const uint64_t* hashes, + uint8_t* result_bit_vector, bool enable_prefetch = true) const; + + // Folding of a block Bloom filter after the initial version + // has been built. + // + // One of the parameters for creation of Bloom filter is the number + // of bits allocated for it. The more bits allocated, the lower the + // probability of false positives. A good heuristic is to aim for + // half of the bits set in the constructed Bloom filter. This should + // result in a good trade off between size (and following cost of + // memory accesses) and false positives rate. + // + // There might have been many duplicate keys in the input provided + // to Bloom filter builder. In that case the resulting bit vector + // would be more sparse then originally intended. It is possible to + // easily correct that and cut in half the size of Bloom filter + // after it has already been constructed. The process to do that is + // approximately equal to OR-ing bits from upper and lower half (the + // way we address these bits when inserting or querying a hash makes + // such folding in half possible). + // + // We will keep folding as long as the fraction of bits set is less + // than 1/4. The resulting bit vector density should be in the [1/4, + // 1/2) range. + // + void Fold(); + + int log_num_blocks() const { return log_num_blocks_; } + + int NumHashBitsUsed() const; + + bool IsSameAs(const BlockedBloomFilter* other) const; + + int64_t NumBitsSet() const; + + private: + inline uint64_t mask(uint64_t hash) const { + // The lowest bits of hash are used to pick mask index. + // + int mask_id = static_cast<int>(hash & (BloomFilterMasks::kNumMasks - 1)); + uint64_t result = masks_.mask(mask_id); + + // The next set of hash bits is used to pick the amount of bit + // rotation of the mask. + // + int rotation = (hash >> BloomFilterMasks::kLogNumMasks) & 63; + result = ROTL64(result, rotation); + + return result; + } + + inline int64_t block_id(uint64_t hash) const { + // The next set of hash bits following the bits used to select a + // mask is used to pick block id (index of 64-bit word in a bit + // vector). + // + return (hash >> (BloomFilterMasks::kLogNumMasks + 6)) & (num_blocks_ - 1); + } + + template <typename T> + inline void InsertImp(int64_t num_rows, const T* hashes); + + template <typename T> + inline void FindImp(int64_t num_rows, const T* hashes, uint8_t* result_bit_vector, + bool enable_prefetch) const; + + void SingleFold(int num_folds); + +#if defined(ARROW_HAVE_AVX2) + inline __m256i mask_avx2(__m256i hash) const; + inline __m256i block_id_avx2(__m256i hash) const; + int64_t Insert_avx2(int64_t num_rows, const uint32_t* hashes); + int64_t Insert_avx2(int64_t num_rows, const uint64_t* hashes); + template <typename T> + int64_t InsertImp_avx2(int64_t num_rows, const T* hashes); + int64_t Find_avx2(int64_t num_rows, const uint32_t* hashes, + uint8_t* result_bit_vector) const; + int64_t Find_avx2(int64_t num_rows, const uint64_t* hashes, + uint8_t* result_bit_vector) const; + template <typename T> + int64_t FindImp_avx2(int64_t num_rows, const T* hashes, + uint8_t* result_bit_vector) const; +#endif + + bool UsePrefetch() const { + return num_blocks_ * sizeof(uint64_t) > kPrefetchLimitBytes; + } + + static constexpr int64_t kPrefetchLimitBytes = 256 * 1024; + + static BloomFilterMasks masks_; + + // Total number of bits used by block Bloom filter must be a power + // of 2. + // + int log_num_blocks_; + int64_t num_blocks_; + + // Buffer allocated to store an array of power of 2 64-bit blocks. + // + std::shared_ptr<Buffer> buf_; + // Pointer to mutable data owned by Buffer + // + uint64_t* blocks_; +}; + +enum class BloomFilterBuildStrategy { + SINGLE_THREADED = 0, + PARALLEL = 1, +}; + +class BloomFilterBuilder { Review comment: This API shape (begin/finish/cleanup) is a little unorthodox (elsewhere for builders we use constructor or static factory that initializes the target, finish returns the built target, and cleanup happens in the destructor). However, I'm thinking this is for interaction with the scheduler API used elsewhere in the hash-join implementation? ########## File path: cpp/src/arrow/compute/exec/bloom_filter.h ########## @@ -0,0 +1,313 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#if defined(ARROW_HAVE_AVX2) +#include <immintrin.h> +#endif + +#include <atomic> +#include <cstdint> +#include <memory> +#include "arrow/compute/exec/partition_util.h" +#include "arrow/compute/exec/util.h" +#include "arrow/memory_pool.h" +#include "arrow/result.h" +#include "arrow/status.h" + +namespace arrow { +namespace compute { + +// A set of pre-generated bit masks from a 64-bit word. +// +// It is used to map selected bits of hash to a bit mask that will be used in +// a Bloom filter. +// +// These bit masks need to look random and need to have a similar fractions of +// bits set in order for a Bloom filter to have a low false positives rate. +// +struct BloomFilterMasks { + // Generate all masks as a single bit vector. Each bit offset in this bit + // vector corresponds to a single mask. + // In each consecutive kBitsPerMask bits, there must be between + // kMinBitsSet and kMaxBitsSet bits set. + // + BloomFilterMasks(); + + inline uint64_t mask(int bit_offset) { +#if ARROW_LITTLE_ENDIAN + return (util::SafeLoadAs<uint64_t>(masks_ + bit_offset / 8) >> (bit_offset % 8)) & + kFullMask; +#else + return (BYTESWAP(util::SafeLoadAs<uint64_t>(masks_ + bit_offset / 8)) >> + (bit_offset % 8)) & + kFullMask; +#endif + } + + // Masks are 57 bits long because then they can be accessed at an + // arbitrary bit offset using a single unaligned 64-bit load instruction. + // + static constexpr int kBitsPerMask = 57; + static constexpr uint64_t kFullMask = (1ULL << kBitsPerMask) - 1; + + // Minimum and maximum number of bits set in each mask. + // This constraint is enforced when generating the bit masks. + // Values should be close to each other and chosen as to minimize a Bloom + // filter false positives rate. + // + static constexpr int kMinBitsSet = 4; + static constexpr int kMaxBitsSet = 5; + + // Number of generated masks. + // Having more masks to choose will improve false positives rate of Bloom + // filter but will also use more memory, which may lead to more CPU cache + // misses. + // The chosen value results in using only a few cache-lines for mask lookups, + // while providing a good variety of available bit masks. + // + static constexpr int kLogNumMasks = 10; + static constexpr int kNumMasks = 1 << kLogNumMasks; + + // Data of masks. Masks are stored in a single bit vector. Nth mask is + // kBitsPerMask bits starting at bit offset N. + // + static constexpr int kTotalBytes = (kNumMasks + 64) / 8; + uint8_t masks_[kTotalBytes]; +}; + +// A variant of a blocked Bloom filter implementation. +// A Bloom filter is a data structure that provides approximate membership test +// functionality based only on the hash of the key. Membership test may return +// false positives but not false negatives. Approximation of the result allows +// in general case (for arbitrary data types of keys) to save on both memory and +// lookup cost compared to the accurate membership test. +// The accurate test may sometimes still be cheaper for a specific data types +// and inputs, e.g. integers from a small range. +// +// This blocked Bloom filter is optimized for use in hash joins, to achieve a +// good balance between the size of the filter, the cost of its building and +// querying and the rate of false positives. +// +class BlockedBloomFilter { + friend class BloomFilterBuilder_Partitioned; + friend class BloomFilterBuilder_Atomic; + + public: + Status CreateEmpty(int64_t num_rows_to_insert, MemoryPool* pool); Review comment: Nit: This feels like it could be a private method but then I suppose you will need to friend the builders. Why not make this a constructor? ########## File path: cpp/src/arrow/compute/exec/bloom_filter.cc ########## @@ -0,0 +1,434 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/exec/bloom_filter.h" +#include <random> +#include "arrow/compute/exec/util.h" // PREFETCH +#include "arrow/util/bit_util.h" // Log2 +#include "arrow/util/bitmap_ops.h" // CountSetBits + +namespace arrow { +namespace compute { + +BloomFilterMasks::BloomFilterMasks() { + std::seed_seq seed{0, 0, 0, 0, 0, 0, 0, 0}; + std::mt19937 re(seed); + std::uniform_int_distribution<uint64_t> rd; + auto random = [&re, &rd](int min_value, int max_value) { + return min_value + rd(re) % (max_value - min_value + 1); + }; + + memset(masks_, 0, kTotalBytes); + + // Prepare the first mask + // + int num_bits_set = static_cast<int>(random(kMinBitsSet, kMaxBitsSet)); + for (int i = 0; i < num_bits_set; ++i) { + for (;;) { + int bit_pos = static_cast<int>(random(0, kBitsPerMask - 1)); + if (!bit_util::GetBit(masks_, bit_pos)) { + bit_util::SetBit(masks_, bit_pos); + break; + } + } + } + + int64_t num_bits_total = kNumMasks + kBitsPerMask - 1; + + // The value num_bits_set will be updated in each iteration of the loop to + // represent the number of bits set in the entire mask directly preceding the + // currently processed bit. + // + for (int64_t i = kBitsPerMask; i < num_bits_total; ++i) { + // The value of the lowest bit of the previous mask that will be removed + // from the current mask as we move to the next position in the bit vector + // of masks. + // + int bit_leaving = bit_util::GetBit(masks_, i - kBitsPerMask) ? 1 : 0; + + // Next bit has to be 1 because of minimum bits in a mask requirement + // + if (bit_leaving == 1 && num_bits_set == kMinBitsSet) { + bit_util::SetBit(masks_, i); + continue; + } + + // Next bit has to be 0 because of maximum bits in a mask requirement + // + if (bit_leaving == 0 && num_bits_set == kMaxBitsSet) { + continue; + } + + // Next bit can be random. Use the expected number of bits set in a mask + // as a probability of 1. + // + if (random(0, kBitsPerMask * 2 - 1) < kMinBitsSet + kMaxBitsSet) { + bit_util::SetBit(masks_, i); + if (bit_leaving == 0) { + ++num_bits_set; + } + } else { + if (bit_leaving == 1) { + --num_bits_set; + } + } + } +} + +BloomFilterMasks BlockedBloomFilter::masks_; + +Status BlockedBloomFilter::CreateEmpty(int64_t num_rows_to_insert, MemoryPool* pool) { + // Compute the size + // + constexpr int64_t min_num_bits_per_key = 8; + constexpr int64_t min_num_bits = 512; + int64_t desired_num_bits = + std::max(min_num_bits, num_rows_to_insert * min_num_bits_per_key); + int log_num_bits = bit_util::Log2(desired_num_bits); + + log_num_blocks_ = log_num_bits - 6; + num_blocks_ = 1ULL << log_num_blocks_; + + // Allocate and zero out bit vector + // + int64_t buffer_size = num_blocks_ * sizeof(uint64_t); + ARROW_ASSIGN_OR_RAISE(buf_, AllocateBuffer(buffer_size, pool)); + blocks_ = reinterpret_cast<uint64_t*>(buf_->mutable_data()); + memset(blocks_, 0, buffer_size); + + return Status::OK(); +} + +template <typename T> +void BlockedBloomFilter::InsertImp(int64_t num_rows, const T* hashes) { + for (int64_t i = 0; i < num_rows; ++i) { + Insert(hashes[i]); + } +} + +void BlockedBloomFilter::Insert(int64_t hardware_flags, int64_t num_rows, + const uint32_t* hashes) { + int64_t num_processed = 0; +#if defined(ARROW_HAVE_AVX2) + if (hardware_flags & arrow::internal::CpuInfo::AVX2) { + num_processed = Insert_avx2(num_rows, hashes); + } +#endif + InsertImp(num_rows - num_processed, hashes + num_processed); Review comment: Would leftovers here be whatever hashes didn't fit nicely into the AVX block? ########## File path: cpp/src/arrow/compute/exec/bloom_filter.h ########## @@ -0,0 +1,313 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#if defined(ARROW_HAVE_AVX2) +#include <immintrin.h> +#endif + +#include <atomic> +#include <cstdint> +#include <memory> +#include "arrow/compute/exec/partition_util.h" +#include "arrow/compute/exec/util.h" +#include "arrow/memory_pool.h" +#include "arrow/result.h" +#include "arrow/status.h" + +namespace arrow { +namespace compute { + +// A set of pre-generated bit masks from a 64-bit word. +// +// It is used to map selected bits of hash to a bit mask that will be used in +// a Bloom filter. +// +// These bit masks need to look random and need to have a similar fractions of +// bits set in order for a Bloom filter to have a low false positives rate. +// +struct BloomFilterMasks { + // Generate all masks as a single bit vector. Each bit offset in this bit + // vector corresponds to a single mask. + // In each consecutive kBitsPerMask bits, there must be between + // kMinBitsSet and kMaxBitsSet bits set. + // + BloomFilterMasks(); + + inline uint64_t mask(int bit_offset) { +#if ARROW_LITTLE_ENDIAN + return (util::SafeLoadAs<uint64_t>(masks_ + bit_offset / 8) >> (bit_offset % 8)) & + kFullMask; +#else + return (BYTESWAP(util::SafeLoadAs<uint64_t>(masks_ + bit_offset / 8)) >> + (bit_offset % 8)) & + kFullMask; +#endif + } + + // Masks are 57 bits long because then they can be accessed at an + // arbitrary bit offset using a single unaligned 64-bit load instruction. + // + static constexpr int kBitsPerMask = 57; + static constexpr uint64_t kFullMask = (1ULL << kBitsPerMask) - 1; + + // Minimum and maximum number of bits set in each mask. + // This constraint is enforced when generating the bit masks. + // Values should be close to each other and chosen as to minimize a Bloom + // filter false positives rate. + // + static constexpr int kMinBitsSet = 4; + static constexpr int kMaxBitsSet = 5; + + // Number of generated masks. + // Having more masks to choose will improve false positives rate of Bloom + // filter but will also use more memory, which may lead to more CPU cache + // misses. + // The chosen value results in using only a few cache-lines for mask lookups, + // while providing a good variety of available bit masks. + // + static constexpr int kLogNumMasks = 10; + static constexpr int kNumMasks = 1 << kLogNumMasks; + + // Data of masks. Masks are stored in a single bit vector. Nth mask is + // kBitsPerMask bits starting at bit offset N. + // + static constexpr int kTotalBytes = (kNumMasks + 64) / 8; + uint8_t masks_[kTotalBytes]; +}; + +// A variant of a blocked Bloom filter implementation. +// A Bloom filter is a data structure that provides approximate membership test +// functionality based only on the hash of the key. Membership test may return +// false positives but not false negatives. Approximation of the result allows +// in general case (for arbitrary data types of keys) to save on both memory and +// lookup cost compared to the accurate membership test. +// The accurate test may sometimes still be cheaper for a specific data types +// and inputs, e.g. integers from a small range. +// +// This blocked Bloom filter is optimized for use in hash joins, to achieve a +// good balance between the size of the filter, the cost of its building and +// querying and the rate of false positives. +// +class BlockedBloomFilter { + friend class BloomFilterBuilder_Partitioned; + friend class BloomFilterBuilder_Atomic; + + public: + Status CreateEmpty(int64_t num_rows_to_insert, MemoryPool* pool); + + inline void Insert(uint64_t hash) { + uint64_t m = mask(hash); + uint64_t& b = blocks_[block_id(hash)]; + b |= m; + } + + inline bool Find(uint64_t hash) const { + uint64_t m = mask(hash); + uint64_t b = blocks_[block_id(hash)]; + return (b & m) == m; + } + + void Insert(int64_t hardware_flags, int64_t num_rows, const uint32_t* hashes); Review comment: Why is hardware_flags an int64 instead of a bool? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org