icexelloss commented on code in PR #34311: URL: https://github.com/apache/arrow/pull/34311#discussion_r1127039705
########## cpp/src/arrow/compute/row/grouper.cc: ########## @@ -39,12 +43,330 @@ namespace arrow { using internal::checked_cast; +using internal::PrimitiveScalarBase; namespace compute { namespace { -struct GrouperImpl : Grouper { +constexpr uint32_t kNoGroupId = std::numeric_limits<uint32_t>::max(); + +using group_id_t = std::remove_const<decltype(kNoGroupId)>::type; +using GroupIdType = CTypeTraits<group_id_t>::ArrowType; +auto g_group_id_type = std::make_shared<GroupIdType>(); + +inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset = 0) { + DCHECK_GT(data.type->byte_width(), 0); + int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width(); + return data.GetValues<uint8_t>(1, absolute_byte_offset); +} + +template <typename Value> +Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length, + int64_t offset, const std::vector<TypeHolder>& key_types) { + if (offset < 0 || offset > length) { + return Status::Invalid("invalid grouping segmenter offset: ", offset); + } + if (values.size() != key_types.size()) { + return Status::Invalid("expected batch size ", key_types.size(), " but got ", + values.size()); + } + for (size_t i = 0; i < key_types.size(); i++) { + const auto& value = values[i]; + const auto& key_type = key_types[i]; + if (*value.type() != *key_type.type) { + return Status::Invalid("expected batch value ", i, " of type ", *key_type.type, + " but got ", *value.type()); + } + } + return Status::OK(); +} + +template <typename Batch> +enable_if_t<std::is_same<Batch, ExecSpan>::value || std::is_same<Batch, ExecBatch>::value, + Status> +CheckForGetNextSegment(const Batch& batch, int64_t offset, + const std::vector<TypeHolder>& key_types) { + return CheckForGetNextSegment(batch.values, batch.length, offset, key_types); +} + +struct BaseRowSegmenter : public RowSegmenter { + explicit BaseRowSegmenter(const std::vector<TypeHolder>& key_types) + : key_types_(key_types) {} + + const std::vector<TypeHolder>& key_types() const override { return key_types_; } + + std::vector<TypeHolder> key_types_; +}; + +Segment MakeSegment(int64_t batch_length, int64_t offset, int64_t length, bool extends) { + return Segment{offset, length, offset + length >= batch_length, extends}; +} + +int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width, + const uint8_t* array_bytes, int64_t offset, int64_t length) { + int64_t cursor, byte_cursor; + for (cursor = offset, byte_cursor = match_width * cursor; cursor < length; + cursor++, byte_cursor += match_width) { + if (memcmp(match_bytes, array_bytes + byte_cursor, + static_cast<size_t>(match_width)) != 0) { + break; + } + } + return std::min(cursor, length) - offset; +} + +using ExtendFunc = std::function<bool(const void*)>; +constexpr bool kDefaultExtends = true; +constexpr bool kEmptyExtends = true; + +struct NoKeysSegmenter : public BaseRowSegmenter { + static std::unique_ptr<RowSegmenter> Make() { + return std::make_unique<NoKeysSegmenter>(); + } + + NoKeysSegmenter() : BaseRowSegmenter({}) {} + + Status Reset() override { return Status::OK(); } + + Result<Segment> GetNextSegment(const ExecSpan& batch, int64_t offset) override { + ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {})); + return MakeSegment(batch.length, offset, batch.length - offset, kDefaultExtends); + } +}; + +struct SimpleKeySegmenter : public BaseRowSegmenter { + static Result<std::unique_ptr<RowSegmenter>> Make(TypeHolder key_type) { + return std::make_unique<SimpleKeySegmenter>(key_type); + } + + explicit SimpleKeySegmenter(TypeHolder key_type) + : BaseRowSegmenter({key_type}), key_type_(key_types_[0]), save_key_data_() {} + + Status CheckType(const DataType& type) { + if (!is_fixed_width(type)) { + return Status::Invalid("SimpleKeySegmenter does not support type ", type); + } + return Status::OK(); + } + + Status Reset() override { + save_key_data_.resize(0); + return Status::OK(); + } + + // Checks whether the given grouping data extends the current segment, i.e., is equal to + // previously seen grouping data, which is updated with each invocation. + bool Extend(const void* data) { + size_t byte_width = static_cast<size_t>(key_type_.type->byte_width()); + bool extends = save_key_data_.size() != byte_width + ? kDefaultExtends + : 0 == memcmp(save_key_data_.data(), data, byte_width); + save_key_data_.resize(byte_width); + memcpy(save_key_data_.data(), data, byte_width); + return extends; + } + + Result<Segment> GetNextSegment(const Scalar& scalar, int64_t offset, int64_t length) { + ARROW_RETURN_NOT_OK(CheckType(*scalar.type)); + if (!scalar.is_valid) { + return Status::Invalid("segmenting an invalid scalar"); + } + auto data = checked_cast<const PrimitiveScalarBase&>(scalar).data(); + bool extends = length > 0 ? Extend(data) : kEmptyExtends; + return MakeSegment(length, offset, length, extends); + } + + Result<Segment> GetNextSegment(const DataType& array_type, const uint8_t* array_bytes, Review Comment: Can we add DCHECK here that offset / length doesn't go out of bound? We are doing pointer math here so we should be "very" careful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org