lidavidm commented on a change in pull request #11452:
URL: https://github.com/apache/arrow/pull/11452#discussion_r745780242
##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1658,6 +1685,181 @@ struct GroupedMinMaxImpl final : public
GroupedAggregator {
ScalarAggregateOptions options_;
};
+// For binary-like types
+// In principle, FixedSizeBinary could use base implementation
+template <typename Type>
+struct GroupedMinMaxImpl<Type,
+ enable_if_t<is_base_binary_type<Type>::value ||
+ std::is_same<Type,
FixedSizeBinaryType>::value>>
+ final : public GroupedAggregator {
+ Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+ const FunctionOptions* options) override {
+ ctx_ = ctx;
+ options_ = *checked_cast<const ScalarAggregateOptions*>(options);
+ // type_ initialized by MinMaxInit
+ has_values_ = TypedBufferBuilder<bool>(ctx->memory_pool());
+ has_nulls_ = TypedBufferBuilder<bool>(ctx->memory_pool());
+ return Status::OK();
+ }
+
+ Status Resize(int64_t new_num_groups) override {
+ auto added_groups = new_num_groups - num_groups_;
+ DCHECK_GE(added_groups, 0);
+ num_groups_ = new_num_groups;
+ mins_.resize(new_num_groups);
+ maxes_.resize(new_num_groups);
+ RETURN_NOT_OK(has_values_.Append(added_groups, false));
+ RETURN_NOT_OK(has_nulls_.Append(added_groups, false));
+ return Status::OK();
+ }
+
+ Status Consume(const ExecBatch& batch) override {
+ return VisitGroupedValues<Type>(
+ batch,
+ [&](uint32_t g, util::string_view val) {
+ if (!mins_[g] || val < util::string_view(*mins_[g])) {
+ if (!mins_[g]) {
+ ARROW_ASSIGN_OR_RAISE(
+ mins_[g], AllocateResizableBuffer(val.size(),
ctx_->memory_pool()));
+ }
+ RETURN_NOT_OK(mins_[g]->Resize(val.size(),
/*shrink_to_fit=*/false));
+ std::memcpy(mins_[g]->mutable_data(), val.data(), val.size());
+ }
+ if (!maxes_[g] || val > util::string_view(*maxes_[g])) {
+ if (!maxes_[g]) {
+ ARROW_ASSIGN_OR_RAISE(
+ maxes_[g], AllocateResizableBuffer(val.size(),
ctx_->memory_pool()));
+ }
+ RETURN_NOT_OK(maxes_[g]->Resize(val.size(),
/*shrink_to_fit=*/false));
+ std::memcpy(maxes_[g]->mutable_data(), val.data(), val.size());
+ }
+ BitUtil::SetBit(has_values_.mutable_data(), g);
+ return Status::OK();
+ },
+ [&](uint32_t g) {
+ BitUtil::SetBit(has_nulls_.mutable_data(), g);
+ return Status::OK();
+ });
+ }
+
+ Status Merge(GroupedAggregator&& raw_other,
+ const ArrayData& group_id_mapping) override {
+ auto other = checked_cast<GroupedMinMaxImpl*>(&raw_other);
+ auto g = group_id_mapping.GetValues<uint32_t>(1);
+ for (uint32_t other_g = 0; static_cast<int64_t>(other_g) <
group_id_mapping.length;
+ ++other_g, ++g) {
+ if (!mins_[*g] ||
+ (mins_[*g] && other->mins_[other_g] &&
+ util::string_view(*mins_[*g]) >
util::string_view(*other->mins_[other_g]))) {
+ mins_[*g] = std::move(other->mins_[other_g]);
+ }
+ if (!maxes_[*g] ||
+ (maxes_[*g] && other->maxes_[other_g] &&
+ util::string_view(*maxes_[*g]) <
util::string_view(*other->maxes_[other_g]))) {
+ maxes_[*g] = std::move(other->maxes_[other_g]);
+ }
+
+ if (BitUtil::GetBit(other->has_values_.data(), other_g)) {
+ BitUtil::SetBit(has_values_.mutable_data(), *g);
+ }
+ if (BitUtil::GetBit(other->has_nulls_.data(), other_g)) {
+ BitUtil::SetBit(has_nulls_.mutable_data(), *g);
+ }
+ }
+ return Status::OK();
+ }
+
+ Result<Datum> Finalize() override {
+ // aggregation for group is valid if there was at least one value in that
group
+ ARROW_ASSIGN_OR_RAISE(auto null_bitmap, has_values_.Finish());
+
+ if (!options_.skip_nulls) {
+ // ... and there were no nulls in that group
+ ARROW_ASSIGN_OR_RAISE(auto has_nulls, has_nulls_.Finish());
+ arrow::internal::BitmapAndNot(null_bitmap->data(), 0, has_nulls->data(),
0,
+ num_groups_, 0,
null_bitmap->mutable_data());
+ }
+
+ auto mins = ArrayData::Make(type_, num_groups_, {null_bitmap, nullptr});
+ auto maxes = ArrayData::Make(type_, num_groups_, {std::move(null_bitmap),
nullptr});
+ RETURN_NOT_OK(MakeOffsetsValues(mins.get(), mins_));
+ RETURN_NOT_OK(MakeOffsetsValues(maxes.get(), maxes_));
+ return ArrayData::Make(out_type(), num_groups_, {nullptr},
+ {std::move(mins), std::move(maxes)});
+ }
+
+ template <typename T = Type>
+ enable_if_base_binary<T, Status> MakeOffsetsValues(
+ ArrayData* array, const std::vector<std::shared_ptr<ResizableBuffer>>&
values) {
+ using offset_type = typename T::offset_type;
+ ARROW_ASSIGN_OR_RAISE(
+ auto raw_offsets,
+ AllocateBuffer((1 + values.size()) * sizeof(offset_type),
ctx_->memory_pool()));
+ offset_type* offsets =
reinterpret_cast<offset_type*>(raw_offsets->mutable_data());
+ offsets[0] = 0;
+ offsets++;
+ const uint8_t* null_bitmap = array->buffers[0]->data();
+ offset_type total_length = 0;
+ for (size_t i = 0; i < values.size(); i++) {
+ if (BitUtil::GetBit(null_bitmap, i)) {
+ if (values[i]->size() > std::numeric_limits<offset_type>::max() ||
+ arrow::internal::AddWithOverflow(total_length,
+
static_cast<offset_type>(values[i]->size()),
+ &total_length)) {
+ return Status::Invalid("Result is too large to fit in ",
*array->type,
+ " cast to large_ variant of type");
+ }
+ }
+ offsets[i] = total_length;
+ }
+ ARROW_ASSIGN_OR_RAISE(auto data, AllocateBuffer(total_length,
ctx_->memory_pool()));
+ int64_t offset = 0;
+ for (size_t i = 0; i < values.size(); i++) {
+ if (BitUtil::GetBit(null_bitmap, i)) {
+ const auto& value = values[i];
+ std::memcpy(data->mutable_data() + offset, value->data(),
value->size());
+ offset += value->size();
+ }
+ }
+ array->buffers[1] = std::move(raw_offsets);
+ array->buffers.push_back(std::move(data));
+ return Status::OK();
+ }
+
+ template <typename T = Type>
+ enable_if_same<T, FixedSizeBinaryType, Status> MakeOffsetsValues(
+ ArrayData* array, const std::vector<std::shared_ptr<ResizableBuffer>>&
values) {
+ const uint8_t* null_bitmap = array->buffers[0]->data();
+ const int32_t slot_width =
+ checked_cast<const FixedSizeBinaryType&>(*array->type).byte_width();
+ int64_t total_length = values.size() * slot_width;
+ ARROW_ASSIGN_OR_RAISE(auto data, AllocateBuffer(total_length,
ctx_->memory_pool()));
+ int64_t offset = 0;
+ for (size_t i = 0; i < values.size(); i++) {
+ if (BitUtil::GetBit(null_bitmap, i)) {
+ const auto& value = values[i];
+ std::memcpy(data->mutable_data() + offset, value->data(), slot_width);
+ } else {
+ std::memset(data->mutable_data() + offset, 0x00, slot_width);
+ }
+ offset += slot_width;
+ }
+ array->buffers[1] = std::move(data);
+ return Status::OK();
+ }
+
+ std::shared_ptr<DataType> out_type() const override {
+ return struct_({field("min", type_), field("max", type_)});
+ }
+
+ ExecContext* ctx_;
+ int64_t num_groups_;
+ std::vector<std::shared_ptr<ResizableBuffer>> mins_, maxes_;
Review comment:
Updated the docs, and benchmarked `optional<string>` - it's about the
same speed and the code is cleaner so I went with this. A quick profile shows
most of the time spent in string comparisons.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]