pitrou commented on code in PR #46229:
URL: https://github.com/apache/arrow/pull/46229#discussion_r2207918592


##########
cpp/src/arrow/array/array_binary.cc:
##########
@@ -105,6 +111,392 @@ 
BinaryViewArray::BinaryViewArray(std::shared_ptr<DataType> type, int64_t length,
       ArrayData::Make(std::move(type), length, std::move(buffers), null_count, 
offset));
 }
 
+namespace {
+
+// TODO Should We move this to bitmap_ops.h and Remove from 
compute/kernels/util.s
+Result<std::shared_ptr<Buffer>> GetOrCopyNullBitmapBuffer(const ArrayData& 
in_array,
+                                                          MemoryPool* pool) {
+  if (in_array.buffers[0]->data() == nullptr) {
+    return nullptr;
+  } else if (in_array.offset == 0) {
+    return in_array.buffers[0];
+  } else if (in_array.offset % 8 == 0) {
+    return SliceBuffer(in_array.buffers[0], /*offset=*/in_array.offset / 8);
+  } else {
+    // If a non-zero offset, we need to shift the bitmap
+    return internal::CopyBitmap(pool, in_array.buffers[0]->data(), 
in_array.offset,
+                                in_array.length);
+  }
+}
+
+struct Interval {
+  int64_t start;
+  int64_t end;
+  int32_t offset = -1;
+};
+
+struct IntervalComparator {
+  bool operator()(const Interval& left, const Interval& right) const {
+    return left.start < right.start;
+  }
+};
+
+// inspired from boost::icl::interval_set
+class IntervalMerger {
+ public:
+  using IntervalSet = std::set<Interval, IntervalComparator>;
+  using Iterator = std::set<Interval, IntervalComparator>::iterator;
+
+  void AddInterval(const Interval& interval) {
+    auto [it, is_inserted] = interval_set.insert(interval);
+    if (is_inserted) {
+      JointLeft(it);
+      JoinRight(it);
+    } else {
+      if (it->end < interval.end) {
+        const_cast<int64_t&>(it->end) = interval.end;
+        JoinRight(it);
+      }
+    }
+  }
+
+  int64_t CalculateOffsetAndTotalSize() {
+    int64_t total_size = 0;
+    for (auto& it : interval_set) {
+      const_cast<int32_t&>(it.offset) = static_cast<int32_t>(total_size);
+      total_size += it.end - it.start;
+    }
+    return total_size;
+  }
+
+  // This method should be called After CalculateOffsetAndTotalSize
+  int32_t GetRelativeOffset(int32_t view_offset) const {
+    // end and offset is included in comparison
+    auto it = interval_set.lower_bound({view_offset, -1, -1});
+    if (it == interval_set.end()) {
+      --it;
+      // offset from the start of interval
+      auto offset_from_span = view_offset - it->start;
+      return static_cast<int32_t>(offset_from_span) + it->offset;
+    } else if (it->start == view_offset) {
+      // this is the case where view_offset refers to the beginning of interval
+      return it->offset;
+    } else {
+      --it;
+      // offset from the start of interval
+      auto offset_from_span = view_offset - it->start;
+      return static_cast<int32_t>(offset_from_span) + it->offset;
+    }
+  }
+
+  IntervalSet::const_iterator begin() const { return interval_set.cbegin(); }
+
+  IntervalSet::const_iterator end() const { return interval_set.cend(); }
+
+ private:
+  void JointLeft(Iterator& it) {
+    if (it == interval_set.begin()) {
+      return;
+    } else {
+      auto prev_it = std::prev(it);
+      if (Joinable(prev_it, it)) {
+        MergeIntoLeftAndAdvanceRight(prev_it, it);
+        it = prev_it;
+        return;
+      }
+    }
+  }
+
+  void JoinRight(Iterator& it) {
+    auto begin_iterator = std::next(it);
+    auto end_iterator = begin_iterator;
+    while (end_iterator != interval_set.end() && Joinable(it, end_iterator)) {
+      const_cast<int64_t&>(it->end) = std::max(it->end, end_iterator->end);
+      ++end_iterator;
+    }
+    interval_set.erase(begin_iterator, end_iterator);
+  }
+
+  // Update left with a new end value
+  // Advance right to the next iterator
+  void MergeIntoLeftAndAdvanceRight(Iterator& left, Iterator& right) {
+    Interval interval{right->start, right->end};
+    right = interval_set.erase(right);
+    const_cast<int64_t&>(left->end) = std::max(left->end, interval.end);
+  }
+
+  bool Joinable(Iterator left, Iterator right) {
+    return std::max(left->start, right->start) <= std::min(left->end, 
right->end);
+  }
+
+  IntervalSet interval_set;
+};
+
+class CompactArrayImpl {
+ public:
+  CompactArrayImpl(const std::shared_ptr<ArrayData>& src_array_data,
+                   double occupancy_threshold, MemoryPool* memory_pool)
+      : src_array_data_(src_array_data),
+        src_buffers_(src_array_data->buffers),
+        occupancy_threshold_(occupancy_threshold),
+        memory_pool_(memory_pool) {}
+
+  Result<std::shared_ptr<Array>> Compact() {
+    // Check occupancy_threshold Parameter Validity
+    if (ARROW_PREDICT_FALSE(ValidateOccupancyThreshold(occupancy_threshold_))) 
{
+      return Status::Invalid(
+          "occupancy_threshold must be between 0 and 1. Current value:",
+          occupancy_threshold_);
+    }
+
+    auto num_src_buffers = src_buffers_.size();
+
+    if (ARROW_PREDICT_FALSE(num_src_buffers < 2)) {
+      return Status::Invalid("The number of buffers in ArrayData is less than 
2.");
+    } else if (num_src_buffers == 2) {
+      // Only the bitmap and view buffers are available.
+      // Should we copy the view buffer to reduce size if an offset is set?
+      dst_buffers_.insert(dst_buffers_.end(), src_buffers_.begin(), 
src_buffers_.end());
+      return MakeArray(ArrayData::Make(
+          src_array_data_->type, src_array_data_->length, 
std::move(dst_buffers_),
+          src_array_data_->null_count, src_array_data_->offset));
+    } else {
+      ARROW_RETURN_NOT_OK(AddBitmapBuffer());
+
+      ARROW_RETURN_NOT_OK(AddViewBuffer());
+
+      // Handle DataBuffer
+      auto buffer_infos = GenerateBufferInfos();
+
+      // Relocating buffers whose occupancy is non-zero and below the 
threshold.
+      ARROW_RETURN_NOT_OK(CompactDataBufferBasedBufferInfo(buffer_infos));
+
+      AdjustViewElementsBufferIndexAndOffset(buffer_infos);
+
+      return MakeArray(ArrayData::Make(src_array_data_->type, 
src_array_data_->length,
+                                       std::move(dst_buffers_)));
+    }
+  }
+
+ private:
+  struct BufferInfo {
+    // It is possible that total occupancy of a data buffer
+    // becomes higher than MAX_INT32_T.
+    int64_t total_size_occupied = 0;
+    int32_t new_index = -1;
+    // offset in new buffer
+    // it is used when buffer is compacted
+    int32_t base_new_offset = -1;
+    IntervalMerger interval_merger;
+    // True if occupancy is non-zero and
+    // less than or equal to the threshold.
+    bool should_be_relocated = false;
+  };
+
+  struct BufferIndexAndOffsetMapper {
+    // The buffer will be copied.
+    // Returns the index and offset in the new buffer.
+    std::pair<int32_t, int32_t> MergeBufferAndGetPosition(int64_t size) {
+      int32_t buffer_offset;
+      if (current_index == -1) {
+        buffer_offset = 0;
+        buffer_sizes.push_back(size);
+        indexes.push_back(-1);
+        current_index = static_cast<int32_t>(buffer_sizes.size()) - 1;
+      } else if (buffer_sizes[current_index] + size >
+                 std::numeric_limits<int32_t>::max()) {
+        buffer_sizes.push_back(size);
+        indexes.push_back(-1);
+        current_index = static_cast<int32_t>(buffer_sizes.size()) - 1;
+        buffer_offset = 0;
+      } else {
+        buffer_offset = static_cast<int32_t>(buffer_sizes[current_index]);
+        buffer_sizes[current_index] += size;
+      }
+      return std::make_pair(current_index, buffer_offset);
+    }
+
+    // The buffer will not be copied.
+    int32_t AppendBufferAndGetPosition(int64_t size, int32_t index) {
+      buffer_sizes.push_back(size);
+      indexes.push_back(index);
+      return static_cast<int32_t>(buffer_sizes.size()) - 1;
+    }
+
+    std::vector<int64_t> buffer_sizes{};
+
+    // Index from previous if it's not merged
+    // The value whether is -1 for  a merged buffer or
+    //  non-negative for  a non-merged-buffer.
+    std::vector<int32_t> indexes{};
+    // Uses for merging buffer to proper location
+    int32_t current_index = -1;
+  };
+
+  bool ValidateOccupancyThreshold(double occupancy_threshold) {
+    return std::signbit(occupancy_threshold) || 
std::isnan(occupancy_threshold) ||
+           occupancy_threshold > 1;

Review Comment:
   No, it just seems that this code is very convoluted for what it's trying to 
do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to