pitrou commented on code in PR #50217:
URL: https://github.com/apache/arrow/pull/50217#discussion_r3436163567


##########
cpp/src/arrow/util/rle_bitmap_internal.h:
##########
@@ -0,0 +1,440 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <cstring>
+
+#include "arrow/util/bit_util.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/rle_encoding_internal.h"
+
+namespace arrow::util {
+
+/// A lightweight view over a bitmap.
+template <typename B = uint8_t>
+class BitmapSpan {
+ public:
+  using size_type = rle_size_t;
+  using byte_type = B;
+
+  explicit constexpr BitmapSpan(byte_type* data, size_type bit_start = 0) 
noexcept
+      : data_{data}, bit_start_{bit_start} {
+    Normalize();
+  }
+
+  /// Pointer to the byte where the first value is stored.
+  constexpr byte_type* data() const noexcept { return data_; }
+
+  /// Bit offset of the first value in the first byte.
+  constexpr size_type bit_start() const noexcept { return bit_start_; }
+
+  /// Return a new span starting at the given position.
+  constexpr BitmapSpan NewStartingAt(size_type bit_start) const noexcept {
+    auto out = *this;
+    out.bit_start_ += bit_start;
+    out.Normalize();
+    return out;
+  }
+
+ private:
+  byte_type* data_;
+  size_type bit_start_ = 0;
+
+  /// Ensure `bit_start` always ends up in [0, 8[, even when it starts 
negative.
+  constexpr void Normalize() {
+    // On two's-complement targets an arithmetic right shift floors, and the 
AND mask
+    // yields the non-negative remainder (e.g. -1 -> byte -1, offset 7), 
unlike `/` and
+    // `%` which truncate toward zero.
+    data_ += bit_start_ >> 3;
+    bit_start_ &= 0b0111;
+  }
+};
+
+using BitmapSpanMut = BitmapSpan<uint8_t>;
+using BitmapSpanConst = BitmapSpan<const uint8_t>;
+
+namespace internal_rle {
+template <typename CRTP>
+class RunToBitmapDecoderMixin {
+ public:
+  /// Advance by as many values as provided or until exhaustion of the decoder.
+  /// Return the number of values skipped.
+  [[nodiscard]] rle_size_t Advance(rle_size_t batch_size) {
+    const auto steps = std::min(batch_size, derived()->remaining());
+    derived()->AdvanceUnsafe(steps);
+    ARROW_DCHECK_GE(derived()->remaining(), 0);
+    return steps;
+  }
+
+  /// Get the next value and return false if there are no more.
+  [[nodiscard]] constexpr bool Get(BitmapSpanMut out) {
+    return derived()->GetBatch(out, 1) == 1;
+  }
+
+  /// Get a batch of values return the number of decoded elements.
+  ///
+  /// May write fewer elements to the output than requested if there are not
+  /// enough values left.
+  [[nodiscard]] rle_size_t GetBatch(BitmapSpanMut out, rle_size_t batch_size) {
+    const auto out_bit_offset = out.bit_start();
+    ARROW_DCHECK_GE(out_bit_offset, 0);
+    ARROW_DCHECK_LT(out_bit_offset, 8);
+
+    if (ARROW_PREDICT_FALSE(derived()->remaining() == 0 || batch_size == 0)) {
+      return 0;
+    }
+
+    // HEADER: Writing inside the first byte if caller gives a non-aligned 
input
+    if (ARROW_PREDICT_FALSE(out_bit_offset != 0)) {
+      const auto n_vals = derived()->GetBatchFirstByte(out, batch_size);
+      // If we exhausted the values in this decoder, or we filled what was 
required,
+      // then the following recursive call will return 0.
+      // If in the opposite case, then we will continue on a byte-aligned 
boundary.
+      return n_vals + GetBatch(out.NewStartingAt(n_vals), batch_size - n_vals);
+    }
+
+    // Writing full bytes
+    const auto n_vals = derived()->GetBatchFast(out, batch_size);

Review Comment:
   Perhaps rename `GetBatchFast` to `GetBatchFullBytes`?



##########
cpp/src/arrow/util/rle_bitmap_internal.h:
##########
@@ -0,0 +1,440 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <cstring>
+
+#include "arrow/util/bit_util.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/rle_encoding_internal.h"
+
+namespace arrow::util {
+
+/// A lightweight view over a bitmap.
+template <typename B = uint8_t>
+class BitmapSpan {
+ public:
+  using size_type = rle_size_t;
+  using byte_type = B;
+
+  explicit constexpr BitmapSpan(byte_type* data, size_type bit_start = 0) 
noexcept
+      : data_{data}, bit_start_{bit_start} {
+    Normalize();
+  }
+
+  /// Pointer to the byte where the first value is stored.
+  constexpr byte_type* data() const noexcept { return data_; }
+
+  /// Bit offset of the first value in the first byte.
+  constexpr size_type bit_start() const noexcept { return bit_start_; }
+
+  /// Return a new span starting at the given position.
+  constexpr BitmapSpan NewStartingAt(size_type bit_start) const noexcept {
+    auto out = *this;
+    out.bit_start_ += bit_start;
+    out.Normalize();
+    return out;
+  }
+
+ private:
+  byte_type* data_;
+  size_type bit_start_ = 0;
+
+  /// Ensure `bit_start` always ends up in [0, 8[, even when it starts 
negative.
+  constexpr void Normalize() {
+    // On two's-complement targets an arithmetic right shift floors, and the 
AND mask
+    // yields the non-negative remainder (e.g. -1 -> byte -1, offset 7), 
unlike `/` and
+    // `%` which truncate toward zero.
+    data_ += bit_start_ >> 3;
+    bit_start_ &= 0b0111;
+  }
+};
+
+using BitmapSpanMut = BitmapSpan<uint8_t>;
+using BitmapSpanConst = BitmapSpan<const uint8_t>;
+
+namespace internal_rle {
+template <typename CRTP>
+class RunToBitmapDecoderMixin {
+ public:
+  /// Advance by as many values as provided or until exhaustion of the decoder.
+  /// Return the number of values skipped.
+  [[nodiscard]] rle_size_t Advance(rle_size_t batch_size) {
+    const auto steps = std::min(batch_size, derived()->remaining());
+    derived()->AdvanceUnsafe(steps);
+    ARROW_DCHECK_GE(derived()->remaining(), 0);
+    return steps;
+  }
+
+  /// Get the next value and return false if there are no more.
+  [[nodiscard]] constexpr bool Get(BitmapSpanMut out) {
+    return derived()->GetBatch(out, 1) == 1;
+  }
+
+  /// Get a batch of values return the number of decoded elements.
+  ///
+  /// May write fewer elements to the output than requested if there are not
+  /// enough values left.
+  [[nodiscard]] rle_size_t GetBatch(BitmapSpanMut out, rle_size_t batch_size) {
+    const auto out_bit_offset = out.bit_start();
+    ARROW_DCHECK_GE(out_bit_offset, 0);
+    ARROW_DCHECK_LT(out_bit_offset, 8);
+
+    if (ARROW_PREDICT_FALSE(derived()->remaining() == 0 || batch_size == 0)) {
+      return 0;
+    }
+
+    // HEADER: Writing inside the first byte if caller gives a non-aligned 
input
+    if (ARROW_PREDICT_FALSE(out_bit_offset != 0)) {
+      const auto n_vals = derived()->GetBatchFirstByte(out, batch_size);
+      // If we exhausted the values in this decoder, or we filled what was 
required,
+      // then the following recursive call will return 0.
+      // If in the opposite case, then we will continue on a byte-aligned 
boundary.
+      return n_vals + GetBatch(out.NewStartingAt(n_vals), batch_size - n_vals);

Review Comment:
   The recursive call won't be terrific for performance, right? I don't expect 
the compiler to understand the actual semantics of this function such as to 
inline the recursive call.



##########
cpp/src/arrow/util/rle_encoding_internal.h:
##########
@@ -780,6 +791,10 @@ auto RleBitPackedDecoder<T>::GetBatch(value_type* out,
                                       rle_size_t batch_size) -> rle_size_t {
   using ControlFlow = RleBitPackedParser::ControlFlow;
 
+  if (ARROW_PREDICT_FALSE(batch_size == 0 || exhausted())) {

Review Comment:
   Is there a particular reason this became necessary?



##########
cpp/src/arrow/util/rle_bitmap_test.cc:
##########
@@ -0,0 +1,508 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <array>
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/rle_bitmap_internal.h"
+#include "arrow/util/rle_encoding_internal.h"
+
+namespace arrow::util {
+
+namespace {
+
+/// Read the first `count` bits of `bytes` (LSB first) into a vector of 
booleans.
+std::vector<bool> BitsFromBytes(const std::vector<uint8_t>& bytes, rle_size_t 
count) {
+  std::vector<bool> bits(count);
+  for (rle_size_t i = 0; i < count; ++i) {
+    bits[i] = bit_util::GetBit(bytes.data(), i);
+  }
+  return bits;
+}
+
+/// Check the decoded output in `out` against `expected`.
+/// Bits `out[out_offset..out_offset + count]` must equal `expected[skip..skip 
+ count]`.
+/// The `out_offset` bits before them must still be zero.
+void CheckDecodedBits(const std::vector<uint8_t>& out, const 
std::vector<bool>& expected,
+                      rle_size_t count, rle_size_t out_offset = 0, rle_size_t 
skip = 0) {
+  ARROW_SCOPED_TRACE("out_offset = ", out_offset, ", skip = ", skip);
+  for (rle_size_t i = 0; i < out_offset; ++i) {
+    EXPECT_FALSE(bit_util::GetBit(out.data(), i)) << "clobbered bit " << i;

Review Comment:
   This is a rudimentary way of testing that existing bits are not clobbered. 
An implementation that zeroes any existing bits would pass this test 
successfully.



##########
cpp/src/arrow/util/bit_util.h:
##########
@@ -176,6 +179,54 @@ static constexpr bool GetBitFromByte(uint8_t byte, uint8_t 
i) {
   return byte & kBitmask[i];
 }
 
+/// Read 32 bits starting at bit `offset` into a uint32.
+///
+/// The return value in in-memory byte layout matches the source bit-stream
+/// identically on little- and big-endian platforms.
+///
+/// The caller must guarantee that many bytes are readable.
+static inline uint32_t Get32Bits(const uint8_t* bits, uint64_t offset) {
+  uint64_t buffer = {};
+  std::memcpy(&buffer, bits + (offset / 8), BytesForBits(offset % 8 + 32));
+  // Interpret the loaded bytes as little-endian, drop the low `offset % 8` 
bits
+  // to align LSB-first, then store back in little-endian byte order so the
+  // result's memory representation matches the bit-stream.
+  buffer = FromLittleEndian(buffer);
+  return ToLittleEndian(static_cast<uint32_t>(buffer >> (offset % 8)));
+}
+
+template <typename Uint>
+struct CopyBitsParams {
+  Uint src = {};
+  Uint dst = {};
+  Uint start = {};
+  Uint end = {};
+};
+
+/// Copy a contiguous span of bits from src into dst.
+///
+/// Copy bits [start, end[ from src into the position [start, end[ in dst
+/// and return the result (inputs are unmodified).
+/// Setting ``kAllowFullCopy`` to false is an optimization when the caller can
+/// guarantee that the range of bits to copy does not cover the whole range.
+template <typename Uint, bool kAllowFullCopy = true>
+[[nodiscard]] constexpr Uint CopyBits(const CopyBitsParams<Uint>& params) {

Review Comment:
   It's not obvious from name or description that this copies bits inside an 
integer, how about giving this function a more explicit name?
   (`CopyIntegerBits` or `CopyBitsInInteger` perhaps?)



##########
cpp/src/arrow/util/rle_bitmap_internal.h:
##########
@@ -0,0 +1,440 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <cstring>
+
+#include "arrow/util/bit_util.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/rle_encoding_internal.h"
+
+namespace arrow::util {
+
+/// A lightweight view over a bitmap.
+template <typename B = uint8_t>
+class BitmapSpan {
+ public:
+  using size_type = rle_size_t;
+  using byte_type = B;
+
+  explicit constexpr BitmapSpan(byte_type* data, size_type bit_start = 0) 
noexcept
+      : data_{data}, bit_start_{bit_start} {
+    Normalize();
+  }
+
+  /// Pointer to the byte where the first value is stored.
+  constexpr byte_type* data() const noexcept { return data_; }
+
+  /// Bit offset of the first value in the first byte.
+  constexpr size_type bit_start() const noexcept { return bit_start_; }
+
+  /// Return a new span starting at the given position.
+  constexpr BitmapSpan NewStartingAt(size_type bit_start) const noexcept {
+    auto out = *this;
+    out.bit_start_ += bit_start;
+    out.Normalize();
+    return out;
+  }
+
+ private:
+  byte_type* data_;
+  size_type bit_start_ = 0;
+
+  /// Ensure `bit_start` always ends up in [0, 8[, even when it starts 
negative.
+  constexpr void Normalize() {
+    // On two's-complement targets an arithmetic right shift floors, and the 
AND mask
+    // yields the non-negative remainder (e.g. -1 -> byte -1, offset 7), 
unlike `/` and
+    // `%` which truncate toward zero.
+    data_ += bit_start_ >> 3;
+    bit_start_ &= 0b0111;
+  }
+};
+
+using BitmapSpanMut = BitmapSpan<uint8_t>;
+using BitmapSpanConst = BitmapSpan<const uint8_t>;
+
+namespace internal_rle {
+template <typename CRTP>
+class RunToBitmapDecoderMixin {
+ public:
+  /// Advance by as many values as provided or until exhaustion of the decoder.
+  /// Return the number of values skipped.
+  [[nodiscard]] rle_size_t Advance(rle_size_t batch_size) {
+    const auto steps = std::min(batch_size, derived()->remaining());
+    derived()->AdvanceUnsafe(steps);
+    ARROW_DCHECK_GE(derived()->remaining(), 0);
+    return steps;
+  }
+
+  /// Get the next value and return false if there are no more.
+  [[nodiscard]] constexpr bool Get(BitmapSpanMut out) {
+    return derived()->GetBatch(out, 1) == 1;
+  }
+
+  /// Get a batch of values return the number of decoded elements.
+  ///
+  /// May write fewer elements to the output than requested if there are not
+  /// enough values left.
+  [[nodiscard]] rle_size_t GetBatch(BitmapSpanMut out, rle_size_t batch_size) {
+    const auto out_bit_offset = out.bit_start();
+    ARROW_DCHECK_GE(out_bit_offset, 0);
+    ARROW_DCHECK_LT(out_bit_offset, 8);
+
+    if (ARROW_PREDICT_FALSE(derived()->remaining() == 0 || batch_size == 0)) {
+      return 0;
+    }
+
+    // HEADER: Writing inside the first byte if caller gives a non-aligned 
input
+    if (ARROW_PREDICT_FALSE(out_bit_offset != 0)) {
+      const auto n_vals = derived()->GetBatchFirstByte(out, batch_size);
+      // If we exhausted the values in this decoder, or we filled what was 
required,
+      // then the following recursive call will return 0.
+      // If in the opposite case, then we will continue on a byte-aligned 
boundary.
+      return n_vals + GetBatch(out.NewStartingAt(n_vals), batch_size - n_vals);
+    }
+
+    // Writing full bytes
+    const auto n_vals = derived()->GetBatchFast(out, batch_size);
+
+    // TRAILER: Writing inside the last byte if caller asked for non multiple 
of 8 values
+    const auto n_last_vals = std::min(batch_size - n_vals, 
derived()->remaining());
+    if (ARROW_PREDICT_FALSE(n_last_vals > 0)) {
+      ARROW_DCHECK_LT(n_last_vals, 8);
+      out = out.NewStartingAt(n_vals);
+      return n_vals + derived()->GetBatchFirstByte(out, n_last_vals);
+    }
+
+    return n_vals;
+  }
+
+ protected:
+  [[nodiscard]] rle_size_t GetBatchFromByte(BitmapSpanMut out, rle_size_t 
batch_size,
+                                            uint8_t src) {
+    const auto out_bit_offset = out.bit_start();
+    ARROW_DCHECK_GE(out_bit_offset, 0);
+    ARROW_DCHECK_LT(out_bit_offset, 8);
+    ARROW_DCHECK_GT(derived()->remaining(), 0);
+    ARROW_DCHECK_GE(batch_size, 0);
+
+    // Empty bits in first byte that we can fill
+    const auto empty_bits = rle_size_t{8} - out_bit_offset;
+    // Number of bits in first byte that we want to fill
+    const auto desired_bits = std::min(empty_bits, batch_size);
+    // Try to advance, and get number of bits we had remaining
+    const auto n_bits = Advance(desired_bits);
+    // Copy relevant bits from the value pattern to the output.
+    *out.data() = bit_util::CopyBits<uint8_t, /* kAllowFullCopy= */ false>({
+        .src = src,
+        .dst = *out.data(),
+        .start = static_cast<uint8_t>(out_bit_offset),
+        .end = static_cast<uint8_t>(out_bit_offset + n_bits),
+    });
+
+    return n_bits;
+  }
+
+ private:
+  CRTP* derived() { return static_cast<CRTP*>(this); }
+  CRTP const* derived() const { return static_cast<CRTP const*>(this); }
+};
+}  // namespace internal_rle
+
+class RleRunToBitmapDecoder
+    : public internal_rle::RunToBitmapDecoderMixin<RleRunToBitmapDecoder> {
+ public:
+  /// The type of run that can be decoded.
+  using RunType = RleRun;
+
+  constexpr RleRunToBitmapDecoder() noexcept = default;
+
+  explicit RleRunToBitmapDecoder(const RunType& run) noexcept { Reset(run); }
+
+  void Reset(const RunType& run) noexcept {
+    values_left_ = run.values_count();
+    if (run.value_little_endian() == 0) {
+      value_pattern_ = uint8_t{0};
+    } else {
+      value_pattern_ = uint8_t{0xFF};
+    }
+  }
+
+  /// Return the number of values that can be advanced.
+  rle_size_t remaining() const { return values_left_; }
+
+  /// Return the repeated value of this decoder.
+  constexpr bool value() const { return value_pattern_ != 0; }
+
+ private:
+  friend class internal_rle::RunToBitmapDecoderMixin<RleRunToBitmapDecoder>;
+
+  /// The byte pattern for 8 values (full ones or full zeros).
+  uint8_t value_pattern_ = {};
+  /// Number of values left to decode.
+  rle_size_t values_left_ = 0;
+
+  void AdvanceUnsafe(rle_size_t batch_size) { values_left_ -= batch_size; }
+
+  /// Get batch values to fill the first incomplete byte of the output.
+  [[nodiscard]] rle_size_t GetBatchFirstByte(BitmapSpanMut out, rle_size_t 
batch_size) {
+    return GetBatchFromByte(out, batch_size, value_pattern_);
+  }
+
+  /// Get batch in full bytes using memset.
+  [[nodiscard]] rle_size_t GetBatchFast(BitmapSpanMut out, rle_size_t 
batch_size) {
+    ARROW_DCHECK_EQ(out.bit_start(), 0);
+    const auto n_bytes = std::min(batch_size, remaining()) / 8;
+    std::memset(out.data(), value_pattern_, n_bytes);
+    const auto n_vals = 8 * n_bytes;
+    AdvanceUnsafe(n_vals);
+    return n_vals;
+  }
+};
+
+class BitPackedRunToBitmapDecoder
+    : public 
internal_rle::RunToBitmapDecoderMixin<BitPackedRunToBitmapDecoder> {
+ public:
+  /// The type of run that can be decoded.
+  using RunType = BitPackedRun;
+
+  constexpr BitPackedRunToBitmapDecoder() noexcept = default;
+
+  explicit BitPackedRunToBitmapDecoder(const RunType& run) noexcept { 
Reset(run); }
+
+  void Reset(const RunType& run) noexcept {
+    data_ = run.raw_data_ptr();
+    values_count_ = run.values_count();
+    values_read_ = 0;
+    ARROW_DCHECK(run.raw_data_max_size() < 0 ||
+                 bit_util::BytesForBits(values_count_) <= 
run.raw_data_max_size());
+  }
+
+  /// Return the number of values that can be advanced.
+  constexpr rle_size_t remaining() const { return values_count_ - 
values_read_; }
+
+  /// Get a batch of values return the number of decoded elements.
+  /// May write fewer elements to the output than requested if there are not 
enough values
+  /// left.
+  [[nodiscard]] rle_size_t GetBatch(BitmapSpanMut out, rle_size_t batch_size) {
+    // WARN: Slow case where bit offsets of input and output are not aligned.
+    // We loose the ability to do memcpy
+    if (ARROW_PREDICT_FALSE(out.bit_start() != unread_values_bit_offset())) {
+      return GetBatchMisaligned(out, batch_size);
+    }
+    return Base::GetBatch(out, batch_size);
+  }
+
+ private:
+  using Base = 
internal_rle::RunToBitmapDecoderMixin<BitPackedRunToBitmapDecoder>;
+  friend class 
internal_rle::RunToBitmapDecoderMixin<BitPackedRunToBitmapDecoder>;
+
+  /// The pointer to the beginning of the run
+  const uint8_t* data_ = nullptr;
+  /// The total number of values in the run
+  rle_size_t values_count_ = 0;
+  /// The number of values read by the decoder
+  rle_size_t values_read_ = 0;
+
+  /// Start pointer of the unread values (may contain values already read).
+  const uint8_t* unread_values_ptr() const noexcept { return data_ + 
(values_read_ / 8); }
+
+  /// Bit in @ref unread_values_ptr where the unread values start.
+  rle_size_t unread_values_bit_offset() const noexcept { return values_read_ % 
8; }
+
+  void AdvanceUnsafe(rle_size_t batch_size) { values_read_ += batch_size; }
+
+  /// Get batch values to fill the first incomplete byte of the output.
+  [[nodiscard]] rle_size_t GetBatchFirstByte(BitmapSpanMut out, rle_size_t 
batch_size) {
+    return GetBatchFromByte(out, batch_size, *unread_values_ptr());
+  }
+
+  /// Get batch in full bytes using memcpy.
+  [[nodiscard]] rle_size_t GetBatchFast(BitmapSpanMut out, rle_size_t 
batch_size) {
+    ARROW_DCHECK_EQ(out.bit_start(), 0);
+    const auto n_bytes = std::min(batch_size, remaining()) / 8;
+    std::memcpy(out.data(), unread_values_ptr(), n_bytes);
+    const auto n_vals = 8 * n_bytes;
+    AdvanceUnsafe(n_vals);
+    return n_vals;
+  }
+
+  /// Correct and slow function for reading a batch one bit at a time.
+  [[nodiscard]] rle_size_t GetBatchSlow(BitmapSpanMut out, rle_size_t 
batch_size) {
+    const rle_size_t to_read = std::min(batch_size, remaining());
+    for (rle_size_t i = 0; i < to_read; ++i) {
+      const bool bit = bit_util::GetBit(data_, values_read_ + i);
+      bit_util::SetBitTo(out.data(), out.bit_start() + i, bit);
+    }
+    AdvanceUnsafe(to_read);
+    return to_read;
+  }
+
+  [[nodiscard]] rle_size_t GetBatchMisaligned(BitmapSpanMut out, rle_size_t 
batch_size) {

Review Comment:
   How about simply reusing `CopyBitmap` from `bitmap_ops.h`? I'm not sure this 
one will be faster.



##########
cpp/src/arrow/util/rle_bitmap_internal.h:
##########
@@ -0,0 +1,440 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <cstring>
+
+#include "arrow/util/bit_util.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/rle_encoding_internal.h"
+
+namespace arrow::util {
+
+/// A lightweight view over a bitmap.
+template <typename B = uint8_t>
+class BitmapSpan {

Review Comment:
   Naming-wise, this is not really a span in the `std::span` sense since it 
doesn't have a length...



##########
cpp/src/arrow/util/rle_bitmap_test.cc:
##########
@@ -0,0 +1,508 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <array>
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/rle_bitmap_internal.h"
+#include "arrow/util/rle_encoding_internal.h"
+
+namespace arrow::util {
+
+namespace {
+
+/// Read the first `count` bits of `bytes` (LSB first) into a vector of 
booleans.
+std::vector<bool> BitsFromBytes(const std::vector<uint8_t>& bytes, rle_size_t 
count) {
+  std::vector<bool> bits(count);
+  for (rle_size_t i = 0; i < count; ++i) {
+    bits[i] = bit_util::GetBit(bytes.data(), i);
+  }
+  return bits;
+}
+
+/// Check the decoded output in `out` against `expected`.
+/// Bits `out[out_offset..out_offset + count]` must equal `expected[skip..skip 
+ count]`.
+/// The `out_offset` bits before them must still be zero.
+void CheckDecodedBits(const std::vector<uint8_t>& out, const 
std::vector<bool>& expected,
+                      rle_size_t count, rle_size_t out_offset = 0, rle_size_t 
skip = 0) {
+  ARROW_SCOPED_TRACE("out_offset = ", out_offset, ", skip = ", skip);
+  for (rle_size_t i = 0; i < out_offset; ++i) {
+    EXPECT_FALSE(bit_util::GetBit(out.data(), i)) << "clobbered bit " << i;
+  }
+  for (rle_size_t i = 0; i < count; ++i) {
+    EXPECT_EQ(bit_util::GetBit(out.data(), out_offset + i), expected[skip + i])
+        << "at bit " << i;
+  }
+}
+
+/// Skip the first `skip` values with Advance(), then decode the rest of the 
run
+/// into one output bitmap, `chunk` values at a time. Compare against 
`expected`.
+///
+/// `chunk` controls output bit alignment. When `chunk` is not a multiple of 8,
+/// later calls start at a non-zero output bit offset.
+///
+/// `skip` shifts the decoder's read offset relative to the output offset.
+/// A non-zero `skip` makes the two differ, which exercises the bit-unaligned 
read
+/// path of BitPackedRunToBitmapDecoder. With `skip == 0` they stay in sync and
+/// only the aligned path runs.
+template <typename Decoder>
+void CheckChunkedDecode(const typename Decoder::RunType& run,
+                        const std::vector<bool>& expected, rle_size_t chunk = 
1,
+                        rle_size_t skip = 0) {
+  ARROW_SCOPED_TRACE("chunk = ", chunk, ", skip = ", skip);
+  const auto n_vals = static_cast<rle_size_t>(expected.size());
+  ASSERT_LE(skip, n_vals);
+
+  Decoder decoder(run);
+  const auto advanced = decoder.Advance(skip);
+  ASSERT_EQ(advanced, skip);
+  const auto rest = n_vals - skip;
+
+  // Output buffer with one guard byte to catch out-of-bounds writes.
+  std::vector<uint8_t> out(static_cast<size_t>(bit_util::BytesForBits(rest)) + 
1, 0);
+  const uint8_t guard = 0xA5;
+  out.back() = guard;
+
+  rle_size_t read = 0;
+  while (read < rest) {
+    const auto want = std::min(chunk, rest - read);
+    const auto got =
+        decoder.GetBatch(BitmapSpanMut(out.data(), /*bit_start=*/read), want);
+    EXPECT_EQ(got, want) << "at pos " << read;
+    ASSERT_GT(got, 0) << "at pos " << read;  // break on failure
+    read += got;
+    EXPECT_EQ(decoder.remaining(), rest - read);
+  }
+
+  EXPECT_EQ(decoder.remaining(), 0);
+  EXPECT_EQ(out.back(), guard) << "decoder wrote past the end of the output";
+  CheckDecodedBits(out, expected, /*count=*/rest, /*out_offset=*/0, skip);
+}
+
+/// All the checks shared by both decoder types.
+///
+/// `expected` is the full sequence of booleans the run should decode to.
+template <typename Decoder>
+void CheckBitmapDecoder(const typename Decoder::RunType& run,
+                        const std::vector<bool>& expected) {
+  const auto n_vals = static_cast<rle_size_t>(expected.size());
+
+  // remaining() reflects the run size before any value is read.
+  {
+    Decoder decoder(run);
+    EXPECT_EQ(decoder.remaining(), n_vals);
+  }
+
+  // Empty requests are a no-op.
+  {
+    Decoder decoder(run);
+    uint8_t out = 0;
+    const auto got = decoder.GetBatch(BitmapSpanMut(&out), /*batch_size=*/0);
+    EXPECT_EQ(got, 0);
+    EXPECT_EQ(decoder.remaining(), n_vals);
+  }
+
+  // Decode the whole run in several chunks.
+  for (const rle_size_t chunk : {rle_size_t{1}, rle_size_t{3}, rle_size_t{7},
+                                 rle_size_t{8}, rle_size_t{9}, n_vals}) {
+    CheckChunkedDecode<Decoder>(run, expected, chunk);
+  }
+
+  // Decode the whole run in several chunks, after an initial Advance that 
shifts
+  // the run and output bit alignment.
+  for (const rle_size_t chunk : {rle_size_t{1}, rle_size_t{3}, rle_size_t{7},
+                                 rle_size_t{8}, rle_size_t{9}, n_vals}) {
+    for (rle_size_t skip = 1; skip < 8 && skip < n_vals; ++skip) {
+      CheckChunkedDecode<Decoder>(run, expected, chunk, skip);
+    }
+  }
+
+  // Get() one value at a time, then read past the end.
+  {
+    Decoder decoder(run);
+    std::vector<uint8_t> 
out(static_cast<size_t>(bit_util::BytesForBits(n_vals)) + 1, 0);
+    for (rle_size_t i = 0; i < n_vals; ++i) {
+      const bool ok = decoder.Get(BitmapSpanMut(out.data(), /*bit_start=*/i));
+      EXPECT_TRUE(ok);
+      EXPECT_EQ(decoder.remaining(), n_vals - i - 1);
+    }
+    // Exhausted: nothing more can be read or advanced.
+    const bool ok = decoder.Get(BitmapSpanMut(out.data()));
+    EXPECT_FALSE(ok);
+    const auto advanced = decoder.Advance(1);
+    EXPECT_EQ(advanced, 0);
+    EXPECT_EQ(decoder.remaining(), 0);
+    CheckDecodedBits(out, expected, /*count=*/n_vals);
+  }
+
+  // Advancing more than available stops at the run boundary.
+  {
+    Decoder decoder(run);
+    const auto advanced = decoder.Advance(n_vals + 100);
+    EXPECT_EQ(advanced, n_vals);
+    EXPECT_EQ(decoder.remaining(), 0);
+  }
+
+  // Reset() rewinds the decoder so the run can be decoded again.
+  {
+    Decoder decoder(run);
+    std::vector<uint8_t> 
out_1(static_cast<size_t>(bit_util::BytesForBits(n_vals)), 0);
+    const auto scratch_got = decoder.GetBatch(BitmapSpanMut(out_1.data()), 
n_vals);
+    EXPECT_EQ(scratch_got, n_vals);
+    EXPECT_EQ(decoder.remaining(), 0);
+
+    decoder.Reset(run);
+    EXPECT_EQ(decoder.remaining(), n_vals);
+    std::vector<uint8_t> 
out_2(static_cast<size_t>(bit_util::BytesForBits(n_vals)), 0);
+    const auto got = decoder.GetBatch(BitmapSpanMut(out_2.data()), n_vals);
+    EXPECT_EQ(got, n_vals);
+    CheckDecodedBits(out_2, expected, /*count=*/n_vals);
+  }
+}
+
+}  // namespace
+
+/***************************
+ *  RleRunToBitmapDecoder  *
+ ***************************/
+
+struct RleBitmapCase {
+  // The repeated boolean value of the run.
+  bool value;
+  // The number of values in the run.
+  rle_size_t count;

Review Comment:
   Why not parameterize just the count, and check both true and false values 
for each count?



##########
cpp/src/arrow/util/rle_bitmap_internal.h:
##########
@@ -0,0 +1,440 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <cstring>
+
+#include "arrow/util/bit_util.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/rle_encoding_internal.h"
+
+namespace arrow::util {
+
+/// A lightweight view over a bitmap.
+template <typename B = uint8_t>
+class BitmapSpan {
+ public:
+  using size_type = rle_size_t;
+  using byte_type = B;
+
+  explicit constexpr BitmapSpan(byte_type* data, size_type bit_start = 0) 
noexcept
+      : data_{data}, bit_start_{bit_start} {
+    Normalize();
+  }
+
+  /// Pointer to the byte where the first value is stored.
+  constexpr byte_type* data() const noexcept { return data_; }
+
+  /// Bit offset of the first value in the first byte.
+  constexpr size_type bit_start() const noexcept { return bit_start_; }
+
+  /// Return a new span starting at the given position.
+  constexpr BitmapSpan NewStartingAt(size_type bit_start) const noexcept {
+    auto out = *this;
+    out.bit_start_ += bit_start;
+    out.Normalize();
+    return out;
+  }
+
+ private:
+  byte_type* data_;
+  size_type bit_start_ = 0;
+
+  /// Ensure `bit_start` always ends up in [0, 8[, even when it starts 
negative.
+  constexpr void Normalize() {
+    // On two's-complement targets an arithmetic right shift floors, and the 
AND mask
+    // yields the non-negative remainder (e.g. -1 -> byte -1, offset 7), 
unlike `/` and
+    // `%` which truncate toward zero.
+    data_ += bit_start_ >> 3;
+    bit_start_ &= 0b0111;
+  }
+};
+
+using BitmapSpanMut = BitmapSpan<uint8_t>;
+using BitmapSpanConst = BitmapSpan<const uint8_t>;
+
+namespace internal_rle {
+template <typename CRTP>
+class RunToBitmapDecoderMixin {
+ public:
+  /// Advance by as many values as provided or until exhaustion of the decoder.
+  /// Return the number of values skipped.
+  [[nodiscard]] rle_size_t Advance(rle_size_t batch_size) {
+    const auto steps = std::min(batch_size, derived()->remaining());
+    derived()->AdvanceUnsafe(steps);
+    ARROW_DCHECK_GE(derived()->remaining(), 0);
+    return steps;
+  }
+
+  /// Get the next value and return false if there are no more.
+  [[nodiscard]] constexpr bool Get(BitmapSpanMut out) {
+    return derived()->GetBatch(out, 1) == 1;
+  }
+
+  /// Get a batch of values return the number of decoded elements.
+  ///
+  /// May write fewer elements to the output than requested if there are not
+  /// enough values left.
+  [[nodiscard]] rle_size_t GetBatch(BitmapSpanMut out, rle_size_t batch_size) {
+    const auto out_bit_offset = out.bit_start();
+    ARROW_DCHECK_GE(out_bit_offset, 0);
+    ARROW_DCHECK_LT(out_bit_offset, 8);
+
+    if (ARROW_PREDICT_FALSE(derived()->remaining() == 0 || batch_size == 0)) {
+      return 0;
+    }
+
+    // HEADER: Writing inside the first byte if caller gives a non-aligned 
input
+    if (ARROW_PREDICT_FALSE(out_bit_offset != 0)) {

Review Comment:
   Why is it `ARROW_PREDICT_FALSE`? Note that the compiler may consider the 
following code "cold" and decide it doesn't need to be optimized for speed.



##########
cpp/src/arrow/util/rle_bitmap_test.cc:
##########
@@ -0,0 +1,508 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <array>
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/rle_bitmap_internal.h"
+#include "arrow/util/rle_encoding_internal.h"
+
+namespace arrow::util {
+
+namespace {
+
+/// Read the first `count` bits of `bytes` (LSB first) into a vector of 
booleans.
+std::vector<bool> BitsFromBytes(const std::vector<uint8_t>& bytes, rle_size_t 
count) {
+  std::vector<bool> bits(count);
+  for (rle_size_t i = 0; i < count; ++i) {
+    bits[i] = bit_util::GetBit(bytes.data(), i);
+  }
+  return bits;
+}
+
+/// Check the decoded output in `out` against `expected`.
+/// Bits `out[out_offset..out_offset + count]` must equal `expected[skip..skip 
+ count]`.
+/// The `out_offset` bits before them must still be zero.
+void CheckDecodedBits(const std::vector<uint8_t>& out, const 
std::vector<bool>& expected,
+                      rle_size_t count, rle_size_t out_offset = 0, rle_size_t 
skip = 0) {
+  ARROW_SCOPED_TRACE("out_offset = ", out_offset, ", skip = ", skip);
+  for (rle_size_t i = 0; i < out_offset; ++i) {
+    EXPECT_FALSE(bit_util::GetBit(out.data(), i)) << "clobbered bit " << i;
+  }
+  for (rle_size_t i = 0; i < count; ++i) {
+    EXPECT_EQ(bit_util::GetBit(out.data(), out_offset + i), expected[skip + i])
+        << "at bit " << i;
+  }
+}
+
+/// Skip the first `skip` values with Advance(), then decode the rest of the 
run
+/// into one output bitmap, `chunk` values at a time. Compare against 
`expected`.
+///
+/// `chunk` controls output bit alignment. When `chunk` is not a multiple of 8,
+/// later calls start at a non-zero output bit offset.
+///
+/// `skip` shifts the decoder's read offset relative to the output offset.
+/// A non-zero `skip` makes the two differ, which exercises the bit-unaligned 
read
+/// path of BitPackedRunToBitmapDecoder. With `skip == 0` they stay in sync and
+/// only the aligned path runs.
+template <typename Decoder>
+void CheckChunkedDecode(const typename Decoder::RunType& run,
+                        const std::vector<bool>& expected, rle_size_t chunk = 
1,
+                        rle_size_t skip = 0) {
+  ARROW_SCOPED_TRACE("chunk = ", chunk, ", skip = ", skip);
+  const auto n_vals = static_cast<rle_size_t>(expected.size());
+  ASSERT_LE(skip, n_vals);
+
+  Decoder decoder(run);
+  const auto advanced = decoder.Advance(skip);
+  ASSERT_EQ(advanced, skip);
+  const auto rest = n_vals - skip;
+
+  // Output buffer with one guard byte to catch out-of-bounds writes.
+  std::vector<uint8_t> out(static_cast<size_t>(bit_util::BytesForBits(rest)) + 
1, 0);
+  const uint8_t guard = 0xA5;
+  out.back() = guard;
+
+  rle_size_t read = 0;
+  while (read < rest) {
+    const auto want = std::min(chunk, rest - read);
+    const auto got =
+        decoder.GetBatch(BitmapSpanMut(out.data(), /*bit_start=*/read), want);
+    EXPECT_EQ(got, want) << "at pos " << read;
+    ASSERT_GT(got, 0) << "at pos " << read;  // break on failure
+    read += got;
+    EXPECT_EQ(decoder.remaining(), rest - read);
+  }
+
+  EXPECT_EQ(decoder.remaining(), 0);
+  EXPECT_EQ(out.back(), guard) << "decoder wrote past the end of the output";
+  CheckDecodedBits(out, expected, /*count=*/rest, /*out_offset=*/0, skip);
+}
+
+/// All the checks shared by both decoder types.
+///
+/// `expected` is the full sequence of booleans the run should decode to.
+template <typename Decoder>
+void CheckBitmapDecoder(const typename Decoder::RunType& run,
+                        const std::vector<bool>& expected) {
+  const auto n_vals = static_cast<rle_size_t>(expected.size());
+
+  // remaining() reflects the run size before any value is read.
+  {
+    Decoder decoder(run);
+    EXPECT_EQ(decoder.remaining(), n_vals);
+  }
+
+  // Empty requests are a no-op.
+  {
+    Decoder decoder(run);
+    uint8_t out = 0;
+    const auto got = decoder.GetBatch(BitmapSpanMut(&out), /*batch_size=*/0);
+    EXPECT_EQ(got, 0);
+    EXPECT_EQ(decoder.remaining(), n_vals);
+  }
+
+  // Decode the whole run in several chunks.
+  for (const rle_size_t chunk : {rle_size_t{1}, rle_size_t{3}, rle_size_t{7},
+                                 rle_size_t{8}, rle_size_t{9}, n_vals}) {

Review Comment:
   Also `n_vals + 1` to make sure that passing a larger chunk size than the 
number of values is allowed?



##########
cpp/src/arrow/util/rle_bitmap_internal.h:
##########
@@ -0,0 +1,440 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <cstring>
+
+#include "arrow/util/bit_util.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/rle_encoding_internal.h"
+
+namespace arrow::util {
+
+/// A lightweight view over a bitmap.
+template <typename B = uint8_t>
+class BitmapSpan {
+ public:
+  using size_type = rle_size_t;
+  using byte_type = B;
+
+  explicit constexpr BitmapSpan(byte_type* data, size_type bit_start = 0) 
noexcept
+      : data_{data}, bit_start_{bit_start} {
+    Normalize();
+  }
+
+  /// Pointer to the byte where the first value is stored.
+  constexpr byte_type* data() const noexcept { return data_; }
+
+  /// Bit offset of the first value in the first byte.
+  constexpr size_type bit_start() const noexcept { return bit_start_; }
+
+  /// Return a new span starting at the given position.
+  constexpr BitmapSpan NewStartingAt(size_type bit_start) const noexcept {
+    auto out = *this;
+    out.bit_start_ += bit_start;
+    out.Normalize();
+    return out;
+  }
+
+ private:
+  byte_type* data_;
+  size_type bit_start_ = 0;
+
+  /// Ensure `bit_start` always ends up in [0, 8[, even when it starts 
negative.
+  constexpr void Normalize() {
+    // On two's-complement targets an arithmetic right shift floors, and the 
AND mask
+    // yields the non-negative remainder (e.g. -1 -> byte -1, offset 7), 
unlike `/` and
+    // `%` which truncate toward zero.
+    data_ += bit_start_ >> 3;
+    bit_start_ &= 0b0111;
+  }
+};
+
+using BitmapSpanMut = BitmapSpan<uint8_t>;
+using BitmapSpanConst = BitmapSpan<const uint8_t>;
+
+namespace internal_rle {
+template <typename CRTP>
+class RunToBitmapDecoderMixin {
+ public:
+  /// Advance by as many values as provided or until exhaustion of the decoder.
+  /// Return the number of values skipped.
+  [[nodiscard]] rle_size_t Advance(rle_size_t batch_size) {
+    const auto steps = std::min(batch_size, derived()->remaining());
+    derived()->AdvanceUnsafe(steps);
+    ARROW_DCHECK_GE(derived()->remaining(), 0);
+    return steps;
+  }
+
+  /// Get the next value and return false if there are no more.
+  [[nodiscard]] constexpr bool Get(BitmapSpanMut out) {
+    return derived()->GetBatch(out, 1) == 1;
+  }
+
+  /// Get a batch of values return the number of decoded elements.
+  ///
+  /// May write fewer elements to the output than requested if there are not
+  /// enough values left.
+  [[nodiscard]] rle_size_t GetBatch(BitmapSpanMut out, rle_size_t batch_size) {
+    const auto out_bit_offset = out.bit_start();
+    ARROW_DCHECK_GE(out_bit_offset, 0);
+    ARROW_DCHECK_LT(out_bit_offset, 8);
+
+    if (ARROW_PREDICT_FALSE(derived()->remaining() == 0 || batch_size == 0)) {
+      return 0;
+    }
+
+    // HEADER: Writing inside the first byte if caller gives a non-aligned 
input
+    if (ARROW_PREDICT_FALSE(out_bit_offset != 0)) {
+      const auto n_vals = derived()->GetBatchFirstByte(out, batch_size);
+      // If we exhausted the values in this decoder, or we filled what was 
required,
+      // then the following recursive call will return 0.
+      // If in the opposite case, then we will continue on a byte-aligned 
boundary.
+      return n_vals + GetBatch(out.NewStartingAt(n_vals), batch_size - n_vals);
+    }
+
+    // Writing full bytes
+    const auto n_vals = derived()->GetBatchFast(out, batch_size);
+
+    // TRAILER: Writing inside the last byte if caller asked for non multiple 
of 8 values
+    const auto n_last_vals = std::min(batch_size - n_vals, 
derived()->remaining());
+    if (ARROW_PREDICT_FALSE(n_last_vals > 0)) {
+      ARROW_DCHECK_LT(n_last_vals, 8);
+      out = out.NewStartingAt(n_vals);
+      return n_vals + derived()->GetBatchFirstByte(out, n_last_vals);
+    }
+
+    return n_vals;
+  }
+
+ protected:
+  [[nodiscard]] rle_size_t GetBatchFromByte(BitmapSpanMut out, rle_size_t 
batch_size,
+                                            uint8_t src) {
+    const auto out_bit_offset = out.bit_start();
+    ARROW_DCHECK_GE(out_bit_offset, 0);
+    ARROW_DCHECK_LT(out_bit_offset, 8);
+    ARROW_DCHECK_GT(derived()->remaining(), 0);
+    ARROW_DCHECK_GE(batch_size, 0);
+
+    // Empty bits in first byte that we can fill
+    const auto empty_bits = rle_size_t{8} - out_bit_offset;
+    // Number of bits in first byte that we want to fill
+    const auto desired_bits = std::min(empty_bits, batch_size);
+    // Try to advance, and get number of bits we had remaining
+    const auto n_bits = Advance(desired_bits);
+    // Copy relevant bits from the value pattern to the output.
+    *out.data() = bit_util::CopyBits<uint8_t, /* kAllowFullCopy= */ false>({
+        .src = src,
+        .dst = *out.data(),
+        .start = static_cast<uint8_t>(out_bit_offset),
+        .end = static_cast<uint8_t>(out_bit_offset + n_bits),
+    });
+
+    return n_bits;
+  }
+
+ private:
+  CRTP* derived() { return static_cast<CRTP*>(this); }
+  CRTP const* derived() const { return static_cast<CRTP const*>(this); }
+};
+}  // namespace internal_rle
+
+class RleRunToBitmapDecoder
+    : public internal_rle::RunToBitmapDecoderMixin<RleRunToBitmapDecoder> {
+ public:
+  /// The type of run that can be decoded.
+  using RunType = RleRun;
+
+  constexpr RleRunToBitmapDecoder() noexcept = default;
+
+  explicit RleRunToBitmapDecoder(const RunType& run) noexcept { Reset(run); }
+
+  void Reset(const RunType& run) noexcept {
+    values_left_ = run.values_count();
+    if (run.value_little_endian() == 0) {
+      value_pattern_ = uint8_t{0};
+    } else {
+      value_pattern_ = uint8_t{0xFF};
+    }
+  }
+
+  /// Return the number of values that can be advanced.
+  rle_size_t remaining() const { return values_left_; }
+
+  /// Return the repeated value of this decoder.
+  constexpr bool value() const { return value_pattern_ != 0; }
+
+ private:
+  friend class internal_rle::RunToBitmapDecoderMixin<RleRunToBitmapDecoder>;
+
+  /// The byte pattern for 8 values (full ones or full zeros).
+  uint8_t value_pattern_ = {};
+  /// Number of values left to decode.
+  rle_size_t values_left_ = 0;
+
+  void AdvanceUnsafe(rle_size_t batch_size) { values_left_ -= batch_size; }
+
+  /// Get batch values to fill the first incomplete byte of the output.
+  [[nodiscard]] rle_size_t GetBatchFirstByte(BitmapSpanMut out, rle_size_t 
batch_size) {
+    return GetBatchFromByte(out, batch_size, value_pattern_);
+  }
+
+  /// Get batch in full bytes using memset.
+  [[nodiscard]] rle_size_t GetBatchFast(BitmapSpanMut out, rle_size_t 
batch_size) {
+    ARROW_DCHECK_EQ(out.bit_start(), 0);
+    const auto n_bytes = std::min(batch_size, remaining()) / 8;
+    std::memset(out.data(), value_pattern_, n_bytes);
+    const auto n_vals = 8 * n_bytes;
+    AdvanceUnsafe(n_vals);
+    return n_vals;
+  }
+};
+
+class BitPackedRunToBitmapDecoder
+    : public 
internal_rle::RunToBitmapDecoderMixin<BitPackedRunToBitmapDecoder> {
+ public:
+  /// The type of run that can be decoded.
+  using RunType = BitPackedRun;
+
+  constexpr BitPackedRunToBitmapDecoder() noexcept = default;
+
+  explicit BitPackedRunToBitmapDecoder(const RunType& run) noexcept { 
Reset(run); }
+
+  void Reset(const RunType& run) noexcept {
+    data_ = run.raw_data_ptr();
+    values_count_ = run.values_count();
+    values_read_ = 0;
+    ARROW_DCHECK(run.raw_data_max_size() < 0 ||
+                 bit_util::BytesForBits(values_count_) <= 
run.raw_data_max_size());
+  }
+
+  /// Return the number of values that can be advanced.
+  constexpr rle_size_t remaining() const { return values_count_ - 
values_read_; }
+
+  /// Get a batch of values return the number of decoded elements.
+  /// May write fewer elements to the output than requested if there are not 
enough values
+  /// left.
+  [[nodiscard]] rle_size_t GetBatch(BitmapSpanMut out, rle_size_t batch_size) {
+    // WARN: Slow case where bit offsets of input and output are not aligned.
+    // We loose the ability to do memcpy
+    if (ARROW_PREDICT_FALSE(out.bit_start() != unread_values_bit_offset())) {
+      return GetBatchMisaligned(out, batch_size);
+    }
+    return Base::GetBatch(out, batch_size);
+  }
+
+ private:
+  using Base = 
internal_rle::RunToBitmapDecoderMixin<BitPackedRunToBitmapDecoder>;
+  friend class 
internal_rle::RunToBitmapDecoderMixin<BitPackedRunToBitmapDecoder>;
+
+  /// The pointer to the beginning of the run
+  const uint8_t* data_ = nullptr;
+  /// The total number of values in the run
+  rle_size_t values_count_ = 0;
+  /// The number of values read by the decoder
+  rle_size_t values_read_ = 0;
+
+  /// Start pointer of the unread values (may contain values already read).
+  const uint8_t* unread_values_ptr() const noexcept { return data_ + 
(values_read_ / 8); }
+
+  /// Bit in @ref unread_values_ptr where the unread values start.
+  rle_size_t unread_values_bit_offset() const noexcept { return values_read_ % 
8; }
+
+  void AdvanceUnsafe(rle_size_t batch_size) { values_read_ += batch_size; }
+
+  /// Get batch values to fill the first incomplete byte of the output.
+  [[nodiscard]] rle_size_t GetBatchFirstByte(BitmapSpanMut out, rle_size_t 
batch_size) {
+    return GetBatchFromByte(out, batch_size, *unread_values_ptr());
+  }
+
+  /// Get batch in full bytes using memcpy.
+  [[nodiscard]] rle_size_t GetBatchFast(BitmapSpanMut out, rle_size_t 
batch_size) {
+    ARROW_DCHECK_EQ(out.bit_start(), 0);
+    const auto n_bytes = std::min(batch_size, remaining()) / 8;

Review Comment:
   Should DCHECK `unread_values_bit_offset()` is equal to 0 here? Otherwise you 
can't just memcpy...



##########
cpp/src/arrow/util/bit_util_test.cc:
##########
@@ -726,6 +726,23 @@ TEST(BitUtil, ByteSwap) {
   EXPECT_EQ(bit_util::ByteSwap(srcd64), expectedd64);
 }
 
+TEST(BitUtil, Get32Bits) {
+  const std::array<uint8_t, 12> src = {0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc,
+                                       0xde, 0xf0, 0x11, 0x22, 0x33, 0x44};
+
+  // The result's in-memory bytes must reproduce the source bit-stream slice
+  // so each result bit must match the corresponding source bit.
+  for (uint64_t offset = 0; offset <= 24; ++offset) {

Review Comment:
   Offset up to 24 means we'll need at most (24 + 32) / 8 = 7 bytes from `src`, 
but `src` is 12 bytes long... why is that?



##########
cpp/src/arrow/util/rle_bitmap_test.cc:
##########
@@ -0,0 +1,508 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <array>
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/rle_bitmap_internal.h"
+#include "arrow/util/rle_encoding_internal.h"
+
+namespace arrow::util {
+
+namespace {
+
+/// Read the first `count` bits of `bytes` (LSB first) into a vector of 
booleans.
+std::vector<bool> BitsFromBytes(const std::vector<uint8_t>& bytes, rle_size_t 
count) {
+  std::vector<bool> bits(count);
+  for (rle_size_t i = 0; i < count; ++i) {
+    bits[i] = bit_util::GetBit(bytes.data(), i);
+  }
+  return bits;
+}
+
+/// Check the decoded output in `out` against `expected`.
+/// Bits `out[out_offset..out_offset + count]` must equal `expected[skip..skip 
+ count]`.
+/// The `out_offset` bits before them must still be zero.
+void CheckDecodedBits(const std::vector<uint8_t>& out, const 
std::vector<bool>& expected,
+                      rle_size_t count, rle_size_t out_offset = 0, rle_size_t 
skip = 0) {
+  ARROW_SCOPED_TRACE("out_offset = ", out_offset, ", skip = ", skip);
+  for (rle_size_t i = 0; i < out_offset; ++i) {
+    EXPECT_FALSE(bit_util::GetBit(out.data(), i)) << "clobbered bit " << i;
+  }
+  for (rle_size_t i = 0; i < count; ++i) {
+    EXPECT_EQ(bit_util::GetBit(out.data(), out_offset + i), expected[skip + i])
+        << "at bit " << i;
+  }
+}
+
+/// Skip the first `skip` values with Advance(), then decode the rest of the 
run
+/// into one output bitmap, `chunk` values at a time. Compare against 
`expected`.
+///
+/// `chunk` controls output bit alignment. When `chunk` is not a multiple of 8,
+/// later calls start at a non-zero output bit offset.
+///
+/// `skip` shifts the decoder's read offset relative to the output offset.
+/// A non-zero `skip` makes the two differ, which exercises the bit-unaligned 
read
+/// path of BitPackedRunToBitmapDecoder. With `skip == 0` they stay in sync and
+/// only the aligned path runs.
+template <typename Decoder>
+void CheckChunkedDecode(const typename Decoder::RunType& run,
+                        const std::vector<bool>& expected, rle_size_t chunk = 
1,

Review Comment:
   `chunk_size` rather than `chunk`?



##########
cpp/src/arrow/util/rle_bitmap_internal.h:
##########
@@ -0,0 +1,440 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <cstring>
+
+#include "arrow/util/bit_util.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/rle_encoding_internal.h"
+
+namespace arrow::util {
+
+/// A lightweight view over a bitmap.
+template <typename B = uint8_t>
+class BitmapSpan {
+ public:
+  using size_type = rle_size_t;
+  using byte_type = B;
+
+  explicit constexpr BitmapSpan(byte_type* data, size_type bit_start = 0) 
noexcept
+      : data_{data}, bit_start_{bit_start} {
+    Normalize();
+  }
+
+  /// Pointer to the byte where the first value is stored.
+  constexpr byte_type* data() const noexcept { return data_; }
+
+  /// Bit offset of the first value in the first byte.
+  constexpr size_type bit_start() const noexcept { return bit_start_; }
+
+  /// Return a new span starting at the given position.
+  constexpr BitmapSpan NewStartingAt(size_type bit_start) const noexcept {
+    auto out = *this;
+    out.bit_start_ += bit_start;
+    out.Normalize();
+    return out;
+  }
+
+ private:
+  byte_type* data_;
+  size_type bit_start_ = 0;
+
+  /// Ensure `bit_start` always ends up in [0, 8[, even when it starts 
negative.
+  constexpr void Normalize() {
+    // On two's-complement targets an arithmetic right shift floors, and the 
AND mask
+    // yields the non-negative remainder (e.g. -1 -> byte -1, offset 7), 
unlike `/` and
+    // `%` which truncate toward zero.
+    data_ += bit_start_ >> 3;
+    bit_start_ &= 0b0111;
+  }
+};
+
+using BitmapSpanMut = BitmapSpan<uint8_t>;
+using BitmapSpanConst = BitmapSpan<const uint8_t>;
+
+namespace internal_rle {
+template <typename CRTP>
+class RunToBitmapDecoderMixin {
+ public:
+  /// Advance by as many values as provided or until exhaustion of the decoder.
+  /// Return the number of values skipped.
+  [[nodiscard]] rle_size_t Advance(rle_size_t batch_size) {
+    const auto steps = std::min(batch_size, derived()->remaining());
+    derived()->AdvanceUnsafe(steps);
+    ARROW_DCHECK_GE(derived()->remaining(), 0);
+    return steps;
+  }
+
+  /// Get the next value and return false if there are no more.
+  [[nodiscard]] constexpr bool Get(BitmapSpanMut out) {
+    return derived()->GetBatch(out, 1) == 1;
+  }
+
+  /// Get a batch of values return the number of decoded elements.
+  ///
+  /// May write fewer elements to the output than requested if there are not
+  /// enough values left.
+  [[nodiscard]] rle_size_t GetBatch(BitmapSpanMut out, rle_size_t batch_size) {
+    const auto out_bit_offset = out.bit_start();
+    ARROW_DCHECK_GE(out_bit_offset, 0);
+    ARROW_DCHECK_LT(out_bit_offset, 8);
+
+    if (ARROW_PREDICT_FALSE(derived()->remaining() == 0 || batch_size == 0)) {
+      return 0;
+    }
+
+    // HEADER: Writing inside the first byte if caller gives a non-aligned 
input
+    if (ARROW_PREDICT_FALSE(out_bit_offset != 0)) {
+      const auto n_vals = derived()->GetBatchFirstByte(out, batch_size);
+      // If we exhausted the values in this decoder, or we filled what was 
required,
+      // then the following recursive call will return 0.
+      // If in the opposite case, then we will continue on a byte-aligned 
boundary.
+      return n_vals + GetBatch(out.NewStartingAt(n_vals), batch_size - n_vals);
+    }
+
+    // Writing full bytes
+    const auto n_vals = derived()->GetBatchFast(out, batch_size);
+
+    // TRAILER: Writing inside the last byte if caller asked for non multiple 
of 8 values
+    const auto n_last_vals = std::min(batch_size - n_vals, 
derived()->remaining());
+    if (ARROW_PREDICT_FALSE(n_last_vals > 0)) {
+      ARROW_DCHECK_LT(n_last_vals, 8);
+      out = out.NewStartingAt(n_vals);
+      return n_vals + derived()->GetBatchFirstByte(out, n_last_vals);
+    }
+
+    return n_vals;
+  }
+
+ protected:
+  [[nodiscard]] rle_size_t GetBatchFromByte(BitmapSpanMut out, rle_size_t 
batch_size,
+                                            uint8_t src) {
+    const auto out_bit_offset = out.bit_start();
+    ARROW_DCHECK_GE(out_bit_offset, 0);
+    ARROW_DCHECK_LT(out_bit_offset, 8);
+    ARROW_DCHECK_GT(derived()->remaining(), 0);
+    ARROW_DCHECK_GE(batch_size, 0);
+
+    // Empty bits in first byte that we can fill
+    const auto empty_bits = rle_size_t{8} - out_bit_offset;
+    // Number of bits in first byte that we want to fill
+    const auto desired_bits = std::min(empty_bits, batch_size);
+    // Try to advance, and get number of bits we had remaining
+    const auto n_bits = Advance(desired_bits);
+    // Copy relevant bits from the value pattern to the output.
+    *out.data() = bit_util::CopyBits<uint8_t, /* kAllowFullCopy= */ false>({
+        .src = src,
+        .dst = *out.data(),
+        .start = static_cast<uint8_t>(out_bit_offset),
+        .end = static_cast<uint8_t>(out_bit_offset + n_bits),
+    });
+
+    return n_bits;
+  }
+
+ private:
+  CRTP* derived() { return static_cast<CRTP*>(this); }
+  CRTP const* derived() const { return static_cast<CRTP const*>(this); }
+};
+}  // namespace internal_rle
+
+class RleRunToBitmapDecoder
+    : public internal_rle::RunToBitmapDecoderMixin<RleRunToBitmapDecoder> {
+ public:
+  /// The type of run that can be decoded.
+  using RunType = RleRun;
+
+  constexpr RleRunToBitmapDecoder() noexcept = default;
+
+  explicit RleRunToBitmapDecoder(const RunType& run) noexcept { Reset(run); }
+
+  void Reset(const RunType& run) noexcept {
+    values_left_ = run.values_count();
+    if (run.value_little_endian() == 0) {
+      value_pattern_ = uint8_t{0};
+    } else {
+      value_pattern_ = uint8_t{0xFF};
+    }
+  }
+
+  /// Return the number of values that can be advanced.
+  rle_size_t remaining() const { return values_left_; }
+
+  /// Return the repeated value of this decoder.
+  constexpr bool value() const { return value_pattern_ != 0; }
+
+ private:
+  friend class internal_rle::RunToBitmapDecoderMixin<RleRunToBitmapDecoder>;
+
+  /// The byte pattern for 8 values (full ones or full zeros).
+  uint8_t value_pattern_ = {};
+  /// Number of values left to decode.
+  rle_size_t values_left_ = 0;
+
+  void AdvanceUnsafe(rle_size_t batch_size) { values_left_ -= batch_size; }
+
+  /// Get batch values to fill the first incomplete byte of the output.
+  [[nodiscard]] rle_size_t GetBatchFirstByte(BitmapSpanMut out, rle_size_t 
batch_size) {
+    return GetBatchFromByte(out, batch_size, value_pattern_);
+  }
+
+  /// Get batch in full bytes using memset.
+  [[nodiscard]] rle_size_t GetBatchFast(BitmapSpanMut out, rle_size_t 
batch_size) {
+    ARROW_DCHECK_EQ(out.bit_start(), 0);
+    const auto n_bytes = std::min(batch_size, remaining()) / 8;
+    std::memset(out.data(), value_pattern_, n_bytes);
+    const auto n_vals = 8 * n_bytes;
+    AdvanceUnsafe(n_vals);
+    return n_vals;
+  }
+};
+
+class BitPackedRunToBitmapDecoder
+    : public 
internal_rle::RunToBitmapDecoderMixin<BitPackedRunToBitmapDecoder> {
+ public:
+  /// The type of run that can be decoded.
+  using RunType = BitPackedRun;
+
+  constexpr BitPackedRunToBitmapDecoder() noexcept = default;
+
+  explicit BitPackedRunToBitmapDecoder(const RunType& run) noexcept { 
Reset(run); }
+
+  void Reset(const RunType& run) noexcept {
+    data_ = run.raw_data_ptr();
+    values_count_ = run.values_count();
+    values_read_ = 0;
+    ARROW_DCHECK(run.raw_data_max_size() < 0 ||
+                 bit_util::BytesForBits(values_count_) <= 
run.raw_data_max_size());
+  }
+
+  /// Return the number of values that can be advanced.
+  constexpr rle_size_t remaining() const { return values_count_ - 
values_read_; }
+
+  /// Get a batch of values return the number of decoded elements.
+  /// May write fewer elements to the output than requested if there are not 
enough values
+  /// left.
+  [[nodiscard]] rle_size_t GetBatch(BitmapSpanMut out, rle_size_t batch_size) {
+    // WARN: Slow case where bit offsets of input and output are not aligned.
+    // We loose the ability to do memcpy
+    if (ARROW_PREDICT_FALSE(out.bit_start() != unread_values_bit_offset())) {
+      return GetBatchMisaligned(out, batch_size);
+    }
+    return Base::GetBatch(out, batch_size);
+  }
+
+ private:
+  using Base = 
internal_rle::RunToBitmapDecoderMixin<BitPackedRunToBitmapDecoder>;
+  friend class 
internal_rle::RunToBitmapDecoderMixin<BitPackedRunToBitmapDecoder>;
+
+  /// The pointer to the beginning of the run
+  const uint8_t* data_ = nullptr;
+  /// The total number of values in the run
+  rle_size_t values_count_ = 0;
+  /// The number of values read by the decoder
+  rle_size_t values_read_ = 0;
+
+  /// Start pointer of the unread values (may contain values already read).
+  const uint8_t* unread_values_ptr() const noexcept { return data_ + 
(values_read_ / 8); }
+
+  /// Bit in @ref unread_values_ptr where the unread values start.
+  rle_size_t unread_values_bit_offset() const noexcept { return values_read_ % 
8; }
+
+  void AdvanceUnsafe(rle_size_t batch_size) { values_read_ += batch_size; }
+
+  /// Get batch values to fill the first incomplete byte of the output.
+  [[nodiscard]] rle_size_t GetBatchFirstByte(BitmapSpanMut out, rle_size_t 
batch_size) {
+    return GetBatchFromByte(out, batch_size, *unread_values_ptr());
+  }
+
+  /// Get batch in full bytes using memcpy.
+  [[nodiscard]] rle_size_t GetBatchFast(BitmapSpanMut out, rle_size_t 
batch_size) {
+    ARROW_DCHECK_EQ(out.bit_start(), 0);
+    const auto n_bytes = std::min(batch_size, remaining()) / 8;
+    std::memcpy(out.data(), unread_values_ptr(), n_bytes);
+    const auto n_vals = 8 * n_bytes;
+    AdvanceUnsafe(n_vals);
+    return n_vals;
+  }
+
+  /// Correct and slow function for reading a batch one bit at a time.
+  [[nodiscard]] rle_size_t GetBatchSlow(BitmapSpanMut out, rle_size_t 
batch_size) {
+    const rle_size_t to_read = std::min(batch_size, remaining());
+    for (rle_size_t i = 0; i < to_read; ++i) {
+      const bool bit = bit_util::GetBit(data_, values_read_ + i);
+      bit_util::SetBitTo(out.data(), out.bit_start() + i, bit);
+    }
+    AdvanceUnsafe(to_read);
+    return to_read;
+  }
+
+  [[nodiscard]] rle_size_t GetBatchMisaligned(BitmapSpanMut out, rle_size_t 
batch_size) {
+    const auto out_bit_offset = out.bit_start();
+    ARROW_DCHECK_LT(out_bit_offset, 8);
+    ARROW_DCHECK_GE(out_bit_offset, 0);
+
+    if (ARROW_PREDICT_FALSE(remaining() == 0 || batch_size == 0)) {
+      return 0;
+    }
+
+    const rle_size_t to_read = std::min(batch_size, remaining());
+    rle_size_t read = 0;
+
+    // HEADER: copy bits one by one unit the output is byte-aligned

Review Comment:
   ```suggestion
       // HEADER: copy bits one by one until the output is byte-aligned
   ```



##########
cpp/src/arrow/util/bit_util.h:
##########
@@ -176,6 +179,54 @@ static constexpr bool GetBitFromByte(uint8_t byte, uint8_t 
i) {
   return byte & kBitmask[i];
 }
 
+/// Read 32 bits starting at bit `offset` into a uint32.
+///
+/// The return value in in-memory byte layout matches the source bit-stream
+/// identically on little- and big-endian platforms.
+///
+/// The caller must guarantee that many bytes are readable.
+static inline uint32_t Get32Bits(const uint8_t* bits, uint64_t offset) {
+  uint64_t buffer = {};
+  std::memcpy(&buffer, bits + (offset / 8), BytesForBits(offset % 8 + 32));
+  // Interpret the loaded bytes as little-endian, drop the low `offset % 8` 
bits
+  // to align LSB-first, then store back in little-endian byte order so the
+  // result's memory representation matches the bit-stream.
+  buffer = FromLittleEndian(buffer);
+  return ToLittleEndian(static_cast<uint32_t>(buffer >> (offset % 8)));
+}
+
+template <typename Uint>
+struct CopyBitsParams {
+  Uint src = {};
+  Uint dst = {};
+  Uint start = {};
+  Uint end = {};
+};
+
+/// Copy a contiguous span of bits from src into dst.
+///
+/// Copy bits [start, end[ from src into the position [start, end[ in dst
+/// and return the result (inputs are unmodified).
+/// Setting ``kAllowFullCopy`` to false is an optimization when the caller can
+/// guarantee that the range of bits to copy does not cover the whole range.
+template <typename Uint, bool kAllowFullCopy = true>
+[[nodiscard]] constexpr Uint CopyBits(const CopyBitsParams<Uint>& params) {
+  constexpr auto kUintSizeBits = static_cast<Uint>(sizeof(Uint) * 8);
+  assert(params.start <= params.end);
+  assert(params.start < kUintSizeBits);
+  assert(params.end <= kUintSizeBits);
+
+  const auto length = static_cast<Uint>(params.end - params.start);
+  if constexpr (kAllowFullCopy) {
+    if (length == kUintSizeBits) {
+      return params.src;
+    }
+  }

Review Comment:
   Let's make sure the caller doesn't violate the contract?
   ```suggestion
     } else {
       assert(length < kUintSizeBits);
     }
   ```



##########
cpp/src/arrow/util/rle_bitmap_test.cc:
##########
@@ -0,0 +1,508 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <array>
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/rle_bitmap_internal.h"
+#include "arrow/util/rle_encoding_internal.h"
+
+namespace arrow::util {
+
+namespace {
+
+/// Read the first `count` bits of `bytes` (LSB first) into a vector of 
booleans.
+std::vector<bool> BitsFromBytes(const std::vector<uint8_t>& bytes, rle_size_t 
count) {
+  std::vector<bool> bits(count);
+  for (rle_size_t i = 0; i < count; ++i) {
+    bits[i] = bit_util::GetBit(bytes.data(), i);
+  }
+  return bits;
+}
+
+/// Check the decoded output in `out` against `expected`.
+/// Bits `out[out_offset..out_offset + count]` must equal `expected[skip..skip 
+ count]`.
+/// The `out_offset` bits before them must still be zero.
+void CheckDecodedBits(const std::vector<uint8_t>& out, const 
std::vector<bool>& expected,
+                      rle_size_t count, rle_size_t out_offset = 0, rle_size_t 
skip = 0) {

Review Comment:
   `skip` could perhaps be named `expected_offset` or `expected_skip` for 
clarity?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to