save-buffer commented on code in PR #13657:
URL: https://github.com/apache/arrow/pull/13657#discussion_r944919307
##########
cpp/src/arrow/util/rle_util.cc:
##########
@@ -0,0 +1,24 @@
+#include "arrow/util/rle_util.h"
+#include <algorithm>
+#include "arrow/builder.h"
+
+namespace arrow {
+namespace rle_util {
+
+int64_t FindPhysicalOffset(const int64_t* accumulated_run_lengths,
+ int64_t physical_length, int64_t logical_offset) {
+ auto it = std::upper_bound(accumulated_run_lengths,
+ accumulated_run_lengths + physical_length,
logical_offset);
+ return std::distance(accumulated_run_lengths, it);
+}
+
+void AddArtificialOffsetInChildArray(ArrayData* array, int64_t offset) {
Review Comment:
You could make this return `Status` and do `RETURN_NOT_OK`. I know this is a
testing function but I'd say it's probably better that the test wraps this
function in `ARROW_CHECK_OK`
##########
cpp/src/arrow/compute/kernels/vector_selection.cc:
##########
@@ -841,6 +907,222 @@ Status PrimitiveFilter(KernelContext* ctx, const
ExecSpan& batch, ExecResult* ou
return Status::OK();
}
+/// \brief The Filter implementation for primitive (fixed-width) types does not
+/// use the logical Arrow type but rather the physical C type. This way we only
+/// generate one take function for each byte width. We use the same
+/// implementation here for boolean and fixed-byte-size inputs with some
+/// template specialization.
+template <typename ArrowType>
+class RLEPrimitiveFilterImpl {
+ public:
+ using T = typename std::conditional<std::is_same<ArrowType,
BooleanType>::value,
+ uint8_t, typename
ArrowType::c_type>::type;
+
+ RLEPrimitiveFilterImpl(const ArraySpan& values, const ArraySpan& filter,
+ FilterOptions::NullSelectionBehavior null_selection,
+ ArrayData* out_arr)
+ : values_{values},
+ values_is_valid_(values.child_data[0].buffers[0].data),
+ values_data_(reinterpret_cast<const
T*>(values.child_data[0].buffers[1].data)),
+ filter_{filter},
+ filter_is_valid_(filter.child_data[0].buffers[0].data),
+ filter_data_(filter.child_data[0].buffers[1].data),
+ null_selection_(null_selection),
+ out_logical_length_(out_arr->length) {
+ if (out_arr->child_data[0]->buffers[0] != nullptr) {
+ // May not be allocated if neither filter nor values contains nulls
+ out_is_valid_ = out_arr->child_data[0]->buffers[0]->mutable_data();
+ }
+ assert(out_arr->offset == 0);
+ out_position_ = 0;
+ out_run_length_ = out_arr->GetMutableValues<int64_t>(0, 0);
+ out_data_ =
reinterpret_cast<T*>(out_arr->child_data[0]->buffers[1]->mutable_data());
+ }
+
+ void Exec() {
Review Comment:
So I'd have thought that you could just run the existing Filter function on
the run lengths array and the values array and be done. I think the reason you
had to rewrite it is because an RLE array's `length` field is the logical
length and not the number of runs, is that correct?
##########
cpp/src/arrow/compute/kernels/vector_selection.cc:
##########
@@ -841,6 +907,222 @@ Status PrimitiveFilter(KernelContext* ctx, const
ExecSpan& batch, ExecResult* ou
return Status::OK();
}
+/// \brief The Filter implementation for primitive (fixed-width) types does not
+/// use the logical Arrow type but rather the physical C type. This way we only
+/// generate one take function for each byte width. We use the same
+/// implementation here for boolean and fixed-byte-size inputs with some
+/// template specialization.
+template <typename ArrowType>
+class RLEPrimitiveFilterImpl {
+ public:
+ using T = typename std::conditional<std::is_same<ArrowType,
BooleanType>::value,
+ uint8_t, typename
ArrowType::c_type>::type;
+
+ RLEPrimitiveFilterImpl(const ArraySpan& values, const ArraySpan& filter,
+ FilterOptions::NullSelectionBehavior null_selection,
+ ArrayData* out_arr)
+ : values_{values},
+ values_is_valid_(values.child_data[0].buffers[0].data),
+ values_data_(reinterpret_cast<const
T*>(values.child_data[0].buffers[1].data)),
+ filter_{filter},
+ filter_is_valid_(filter.child_data[0].buffers[0].data),
+ filter_data_(filter.child_data[0].buffers[1].data),
+ null_selection_(null_selection),
+ out_logical_length_(out_arr->length) {
+ if (out_arr->child_data[0]->buffers[0] != nullptr) {
+ // May not be allocated if neither filter nor values contains nulls
+ out_is_valid_ = out_arr->child_data[0]->buffers[0]->mutable_data();
+ }
+ assert(out_arr->offset == 0);
+ out_position_ = 0;
+ out_run_length_ = out_arr->GetMutableValues<int64_t>(0, 0);
+ out_data_ =
reinterpret_cast<T*>(out_arr->child_data[0]->buffers[1]->mutable_data());
+ }
+
+ void Exec() {
+ auto WriteNotNull = [&](int64_t in_position, int64_t run_length) {
+ bit_util::SetBit(out_is_valid_, out_position_);
+ out_run_length_[out_position_] = run_length;
+ // Increments out_position_
+ WriteValue(in_position, run_length);
+ };
+
+ auto WriteMaybeNull = [&](int64_t in_position, int64_t run_length) {
+ bit_util::SetBitTo(out_is_valid_, out_position_,
+ bit_util::GetBit(values_is_valid_, in_position));
+ out_run_length_[out_position_] = run_length;
+ // Increments out_position_
+ WriteValue(in_position, run_length);
+ };
+
+ int64_t accumulated_run_length = 0;
+ if (values_is_valid_ == NULLPTR) {
+ if (filter_is_valid_ == NULLPTR) {
+ rle_util::VisitMergedRuns(
+ values_, filter_,
+ [&](int64_t run_length, int64_t value_index, int64_t filter_index)
{
+ if (bit_util::GetBit(filter_data_, filter_index)) {
+ accumulated_run_length += run_length;
+ WriteValue(value_index, accumulated_run_length);
+ }
+ });
+ } else if (null_selection_ == FilterOptions::DROP) {
+ rle_util::VisitMergedRuns(
+ values_, filter_,
+ [&](int64_t run_length, int64_t value_index, int64_t filter_index)
{
+ if (bit_util::GetBit(filter_is_valid_, filter_index) &&
+ bit_util::GetBit(filter_data_, filter_index)) {
+ accumulated_run_length += run_length;
+ WriteValue(value_index, accumulated_run_length);
+ }
+ });
+ } else { // null_selection == FilterOptions::EMIT_NULL
+ rle_util::VisitMergedRuns(
+ values_, filter_,
+ [&](int64_t run_length, int64_t value_index, int64_t filter_index)
{
+ const bool is_valid = bit_util::GetBit(filter_is_valid_,
filter_index);
+ if (is_valid && bit_util::GetBit(filter_data_, filter_index)) {
+ accumulated_run_length += run_length;
+ WriteNotNull(value_index, accumulated_run_length);
+ }
+ if (!is_valid) {
+ accumulated_run_length += run_length;
+ bit_util::ClearBit(out_is_valid_, out_position_);
+ WriteNull(accumulated_run_length);
+ }
+ });
+ }
+ } else { // values_is_valid_ exists. Input may have nulls
+ if (filter_is_valid_ == NULLPTR) {
+ rle_util::VisitMergedRuns(
+ values_, filter_,
+ [&](int64_t run_length, int64_t value_index, int64_t filter_index)
{
+ if (bit_util::GetBit(filter_data_, filter_index)) {
+ accumulated_run_length += run_length;
+ WriteMaybeNull(value_index, accumulated_run_length);
+ }
+ });
+ } else if (null_selection_ == FilterOptions::DROP) {
+ rle_util::VisitMergedRuns(
+ values_, filter_,
+ [&](int64_t run_length, int64_t value_index, int64_t filter_index)
{
+ if (bit_util::GetBit(filter_is_valid_, filter_index) &&
+ bit_util::GetBit(filter_data_, filter_index)) {
+ accumulated_run_length += run_length;
+ WriteMaybeNull(value_index, accumulated_run_length);
+ }
+ });
+ } else { // null_selection == FilterOptions::EMIT_NULL
+ rle_util::VisitMergedRuns(
+ values_, filter_,
+ [&](int64_t run_length, int64_t value_index, int64_t filter_index)
{
+ const bool is_valid = bit_util::GetBit(filter_is_valid_,
filter_index);
+ if (is_valid && bit_util::GetBit(filter_data_, filter_index)) {
+ accumulated_run_length += run_length;
+ WriteMaybeNull(value_index, accumulated_run_length);
+ }
+ if (!is_valid) {
+ accumulated_run_length += run_length;
+ bit_util::ClearBit(out_is_valid_, out_position_);
+ WriteNull(accumulated_run_length);
+ }
+ });
+ }
+ }
+ out_logical_length_ = accumulated_run_length;
+ }
+
+ // Write the next out_position given the selected in_position for the input
+ // data and advance out_position
+ void WriteValue(int64_t in_position, int64_t run_length) {
+ out_run_length_[out_position_] = run_length;
+ out_data_[out_position_++] = values_data_[in_position];
+ }
+
+ void WriteNull(int64_t run_length) {
+ // Zero the memory
+ out_run_length_[out_position_] = run_length;
+ out_data_[out_position_++] = T{};
+ }
+
+ private:
+ const ArraySpan& values_;
+ const uint8_t* values_is_valid_;
+ const T* values_data_;
+ const ArraySpan& filter_;
+ const uint8_t* filter_is_valid_;
+ const uint8_t* filter_data_;
+ FilterOptions::NullSelectionBehavior null_selection_;
+ uint8_t* out_is_valid_;
+ int64_t* out_run_length_;
+ T* out_data_;
+ int64_t& out_logical_length_;
Review Comment:
I personally don't have a strong opinion about this but I think according to
the style guide this should be a pointer (since it's an output).
##########
cpp/src/arrow/compute/kernels/vector_selection.cc:
##########
@@ -841,6 +907,222 @@ Status PrimitiveFilter(KernelContext* ctx, const
ExecSpan& batch, ExecResult* ou
return Status::OK();
}
+/// \brief The Filter implementation for primitive (fixed-width) types does not
+/// use the logical Arrow type but rather the physical C type. This way we only
+/// generate one take function for each byte width. We use the same
+/// implementation here for boolean and fixed-byte-size inputs with some
+/// template specialization.
+template <typename ArrowType>
+class RLEPrimitiveFilterImpl {
+ public:
+ using T = typename std::conditional<std::is_same<ArrowType,
BooleanType>::value,
+ uint8_t, typename
ArrowType::c_type>::type;
+
+ RLEPrimitiveFilterImpl(const ArraySpan& values, const ArraySpan& filter,
+ FilterOptions::NullSelectionBehavior null_selection,
+ ArrayData* out_arr)
+ : values_{values},
+ values_is_valid_(values.child_data[0].buffers[0].data),
+ values_data_(reinterpret_cast<const
T*>(values.child_data[0].buffers[1].data)),
+ filter_{filter},
+ filter_is_valid_(filter.child_data[0].buffers[0].data),
+ filter_data_(filter.child_data[0].buffers[1].data),
+ null_selection_(null_selection),
+ out_logical_length_(out_arr->length) {
+ if (out_arr->child_data[0]->buffers[0] != nullptr) {
+ // May not be allocated if neither filter nor values contains nulls
+ out_is_valid_ = out_arr->child_data[0]->buffers[0]->mutable_data();
+ }
+ assert(out_arr->offset == 0);
+ out_position_ = 0;
+ out_run_length_ = out_arr->GetMutableValues<int64_t>(0, 0);
+ out_data_ =
reinterpret_cast<T*>(out_arr->child_data[0]->buffers[1]->mutable_data());
+ }
+
+ void Exec() {
+ auto WriteNotNull = [&](int64_t in_position, int64_t run_length) {
+ bit_util::SetBit(out_is_valid_, out_position_);
+ out_run_length_[out_position_] = run_length;
+ // Increments out_position_
+ WriteValue(in_position, run_length);
+ };
+
+ auto WriteMaybeNull = [&](int64_t in_position, int64_t run_length) {
+ bit_util::SetBitTo(out_is_valid_, out_position_,
+ bit_util::GetBit(values_is_valid_, in_position));
+ out_run_length_[out_position_] = run_length;
+ // Increments out_position_
+ WriteValue(in_position, run_length);
+ };
+
+ int64_t accumulated_run_length = 0;
+ if (values_is_valid_ == NULLPTR) {
Review Comment:
Could you break up this `if` into two functions? I'm having trouble
following the control flow with the nested ifs. Maybe something like
`Exec_ValuesNotNull`, `Exec_ValuesMaybeNull`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]