AntoinePrv commented on code in PR #47294:
URL: https://github.com/apache/arrow/pull/47294#discussion_r2337142516


##########
cpp/src/arrow/util/rle_encoding_internal.h:
##########
@@ -299,385 +552,988 @@ class RleEncoder {
   uint8_t* literal_indicator_byte_;
 };
 
+/*************************
+ *  RleBitPackedDecoder  *
+ *************************/
+
+template <typename T>
+RleBitPackedDecoder<T>::RleBitPackedDecoder(raw_data_const_pointer data,
+                                            raw_data_size_type data_size,
+                                            bit_size_type value_bit_width) 
noexcept {
+  Reset(data, data_size, value_bit_width);
+}
+
+template <typename T>
+void RleBitPackedDecoder<T>::Reset(raw_data_const_pointer data,
+                                   raw_data_size_type data_size,
+                                   bit_size_type value_bit_width) noexcept {
+  ARROW_DCHECK_GE(value_bit_width, 0);
+  ARROW_DCHECK_LE(value_bit_width, 64);
+  parser_.Reset(data, data_size, value_bit_width);
+  decoder_ = {};
+}
+
+template <typename T>
+auto RleBitPackedDecoder<T>::RunRemaining() const -> values_count_type {
+  return std::visit([](auto const& dec) { return dec.Remaining(); }, decoder_);
+}
+
+template <typename T>
+bool RleBitPackedDecoder<T>::Exhausted() const {
+  return (RunRemaining() == 0) && parser_.Exhausted();
+}
+
 template <typename T>
-inline bool RleDecoder::Get(T* val) {
+bool RleBitPackedDecoder<T>::ParseAndResetDecoder() {
+  auto dyn_run = parser_.Next();
+  if (!dyn_run.has_value()) {
+    return false;
+  }
+
+  if (auto* rle_run = std::get_if<BitPackedRun>(dyn_run.operator->())) {
+    decoder_ = {BitPackedDecoder<value_type>(*rle_run)};
+    return true;
+  }
+
+  auto* bit_packed_run = std::get_if<RleRun>(dyn_run.operator->());
+  ARROW_DCHECK(bit_packed_run);  // Only two possibilities in the variant
+  decoder_ = {RleDecoder<value_type>(*bit_packed_run)};
+  return true;
+}
+
+template <typename T>
+auto RleBitPackedDecoder<T>::RunGetBatch(value_type* out, values_count_type 
batch_size)
+    -> values_count_type {
+  return std::visit([&](auto& dec) { return dec.GetBatch(out, batch_size); }, 
decoder_);
+}
+
+template <typename T>
+bool RleBitPackedDecoder<T>::Get(value_type* val) {
   return GetBatch(val, 1) == 1;
 }
 
+namespace internal {
+
+/// A ``Parse`` handler that calls a single lambda.
+///
+/// This lambda would typically take the input run as ``auto run`` (i.e. the 
lambda is
+/// templated) and deduce other types from it.
+template <typename Lambda>
+struct LambdaHandler {
+  Lambda handlder_;
+
+  auto OnBitPackedRun(BitPackedRun run) { return handlder_(std::move(run)); }
+
+  auto OnRleRun(RleRun run) { return handlder_(std::move(run)); }
+};
+
+template <typename Lambda>
+LambdaHandler(Lambda) -> LambdaHandler<Lambda>;
+
+template <typename value_type, typename Run>
+struct decoder_for;
+
+template <typename value_type>
+struct decoder_for<value_type, BitPackedRun> {
+  using type = BitPackedDecoder<value_type>;
+};
+
+template <typename value_type>
+struct decoder_for<value_type, RleRun> {
+  using type = RleDecoder<value_type>;
+};
+
+template <typename value_type, typename Run>
+using decoder_for_t = typename decoder_for<value_type, Run>::type;
+
+}  // namespace internal
+
 template <typename T>
-inline int RleDecoder::GetBatch(T* values, int batch_size) {
-  ARROW_DCHECK_GE(bit_width_, 0);
-  int values_read = 0;
-
-  auto* out = values;
-
-  while (values_read < batch_size) {
-    int remaining = batch_size - values_read;
-
-    if (repeat_count_ > 0) {  // Repeated value case.
-      int repeat_batch = std::min(remaining, repeat_count_);
-      std::fill(out, out + repeat_batch, static_cast<T>(current_value_));
-
-      repeat_count_ -= repeat_batch;
-      values_read += repeat_batch;
-      out += repeat_batch;
-    } else if (literal_count_ > 0) {
-      int literal_batch = std::min(remaining, literal_count_);
-      int actual_read = bit_reader_.GetBatch(bit_width_, out, literal_batch);
-      if (actual_read != literal_batch) {
-        return values_read;
-      }
+auto RleBitPackedDecoder<T>::GetBatch(value_type* out, values_count_type 
batch_size)
+    -> values_count_type {
+  using ControlFlow = RleBitPackedParser::ControlFlow;
 
-      literal_count_ -= literal_batch;
-      values_read += literal_batch;
-      out += literal_batch;
-    } else {
-      if (!NextCounts<T>()) return values_read;
+  values_count_type values_read = 0;
+
+  // Remaining from a previous call that would have left some unread data from 
a run.
+  if (ARROW_PREDICT_FALSE(RunRemaining() > 0)) {
+    auto const read = RunGetBatch(out, batch_size);
+    values_read += read;
+    out += read;
+
+    // Either we fulfilled all the batch to be read or we finished remaining 
run.
+    if (ARROW_PREDICT_FALSE(values_read == batch_size)) {
+      return values_read;
     }
+    ARROW_DCHECK(RunRemaining() == 0);
   }
 
+  auto handler = internal::LambdaHandler{
+      [&](auto run) {
+        ARROW_DCHECK_LT(values_read, batch_size);
+        internal::decoder_for_t<value_type, decltype(run)> decoder(run);
+        auto const read = decoder.GetBatch(out, batch_size - values_read);
+        ARROW_DCHECK_LE(read, batch_size - values_read);
+        values_read += read;
+        out += read;
+
+        // Stop reading and store remaining decoder
+        if (ARROW_PREDICT_FALSE(values_read == batch_size || read == 0)) {
+          decoder_ = std::move(decoder);
+          return ControlFlow::Break;
+        }
+
+        return ControlFlow::Continue;
+      },
+  };
+
+  parser_.Parse(handler);
+
   return values_read;
 }
 
-template <typename T, typename RunType, typename Converter>
-inline int RleDecoder::GetSpaced(Converter converter, int batch_size, int 
null_count,
-                                 const uint8_t* valid_bits, int64_t 
valid_bits_offset,
-                                 T* out) {
-  if (ARROW_PREDICT_FALSE(null_count == batch_size)) {
-    converter.FillZero(out, out + batch_size);
-    return batch_size;
+namespace internal {
+
+/// Utility class to safely handle values and null count without too 
error-prone
+/// verbosity.
+class BatchCounter {
+ public:
+  using size_type = int32_t;
+
+  [[nodiscard]] static constexpr BatchCounter FromBatchSizeAndNulls(
+      size_type batch_size, size_type null_count) {
+    ARROW_DCHECK_LE(null_count, batch_size);
+    return {batch_size - null_count, null_count};
   }
 
-  ARROW_DCHECK_GE(bit_width_, 0);
-  int values_read = 0;
-  int values_remaining = batch_size - null_count;
+  constexpr BatchCounter(size_type values_count, size_type null_count) noexcept
+      : values_count_(values_count), null_count_(null_count) {}
 
-  // Assume no bits to start.
-  arrow::internal::BitRunReader bit_reader(valid_bits, valid_bits_offset,
-                                           /*length=*/batch_size);
-  arrow::internal::BitRun valid_run = bit_reader.NextRun();
-  while (values_read < batch_size) {
-    if (ARROW_PREDICT_FALSE(valid_run.length == 0)) {
-      valid_run = bit_reader.NextRun();
+  [[nodiscard]] constexpr size_type ValuesCount() const noexcept { return 
values_count_; }
+
+  [[nodiscard]] constexpr size_type ValuesRead() const noexcept { return 
values_read_; }
+
+  [[nodiscard]] constexpr size_type ValuesRemaining() const noexcept {
+    ARROW_DCHECK_LE(values_read_, values_count_);
+    return values_count_ - values_read_;
+  }
+
+  constexpr void AccrueReadValues(size_type to_read) noexcept {
+    ARROW_DCHECK_LE(to_read, ValuesRemaining());
+    values_read_ += to_read;
+  }
+
+  [[nodiscard]] constexpr size_type NullCount() const noexcept { return 
null_count_; }
+
+  [[nodiscard]] constexpr size_type NullRead() const noexcept { return 
null_read_; }
+
+  [[nodiscard]] constexpr size_type NullRemaining() const noexcept {
+    ARROW_DCHECK_LE(null_read_, null_count_);
+    return null_count_ - null_read_;
+  }
+
+  constexpr void AccrueReadNulls(size_type to_read) noexcept {
+    ARROW_DCHECK_LE(to_read, NullRemaining());
+    null_read_ += to_read;
+  }
+
+  [[nodiscard]] constexpr size_type TotalRemaining() const noexcept {
+    return ValuesRemaining() + NullRemaining();
+  }
+
+  [[nodiscard]] constexpr size_type TotalRead() const noexcept {
+    return values_read_ + null_read_;
+  }
+
+  [[nodiscard]] constexpr bool IsFullyNull() const noexcept {
+    return ValuesRemaining() == 0;
+  }
+
+  [[nodiscard]] constexpr bool IsDone() const noexcept { return 
TotalRemaining() == 0; }
+
+ private:
+  size_type values_count_ = 0;
+  size_type values_read_ = 0;
+  size_type null_count_ = 0;
+  size_type null_read_ = 0;
+};
+
+// The maximal unsigned size that a variable can fit.
+template <typename T>
+constexpr auto max_size_for_v =
+    static_cast<std::make_unsigned_t<T>>(std::numeric_limits<T>::max());
+
+/// Overload for GetSpaced for a single run in a RleDecoder
+template <typename Converter, typename BitRunReader, typename BitRun,
+          typename values_count_type, typename value_type>
+auto RunGetSpaced(Converter& converter, typename Converter::out_type* out,
+                  values_count_type batch_size, values_count_type null_count,
+                  BitRunReader&& validity_reader, BitRun&& validity_run,

Review Comment:
   Replaced with pointers



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to