rtpsw commented on code in PR #14352:
URL: https://github.com/apache/arrow/pull/14352#discussion_r1027274736


##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -44,7 +45,407 @@ namespace compute {
 
 namespace {
 
-struct GrouperImpl : Grouper {
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset = 
0) {
+  int64_t absolute_byte_offset = (data.offset + offset) * 
data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset = 
0) {
+  int64_t absolute_byte_offset = (data.offset + offset) * 
data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+                              int64_t offset, const std::vector<TypeHolder>& 
key_types) {
+  if (offset < 0 || offset > length) {
+    return Status::Invalid("invalid grouping segmenter offset: ", offset);
+  }
+  if (values.size() != key_types.size()) {
+    return Status::Invalid("expected batch size ", key_types.size(), " but got 
",
+                           values.size());
+  }
+  for (size_t i = 0; i < key_types.size(); i++) {
+    const auto& value = values[i];
+    const auto& key_type = key_types[i];
+    if (*value.type() != *key_type.type) {
+      return Status::Invalid("expected batch value ", i, " of type ", 
*key_type.type,
+                             " but got ", *value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status CheckForGetNextSegment(const ExecSpan& batch, int64_t offset,
+                              const std::vector<TypeHolder>& key_types) {
+  return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+Status CheckForGetNextSegment(const ExecBatch& batch, int64_t offset,
+                              const std::vector<TypeHolder>& key_types) {
+  return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct StatelessGroupingSegmenter : public GroupingSegmenter {
+  Status Reset() override { return Status::OK(); }
+};
+
+GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t 
length) {
+  return GroupingSegment{offset, length, offset + length >= batch_length};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+                       const uint8_t* array_bytes, int64_t offset, int64_t 
length) {
+  int64_t cursor, byte_cursor;
+  for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+       cursor++, byte_cursor += match_width) {
+    if (memcmp(match_bytes, array_bytes + byte_cursor,
+               static_cast<size_t>(match_width)) != 0) {
+      break;
+    }
+  }
+  return std::min(cursor, length - offset);
+}
+
+Result<GroupingSegment> GetNextSegmentChunked(
+    const std::shared_ptr<ChunkedArray>& chunked_array, int64_t offset) {
+  if (offset >= chunked_array->length()) {
+    return MakeSegment(chunked_array->length(), chunked_array->length(), 0);
+  }
+  int64_t remaining_offset = offset;
+  const auto& arrays = chunked_array->chunks();
+  for (size_t i = 0; remaining_offset >= 0 && i < arrays.size(); i++) {
+    // look up chunk containing offset
+    int64_t array_length = arrays[i]->length();
+    if (remaining_offset < array_length) {
+      // found - switch to matching
+      int64_t match_width = arrays[i]->type()->byte_width();
+      const uint8_t* match_bytes = GetValuesAsBytes(*arrays[i]->data(), 
remaining_offset);
+      int64_t total_match_length = 0;
+      for (; i < arrays.size(); i++) {
+        int64_t array_length = arrays[i]->length();
+        if (array_length <= 0) continue;
+        const uint8_t* array_bytes = GetValuesAsBytes(*arrays[i]->data());
+        int64_t match_length = GetMatchLength(match_bytes, match_width, 
array_bytes,
+                                              remaining_offset, array_length);
+        total_match_length += match_length;
+        remaining_offset = 0;
+        if (match_length < array_length - remaining_offset) break;
+      }
+      return MakeSegment(chunked_array->length(), offset, total_match_length);
+    }
+    remaining_offset -= array_length;
+  }
+  return Status::Invalid("segmenting invalid chunked array value");
+}
+
+struct NoKeysGroupingSegmenter : public StatelessGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make() {
+    return std::make_unique<NoKeysGroupingSegmenter>();
+  }
+
+  template <typename Batch>
+  Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t 
offset) {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {}));
+    return MakeSegment(batch.length, offset, batch.length - offset);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t 
offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+};
+
+struct SimpleKeyGroupingSegmenter : public StatelessGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(
+      std::vector<TypeHolder> key_types) {
+    return std::make_unique<SimpleKeyGroupingSegmenter>(key_types);
+  }
+
+  explicit SimpleKeyGroupingSegmenter(std::vector<TypeHolder> key_types)
+      : key_types_(std::move(key_types)) {}
+
+  Status CheckType(const DataType& type) {
+    if (!is_fixed_width(type)) {
+      return Status::Invalid("SimpleKeyGroupingSegmenter does not support type 
", type);
+    }
+    return Status::OK();
+  }
+
+  Result<GroupingSegment> GetNextSegment(const Scalar& scalar, int64_t offset,
+                                         int64_t length) {
+    ARROW_RETURN_NOT_OK(CheckType(*scalar.type));
+    if (!scalar.is_valid) {
+      return Status::Invalid("segmenting an invalid scalar");
+    }

Review Comment:
   In principle, it is OK to segment with a null value in one of the fields 
used for segmenting. However, if we want this done in this PR rather than in 
the future (WDYT?), then it won't be enough to fix just here - all 
`GetNextSegment` overloads would need to be fixed to support null values.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to