alkis commented on code in PR #14049:
URL: https://github.com/apache/arrow/pull/14049#discussion_r1125751843


##########
cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.h:
##########
@@ -0,0 +1,513 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include "arrow/compute/exec/util.h"
+#include "arrow/util/bit_util.h"
+
+namespace arrow {
+namespace compute {
+
+// Storage for a bit vector to be used with BitVectorNavigator and its 
variants.
+//
+// Supports weaved bit vectors.
+//
+class BitVectorWithCountsBase {
+  template <bool T>
+  friend class BitVectorNavigatorImp;
+
+ public:
+  BitVectorWithCountsBase() : num_children_(0), num_bits_per_child_(0) {}
+
+  void Resize(int64_t num_bits_per_child, int64_t num_children = 1) {
+    ARROW_DCHECK(num_children > 0 && num_bits_per_child > 0);
+    num_children_ = num_children;
+    num_bits_per_child_ = num_bits_per_child;
+    int64_t num_words =
+        bit_util::CeilDiv(num_bits_per_child, kBitsPerWord) * num_children;
+    bits_.resize(num_words);
+    mid_counts_.resize(num_words);
+    int64_t num_blocks =
+        bit_util::CeilDiv(num_bits_per_child, kBitsPerBlock) * num_children;
+    top_counts_.resize(num_blocks);
+  }
+
+  void ClearBits() { memset(bits_.data(), 0, bits_.size() * sizeof(bits_[0])); 
}
+
+  // Word is 64 adjacent bits
+  //
+  static constexpr int64_t kBitsPerWord = 64;
+  // Block is 65536 adjacent bits
+  // (that means that 16-bit counters can be used within the block)
+  //
+#ifndef NDEBUG
+  static constexpr int kLogBitsPerBlock = 7;
+#else
+  static constexpr int kLogBitsPerBlock = 16;
+#endif
+  static constexpr int64_t kBitsPerBlock = 1LL << kLogBitsPerBlock;
+
+ protected:
+  int64_t num_children_;
+  int64_t num_bits_per_child_;
+  // TODO: Replace vectors with ResizableBuffers. Return error status from
+  // Resize on out-of-memory.
+  //
+  std::vector<uint64_t> bits_;
+  std::vector<int64_t> top_counts_;
+  std::vector<uint16_t> mid_counts_;
+};
+
+template <bool SINGLE_CHILD_BIT_VECTOR>
+class BitVectorNavigatorImp {
+ public:
+  BitVectorNavigatorImp() : container_(NULLPTR) {}
+
+  BitVectorNavigatorImp(BitVectorWithCountsBase* container, int64_t 
child_index)
+      : container_(container), child_index_(child_index) {}
+
+  int64_t block_count() const {
+    return bit_util::CeilDiv(container_->num_bits_per_child_,

Review Comment:
   `CeilDiv` takes arguments as signed integers so it ends up generated 
suboptimal code.
   
   Because of the above, the codegen here and below has unnecessary branches: 
https://godbolt.org/z/6Me84646P



##########
cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.h:
##########
@@ -0,0 +1,513 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include "arrow/compute/exec/util.h"
+#include "arrow/util/bit_util.h"
+
+namespace arrow {
+namespace compute {
+
+// Storage for a bit vector to be used with BitVectorNavigator and its 
variants.
+//
+// Supports weaved bit vectors.
+//
+class BitVectorWithCountsBase {
+  template <bool T>
+  friend class BitVectorNavigatorImp;
+
+ public:
+  BitVectorWithCountsBase() : num_children_(0), num_bits_per_child_(0) {}
+
+  void Resize(int64_t num_bits_per_child, int64_t num_children = 1) {
+    ARROW_DCHECK(num_children > 0 && num_bits_per_child > 0);
+    num_children_ = num_children;
+    num_bits_per_child_ = num_bits_per_child;
+    int64_t num_words =
+        bit_util::CeilDiv(num_bits_per_child, kBitsPerWord) * num_children;
+    bits_.resize(num_words);
+    mid_counts_.resize(num_words);
+    int64_t num_blocks =
+        bit_util::CeilDiv(num_bits_per_child, kBitsPerBlock) * num_children;
+    top_counts_.resize(num_blocks);
+  }
+
+  void ClearBits() { memset(bits_.data(), 0, bits_.size() * sizeof(bits_[0])); 
}
+
+  // Word is 64 adjacent bits
+  //
+  static constexpr int64_t kBitsPerWord = 64;
+  // Block is 65536 adjacent bits
+  // (that means that 16-bit counters can be used within the block)
+  //
+#ifndef NDEBUG
+  static constexpr int kLogBitsPerBlock = 7;
+#else
+  static constexpr int kLogBitsPerBlock = 16;
+#endif
+  static constexpr int64_t kBitsPerBlock = 1LL << kLogBitsPerBlock;
+
+ protected:
+  int64_t num_children_;
+  int64_t num_bits_per_child_;
+  // TODO: Replace vectors with ResizableBuffers. Return error status from
+  // Resize on out-of-memory.
+  //
+  std::vector<uint64_t> bits_;
+  std::vector<int64_t> top_counts_;
+  std::vector<uint16_t> mid_counts_;
+};
+
+template <bool SINGLE_CHILD_BIT_VECTOR>
+class BitVectorNavigatorImp {
+ public:
+  BitVectorNavigatorImp() : container_(NULLPTR) {}
+
+  BitVectorNavigatorImp(BitVectorWithCountsBase* container, int64_t 
child_index)
+      : container_(container), child_index_(child_index) {}
+
+  int64_t block_count() const {
+    return bit_util::CeilDiv(container_->num_bits_per_child_,
+                             BitVectorWithCountsBase::kBitsPerBlock);
+  }
+
+  int64_t word_count() const {
+    return bit_util::CeilDiv(container_->num_bits_per_child_,
+                             BitVectorWithCountsBase::kBitsPerWord);
+  }
+
+  int64_t bit_count() const { return container_->num_bits_per_child_; }
+
+  int64_t pop_count() const {
+    int64_t last_block = block_count() - 1;
+    int64_t last_word = word_count() - 1;
+    int num_bits_last_word =
+        static_cast<int>((bit_count() - 1) % 
BitVectorWithCountsBase::kBitsPerWord + 1);
+    uint64_t last_word_mask = ~0ULL >> (64 - num_bits_last_word);
+    return container_->top_counts_[apply_stride_and_offset(last_block)] +
+           container_->mid_counts_[apply_stride_and_offset(last_word)] +
+           
ARROW_POPCOUNT64(container_->bits_[apply_stride_and_offset(last_word)] &
+                            last_word_mask);
+  }
+
+  const uint8_t* GetBytes() const {
+    return reinterpret_cast<const uint8_t*>(container_->bits_.data());
+  }
+
+  void BuildMidCounts(int64_t block_index) {
+    ARROW_DCHECK(block_index >= 0 &&
+                 block_index < 
static_cast<int64_t>(container_->mid_counts_.size()));
+    constexpr int64_t words_per_block =
+        BitVectorWithCountsBase::kBitsPerBlock / 
BitVectorWithCountsBase::kBitsPerWord;
+    int64_t word_begin = block_index * words_per_block;
+    int64_t word_end = std::min(word_count(), word_begin + words_per_block);
+
+    const uint64_t* words = container_->bits_.data();
+    uint16_t* counters = container_->mid_counts_.data();
+
+    uint16_t count = 0;
+    for (int64_t word_index = word_begin; word_index < word_end; ++word_index) 
{
+      counters[apply_stride_and_offset(word_index)] = count;
+      count += static_cast<uint16_t>(
+          ARROW_POPCOUNT64(words[apply_stride_and_offset(word_index)]));
+    }
+  }
+
+  void BuildTopCounts(int64_t block_index_begin, int64_t block_index_end,
+                      int64_t initial_count = 0) {
+    const uint64_t* words = container_->bits_.data();
+    int64_t* counters = container_->top_counts_.data();
+    const uint16_t* mid_counters = container_->mid_counts_.data();
+
+    int64_t count = initial_count;
+
+    for (int64_t block_index = block_index_begin; block_index < 
block_index_end - 1;
+         ++block_index) {
+      counters[apply_stride_and_offset(block_index)] = count;
+
+      constexpr int64_t words_per_block =
+          BitVectorWithCountsBase::kBitsPerBlock / 
BitVectorWithCountsBase::kBitsPerWord;
+
+      int64_t word_begin = block_index * words_per_block;
+      int64_t word_end = std::min(word_count(), word_begin + words_per_block);
+
+      count += mid_counters[apply_stride_and_offset(word_end - 1)];
+      count += ARROW_POPCOUNT64(words[apply_stride_and_offset(word_end - 1)]);
+    }
+    counters[apply_stride_and_offset(block_index_end - 1)] = count;
+  }
+
+  // Position of the nth bit set (input argument zero corresponds to the first
+  // bit set).
+  //
+  int64_t Select(int64_t rank) const {
+    if (rank < 0) {
+      return BeforeFirstBit();
+    }
+    if (rank >= pop_count()) {
+      return AfterLastBit();
+    }
+
+    constexpr int64_t bits_per_block = BitVectorWithCountsBase::kBitsPerBlock;
+    constexpr int64_t bits_per_word = BitVectorWithCountsBase::kBitsPerWord;
+    constexpr int64_t words_per_block = bits_per_block / bits_per_word;
+    const int64_t* top_counters = container_->top_counts_.data();
+    const uint16_t* mid_counters = container_->mid_counts_.data();
+    const uint64_t* words = container_->bits_.data();
+
+    // Binary search in top level counters.
+    //
+    // Equivalent of std::upper_bound() - 1, but not using iterators.
+    //
+    int64_t begin = 0;
+    int64_t end = block_count();
+    while (end - begin > 1) {
+      int64_t middle = (begin + end) / 2;

Review Comment:
   in principle this can overflow so it should be `begin + (end - begin) / 2`



##########
cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.h:
##########
@@ -0,0 +1,513 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include "arrow/compute/exec/util.h"
+#include "arrow/util/bit_util.h"
+
+namespace arrow {
+namespace compute {
+
+// Storage for a bit vector to be used with BitVectorNavigator and its 
variants.
+//
+// Supports weaved bit vectors.
+//
+class BitVectorWithCountsBase {
+  template <bool T>
+  friend class BitVectorNavigatorImp;
+
+ public:
+  BitVectorWithCountsBase() : num_children_(0), num_bits_per_child_(0) {}
+
+  void Resize(int64_t num_bits_per_child, int64_t num_children = 1) {
+    ARROW_DCHECK(num_children > 0 && num_bits_per_child > 0);
+    num_children_ = num_children;
+    num_bits_per_child_ = num_bits_per_child;
+    int64_t num_words =
+        bit_util::CeilDiv(num_bits_per_child, kBitsPerWord) * num_children;
+    bits_.resize(num_words);
+    mid_counts_.resize(num_words);
+    int64_t num_blocks =
+        bit_util::CeilDiv(num_bits_per_child, kBitsPerBlock) * num_children;
+    top_counts_.resize(num_blocks);
+  }
+
+  void ClearBits() { memset(bits_.data(), 0, bits_.size() * sizeof(bits_[0])); 
}
+
+  // Word is 64 adjacent bits
+  //
+  static constexpr int64_t kBitsPerWord = 64;
+  // Block is 65536 adjacent bits
+  // (that means that 16-bit counters can be used within the block)
+  //
+#ifndef NDEBUG
+  static constexpr int kLogBitsPerBlock = 7;
+#else
+  static constexpr int kLogBitsPerBlock = 16;
+#endif
+  static constexpr int64_t kBitsPerBlock = 1LL << kLogBitsPerBlock;
+
+ protected:
+  int64_t num_children_;
+  int64_t num_bits_per_child_;
+  // TODO: Replace vectors with ResizableBuffers. Return error status from
+  // Resize on out-of-memory.
+  //
+  std::vector<uint64_t> bits_;
+  std::vector<int64_t> top_counts_;
+  std::vector<uint16_t> mid_counts_;
+};
+
+template <bool SINGLE_CHILD_BIT_VECTOR>
+class BitVectorNavigatorImp {
+ public:
+  BitVectorNavigatorImp() : container_(NULLPTR) {}
+
+  BitVectorNavigatorImp(BitVectorWithCountsBase* container, int64_t 
child_index)
+      : container_(container), child_index_(child_index) {}
+
+  int64_t block_count() const {
+    return bit_util::CeilDiv(container_->num_bits_per_child_,
+                             BitVectorWithCountsBase::kBitsPerBlock);
+  }
+
+  int64_t word_count() const {
+    return bit_util::CeilDiv(container_->num_bits_per_child_,
+                             BitVectorWithCountsBase::kBitsPerWord);
+  }
+
+  int64_t bit_count() const { return container_->num_bits_per_child_; }
+
+  int64_t pop_count() const {
+    int64_t last_block = block_count() - 1;
+    int64_t last_word = word_count() - 1;
+    int num_bits_last_word =
+        static_cast<int>((bit_count() - 1) % 
BitVectorWithCountsBase::kBitsPerWord + 1);
+    uint64_t last_word_mask = ~0ULL >> (64 - num_bits_last_word);
+    return container_->top_counts_[apply_stride_and_offset(last_block)] +
+           container_->mid_counts_[apply_stride_and_offset(last_word)] +
+           
ARROW_POPCOUNT64(container_->bits_[apply_stride_and_offset(last_word)] &
+                            last_word_mask);
+  }
+
+  const uint8_t* GetBytes() const {
+    return reinterpret_cast<const uint8_t*>(container_->bits_.data());
+  }
+
+  void BuildMidCounts(int64_t block_index) {
+    ARROW_DCHECK(block_index >= 0 &&
+                 block_index < 
static_cast<int64_t>(container_->mid_counts_.size()));
+    constexpr int64_t words_per_block =
+        BitVectorWithCountsBase::kBitsPerBlock / 
BitVectorWithCountsBase::kBitsPerWord;
+    int64_t word_begin = block_index * words_per_block;
+    int64_t word_end = std::min(word_count(), word_begin + words_per_block);
+
+    const uint64_t* words = container_->bits_.data();
+    uint16_t* counters = container_->mid_counts_.data();
+
+    uint16_t count = 0;
+    for (int64_t word_index = word_begin; word_index < word_end; ++word_index) 
{
+      counters[apply_stride_and_offset(word_index)] = count;
+      count += static_cast<uint16_t>(
+          ARROW_POPCOUNT64(words[apply_stride_and_offset(word_index)]));
+    }
+  }
+
+  void BuildTopCounts(int64_t block_index_begin, int64_t block_index_end,
+                      int64_t initial_count = 0) {
+    const uint64_t* words = container_->bits_.data();
+    int64_t* counters = container_->top_counts_.data();
+    const uint16_t* mid_counters = container_->mid_counts_.data();
+
+    int64_t count = initial_count;
+
+    for (int64_t block_index = block_index_begin; block_index < 
block_index_end - 1;
+         ++block_index) {
+      counters[apply_stride_and_offset(block_index)] = count;
+
+      constexpr int64_t words_per_block =
+          BitVectorWithCountsBase::kBitsPerBlock / 
BitVectorWithCountsBase::kBitsPerWord;
+
+      int64_t word_begin = block_index * words_per_block;
+      int64_t word_end = std::min(word_count(), word_begin + words_per_block);
+
+      count += mid_counters[apply_stride_and_offset(word_end - 1)];
+      count += ARROW_POPCOUNT64(words[apply_stride_and_offset(word_end - 1)]);
+    }
+    counters[apply_stride_and_offset(block_index_end - 1)] = count;
+  }
+
+  // Position of the nth bit set (input argument zero corresponds to the first
+  // bit set).
+  //
+  int64_t Select(int64_t rank) const {
+    if (rank < 0) {
+      return BeforeFirstBit();
+    }
+    if (rank >= pop_count()) {
+      return AfterLastBit();
+    }
+
+    constexpr int64_t bits_per_block = BitVectorWithCountsBase::kBitsPerBlock;
+    constexpr int64_t bits_per_word = BitVectorWithCountsBase::kBitsPerWord;
+    constexpr int64_t words_per_block = bits_per_block / bits_per_word;
+    const int64_t* top_counters = container_->top_counts_.data();
+    const uint16_t* mid_counters = container_->mid_counts_.data();
+    const uint64_t* words = container_->bits_.data();
+
+    // Binary search in top level counters.
+    //
+    // Equivalent of std::upper_bound() - 1, but not using iterators.
+    //
+    int64_t begin = 0;
+    int64_t end = block_count();
+    while (end - begin > 1) {
+      int64_t middle = (begin + end) / 2;
+      int reject_left_half =
+          (rank >= top_counters[apply_stride_and_offset(middle)]) ? 1 : 0;
+      begin = begin + (middle - begin) * reject_left_half;
+      end = middle + (end - middle) * reject_left_half;
+    }
+
+    int64_t block_index = begin;
+    rank -= top_counters[apply_stride_and_offset(begin)];
+
+    // Continue with binary search in intermediate level counters of the
+    // selected block.
+    //
+    begin = block_index * words_per_block;
+    end = std::min(word_count(), begin + words_per_block);
+    while (end - begin > 1) {
+      int64_t middle = (begin + end) / 2;
+      int reject_left_half =
+          (rank >= mid_counters[apply_stride_and_offset(middle)]) ? 1 : 0;
+      begin = begin + (middle - begin) * reject_left_half;
+      end = middle + (end - middle) * reject_left_half;
+    }
+
+    int64_t word_index = begin;
+    rank -= mid_counters[apply_stride_and_offset(begin)];
+
+    // Continue with binary search in the selected word.
+    //
+    uint64_t word = words[apply_stride_and_offset(word_index)];
+    int pop_count_prefix = 0;
+    int bit_count_prefix = 0;
+    const uint64_t masks[6] = {0xFFFFFFFFULL, 0xFFFFULL, 0xFFULL, 0xFULL, 
0x3ULL, 0x1ULL};
+    int bit_count_left_half = 32;
+    for (int i = 0; i < 6; ++i) {
+      int pop_count_left_half =
+          static_cast<int>(ARROW_POPCOUNT64((word >> bit_count_prefix) & 
masks[i]));
+      int reject_left_half = (rank >= pop_count_prefix + pop_count_left_half) 
? 1 : 0;
+      pop_count_prefix += reject_left_half * pop_count_left_half;
+      bit_count_prefix += reject_left_half * bit_count_left_half;
+      bit_count_left_half /= 2;
+    }
+
+    return word_index * bits_per_word + bit_count_prefix;
+  }
+
+  void Select(int64_t rank_begin, int64_t rank_end, int64_t* selects,
+              const ThreadContext& thread_ctx) const {
+    ARROW_DCHECK(rank_begin <= rank_end);
+
+    // For ranks out of the range represented in the bit vector return
+    // BeforeFirstBit() or AfterLastBit().
+    //
+    if (rank_begin < 0) {
+      int64_t num_ranks_to_skip =
+          std::min(rank_end, static_cast<int64_t>(0)) - rank_begin;

Review Comment:
   nit: static_cast<int64_t>(0) -> int64_t{0} or 0LL



##########
cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.h:
##########
@@ -0,0 +1,513 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include "arrow/compute/exec/util.h"
+#include "arrow/util/bit_util.h"
+
+namespace arrow {
+namespace compute {
+
+// Storage for a bit vector to be used with BitVectorNavigator and its 
variants.
+//
+// Supports weaved bit vectors.
+//
+class BitVectorWithCountsBase {
+  template <bool T>
+  friend class BitVectorNavigatorImp;
+
+ public:
+  BitVectorWithCountsBase() : num_children_(0), num_bits_per_child_(0) {}
+
+  void Resize(int64_t num_bits_per_child, int64_t num_children = 1) {
+    ARROW_DCHECK(num_children > 0 && num_bits_per_child > 0);
+    num_children_ = num_children;
+    num_bits_per_child_ = num_bits_per_child;
+    int64_t num_words =
+        bit_util::CeilDiv(num_bits_per_child, kBitsPerWord) * num_children;
+    bits_.resize(num_words);
+    mid_counts_.resize(num_words);
+    int64_t num_blocks =
+        bit_util::CeilDiv(num_bits_per_child, kBitsPerBlock) * num_children;
+    top_counts_.resize(num_blocks);
+  }
+
+  void ClearBits() { memset(bits_.data(), 0, bits_.size() * sizeof(bits_[0])); 
}
+
+  // Word is 64 adjacent bits
+  //
+  static constexpr int64_t kBitsPerWord = 64;
+  // Block is 65536 adjacent bits
+  // (that means that 16-bit counters can be used within the block)
+  //
+#ifndef NDEBUG
+  static constexpr int kLogBitsPerBlock = 7;
+#else
+  static constexpr int kLogBitsPerBlock = 16;
+#endif
+  static constexpr int64_t kBitsPerBlock = 1LL << kLogBitsPerBlock;
+
+ protected:
+  int64_t num_children_;
+  int64_t num_bits_per_child_;
+  // TODO: Replace vectors with ResizableBuffers. Return error status from
+  // Resize on out-of-memory.
+  //
+  std::vector<uint64_t> bits_;
+  std::vector<int64_t> top_counts_;
+  std::vector<uint16_t> mid_counts_;
+};
+
+template <bool SINGLE_CHILD_BIT_VECTOR>
+class BitVectorNavigatorImp {
+ public:
+  BitVectorNavigatorImp() : container_(NULLPTR) {}
+
+  BitVectorNavigatorImp(BitVectorWithCountsBase* container, int64_t 
child_index)
+      : container_(container), child_index_(child_index) {}
+
+  int64_t block_count() const {
+    return bit_util::CeilDiv(container_->num_bits_per_child_,
+                             BitVectorWithCountsBase::kBitsPerBlock);
+  }
+
+  int64_t word_count() const {
+    return bit_util::CeilDiv(container_->num_bits_per_child_,
+                             BitVectorWithCountsBase::kBitsPerWord);
+  }
+
+  int64_t bit_count() const { return container_->num_bits_per_child_; }
+
+  int64_t pop_count() const {
+    int64_t last_block = block_count() - 1;
+    int64_t last_word = word_count() - 1;
+    int num_bits_last_word =
+        static_cast<int>((bit_count() - 1) % 
BitVectorWithCountsBase::kBitsPerWord + 1);
+    uint64_t last_word_mask = ~0ULL >> (64 - num_bits_last_word);

Review Comment:
   If `num_bits_last_word` is 0 this will not generate 0 mask. Is this a bug?



##########
cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.h:
##########
@@ -0,0 +1,513 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include "arrow/compute/exec/util.h"
+#include "arrow/util/bit_util.h"
+
+namespace arrow {
+namespace compute {
+
+// Storage for a bit vector to be used with BitVectorNavigator and its 
variants.
+//
+// Supports weaved bit vectors.
+//
+class BitVectorWithCountsBase {
+  template <bool T>
+  friend class BitVectorNavigatorImp;
+
+ public:
+  BitVectorWithCountsBase() : num_children_(0), num_bits_per_child_(0) {}
+
+  void Resize(int64_t num_bits_per_child, int64_t num_children = 1) {
+    ARROW_DCHECK(num_children > 0 && num_bits_per_child > 0);
+    num_children_ = num_children;
+    num_bits_per_child_ = num_bits_per_child;
+    int64_t num_words =
+        bit_util::CeilDiv(num_bits_per_child, kBitsPerWord) * num_children;
+    bits_.resize(num_words);
+    mid_counts_.resize(num_words);
+    int64_t num_blocks =
+        bit_util::CeilDiv(num_bits_per_child, kBitsPerBlock) * num_children;
+    top_counts_.resize(num_blocks);
+  }
+
+  void ClearBits() { memset(bits_.data(), 0, bits_.size() * sizeof(bits_[0])); 
}
+
+  // Word is 64 adjacent bits
+  //
+  static constexpr int64_t kBitsPerWord = 64;
+  // Block is 65536 adjacent bits
+  // (that means that 16-bit counters can be used within the block)
+  //
+#ifndef NDEBUG
+  static constexpr int kLogBitsPerBlock = 7;
+#else
+  static constexpr int kLogBitsPerBlock = 16;
+#endif
+  static constexpr int64_t kBitsPerBlock = 1LL << kLogBitsPerBlock;
+
+ protected:
+  int64_t num_children_;
+  int64_t num_bits_per_child_;
+  // TODO: Replace vectors with ResizableBuffers. Return error status from
+  // Resize on out-of-memory.
+  //
+  std::vector<uint64_t> bits_;
+  std::vector<int64_t> top_counts_;
+  std::vector<uint16_t> mid_counts_;
+};
+
+template <bool SINGLE_CHILD_BIT_VECTOR>
+class BitVectorNavigatorImp {
+ public:
+  BitVectorNavigatorImp() : container_(NULLPTR) {}
+
+  BitVectorNavigatorImp(BitVectorWithCountsBase* container, int64_t 
child_index)
+      : container_(container), child_index_(child_index) {}
+
+  int64_t block_count() const {
+    return bit_util::CeilDiv(container_->num_bits_per_child_,
+                             BitVectorWithCountsBase::kBitsPerBlock);
+  }
+
+  int64_t word_count() const {
+    return bit_util::CeilDiv(container_->num_bits_per_child_,
+                             BitVectorWithCountsBase::kBitsPerWord);
+  }
+
+  int64_t bit_count() const { return container_->num_bits_per_child_; }
+
+  int64_t pop_count() const {
+    int64_t last_block = block_count() - 1;
+    int64_t last_word = word_count() - 1;
+    int num_bits_last_word =
+        static_cast<int>((bit_count() - 1) % 
BitVectorWithCountsBase::kBitsPerWord + 1);
+    uint64_t last_word_mask = ~0ULL >> (64 - num_bits_last_word);
+    return container_->top_counts_[apply_stride_and_offset(last_block)] +
+           container_->mid_counts_[apply_stride_and_offset(last_word)] +
+           
ARROW_POPCOUNT64(container_->bits_[apply_stride_and_offset(last_word)] &
+                            last_word_mask);
+  }
+
+  const uint8_t* GetBytes() const {
+    return reinterpret_cast<const uint8_t*>(container_->bits_.data());
+  }
+
+  void BuildMidCounts(int64_t block_index) {
+    ARROW_DCHECK(block_index >= 0 &&
+                 block_index < 
static_cast<int64_t>(container_->mid_counts_.size()));
+    constexpr int64_t words_per_block =
+        BitVectorWithCountsBase::kBitsPerBlock / 
BitVectorWithCountsBase::kBitsPerWord;
+    int64_t word_begin = block_index * words_per_block;
+    int64_t word_end = std::min(word_count(), word_begin + words_per_block);
+
+    const uint64_t* words = container_->bits_.data();
+    uint16_t* counters = container_->mid_counts_.data();
+
+    uint16_t count = 0;
+    for (int64_t word_index = word_begin; word_index < word_end; ++word_index) 
{
+      counters[apply_stride_and_offset(word_index)] = count;
+      count += static_cast<uint16_t>(
+          ARROW_POPCOUNT64(words[apply_stride_and_offset(word_index)]));
+    }
+  }
+
+  void BuildTopCounts(int64_t block_index_begin, int64_t block_index_end,
+                      int64_t initial_count = 0) {
+    const uint64_t* words = container_->bits_.data();
+    int64_t* counters = container_->top_counts_.data();
+    const uint16_t* mid_counters = container_->mid_counts_.data();
+
+    int64_t count = initial_count;
+
+    for (int64_t block_index = block_index_begin; block_index < 
block_index_end - 1;
+         ++block_index) {
+      counters[apply_stride_and_offset(block_index)] = count;
+
+      constexpr int64_t words_per_block =
+          BitVectorWithCountsBase::kBitsPerBlock / 
BitVectorWithCountsBase::kBitsPerWord;
+
+      int64_t word_begin = block_index * words_per_block;
+      int64_t word_end = std::min(word_count(), word_begin + words_per_block);
+
+      count += mid_counters[apply_stride_and_offset(word_end - 1)];
+      count += ARROW_POPCOUNT64(words[apply_stride_and_offset(word_end - 1)]);
+    }
+    counters[apply_stride_and_offset(block_index_end - 1)] = count;
+  }
+
+  // Position of the nth bit set (input argument zero corresponds to the first
+  // bit set).
+  //
+  int64_t Select(int64_t rank) const {
+    if (rank < 0) {
+      return BeforeFirstBit();
+    }
+    if (rank >= pop_count()) {
+      return AfterLastBit();
+    }
+
+    constexpr int64_t bits_per_block = BitVectorWithCountsBase::kBitsPerBlock;
+    constexpr int64_t bits_per_word = BitVectorWithCountsBase::kBitsPerWord;
+    constexpr int64_t words_per_block = bits_per_block / bits_per_word;
+    const int64_t* top_counters = container_->top_counts_.data();
+    const uint16_t* mid_counters = container_->mid_counts_.data();
+    const uint64_t* words = container_->bits_.data();
+
+    // Binary search in top level counters.
+    //
+    // Equivalent of std::upper_bound() - 1, but not using iterators.
+    //
+    int64_t begin = 0;
+    int64_t end = block_count();
+    while (end - begin > 1) {
+      int64_t middle = (begin + end) / 2;
+      int reject_left_half =
+          (rank >= top_counters[apply_stride_and_offset(middle)]) ? 1 : 0;
+      begin = begin + (middle - begin) * reject_left_half;
+      end = middle + (end - middle) * reject_left_half;
+    }

Review Comment:
   so unlike `std::upper_bound` this is a branchless binary search which will 
be good when the window is small and bad when the window is large. Do we know 
it always pays off?



##########
cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.h:
##########
@@ -0,0 +1,513 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include "arrow/compute/exec/util.h"
+#include "arrow/util/bit_util.h"
+
+namespace arrow {
+namespace compute {
+
+// Storage for a bit vector to be used with BitVectorNavigator and its 
variants.
+//
+// Supports weaved bit vectors.
+//
+class BitVectorWithCountsBase {
+  template <bool T>
+  friend class BitVectorNavigatorImp;
+
+ public:
+  BitVectorWithCountsBase() : num_children_(0), num_bits_per_child_(0) {}
+
+  void Resize(int64_t num_bits_per_child, int64_t num_children = 1) {
+    ARROW_DCHECK(num_children > 0 && num_bits_per_child > 0);
+    num_children_ = num_children;
+    num_bits_per_child_ = num_bits_per_child;
+    int64_t num_words =
+        bit_util::CeilDiv(num_bits_per_child, kBitsPerWord) * num_children;
+    bits_.resize(num_words);
+    mid_counts_.resize(num_words);
+    int64_t num_blocks =
+        bit_util::CeilDiv(num_bits_per_child, kBitsPerBlock) * num_children;
+    top_counts_.resize(num_blocks);
+  }
+
+  void ClearBits() { memset(bits_.data(), 0, bits_.size() * sizeof(bits_[0])); 
}
+
+  // Word is 64 adjacent bits
+  //
+  static constexpr int64_t kBitsPerWord = 64;
+  // Block is 65536 adjacent bits
+  // (that means that 16-bit counters can be used within the block)
+  //
+#ifndef NDEBUG
+  static constexpr int kLogBitsPerBlock = 7;
+#else
+  static constexpr int kLogBitsPerBlock = 16;
+#endif
+  static constexpr int64_t kBitsPerBlock = 1LL << kLogBitsPerBlock;
+
+ protected:
+  int64_t num_children_;
+  int64_t num_bits_per_child_;
+  // TODO: Replace vectors with ResizableBuffers. Return error status from
+  // Resize on out-of-memory.
+  //
+  std::vector<uint64_t> bits_;
+  std::vector<int64_t> top_counts_;
+  std::vector<uint16_t> mid_counts_;
+};
+
+template <bool SINGLE_CHILD_BIT_VECTOR>
+class BitVectorNavigatorImp {
+ public:
+  BitVectorNavigatorImp() : container_(NULLPTR) {}
+
+  BitVectorNavigatorImp(BitVectorWithCountsBase* container, int64_t 
child_index)
+      : container_(container), child_index_(child_index) {}
+
+  int64_t block_count() const {
+    return bit_util::CeilDiv(container_->num_bits_per_child_,
+                             BitVectorWithCountsBase::kBitsPerBlock);
+  }
+
+  int64_t word_count() const {
+    return bit_util::CeilDiv(container_->num_bits_per_child_,
+                             BitVectorWithCountsBase::kBitsPerWord);
+  }
+
+  int64_t bit_count() const { return container_->num_bits_per_child_; }
+
+  int64_t pop_count() const {
+    int64_t last_block = block_count() - 1;
+    int64_t last_word = word_count() - 1;
+    int num_bits_last_word =
+        static_cast<int>((bit_count() - 1) % 
BitVectorWithCountsBase::kBitsPerWord + 1);
+    uint64_t last_word_mask = ~0ULL >> (64 - num_bits_last_word);
+    return container_->top_counts_[apply_stride_and_offset(last_block)] +
+           container_->mid_counts_[apply_stride_and_offset(last_word)] +
+           
ARROW_POPCOUNT64(container_->bits_[apply_stride_and_offset(last_word)] &
+                            last_word_mask);
+  }
+
+  const uint8_t* GetBytes() const {
+    return reinterpret_cast<const uint8_t*>(container_->bits_.data());
+  }
+
+  void BuildMidCounts(int64_t block_index) {
+    ARROW_DCHECK(block_index >= 0 &&
+                 block_index < 
static_cast<int64_t>(container_->mid_counts_.size()));
+    constexpr int64_t words_per_block =
+        BitVectorWithCountsBase::kBitsPerBlock / 
BitVectorWithCountsBase::kBitsPerWord;
+    int64_t word_begin = block_index * words_per_block;
+    int64_t word_end = std::min(word_count(), word_begin + words_per_block);
+
+    const uint64_t* words = container_->bits_.data();
+    uint16_t* counters = container_->mid_counts_.data();
+
+    uint16_t count = 0;
+    for (int64_t word_index = word_begin; word_index < word_end; ++word_index) 
{
+      counters[apply_stride_and_offset(word_index)] = count;
+      count += static_cast<uint16_t>(
+          ARROW_POPCOUNT64(words[apply_stride_and_offset(word_index)]));
+    }
+  }
+
+  void BuildTopCounts(int64_t block_index_begin, int64_t block_index_end,
+                      int64_t initial_count = 0) {
+    const uint64_t* words = container_->bits_.data();
+    int64_t* counters = container_->top_counts_.data();
+    const uint16_t* mid_counters = container_->mid_counts_.data();
+
+    int64_t count = initial_count;
+
+    for (int64_t block_index = block_index_begin; block_index < 
block_index_end - 1;
+         ++block_index) {
+      counters[apply_stride_and_offset(block_index)] = count;
+
+      constexpr int64_t words_per_block =
+          BitVectorWithCountsBase::kBitsPerBlock / 
BitVectorWithCountsBase::kBitsPerWord;
+
+      int64_t word_begin = block_index * words_per_block;
+      int64_t word_end = std::min(word_count(), word_begin + words_per_block);
+
+      count += mid_counters[apply_stride_and_offset(word_end - 1)];
+      count += ARROW_POPCOUNT64(words[apply_stride_and_offset(word_end - 1)]);
+    }
+    counters[apply_stride_and_offset(block_index_end - 1)] = count;
+  }
+
+  // Position of the nth bit set (input argument zero corresponds to the first
+  // bit set).
+  //
+  int64_t Select(int64_t rank) const {
+    if (rank < 0) {
+      return BeforeFirstBit();
+    }
+    if (rank >= pop_count()) {
+      return AfterLastBit();
+    }
+
+    constexpr int64_t bits_per_block = BitVectorWithCountsBase::kBitsPerBlock;
+    constexpr int64_t bits_per_word = BitVectorWithCountsBase::kBitsPerWord;
+    constexpr int64_t words_per_block = bits_per_block / bits_per_word;
+    const int64_t* top_counters = container_->top_counts_.data();
+    const uint16_t* mid_counters = container_->mid_counts_.data();
+    const uint64_t* words = container_->bits_.data();
+
+    // Binary search in top level counters.
+    //
+    // Equivalent of std::upper_bound() - 1, but not using iterators.
+    //
+    int64_t begin = 0;
+    int64_t end = block_count();
+    while (end - begin > 1) {
+      int64_t middle = (begin + end) / 2;
+      int reject_left_half =
+          (rank >= top_counters[apply_stride_and_offset(middle)]) ? 1 : 0;
+      begin = begin + (middle - begin) * reject_left_half;
+      end = middle + (end - middle) * reject_left_half;
+    }

Review Comment:
   Could this be written with only one cmov/mul? Something like:
   
   ```
   int i = 0;
   for (int n = block_count(); n > 1; ) {
     int mid = n / 2;
     i += (rank >= top_counts[apply_stride_and_offset(i + mid)]) ? mid : 0;
     n -= mid;
   }
   ```
   
   caveat emptor: untested so could be buggy



##########
cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.h:
##########
@@ -0,0 +1,513 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include "arrow/compute/exec/util.h"
+#include "arrow/util/bit_util.h"
+
+namespace arrow {
+namespace compute {
+
+// Storage for a bit vector to be used with BitVectorNavigator and its 
variants.
+//
+// Supports weaved bit vectors.
+//
+class BitVectorWithCountsBase {
+  template <bool T>
+  friend class BitVectorNavigatorImp;
+
+ public:
+  BitVectorWithCountsBase() : num_children_(0), num_bits_per_child_(0) {}
+
+  void Resize(int64_t num_bits_per_child, int64_t num_children = 1) {
+    ARROW_DCHECK(num_children > 0 && num_bits_per_child > 0);
+    num_children_ = num_children;
+    num_bits_per_child_ = num_bits_per_child;
+    int64_t num_words =
+        bit_util::CeilDiv(num_bits_per_child, kBitsPerWord) * num_children;
+    bits_.resize(num_words);
+    mid_counts_.resize(num_words);
+    int64_t num_blocks =
+        bit_util::CeilDiv(num_bits_per_child, kBitsPerBlock) * num_children;
+    top_counts_.resize(num_blocks);
+  }
+
+  void ClearBits() { memset(bits_.data(), 0, bits_.size() * sizeof(bits_[0])); 
}
+
+  // Word is 64 adjacent bits
+  //
+  static constexpr int64_t kBitsPerWord = 64;
+  // Block is 65536 adjacent bits
+  // (that means that 16-bit counters can be used within the block)
+  //
+#ifndef NDEBUG
+  static constexpr int kLogBitsPerBlock = 7;
+#else
+  static constexpr int kLogBitsPerBlock = 16;
+#endif
+  static constexpr int64_t kBitsPerBlock = 1LL << kLogBitsPerBlock;
+
+ protected:
+  int64_t num_children_;
+  int64_t num_bits_per_child_;
+  // TODO: Replace vectors with ResizableBuffers. Return error status from
+  // Resize on out-of-memory.
+  //
+  std::vector<uint64_t> bits_;
+  std::vector<int64_t> top_counts_;
+  std::vector<uint16_t> mid_counts_;
+};
+
+template <bool SINGLE_CHILD_BIT_VECTOR>
+class BitVectorNavigatorImp {
+ public:
+  BitVectorNavigatorImp() : container_(NULLPTR) {}
+
+  BitVectorNavigatorImp(BitVectorWithCountsBase* container, int64_t 
child_index)
+      : container_(container), child_index_(child_index) {}
+
+  int64_t block_count() const {
+    return bit_util::CeilDiv(container_->num_bits_per_child_,
+                             BitVectorWithCountsBase::kBitsPerBlock);
+  }
+
+  int64_t word_count() const {
+    return bit_util::CeilDiv(container_->num_bits_per_child_,
+                             BitVectorWithCountsBase::kBitsPerWord);
+  }
+
+  int64_t bit_count() const { return container_->num_bits_per_child_; }
+
+  int64_t pop_count() const {
+    int64_t last_block = block_count() - 1;
+    int64_t last_word = word_count() - 1;
+    int num_bits_last_word =
+        static_cast<int>((bit_count() - 1) % 
BitVectorWithCountsBase::kBitsPerWord + 1);
+    uint64_t last_word_mask = ~0ULL >> (64 - num_bits_last_word);
+    return container_->top_counts_[apply_stride_and_offset(last_block)] +
+           container_->mid_counts_[apply_stride_and_offset(last_word)] +
+           
ARROW_POPCOUNT64(container_->bits_[apply_stride_and_offset(last_word)] &
+                            last_word_mask);
+  }
+
+  const uint8_t* GetBytes() const {
+    return reinterpret_cast<const uint8_t*>(container_->bits_.data());
+  }
+
+  void BuildMidCounts(int64_t block_index) {
+    ARROW_DCHECK(block_index >= 0 &&
+                 block_index < 
static_cast<int64_t>(container_->mid_counts_.size()));
+    constexpr int64_t words_per_block =
+        BitVectorWithCountsBase::kBitsPerBlock / 
BitVectorWithCountsBase::kBitsPerWord;
+    int64_t word_begin = block_index * words_per_block;
+    int64_t word_end = std::min(word_count(), word_begin + words_per_block);
+
+    const uint64_t* words = container_->bits_.data();
+    uint16_t* counters = container_->mid_counts_.data();
+
+    uint16_t count = 0;
+    for (int64_t word_index = word_begin; word_index < word_end; ++word_index) 
{
+      counters[apply_stride_and_offset(word_index)] = count;
+      count += static_cast<uint16_t>(
+          ARROW_POPCOUNT64(words[apply_stride_and_offset(word_index)]));
+    }
+  }
+
+  void BuildTopCounts(int64_t block_index_begin, int64_t block_index_end,
+                      int64_t initial_count = 0) {
+    const uint64_t* words = container_->bits_.data();
+    int64_t* counters = container_->top_counts_.data();
+    const uint16_t* mid_counters = container_->mid_counts_.data();
+
+    int64_t count = initial_count;
+
+    for (int64_t block_index = block_index_begin; block_index < 
block_index_end - 1;
+         ++block_index) {
+      counters[apply_stride_and_offset(block_index)] = count;
+
+      constexpr int64_t words_per_block =
+          BitVectorWithCountsBase::kBitsPerBlock / 
BitVectorWithCountsBase::kBitsPerWord;
+
+      int64_t word_begin = block_index * words_per_block;
+      int64_t word_end = std::min(word_count(), word_begin + words_per_block);
+
+      count += mid_counters[apply_stride_and_offset(word_end - 1)];
+      count += ARROW_POPCOUNT64(words[apply_stride_and_offset(word_end - 1)]);
+    }
+    counters[apply_stride_and_offset(block_index_end - 1)] = count;
+  }
+
+  // Position of the nth bit set (input argument zero corresponds to the first
+  // bit set).
+  //
+  int64_t Select(int64_t rank) const {
+    if (rank < 0) {
+      return BeforeFirstBit();
+    }
+    if (rank >= pop_count()) {
+      return AfterLastBit();
+    }
+
+    constexpr int64_t bits_per_block = BitVectorWithCountsBase::kBitsPerBlock;
+    constexpr int64_t bits_per_word = BitVectorWithCountsBase::kBitsPerWord;
+    constexpr int64_t words_per_block = bits_per_block / bits_per_word;
+    const int64_t* top_counters = container_->top_counts_.data();
+    const uint16_t* mid_counters = container_->mid_counts_.data();
+    const uint64_t* words = container_->bits_.data();
+
+    // Binary search in top level counters.
+    //
+    // Equivalent of std::upper_bound() - 1, but not using iterators.
+    //
+    int64_t begin = 0;
+    int64_t end = block_count();
+    while (end - begin > 1) {
+      int64_t middle = (begin + end) / 2;
+      int reject_left_half =
+          (rank >= top_counters[apply_stride_and_offset(middle)]) ? 1 : 0;
+      begin = begin + (middle - begin) * reject_left_half;
+      end = middle + (end - middle) * reject_left_half;
+    }
+
+    int64_t block_index = begin;
+    rank -= top_counters[apply_stride_and_offset(begin)];
+
+    // Continue with binary search in intermediate level counters of the
+    // selected block.
+    //
+    begin = block_index * words_per_block;
+    end = std::min(word_count(), begin + words_per_block);
+    while (end - begin > 1) {
+      int64_t middle = (begin + end) / 2;
+      int reject_left_half =
+          (rank >= mid_counters[apply_stride_and_offset(middle)]) ? 1 : 0;
+      begin = begin + (middle - begin) * reject_left_half;
+      end = middle + (end - middle) * reject_left_half;
+    }
+
+    int64_t word_index = begin;
+    rank -= mid_counters[apply_stride_and_offset(begin)];
+
+    // Continue with binary search in the selected word.
+    //
+    uint64_t word = words[apply_stride_and_offset(word_index)];
+    int pop_count_prefix = 0;
+    int bit_count_prefix = 0;
+    const uint64_t masks[6] = {0xFFFFFFFFULL, 0xFFFFULL, 0xFFULL, 0xFULL, 
0x3ULL, 0x1ULL};
+    int bit_count_left_half = 32;
+    for (int i = 0; i < 6; ++i) {
+      int pop_count_left_half =
+          static_cast<int>(ARROW_POPCOUNT64((word >> bit_count_prefix) & 
masks[i]));
+      int reject_left_half = (rank >= pop_count_prefix + pop_count_left_half) 
? 1 : 0;
+      pop_count_prefix += reject_left_half * pop_count_left_half;
+      bit_count_prefix += reject_left_half * bit_count_left_half;
+      bit_count_left_half /= 2;
+    }

Review Comment:
   This can be done faster with tzcnt and pdep instructions wherever available 
except zen2 (pdep is slow there). Otherwise the binary search could look like 
above for a tighter loop.
   



##########
cpp/src/arrow/compute/exec/window_functions/bit_vector_navigator.h:
##########
@@ -0,0 +1,513 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include "arrow/compute/exec/util.h"
+#include "arrow/util/bit_util.h"
+
+namespace arrow {
+namespace compute {
+
+// Storage for a bit vector to be used with BitVectorNavigator and its 
variants.
+//
+// Supports weaved bit vectors.
+//
+class BitVectorWithCountsBase {
+  template <bool T>
+  friend class BitVectorNavigatorImp;
+
+ public:
+  BitVectorWithCountsBase() : num_children_(0), num_bits_per_child_(0) {}
+
+  void Resize(int64_t num_bits_per_child, int64_t num_children = 1) {
+    ARROW_DCHECK(num_children > 0 && num_bits_per_child > 0);
+    num_children_ = num_children;
+    num_bits_per_child_ = num_bits_per_child;
+    int64_t num_words =
+        bit_util::CeilDiv(num_bits_per_child, kBitsPerWord) * num_children;
+    bits_.resize(num_words);
+    mid_counts_.resize(num_words);
+    int64_t num_blocks =
+        bit_util::CeilDiv(num_bits_per_child, kBitsPerBlock) * num_children;
+    top_counts_.resize(num_blocks);
+  }
+
+  void ClearBits() { memset(bits_.data(), 0, bits_.size() * sizeof(bits_[0])); 
}
+
+  // Word is 64 adjacent bits
+  //
+  static constexpr int64_t kBitsPerWord = 64;
+  // Block is 65536 adjacent bits
+  // (that means that 16-bit counters can be used within the block)
+  //
+#ifndef NDEBUG
+  static constexpr int kLogBitsPerBlock = 7;
+#else
+  static constexpr int kLogBitsPerBlock = 16;
+#endif
+  static constexpr int64_t kBitsPerBlock = 1LL << kLogBitsPerBlock;
+
+ protected:
+  int64_t num_children_;
+  int64_t num_bits_per_child_;
+  // TODO: Replace vectors with ResizableBuffers. Return error status from
+  // Resize on out-of-memory.
+  //
+  std::vector<uint64_t> bits_;
+  std::vector<int64_t> top_counts_;
+  std::vector<uint16_t> mid_counts_;
+};
+
+template <bool SINGLE_CHILD_BIT_VECTOR>
+class BitVectorNavigatorImp {
+ public:
+  BitVectorNavigatorImp() : container_(NULLPTR) {}
+
+  BitVectorNavigatorImp(BitVectorWithCountsBase* container, int64_t 
child_index)
+      : container_(container), child_index_(child_index) {}
+
+  int64_t block_count() const {
+    return bit_util::CeilDiv(container_->num_bits_per_child_,
+                             BitVectorWithCountsBase::kBitsPerBlock);
+  }
+
+  int64_t word_count() const {
+    return bit_util::CeilDiv(container_->num_bits_per_child_,
+                             BitVectorWithCountsBase::kBitsPerWord);
+  }
+
+  int64_t bit_count() const { return container_->num_bits_per_child_; }
+
+  int64_t pop_count() const {
+    int64_t last_block = block_count() - 1;
+    int64_t last_word = word_count() - 1;
+    int num_bits_last_word =
+        static_cast<int>((bit_count() - 1) % 
BitVectorWithCountsBase::kBitsPerWord + 1);
+    uint64_t last_word_mask = ~0ULL >> (64 - num_bits_last_word);
+    return container_->top_counts_[apply_stride_and_offset(last_block)] +
+           container_->mid_counts_[apply_stride_and_offset(last_word)] +
+           
ARROW_POPCOUNT64(container_->bits_[apply_stride_and_offset(last_word)] &
+                            last_word_mask);
+  }
+
+  const uint8_t* GetBytes() const {
+    return reinterpret_cast<const uint8_t*>(container_->bits_.data());
+  }
+
+  void BuildMidCounts(int64_t block_index) {
+    ARROW_DCHECK(block_index >= 0 &&
+                 block_index < 
static_cast<int64_t>(container_->mid_counts_.size()));
+    constexpr int64_t words_per_block =
+        BitVectorWithCountsBase::kBitsPerBlock / 
BitVectorWithCountsBase::kBitsPerWord;
+    int64_t word_begin = block_index * words_per_block;
+    int64_t word_end = std::min(word_count(), word_begin + words_per_block);
+
+    const uint64_t* words = container_->bits_.data();
+    uint16_t* counters = container_->mid_counts_.data();
+
+    uint16_t count = 0;
+    for (int64_t word_index = word_begin; word_index < word_end; ++word_index) 
{
+      counters[apply_stride_and_offset(word_index)] = count;
+      count += static_cast<uint16_t>(
+          ARROW_POPCOUNT64(words[apply_stride_and_offset(word_index)]));
+    }
+  }
+
+  void BuildTopCounts(int64_t block_index_begin, int64_t block_index_end,
+                      int64_t initial_count = 0) {
+    const uint64_t* words = container_->bits_.data();
+    int64_t* counters = container_->top_counts_.data();
+    const uint16_t* mid_counters = container_->mid_counts_.data();
+
+    int64_t count = initial_count;
+
+    for (int64_t block_index = block_index_begin; block_index < 
block_index_end - 1;
+         ++block_index) {
+      counters[apply_stride_and_offset(block_index)] = count;
+
+      constexpr int64_t words_per_block =
+          BitVectorWithCountsBase::kBitsPerBlock / 
BitVectorWithCountsBase::kBitsPerWord;
+
+      int64_t word_begin = block_index * words_per_block;
+      int64_t word_end = std::min(word_count(), word_begin + words_per_block);
+
+      count += mid_counters[apply_stride_and_offset(word_end - 1)];
+      count += ARROW_POPCOUNT64(words[apply_stride_and_offset(word_end - 1)]);
+    }
+    counters[apply_stride_and_offset(block_index_end - 1)] = count;
+  }
+
+  // Position of the nth bit set (input argument zero corresponds to the first
+  // bit set).
+  //
+  int64_t Select(int64_t rank) const {
+    if (rank < 0) {
+      return BeforeFirstBit();
+    }
+    if (rank >= pop_count()) {
+      return AfterLastBit();
+    }
+
+    constexpr int64_t bits_per_block = BitVectorWithCountsBase::kBitsPerBlock;
+    constexpr int64_t bits_per_word = BitVectorWithCountsBase::kBitsPerWord;
+    constexpr int64_t words_per_block = bits_per_block / bits_per_word;
+    const int64_t* top_counters = container_->top_counts_.data();
+    const uint16_t* mid_counters = container_->mid_counts_.data();
+    const uint64_t* words = container_->bits_.data();
+
+    // Binary search in top level counters.
+    //
+    // Equivalent of std::upper_bound() - 1, but not using iterators.
+    //
+    int64_t begin = 0;
+    int64_t end = block_count();
+    while (end - begin > 1) {
+      int64_t middle = (begin + end) / 2;
+      int reject_left_half =
+          (rank >= top_counters[apply_stride_and_offset(middle)]) ? 1 : 0;
+      begin = begin + (middle - begin) * reject_left_half;
+      end = middle + (end - middle) * reject_left_half;

Review Comment:
   why multiply when you can use conditional moves? cmov is 1 cycle and 
multiply is 3+.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to