[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-23 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r599667024



##
File path: cpp/src/arrow/compute/api_aggregate.h
##
@@ -306,5 +306,100 @@ Result TDigest(const Datum& value,
   const TDigestOptions& options = 
TDigestOptions::Defaults(),
   ExecContext* ctx = NULLPTR);
 
+namespace internal {
+
+/// Internal use only: streaming group identifier.

Review comment:
   It allowed @nealrichardson to explore the hash aggregate kernels and 
expose a number of issues. We'll probably remove `GroupBy` altogether in 
ARROW-12010




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-23 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r599555010



##
File path: cpp/src/arrow/compute/api_aggregate.h
##
@@ -306,5 +306,100 @@ Result TDigest(const Datum& value,
   const TDigestOptions& options = 
TDigestOptions::Defaults(),
   ExecContext* ctx = NULLPTR);
 
+namespace internal {
+
+/// Internal use only: streaming group identifier.
+/// Consumes batches of keys and yields batches of the group ids.
+class ARROW_EXPORT Grouper {
+ public:
+  virtual ~Grouper() = default;
+
+  /// Construct a Grouper which receives the specified key types
+  static Result> Make(const std::vector& 
descrs,
+   ExecContext* ctx = 
default_exec_context());
+
+  /// Consume a batch of keys, producing the corresponding group ids as an 
integer array.
+  virtual Result Consume(const ExecBatch& batch) = 0;
+
+  /// Get current unique keys. May be called multiple times.
+  virtual Result GetUniques() = 0;
+
+  /// Get the current number of groups.

Review comment:
   I'll include a definition of keys, unique keys, and groups in the 
doccomment for Grouper




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-22 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r599025873



##
File path: cpp/src/arrow/compute/kernels/aggregate_benchmark.cc
##
@@ -300,6 +300,57 @@ BENCHMARK_TEMPLATE(ReferenceSum, 
SumBitmapVectorizeUnroll)
 ->Apply(BenchmarkSetArgs);
 #endif  // ARROW_WITH_BENCHMARKS_REFERENCE
 
+//
+// GroupBy
+//
+
+static void BenchmarkGroupBy(benchmark::State& state,
+ std::vector aggregates,
+ std::vector arguments, std::vector 
keys) {
+  for (auto _ : state) {
+ABORT_NOT_OK(GroupBy(arguments, keys, aggregates).status());
+  }
+}
+
+#define GROUP_BY_BENCHMARK(Name, Impl)   \
+  static void Name(benchmark::State& state) {\
+RegressionArgs args(state, false);   \
+auto rng = random::RandomArrayGenerator(1923);   \
+(Impl)();\
+  }  \
+  BENCHMARK(Name)->Apply([](benchmark::internal::Benchmark* bench) { \
+BenchmarkSetArgsWithSizes(bench, {1 * 1024 * 1024}); \
+  })
+
+GROUP_BY_BENCHMARK(SumDoublesGroupedBySmallStringSet, [&] {

Review comment:
   Ah, needed to rename "sum" to "hash_sum" in there




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-22 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r599015955



##
File path: python/pyarrow/_compute.pyx
##
@@ -351,6 +388,25 @@ cdef class ScalarAggregateFunction(Function):
 return [wrap_scalar_aggregate_kernel(k) for k in kernels]
 
 
+cdef class HashAggregateFunction(Function):
+cdef:
+const CHashAggregateFunction* func
+
+cdef void init(self, const shared_ptr[CFunction]& sp_func) except *:
+Function.init(self, sp_func)
+self.func =  sp_func.get()
+
+@property
+def kernels(self):

Review comment:
   I'll add one in `test_compute.py`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-22 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r599014820



##
File path: cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
##
@@ -0,0 +1,728 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include 
+
+#include "arrow/array.h"
+#include "arrow/chunked_array.h"
+#include "arrow/compute/api_aggregate.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/kernels/aggregate_internal.h"
+#include "arrow/compute/kernels/codegen_internal.h"
+#include "arrow/compute/kernels/test_util.h"
+#include "arrow/compute/registry.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bitmap_reader.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/int_util_internal.h"
+
+#include "arrow/testing/gtest_common.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/random.h"
+#include "arrow/util/key_value_metadata.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+using internal::BitmapReader;
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+namespace compute {
+namespace {
+
+struct ScalarVectorToArray {
+  template ::BuilderType,
+typename ScalarType = typename TypeTraits::ScalarType>
+  Status UseBuilder(const AppendScalar& append) {
+BuilderType builder(type_, default_memory_pool());
+for (const auto& s : scalars_) {
+  if (s->is_valid) {
+RETURN_NOT_OK(append(checked_cast(*s), ));
+  } else {
+RETURN_NOT_OK(builder.AppendNull());
+  }
+}
+return builder.FinishInternal(_);
+  }
+
+  struct AppendValue {
+template 
+Status operator()(const ScalarType& s, BuilderType* builder) const {
+  return builder->Append(s.value);
+}
+  };
+
+  struct AppendBuffer {
+template 
+Status operator()(const ScalarType& s, BuilderType* builder) const {
+  const Buffer& buffer = *s.value;
+  return builder->Append(util::string_view{buffer});
+}
+  };
+
+  template 
+  enable_if_primitive_ctype Visit(const T&) {
+return UseBuilder(AppendValue{});
+  }
+
+  template 
+  enable_if_has_string_view Visit(const T&) {
+return UseBuilder(AppendBuffer{});
+  }
+
+  Status Visit(const StructType& type) {
+data_ = ArrayData::Make(type_, static_cast(scalars_.size()),
+{/*null_bitmap=*/nullptr});
+ScalarVector field_scalars(scalars_.size());
+
+for (int field_index = 0; field_index < type.num_fields(); ++field_index) {
+  for (size_t i = 0; i < scalars_.size(); ++i) {
+field_scalars[i] =
+checked_cast(scalars_[i].get())->value[field_index];
+  }
+
+  ARROW_ASSIGN_OR_RAISE(Datum field, 
ScalarVectorToArray{}.Convert(field_scalars));
+  data_->child_data.push_back(field.array());
+}
+return Status::OK();
+  }
+
+  Status Visit(const DataType& type) {
+return Status::NotImplemented("ScalarVectorToArray for type ", type);
+  }
+
+  Result Convert(const ScalarVector& scalars) && {
+if (scalars.size() == 0) {
+  return Status::NotImplemented("ScalarVectorToArray with no scalars");
+}
+scalars_ = std::move(scalars);
+type_ = scalars_[0]->type;
+RETURN_NOT_OK(VisitTypeInline(*type_, this));
+return Datum(std::move(data_));
+  }
+
+  std::shared_ptr type_;
+  ScalarVector scalars_;
+  std::shared_ptr data_;
+};
+
+Result NaiveGroupBy(std::vector arguments, std::vector 
keys,
+   const std::vector& aggregates) 
{
+  ARROW_ASSIGN_OR_RAISE(auto key_batch, ExecBatch::Make(std::move(keys)));
+
+  ARROW_ASSIGN_OR_RAISE(auto grouper,
+internal::Grouper::Make(key_batch.GetDescriptors()));
+
+  ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper->Consume(key_batch));
+
+  ARROW_ASSIGN_OR_RAISE(
+  auto groupings, 
internal::Grouper::MakeGroupings(*id_batch.array_as(),
+   grouper->num_groups()));
+
+  ArrayVector out_columns;
+
+  for (size_t i = 0; i < arguments.size(); ++i) {
+// 

[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-22 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r599014909



##
File path: cpp/src/arrow/compute/kernels/aggregate_benchmark.cc
##
@@ -300,6 +300,57 @@ BENCHMARK_TEMPLATE(ReferenceSum, 
SumBitmapVectorizeUnroll)
 ->Apply(BenchmarkSetArgs);
 #endif  // ARROW_WITH_BENCHMARKS_REFERENCE
 
+//
+// GroupBy
+//
+
+static void BenchmarkGroupBy(benchmark::State& state,
+ std::vector aggregates,
+ std::vector arguments, std::vector 
keys) {
+  for (auto _ : state) {
+ABORT_NOT_OK(GroupBy(arguments, keys, aggregates).status());
+  }
+}
+
+#define GROUP_BY_BENCHMARK(Name, Impl)   \
+  static void Name(benchmark::State& state) {\
+RegressionArgs args(state, false);   \
+auto rng = random::RandomArrayGenerator(1923);   \
+(Impl)();\
+  }  \
+  BENCHMARK(Name)->Apply([](benchmark::internal::Benchmark* bench) { \
+BenchmarkSetArgsWithSizes(bench, {1 * 1024 * 1024}); \
+  })
+
+GROUP_BY_BENCHMARK(SumDoublesGroupedBySmallStringSet, [&] {

Review comment:
   Nope.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-22 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r599011124



##
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##
@@ -0,0 +1,1050 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/api_aggregate.h"
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "arrow/buffer_builder.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/exec_internal.h"
+#include "arrow/compute/kernel.h"
+#include "arrow/compute/kernels/aggregate_internal.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/util/bit_run_reader.h"
+#include "arrow/util/bitmap_ops.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+
+using internal::checked_cast;
+
+namespace compute {
+namespace internal {
+namespace {
+
+struct KeyEncoder {
+  // the first byte of an encoded key is used to indicate nullity
+  static constexpr bool kExtraByteForNull = true;
+
+  virtual ~KeyEncoder() = default;
+
+  virtual void AddLength(const ArrayData&, int32_t* lengths) = 0;
+
+  virtual Status Encode(const ArrayData&, uint8_t** encoded_bytes) = 0;
+
+  virtual Result> Decode(uint8_t** encoded_bytes,
+int32_t length, 
MemoryPool*) = 0;
+
+  // extract the null bitmap from the leading nullity bytes of encoded keys
+  static Status DecodeNulls(MemoryPool* pool, int32_t length, uint8_t** 
encoded_bytes,
+std::shared_ptr* null_buf, int32_t* 
null_count) {
+// first count nulls to determine if a null bitmap is necessary
+*null_count = 0;
+for (int32_t i = 0; i < length; ++i) {
+  *null_count += encoded_bytes[i][0];
+}
+
+if (*null_count > 0) {
+  ARROW_ASSIGN_OR_RAISE(*null_buf, AllocateBitmap(length, pool));
+
+  uint8_t* nulls = (*null_buf)->mutable_data();
+  for (int32_t i = 0; i < length; ++i) {
+BitUtil::SetBitTo(nulls, i, !encoded_bytes[i][0]);
+encoded_bytes[i] += 1;
+  }
+} else {
+  for (int32_t i = 0; i < length; ++i) {
+encoded_bytes[i] += 1;
+  }
+}
+return Status ::OK();
+  }
+};
+
+struct BooleanKeyEncoder : KeyEncoder {
+  static constexpr int kByteWidth = 1;
+
+  void AddLength(const ArrayData& data, int32_t* lengths) override {
+for (int64_t i = 0; i < data.length; ++i) {
+  lengths[i] += kByteWidth + kExtraByteForNull;
+}
+  }
+
+  Status Encode(const ArrayData& data, uint8_t** encoded_bytes) override {
+VisitArrayDataInline(
+data,
+[&](bool value) {
+  auto& encoded_ptr = *encoded_bytes++;
+  *encoded_ptr++ = 0;
+  *encoded_ptr++ = value;
+},
+[&] {
+  auto& encoded_ptr = *encoded_bytes++;
+  *encoded_ptr++ = 1;
+  *encoded_ptr++ = 0;
+});
+return Status::OK();
+  }
+
+  Result> Decode(uint8_t** encoded_bytes, int32_t 
length,
+MemoryPool* pool) override {
+std::shared_ptr null_buf;
+int32_t null_count;
+RETURN_NOT_OK(DecodeNulls(pool, length, encoded_bytes, _buf, 
_count));
+
+ARROW_ASSIGN_OR_RAISE(auto key_buf, AllocateBitmap(length, pool));
+
+uint8_t* raw_output = key_buf->mutable_data();
+for (int32_t i = 0; i < length; ++i) {
+  auto& encoded_ptr = encoded_bytes[i];
+  BitUtil::SetBitTo(raw_output, i, encoded_ptr[0] != 0);
+  encoded_ptr += 1;
+}
+
+return ArrayData::Make(boolean(), length, {std::move(null_buf), 
std::move(key_buf)},
+   null_count);
+  }
+};
+
+struct FixedWidthKeyEncoder : KeyEncoder {
+  explicit FixedWidthKeyEncoder(std::shared_ptr type)
+  : type_(std::move(type)),
+byte_width_(checked_cast(*type_).bit_width() / 
8) {}
+
+  void AddLength(const ArrayData& data, int32_t* lengths) override {
+for (int64_t i = 0; i < data.length; ++i) {
+  lengths[i] += byte_width_ + kExtraByteForNull;
+}
+  }
+
+  Status Encode(const ArrayData& data, uint8_t** encoded_bytes) override {
+ArrayData viewed(fixed_size_binary(byte_width_), 

[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-22 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r599010908



##
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##
@@ -0,0 +1,1050 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/api_aggregate.h"
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "arrow/buffer_builder.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/exec_internal.h"
+#include "arrow/compute/kernel.h"
+#include "arrow/compute/kernels/aggregate_internal.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/util/bit_run_reader.h"
+#include "arrow/util/bitmap_ops.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+
+using internal::checked_cast;
+
+namespace compute {
+namespace internal {
+namespace {
+
+struct KeyEncoder {
+  // the first byte of an encoded key is used to indicate nullity
+  static constexpr bool kExtraByteForNull = true;
+
+  virtual ~KeyEncoder() = default;
+
+  virtual void AddLength(const ArrayData&, int32_t* lengths) = 0;
+
+  virtual Status Encode(const ArrayData&, uint8_t** encoded_bytes) = 0;
+
+  virtual Result> Decode(uint8_t** encoded_bytes,
+int32_t length, 
MemoryPool*) = 0;
+
+  // extract the null bitmap from the leading nullity bytes of encoded keys
+  static Status DecodeNulls(MemoryPool* pool, int32_t length, uint8_t** 
encoded_bytes,
+std::shared_ptr* null_buf, int32_t* 
null_count) {
+// first count nulls to determine if a null bitmap is necessary
+*null_count = 0;
+for (int32_t i = 0; i < length; ++i) {
+  *null_count += encoded_bytes[i][0];
+}
+
+if (*null_count > 0) {
+  ARROW_ASSIGN_OR_RAISE(*null_buf, AllocateBitmap(length, pool));
+
+  uint8_t* nulls = (*null_buf)->mutable_data();
+  for (int32_t i = 0; i < length; ++i) {
+BitUtil::SetBitTo(nulls, i, !encoded_bytes[i][0]);
+encoded_bytes[i] += 1;
+  }
+} else {
+  for (int32_t i = 0; i < length; ++i) {
+encoded_bytes[i] += 1;
+  }
+}
+return Status ::OK();
+  }
+};
+
+struct BooleanKeyEncoder : KeyEncoder {
+  static constexpr int kByteWidth = 1;
+
+  void AddLength(const ArrayData& data, int32_t* lengths) override {
+for (int64_t i = 0; i < data.length; ++i) {
+  lengths[i] += kByteWidth + kExtraByteForNull;
+}
+  }
+
+  Status Encode(const ArrayData& data, uint8_t** encoded_bytes) override {
+VisitArrayDataInline(
+data,
+[&](bool value) {
+  auto& encoded_ptr = *encoded_bytes++;
+  *encoded_ptr++ = 0;

Review comment:
   alright




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-22 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r599010694



##
File path: cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
##
@@ -0,0 +1,728 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include 
+
+#include "arrow/array.h"
+#include "arrow/chunked_array.h"
+#include "arrow/compute/api_aggregate.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/kernels/aggregate_internal.h"
+#include "arrow/compute/kernels/codegen_internal.h"
+#include "arrow/compute/kernels/test_util.h"
+#include "arrow/compute/registry.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bitmap_reader.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/int_util_internal.h"
+
+#include "arrow/testing/gtest_common.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/random.h"
+#include "arrow/util/key_value_metadata.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+using internal::BitmapReader;
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+namespace compute {
+namespace {
+
+struct ScalarVectorToArray {
+  template ::BuilderType,
+typename ScalarType = typename TypeTraits::ScalarType>
+  Status UseBuilder(const AppendScalar& append) {
+BuilderType builder(type_, default_memory_pool());
+for (const auto& s : scalars_) {
+  if (s->is_valid) {
+RETURN_NOT_OK(append(checked_cast(*s), ));
+  } else {
+RETURN_NOT_OK(builder.AppendNull());
+  }
+}
+return builder.FinishInternal(_);
+  }
+
+  struct AppendValue {
+template 
+Status operator()(const ScalarType& s, BuilderType* builder) const {
+  return builder->Append(s.value);
+}
+  };
+
+  struct AppendBuffer {
+template 
+Status operator()(const ScalarType& s, BuilderType* builder) const {
+  const Buffer& buffer = *s.value;
+  return builder->Append(util::string_view{buffer});
+}
+  };
+
+  template 
+  enable_if_primitive_ctype Visit(const T&) {
+return UseBuilder(AppendValue{});
+  }
+
+  template 
+  enable_if_has_string_view Visit(const T&) {
+return UseBuilder(AppendBuffer{});
+  }
+
+  Status Visit(const StructType& type) {
+data_ = ArrayData::Make(type_, static_cast(scalars_.size()),
+{/*null_bitmap=*/nullptr});
+ScalarVector field_scalars(scalars_.size());
+
+for (int field_index = 0; field_index < type.num_fields(); ++field_index) {
+  for (size_t i = 0; i < scalars_.size(); ++i) {
+field_scalars[i] =
+checked_cast(scalars_[i].get())->value[field_index];
+  }
+
+  ARROW_ASSIGN_OR_RAISE(Datum field, 
ScalarVectorToArray{}.Convert(field_scalars));
+  data_->child_data.push_back(field.array());
+}
+return Status::OK();
+  }
+
+  Status Visit(const DataType& type) {
+return Status::NotImplemented("ScalarVectorToArray for type ", type);
+  }
+
+  Result Convert(const ScalarVector& scalars) && {
+if (scalars.size() == 0) {
+  return Status::NotImplemented("ScalarVectorToArray with no scalars");
+}
+scalars_ = std::move(scalars);
+type_ = scalars_[0]->type;
+RETURN_NOT_OK(VisitTypeInline(*type_, this));
+return Datum(std::move(data_));
+  }
+
+  std::shared_ptr type_;
+  ScalarVector scalars_;
+  std::shared_ptr data_;
+};
+
+Result NaiveGroupBy(std::vector arguments, std::vector 
keys,
+   const std::vector& aggregates) 
{
+  ARROW_ASSIGN_OR_RAISE(auto key_batch, ExecBatch::Make(std::move(keys)));
+
+  ARROW_ASSIGN_OR_RAISE(auto grouper,
+internal::Grouper::Make(key_batch.GetDescriptors()));
+
+  ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper->Consume(key_batch));
+
+  ARROW_ASSIGN_OR_RAISE(
+  auto groupings, 
internal::Grouper::MakeGroupings(*id_batch.array_as(),
+   grouper->num_groups()));
+
+  ArrayVector out_columns;
+
+  for (size_t i = 0; i < arguments.size(); ++i) {
+// 

[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-22 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r599010607



##
File path: cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
##
@@ -0,0 +1,728 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include 
+
+#include "arrow/array.h"
+#include "arrow/chunked_array.h"
+#include "arrow/compute/api_aggregate.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/kernels/aggregate_internal.h"
+#include "arrow/compute/kernels/codegen_internal.h"
+#include "arrow/compute/kernels/test_util.h"
+#include "arrow/compute/registry.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bitmap_reader.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/int_util_internal.h"
+
+#include "arrow/testing/gtest_common.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/random.h"
+#include "arrow/util/key_value_metadata.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+using internal::BitmapReader;
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+namespace compute {
+namespace {
+
+struct ScalarVectorToArray {
+  template ::BuilderType,
+typename ScalarType = typename TypeTraits::ScalarType>
+  Status UseBuilder(const AppendScalar& append) {
+BuilderType builder(type_, default_memory_pool());
+for (const auto& s : scalars_) {
+  if (s->is_valid) {
+RETURN_NOT_OK(append(checked_cast(*s), ));
+  } else {
+RETURN_NOT_OK(builder.AppendNull());
+  }
+}
+return builder.FinishInternal(_);
+  }
+
+  struct AppendValue {
+template 
+Status operator()(const ScalarType& s, BuilderType* builder) const {
+  return builder->Append(s.value);
+}
+  };
+
+  struct AppendBuffer {
+template 
+Status operator()(const ScalarType& s, BuilderType* builder) const {
+  const Buffer& buffer = *s.value;
+  return builder->Append(util::string_view{buffer});
+}
+  };
+
+  template 
+  enable_if_primitive_ctype Visit(const T&) {
+return UseBuilder(AppendValue{});
+  }
+
+  template 
+  enable_if_has_string_view Visit(const T&) {
+return UseBuilder(AppendBuffer{});
+  }
+
+  Status Visit(const StructType& type) {
+data_ = ArrayData::Make(type_, static_cast(scalars_.size()),
+{/*null_bitmap=*/nullptr});
+ScalarVector field_scalars(scalars_.size());
+
+for (int field_index = 0; field_index < type.num_fields(); ++field_index) {
+  for (size_t i = 0; i < scalars_.size(); ++i) {
+field_scalars[i] =
+checked_cast(scalars_[i].get())->value[field_index];
+  }
+
+  ARROW_ASSIGN_OR_RAISE(Datum field, 
ScalarVectorToArray{}.Convert(field_scalars));
+  data_->child_data.push_back(field.array());
+}
+return Status::OK();
+  }
+
+  Status Visit(const DataType& type) {
+return Status::NotImplemented("ScalarVectorToArray for type ", type);
+  }
+
+  Result Convert(const ScalarVector& scalars) && {
+if (scalars.size() == 0) {
+  return Status::NotImplemented("ScalarVectorToArray with no scalars");
+}
+scalars_ = std::move(scalars);
+type_ = scalars_[0]->type;
+RETURN_NOT_OK(VisitTypeInline(*type_, this));
+return Datum(std::move(data_));
+  }
+
+  std::shared_ptr type_;
+  ScalarVector scalars_;
+  std::shared_ptr data_;
+};
+
+Result NaiveGroupBy(std::vector arguments, std::vector 
keys,
+   const std::vector& aggregates) 
{
+  ARROW_ASSIGN_OR_RAISE(auto key_batch, ExecBatch::Make(std::move(keys)));
+
+  ARROW_ASSIGN_OR_RAISE(auto grouper,
+internal::Grouper::Make(key_batch.GetDescriptors()));
+
+  ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper->Consume(key_batch));
+
+  ARROW_ASSIGN_OR_RAISE(
+  auto groupings, 
internal::Grouper::MakeGroupings(*id_batch.array_as(),
+   grouper->num_groups()));
+
+  ArrayVector out_columns;
+
+  for (size_t i = 0; i < arguments.size(); ++i) {
+// 

[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-22 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r599009814



##
File path: cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
##
@@ -0,0 +1,728 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include 
+
+#include "arrow/array.h"
+#include "arrow/chunked_array.h"
+#include "arrow/compute/api_aggregate.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/kernels/aggregate_internal.h"
+#include "arrow/compute/kernels/codegen_internal.h"
+#include "arrow/compute/kernels/test_util.h"
+#include "arrow/compute/registry.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bitmap_reader.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/int_util_internal.h"
+
+#include "arrow/testing/gtest_common.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/random.h"
+#include "arrow/util/key_value_metadata.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+using internal::BitmapReader;
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+namespace compute {
+namespace {
+
+struct ScalarVectorToArray {
+  template ::BuilderType,
+typename ScalarType = typename TypeTraits::ScalarType>
+  Status UseBuilder(const AppendScalar& append) {
+BuilderType builder(type_, default_memory_pool());
+for (const auto& s : scalars_) {
+  if (s->is_valid) {
+RETURN_NOT_OK(append(checked_cast(*s), ));
+  } else {
+RETURN_NOT_OK(builder.AppendNull());
+  }
+}
+return builder.FinishInternal(_);
+  }
+
+  struct AppendValue {
+template 
+Status operator()(const ScalarType& s, BuilderType* builder) const {
+  return builder->Append(s.value);
+}
+  };
+
+  struct AppendBuffer {
+template 
+Status operator()(const ScalarType& s, BuilderType* builder) const {
+  const Buffer& buffer = *s.value;
+  return builder->Append(util::string_view{buffer});
+}
+  };
+
+  template 
+  enable_if_primitive_ctype Visit(const T&) {
+return UseBuilder(AppendValue{});
+  }
+
+  template 
+  enable_if_has_string_view Visit(const T&) {
+return UseBuilder(AppendBuffer{});
+  }
+
+  Status Visit(const StructType& type) {
+data_ = ArrayData::Make(type_, static_cast(scalars_.size()),
+{/*null_bitmap=*/nullptr});
+ScalarVector field_scalars(scalars_.size());
+
+for (int field_index = 0; field_index < type.num_fields(); ++field_index) {
+  for (size_t i = 0; i < scalars_.size(); ++i) {
+field_scalars[i] =
+checked_cast(scalars_[i].get())->value[field_index];
+  }
+
+  ARROW_ASSIGN_OR_RAISE(Datum field, 
ScalarVectorToArray{}.Convert(field_scalars));
+  data_->child_data.push_back(field.array());
+}
+return Status::OK();
+  }
+
+  Status Visit(const DataType& type) {
+return Status::NotImplemented("ScalarVectorToArray for type ", type);
+  }
+
+  Result Convert(const ScalarVector& scalars) && {
+if (scalars.size() == 0) {
+  return Status::NotImplemented("ScalarVectorToArray with no scalars");
+}
+scalars_ = std::move(scalars);
+type_ = scalars_[0]->type;
+RETURN_NOT_OK(VisitTypeInline(*type_, this));
+return Datum(std::move(data_));
+  }
+
+  std::shared_ptr type_;
+  ScalarVector scalars_;
+  std::shared_ptr data_;
+};
+
+Result NaiveGroupBy(std::vector arguments, std::vector 
keys,
+   const std::vector& aggregates) 
{
+  ARROW_ASSIGN_OR_RAISE(auto key_batch, ExecBatch::Make(std::move(keys)));
+
+  ARROW_ASSIGN_OR_RAISE(auto grouper,
+internal::Grouper::Make(key_batch.GetDescriptors()));
+
+  ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper->Consume(key_batch));
+
+  ARROW_ASSIGN_OR_RAISE(
+  auto groupings, 
internal::Grouper::MakeGroupings(*id_batch.array_as(),
+   grouper->num_groups()));
+
+  ArrayVector out_columns;
+
+  for (size_t i = 0; i < arguments.size(); ++i) {
+// 

[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-22 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r599009321



##
File path: cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
##
@@ -0,0 +1,728 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include 
+
+#include "arrow/array.h"
+#include "arrow/chunked_array.h"
+#include "arrow/compute/api_aggregate.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/kernels/aggregate_internal.h"
+#include "arrow/compute/kernels/codegen_internal.h"
+#include "arrow/compute/kernels/test_util.h"
+#include "arrow/compute/registry.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bitmap_reader.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/int_util_internal.h"
+
+#include "arrow/testing/gtest_common.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/random.h"
+#include "arrow/util/key_value_metadata.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+using internal::BitmapReader;
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+namespace compute {
+namespace {
+
+struct ScalarVectorToArray {
+  template ::BuilderType,
+typename ScalarType = typename TypeTraits::ScalarType>
+  Status UseBuilder(const AppendScalar& append) {
+BuilderType builder(type_, default_memory_pool());
+for (const auto& s : scalars_) {
+  if (s->is_valid) {
+RETURN_NOT_OK(append(checked_cast(*s), ));
+  } else {
+RETURN_NOT_OK(builder.AppendNull());
+  }
+}
+return builder.FinishInternal(_);
+  }
+
+  struct AppendValue {
+template 
+Status operator()(const ScalarType& s, BuilderType* builder) const {
+  return builder->Append(s.value);
+}
+  };
+
+  struct AppendBuffer {
+template 
+Status operator()(const ScalarType& s, BuilderType* builder) const {
+  const Buffer& buffer = *s.value;
+  return builder->Append(util::string_view{buffer});
+}
+  };
+
+  template 
+  enable_if_primitive_ctype Visit(const T&) {
+return UseBuilder(AppendValue{});
+  }
+
+  template 
+  enable_if_has_string_view Visit(const T&) {
+return UseBuilder(AppendBuffer{});
+  }
+
+  Status Visit(const StructType& type) {
+data_ = ArrayData::Make(type_, static_cast(scalars_.size()),
+{/*null_bitmap=*/nullptr});
+ScalarVector field_scalars(scalars_.size());
+
+for (int field_index = 0; field_index < type.num_fields(); ++field_index) {
+  for (size_t i = 0; i < scalars_.size(); ++i) {
+field_scalars[i] =
+checked_cast(scalars_[i].get())->value[field_index];
+  }
+
+  ARROW_ASSIGN_OR_RAISE(Datum field, 
ScalarVectorToArray{}.Convert(field_scalars));
+  data_->child_data.push_back(field.array());
+}
+return Status::OK();
+  }
+
+  Status Visit(const DataType& type) {
+return Status::NotImplemented("ScalarVectorToArray for type ", type);
+  }
+
+  Result Convert(const ScalarVector& scalars) && {
+if (scalars.size() == 0) {
+  return Status::NotImplemented("ScalarVectorToArray with no scalars");
+}
+scalars_ = std::move(scalars);
+type_ = scalars_[0]->type;
+RETURN_NOT_OK(VisitTypeInline(*type_, this));
+return Datum(std::move(data_));
+  }
+
+  std::shared_ptr type_;
+  ScalarVector scalars_;
+  std::shared_ptr data_;
+};
+
+Result NaiveGroupBy(std::vector arguments, std::vector 
keys,
+   const std::vector& aggregates) 
{
+  ARROW_ASSIGN_OR_RAISE(auto key_batch, ExecBatch::Make(std::move(keys)));
+
+  ARROW_ASSIGN_OR_RAISE(auto grouper,
+internal::Grouper::Make(key_batch.GetDescriptors()));
+
+  ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper->Consume(key_batch));
+
+  ARROW_ASSIGN_OR_RAISE(
+  auto groupings, 
internal::Grouper::MakeGroupings(*id_batch.array_as(),
+   grouper->num_groups()));
+
+  ArrayVector out_columns;
+
+  for (size_t i = 0; i < arguments.size(); ++i) {
+// 

[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-22 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r599008214



##
File path: cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
##
@@ -0,0 +1,728 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include 
+
+#include "arrow/array.h"
+#include "arrow/chunked_array.h"
+#include "arrow/compute/api_aggregate.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/kernels/aggregate_internal.h"
+#include "arrow/compute/kernels/codegen_internal.h"
+#include "arrow/compute/kernels/test_util.h"
+#include "arrow/compute/registry.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bitmap_reader.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/int_util_internal.h"
+
+#include "arrow/testing/gtest_common.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/random.h"
+#include "arrow/util/key_value_metadata.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+using internal::BitmapReader;
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+namespace compute {
+namespace {
+
+struct ScalarVectorToArray {
+  template ::BuilderType,
+typename ScalarType = typename TypeTraits::ScalarType>
+  Status UseBuilder(const AppendScalar& append) {
+BuilderType builder(type_, default_memory_pool());
+for (const auto& s : scalars_) {
+  if (s->is_valid) {
+RETURN_NOT_OK(append(checked_cast(*s), ));
+  } else {
+RETURN_NOT_OK(builder.AppendNull());
+  }
+}
+return builder.FinishInternal(_);
+  }
+
+  struct AppendValue {
+template 
+Status operator()(const ScalarType& s, BuilderType* builder) const {
+  return builder->Append(s.value);
+}
+  };
+
+  struct AppendBuffer {
+template 
+Status operator()(const ScalarType& s, BuilderType* builder) const {
+  const Buffer& buffer = *s.value;
+  return builder->Append(util::string_view{buffer});
+}
+  };
+
+  template 
+  enable_if_primitive_ctype Visit(const T&) {
+return UseBuilder(AppendValue{});
+  }
+
+  template 
+  enable_if_has_string_view Visit(const T&) {
+return UseBuilder(AppendBuffer{});
+  }
+
+  Status Visit(const StructType& type) {
+data_ = ArrayData::Make(type_, static_cast(scalars_.size()),
+{/*null_bitmap=*/nullptr});
+ScalarVector field_scalars(scalars_.size());
+
+for (int field_index = 0; field_index < type.num_fields(); ++field_index) {
+  for (size_t i = 0; i < scalars_.size(); ++i) {
+field_scalars[i] =
+checked_cast(scalars_[i].get())->value[field_index];
+  }
+
+  ARROW_ASSIGN_OR_RAISE(Datum field, 
ScalarVectorToArray{}.Convert(field_scalars));
+  data_->child_data.push_back(field.array());
+}
+return Status::OK();
+  }
+
+  Status Visit(const DataType& type) {
+return Status::NotImplemented("ScalarVectorToArray for type ", type);
+  }
+
+  Result Convert(const ScalarVector& scalars) && {
+if (scalars.size() == 0) {
+  return Status::NotImplemented("ScalarVectorToArray with no scalars");
+}
+scalars_ = std::move(scalars);
+type_ = scalars_[0]->type;
+RETURN_NOT_OK(VisitTypeInline(*type_, this));
+return Datum(std::move(data_));
+  }
+
+  std::shared_ptr type_;
+  ScalarVector scalars_;
+  std::shared_ptr data_;
+};
+
+Result NaiveGroupBy(std::vector arguments, std::vector 
keys,
+   const std::vector& aggregates) 
{
+  ARROW_ASSIGN_OR_RAISE(auto key_batch, ExecBatch::Make(std::move(keys)));
+
+  ARROW_ASSIGN_OR_RAISE(auto grouper,
+internal::Grouper::Make(key_batch.GetDescriptors()));
+
+  ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper->Consume(key_batch));
+
+  ARROW_ASSIGN_OR_RAISE(
+  auto groupings, 
internal::Grouper::MakeGroupings(*id_batch.array_as(),
+   grouper->num_groups()));
+
+  ArrayVector out_columns;
+
+  for (size_t i = 0; i < arguments.size(); ++i) {
+// 

[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-22 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r599008010



##
File path: cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
##
@@ -0,0 +1,669 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include 
+
+#include "arrow/array.h"
+#include "arrow/chunked_array.h"
+#include "arrow/compute/api_aggregate.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/kernels/aggregate_internal.h"
+#include "arrow/compute/kernels/codegen_internal.h"
+#include "arrow/compute/kernels/test_util.h"
+#include "arrow/compute/registry.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bitmap_reader.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/int_util_internal.h"
+
+#include "arrow/testing/gtest_common.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/random.h"
+#include "arrow/util/key_value_metadata.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+using internal::BitmapReader;
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+namespace compute {
+namespace {
+
+struct ScalarVectorToArray {

Review comment:
   Will do




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-22 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r599007804



##
File path: cpp/src/arrow/compute/api_aggregate.h
##
@@ -306,5 +306,100 @@ Result TDigest(const Datum& value,
   const TDigestOptions& options = 
TDigestOptions::Defaults(),
   ExecContext* ctx = NULLPTR);
 
+namespace internal {
+
+/// Internal use only: streaming group identifier.
+/// Consumes batches of keys and yields batches of the group ids.
+class ARROW_EXPORT Grouper {
+ public:
+  virtual ~Grouper() = default;
+
+  /// Construct a Grouper which receives the specified key types
+  static Result> Make(const std::vector& 
descrs,
+   ExecContext* ctx = 
default_exec_context());
+
+  /// Consume a batch of keys, producing the corresponding group ids as an 
integer array.
+  virtual Result Consume(const ExecBatch& batch) = 0;
+
+  /// Get current unique keys. May be called multiple times.
+  virtual Result GetUniques() = 0;
+
+  /// Get the current number of groups.
+  virtual uint32_t num_groups() const = 0;
+
+  /// \brief Assemble lists of indices of identical elements.
+  ///
+  /// \param[in] ids An unsigned, all-valid integral array which will be
+  ///used as grouping criteria.
+  /// \param[in] num_groups An upper bound for the elements of ids

Review comment:
   Ah, that's a typo. There should be an 8 instead of a 7 below




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-22 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r599007104



##
File path: cpp/src/arrow/compute/kernel.h
##
@@ -680,26 +682,71 @@ using ScalarAggregateFinalize = 
std::function;
 /// * finalize: produces the end result of the aggregation using the
 ///   KernelState in the KernelContext.
 struct ScalarAggregateKernel : public Kernel {
-  ScalarAggregateKernel() {}
+  ScalarAggregateKernel() = default;
 
   ScalarAggregateKernel(std::shared_ptr sig, KernelInit init,
 ScalarAggregateConsume consume, ScalarAggregateMerge 
merge,
 ScalarAggregateFinalize finalize)
-  : Kernel(std::move(sig), init),
+  : Kernel(std::move(sig), std::move(init)),
 consume(std::move(consume)),
 merge(std::move(merge)),
 finalize(std::move(finalize)) {}
 
   ScalarAggregateKernel(std::vector in_types, OutputType out_type,
 KernelInit init, ScalarAggregateConsume consume,
 ScalarAggregateMerge merge, ScalarAggregateFinalize 
finalize)
-  : ScalarAggregateKernel(KernelSignature::Make(std::move(in_types), 
out_type), init,
-  consume, merge, finalize) {}
+  : ScalarAggregateKernel(
+KernelSignature::Make(std::move(in_types), std::move(out_type)),
+std::move(init), std::move(consume), std::move(merge), 
std::move(finalize)) {}
 
   ScalarAggregateConsume consume;
   ScalarAggregateMerge merge;
   ScalarAggregateFinalize finalize;
 };
 
+// --
+// HashAggregateKernel (for HashAggregateFunction)
+
+using HashAggregateConsume = std::function;
+
+using HashAggregateMerge =
+std::function;
+
+// Finalize returns Datum to permit multiple return values
+using HashAggregateFinalize = std::function;
+
+/// \brief Kernel data structure for implementations of
+/// HashAggregateFunction. The four necessary components of an aggregation
+/// kernel are the init, consume, merge, and finalize functions.
+///
+/// * init: creates a new KernelState for a kernel.
+/// * consume: processes an ExecBatch (which includes the argument as well
+///   as an array of group identifiers) and updates the KernelState found in 
the
+///   KernelContext.
+/// * finalize: produces the end result of the aggregation using the

Review comment:
   Huh, missed that one. Will do




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-22 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r599006347



##
File path: cpp/src/arrow/compute/api_aggregate.h
##
@@ -306,5 +306,100 @@ Result TDigest(const Datum& value,
   const TDigestOptions& options = 
TDigestOptions::Defaults(),
   ExecContext* ctx = NULLPTR);
 
+namespace internal {
+
+/// Internal use only: streaming group identifier.
+/// Consumes batches of keys and yields batches of the group ids.
+class ARROW_EXPORT Grouper {
+ public:
+  virtual ~Grouper() = default;
+
+  /// Construct a Grouper which receives the specified key types
+  static Result> Make(const std::vector& 
descrs,
+   ExecContext* ctx = 
default_exec_context());
+
+  /// Consume a batch of keys, producing the corresponding group ids as an 
integer array.
+  virtual Result Consume(const ExecBatch& batch) = 0;
+
+  /// Get current unique keys. May be called multiple times.
+  virtual Result GetUniques() = 0;
+
+  /// Get the current number of groups.

Review comment:
   The number of key(column)s is fixed throughout the lifetime of the 
Grouper. The number of groups is incremented each time a unique row of keys is 
encountered.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-22 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r599003922



##
File path: cpp/src/arrow/compute/api_aggregate.h
##
@@ -306,5 +306,100 @@ Result TDigest(const Datum& value,
   const TDigestOptions& options = 
TDigestOptions::Defaults(),
   ExecContext* ctx = NULLPTR);
 
+namespace internal {
+
+/// Internal use only: streaming group identifier.
+/// Consumes batches of keys and yields batches of the group ids.
+class ARROW_EXPORT Grouper {
+ public:
+  virtual ~Grouper() = default;
+
+  /// Construct a Grouper which receives the specified key types
+  static Result> Make(const std::vector& 
descrs,
+   ExecContext* ctx = 
default_exec_context());
+
+  /// Consume a batch of keys, producing the corresponding group ids as an 
integer array.

Review comment:
   Alright. Eventually we'll support multiple integer types (increasing the 
width as needed)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-22 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r599003271



##
File path: cpp/src/arrow/compute/api_aggregate.h
##
@@ -306,5 +306,100 @@ Result TDigest(const Datum& value,
   const TDigestOptions& options = 
TDigestOptions::Defaults(),
   ExecContext* ctx = NULLPTR);
 
+namespace internal {
+
+/// Internal use only: streaming group identifier.

Review comment:
   These are made available for testing from R, which could not access an 
_internal header (since it wouldn't be installed)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-22 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r599001850



##
File path: cpp/src/arrow/buffer_builder.h
##
@@ -202,6 +208,11 @@ class TypedBufferBuilder<
   MemoryPool* pool = default_memory_pool())
   : bytes_builder_(std::move(buffer), pool) {}
 
+  explicit TypedBufferBuilder(BufferBuilder builder)
+  : bytes_builder_(std::move(builder)) {}
+
+  BufferBuilder& bytes_builder() { return bytes_builder_; }

Review comment:
   SGTM




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-17 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r596407348



##
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##
@@ -229,6 +604,710 @@ std::unique_ptr AllInit(KernelContext*, 
const KernelInitArgs& args)
   return ::arrow::internal::make_unique();
 }
 
+struct GroupByImpl : public ScalarAggregator {
+  using AddLengthImpl = std::function&, 
int32_t*)>;
+
+  struct GetAddLengthImpl {
+static constexpr int32_t null_extra_byte = 1;
+
+static void AddFixedLength(int32_t fixed_length, int64_t num_repeats,
+   int32_t* lengths) {
+  for (int64_t i = 0; i < num_repeats; ++i) {
+lengths[i] += fixed_length + null_extra_byte;
+  }
+}
+
+static void AddVarLength(const std::shared_ptr& data, int32_t* 
lengths) {
+  using offset_type = typename StringType::offset_type;
+  constexpr int32_t length_extra_bytes = sizeof(offset_type);
+  auto offset = data->offset;
+  const auto offsets = data->GetValues(1);
+  if (data->MayHaveNulls()) {
+const uint8_t* nulls = data->buffers[0]->data();
+
+for (int64_t i = 0; i < data->length; ++i) {
+  bool is_null = !BitUtil::GetBit(nulls, offset + i);
+  if (is_null) {
+lengths[i] += null_extra_byte + length_extra_bytes;
+  } else {
+lengths[i] += null_extra_byte + length_extra_bytes + 
offsets[offset + i + 1] -
+  offsets[offset + i];
+  }
+}
+  } else {
+for (int64_t i = 0; i < data->length; ++i) {
+  lengths[i] += null_extra_byte + length_extra_bytes + offsets[offset 
+ i + 1] -
+offsets[offset + i];
+}
+  }
+}
+
+template 
+Status Visit(const T& input_type) {
+  int32_t num_bytes = (bit_width(input_type.id()) + 7) / 8;
+  add_length_impl = [num_bytes](const std::shared_ptr& data,
+int32_t* lengths) {
+AddFixedLength(num_bytes, data->length, lengths);
+  };
+  return Status::OK();
+}
+
+Status Visit(const StringType&) {
+  add_length_impl = [](const std::shared_ptr& data, int32_t* 
lengths) {
+AddVarLength(data, lengths);
+  };
+  return Status::OK();
+}
+
+Status Visit(const BinaryType&) {
+  add_length_impl = [](const std::shared_ptr& data, int32_t* 
lengths) {
+AddVarLength(data, lengths);
+  };
+  return Status::OK();
+}
+
+Status Visit(const FixedSizeBinaryType& type) {
+  int32_t num_bytes = type.byte_width();
+  add_length_impl = [num_bytes](const std::shared_ptr& data,
+int32_t* lengths) {
+AddFixedLength(num_bytes, data->length, lengths);
+  };
+  return Status::OK();
+}
+
+AddLengthImpl add_length_impl;
+  };
+
+  using EncodeNextImpl =
+  std::function&, uint8_t**)>;
+
+  struct GetEncodeNextImpl {
+template 
+static void EncodeSmallFixed(const std::shared_ptr& data,
+ uint8_t** encoded_bytes) {
+  auto raw_input = data->buffers[1]->data();
+  auto offset = data->offset;
+  if (data->MayHaveNulls()) {
+const uint8_t* nulls = data->buffers[0]->data();
+for (int64_t i = 0; i < data->length; ++i) {
+  auto& encoded_ptr = encoded_bytes[i];
+  bool is_null = !BitUtil::GetBit(nulls, offset + i);
+  encoded_ptr[0] = is_null ? 1 : 0;
+  encoded_ptr += 1;
+  uint64_t null_multiplier = is_null ? 0 : 1;
+  if (NumBits == 1) {
+encoded_ptr[0] = static_cast(
+null_multiplier * (BitUtil::GetBit(raw_input, offset + i) ? 1 
: 0));
+encoded_ptr += 1;
+  }
+  if (NumBits == 8) {
+encoded_ptr[0] =
+static_cast(null_multiplier * reinterpret_cast(
+   raw_input)[offset + 
i]);
+encoded_ptr += 1;
+  }
+  if (NumBits == 16) {
+reinterpret_cast(encoded_ptr)[0] =
+static_cast(null_multiplier * reinterpret_cast(
+raw_input)[offset 
+ i]);
+encoded_ptr += 2;
+  }
+  if (NumBits == 32) {
+reinterpret_cast(encoded_ptr)[0] =
+static_cast(null_multiplier * reinterpret_cast(
+raw_input)[offset 
+ i]);
+encoded_ptr += 4;
+  }
+  if (NumBits == 64) {
+reinterpret_cast(encoded_ptr)[0] =
+static_cast(null_multiplier * reinterpret_cast(
+raw_input)[offset 
+ i]);
+encoded_ptr += 8;
+  }
+}
+  } else {
+for (int64_t i = 0; i < data->length; 

[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-17 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r596406429



##
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##
@@ -229,6 +604,710 @@ std::unique_ptr AllInit(KernelContext*, 
const KernelInitArgs& args)
   return ::arrow::internal::make_unique();
 }
 
+struct GroupByImpl : public ScalarAggregator {
+  using AddLengthImpl = std::function&, 
int32_t*)>;
+
+  struct GetAddLengthImpl {
+static constexpr int32_t null_extra_byte = 1;
+
+static void AddFixedLength(int32_t fixed_length, int64_t num_repeats,
+   int32_t* lengths) {
+  for (int64_t i = 0; i < num_repeats; ++i) {
+lengths[i] += fixed_length + null_extra_byte;
+  }
+}
+
+static void AddVarLength(const std::shared_ptr& data, int32_t* 
lengths) {
+  using offset_type = typename StringType::offset_type;
+  constexpr int32_t length_extra_bytes = sizeof(offset_type);
+  auto offset = data->offset;
+  const auto offsets = data->GetValues(1);
+  if (data->MayHaveNulls()) {
+const uint8_t* nulls = data->buffers[0]->data();
+
+for (int64_t i = 0; i < data->length; ++i) {
+  bool is_null = !BitUtil::GetBit(nulls, offset + i);
+  if (is_null) {
+lengths[i] += null_extra_byte + length_extra_bytes;
+  } else {
+lengths[i] += null_extra_byte + length_extra_bytes + 
offsets[offset + i + 1] -
+  offsets[offset + i];
+  }
+}
+  } else {
+for (int64_t i = 0; i < data->length; ++i) {
+  lengths[i] += null_extra_byte + length_extra_bytes + offsets[offset 
+ i + 1] -
+offsets[offset + i];
+}
+  }
+}
+
+template 
+Status Visit(const T& input_type) {
+  int32_t num_bytes = (bit_width(input_type.id()) + 7) / 8;
+  add_length_impl = [num_bytes](const std::shared_ptr& data,
+int32_t* lengths) {
+AddFixedLength(num_bytes, data->length, lengths);
+  };
+  return Status::OK();
+}
+
+Status Visit(const StringType&) {
+  add_length_impl = [](const std::shared_ptr& data, int32_t* 
lengths) {
+AddVarLength(data, lengths);
+  };
+  return Status::OK();
+}
+
+Status Visit(const BinaryType&) {
+  add_length_impl = [](const std::shared_ptr& data, int32_t* 
lengths) {
+AddVarLength(data, lengths);
+  };
+  return Status::OK();
+}
+
+Status Visit(const FixedSizeBinaryType& type) {
+  int32_t num_bytes = type.byte_width();
+  add_length_impl = [num_bytes](const std::shared_ptr& data,
+int32_t* lengths) {
+AddFixedLength(num_bytes, data->length, lengths);
+  };
+  return Status::OK();
+}
+
+AddLengthImpl add_length_impl;
+  };
+
+  using EncodeNextImpl =
+  std::function&, uint8_t**)>;
+
+  struct GetEncodeNextImpl {
+template 
+static void EncodeSmallFixed(const std::shared_ptr& data,
+ uint8_t** encoded_bytes) {
+  auto raw_input = data->buffers[1]->data();
+  auto offset = data->offset;
+  if (data->MayHaveNulls()) {
+const uint8_t* nulls = data->buffers[0]->data();
+for (int64_t i = 0; i < data->length; ++i) {
+  auto& encoded_ptr = encoded_bytes[i];
+  bool is_null = !BitUtil::GetBit(nulls, offset + i);
+  encoded_ptr[0] = is_null ? 1 : 0;
+  encoded_ptr += 1;
+  uint64_t null_multiplier = is_null ? 0 : 1;
+  if (NumBits == 1) {
+encoded_ptr[0] = static_cast(
+null_multiplier * (BitUtil::GetBit(raw_input, offset + i) ? 1 
: 0));
+encoded_ptr += 1;
+  }
+  if (NumBits == 8) {
+encoded_ptr[0] =
+static_cast(null_multiplier * reinterpret_cast(
+   raw_input)[offset + 
i]);
+encoded_ptr += 1;
+  }
+  if (NumBits == 16) {
+reinterpret_cast(encoded_ptr)[0] =
+static_cast(null_multiplier * reinterpret_cast(
+raw_input)[offset 
+ i]);
+encoded_ptr += 2;
+  }
+  if (NumBits == 32) {
+reinterpret_cast(encoded_ptr)[0] =
+static_cast(null_multiplier * reinterpret_cast(
+raw_input)[offset 
+ i]);
+encoded_ptr += 4;
+  }
+  if (NumBits == 64) {
+reinterpret_cast(encoded_ptr)[0] =
+static_cast(null_multiplier * reinterpret_cast(
+raw_input)[offset 
+ i]);
+encoded_ptr += 8;
+  }
+}
+  } else {
+for (int64_t i = 0; i < data->length; 

[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-11 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r592482656



##
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##
@@ -91,6 +95,377 @@ struct CountImpl : public ScalarAggregator {
   int64_t nulls = 0;
 };
 
+struct GroupedAggregator {
+  virtual ~GroupedAggregator() = default;
+
+  virtual void Consume(KernelContext*, const Datum& aggregand,
+   const uint32_t* group_ids) = 0;
+
+  virtual void Finalize(KernelContext* ctx, Datum* out) = 0;
+
+  virtual void Resize(KernelContext* ctx, int64_t new_num_groups) = 0;
+
+  virtual int64_t num_groups() const = 0;
+
+  void MaybeResize(KernelContext* ctx, int64_t length, const uint32_t* 
group_ids) {
+if (length == 0) return;
+
+// maybe a batch of group_ids should include the min/max group id
+int64_t max_group = *std::max_element(group_ids, group_ids + length);
+auto old_size = num_groups();
+
+if (max_group >= old_size) {
+  auto new_size = BufferBuilder::GrowByFactor(old_size, max_group + 1);
+  Resize(ctx, new_size);
+}
+  }
+
+  virtual std::shared_ptr out_type() const = 0;
+};
+
+struct GroupedCountImpl : public GroupedAggregator {
+  static std::unique_ptr Make(KernelContext* ctx,
+const 
std::shared_ptr&,
+const FunctionOptions* 
options) {
+auto out = ::arrow::internal::make_unique();
+out->options_ = checked_cast(*options);
+ctx->SetStatus(ctx->Allocate(0).Value(>counts_));
+return out;
+  }
+
+  void Resize(KernelContext* ctx, int64_t new_num_groups) override {
+auto old_size = num_groups();
+KERNEL_RETURN_IF_ERROR(ctx, counts_->TypedResize(new_num_groups));
+auto new_size = num_groups();
+
+auto raw_counts = reinterpret_cast(counts_->mutable_data());
+for (auto i = old_size; i < new_size; ++i) {
+  raw_counts[i] = 0;
+}
+  }
+
+  void Consume(KernelContext* ctx, const Datum& aggregand,
+   const uint32_t* group_ids) override {
+MaybeResize(ctx, aggregand.length(), group_ids);
+if (ctx->HasError()) return;
+
+auto raw_counts = reinterpret_cast(counts_->mutable_data());
+
+const auto& input = aggregand.array();
+
+if (options_.count_mode == CountOptions::COUNT_NULL) {
+  for (int64_t i = 0, input_i = input->offset; i < input->length; ++i, 
++input_i) {
+auto g = group_ids[i];
+raw_counts[g] += !BitUtil::GetBit(input->buffers[0]->data(), input_i);
+  }
+  return;
+}
+
+arrow::internal::VisitSetBitRunsVoid(
+input->buffers[0], input->offset, input->length,
+[&](int64_t begin, int64_t length) {
+  for (int64_t input_i = begin, i = begin - input->offset;
+   input_i < begin + length; ++input_i, ++i) {
+auto g = group_ids[i];
+raw_counts[g] += 1;
+  }
+});
+  }
+
+  void Finalize(KernelContext* ctx, Datum* out) override {
+auto length = num_groups();
+*out = std::make_shared(length, std::move(counts_));
+  }
+
+  int64_t num_groups() const override { return counts_->size() / 
sizeof(int64_t); }
+
+  std::shared_ptr out_type() const override { return int64(); }
+
+  CountOptions options_;
+  std::shared_ptr counts_;
+};
+
+struct GroupedSumImpl : public GroupedAggregator {
+  // NB: whether we are accumulating into double, int64_t, or uint64_t
+  // we always have 64 bits per group in the sums buffer.
+  static constexpr size_t kSumSize = sizeof(int64_t);
+
+  using ConsumeImpl = std::function&,
+ const uint32_t*, Buffer*, Buffer*)>;
+
+  struct GetConsumeImpl {
+template ::Type>
+Status Visit(const T&) {
+  consume_impl = [](const std::shared_ptr& input,
+const uint32_t* group_ids, Buffer* sums, Buffer* 
counts) {
+auto raw_input = reinterpret_cast::CType*>(
+input->buffers[1]->data());
+auto raw_sums = reinterpret_cast::CType*>(
+sums->mutable_data());
+auto raw_counts = reinterpret_cast(counts->mutable_data());
+
+arrow::internal::VisitSetBitRunsVoid(
+input->buffers[0], input->offset, input->length,
+[&](int64_t begin, int64_t length) {
+  for (int64_t input_i = begin, i = begin - input->offset;
+   input_i < begin + length; ++input_i, ++i) {
+auto g = group_ids[i];
+raw_sums[g] += raw_input[input_i];
+raw_counts[g] += 1;
+  }
+});
+  };
+  out_type = TypeTraits::type_singleton();
+  return Status::OK();
+}
+
+Status Visit(const BooleanType&) {
+  consume_impl = [](const std::shared_ptr& input,
+const uint32_t* group_ids, Buffer* sums, Buffer* 
counts) {
+auto raw_input = input->buffers[1]->data();
+auto 

[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-11 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r592473683



##
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##
@@ -229,6 +604,710 @@ std::unique_ptr AllInit(KernelContext*, 
const KernelInitArgs& args)
   return ::arrow::internal::make_unique();
 }
 
+struct GroupByImpl : public ScalarAggregator {
+  using AddLengthImpl = std::function&, 
int32_t*)>;
+
+  struct GetAddLengthImpl {
+static constexpr int32_t null_extra_byte = 1;
+
+static void AddFixedLength(int32_t fixed_length, int64_t num_repeats,
+   int32_t* lengths) {
+  for (int64_t i = 0; i < num_repeats; ++i) {
+lengths[i] += fixed_length + null_extra_byte;
+  }
+}
+
+static void AddVarLength(const std::shared_ptr& data, int32_t* 
lengths) {
+  using offset_type = typename StringType::offset_type;
+  constexpr int32_t length_extra_bytes = sizeof(offset_type);
+  auto offset = data->offset;
+  const auto offsets = data->GetValues(1);
+  if (data->MayHaveNulls()) {
+const uint8_t* nulls = data->buffers[0]->data();
+
+for (int64_t i = 0; i < data->length; ++i) {
+  bool is_null = !BitUtil::GetBit(nulls, offset + i);
+  if (is_null) {
+lengths[i] += null_extra_byte + length_extra_bytes;
+  } else {
+lengths[i] += null_extra_byte + length_extra_bytes + 
offsets[offset + i + 1] -
+  offsets[offset + i];
+  }
+}
+  } else {
+for (int64_t i = 0; i < data->length; ++i) {
+  lengths[i] += null_extra_byte + length_extra_bytes + offsets[offset 
+ i + 1] -
+offsets[offset + i];
+}
+  }
+}
+
+template 
+Status Visit(const T& input_type) {
+  int32_t num_bytes = (bit_width(input_type.id()) + 7) / 8;
+  add_length_impl = [num_bytes](const std::shared_ptr& data,
+int32_t* lengths) {
+AddFixedLength(num_bytes, data->length, lengths);
+  };
+  return Status::OK();
+}
+
+Status Visit(const StringType&) {
+  add_length_impl = [](const std::shared_ptr& data, int32_t* 
lengths) {
+AddVarLength(data, lengths);
+  };
+  return Status::OK();
+}
+
+Status Visit(const BinaryType&) {
+  add_length_impl = [](const std::shared_ptr& data, int32_t* 
lengths) {
+AddVarLength(data, lengths);
+  };
+  return Status::OK();
+}
+
+Status Visit(const FixedSizeBinaryType& type) {
+  int32_t num_bytes = type.byte_width();
+  add_length_impl = [num_bytes](const std::shared_ptr& data,
+int32_t* lengths) {
+AddFixedLength(num_bytes, data->length, lengths);
+  };
+  return Status::OK();
+}
+
+AddLengthImpl add_length_impl;
+  };
+
+  using EncodeNextImpl =
+  std::function&, uint8_t**)>;
+
+  struct GetEncodeNextImpl {
+template 
+static void EncodeSmallFixed(const std::shared_ptr& data,
+ uint8_t** encoded_bytes) {
+  auto raw_input = data->buffers[1]->data();
+  auto offset = data->offset;
+  if (data->MayHaveNulls()) {
+const uint8_t* nulls = data->buffers[0]->data();
+for (int64_t i = 0; i < data->length; ++i) {
+  auto& encoded_ptr = encoded_bytes[i];
+  bool is_null = !BitUtil::GetBit(nulls, offset + i);
+  encoded_ptr[0] = is_null ? 1 : 0;
+  encoded_ptr += 1;
+  uint64_t null_multiplier = is_null ? 0 : 1;
+  if (NumBits == 1) {
+encoded_ptr[0] = static_cast(
+null_multiplier * (BitUtil::GetBit(raw_input, offset + i) ? 1 
: 0));
+encoded_ptr += 1;
+  }
+  if (NumBits == 8) {
+encoded_ptr[0] =
+static_cast(null_multiplier * reinterpret_cast(
+   raw_input)[offset + 
i]);
+encoded_ptr += 1;
+  }
+  if (NumBits == 16) {
+reinterpret_cast(encoded_ptr)[0] =
+static_cast(null_multiplier * reinterpret_cast(
+raw_input)[offset 
+ i]);
+encoded_ptr += 2;
+  }
+  if (NumBits == 32) {
+reinterpret_cast(encoded_ptr)[0] =
+static_cast(null_multiplier * reinterpret_cast(
+raw_input)[offset 
+ i]);
+encoded_ptr += 4;
+  }
+  if (NumBits == 64) {
+reinterpret_cast(encoded_ptr)[0] =
+static_cast(null_multiplier * reinterpret_cast(
+raw_input)[offset 
+ i]);
+encoded_ptr += 8;
+  }
+}
+  } else {
+for (int64_t i = 0; i < data->length; 

[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-11 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r592471600



##
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##
@@ -91,6 +95,377 @@ struct CountImpl : public ScalarAggregator {
   int64_t nulls = 0;
 };
 
+struct GroupedAggregator {
+  virtual ~GroupedAggregator() = default;
+
+  virtual void Consume(KernelContext*, const Datum& aggregand,
+   const uint32_t* group_ids) = 0;
+
+  virtual void Finalize(KernelContext* ctx, Datum* out) = 0;
+
+  virtual void Resize(KernelContext* ctx, int64_t new_num_groups) = 0;
+
+  virtual int64_t num_groups() const = 0;
+
+  void MaybeResize(KernelContext* ctx, int64_t length, const uint32_t* 
group_ids) {
+if (length == 0) return;
+
+// maybe a batch of group_ids should include the min/max group id
+int64_t max_group = *std::max_element(group_ids, group_ids + length);
+auto old_size = num_groups();
+
+if (max_group >= old_size) {
+  auto new_size = BufferBuilder::GrowByFactor(old_size, max_group + 1);
+  Resize(ctx, new_size);
+}
+  }
+
+  virtual std::shared_ptr out_type() const = 0;
+};
+
+struct GroupedCountImpl : public GroupedAggregator {
+  static std::unique_ptr Make(KernelContext* ctx,
+const 
std::shared_ptr&,
+const FunctionOptions* 
options) {
+auto out = ::arrow::internal::make_unique();
+out->options_ = checked_cast(*options);
+ctx->SetStatus(ctx->Allocate(0).Value(>counts_));
+return out;
+  }
+
+  void Resize(KernelContext* ctx, int64_t new_num_groups) override {
+auto old_size = num_groups();
+KERNEL_RETURN_IF_ERROR(ctx, counts_->TypedResize(new_num_groups));
+auto new_size = num_groups();
+
+auto raw_counts = reinterpret_cast(counts_->mutable_data());
+for (auto i = old_size; i < new_size; ++i) {
+  raw_counts[i] = 0;
+}
+  }
+
+  void Consume(KernelContext* ctx, const Datum& aggregand,
+   const uint32_t* group_ids) override {
+MaybeResize(ctx, aggregand.length(), group_ids);
+if (ctx->HasError()) return;
+
+auto raw_counts = reinterpret_cast(counts_->mutable_data());
+
+const auto& input = aggregand.array();
+
+if (options_.count_mode == CountOptions::COUNT_NULL) {
+  for (int64_t i = 0, input_i = input->offset; i < input->length; ++i, 
++input_i) {
+auto g = group_ids[i];
+raw_counts[g] += !BitUtil::GetBit(input->buffers[0]->data(), input_i);
+  }
+  return;
+}
+
+arrow::internal::VisitSetBitRunsVoid(
+input->buffers[0], input->offset, input->length,
+[&](int64_t begin, int64_t length) {
+  for (int64_t input_i = begin, i = begin - input->offset;
+   input_i < begin + length; ++input_i, ++i) {
+auto g = group_ids[i];
+raw_counts[g] += 1;
+  }
+});
+  }
+
+  void Finalize(KernelContext* ctx, Datum* out) override {
+auto length = num_groups();
+*out = std::make_shared(length, std::move(counts_));
+  }
+
+  int64_t num_groups() const override { return counts_->size() / 
sizeof(int64_t); }

Review comment:
   I'll refactor to use BufferBuilder.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-11 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r592463457



##
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##
@@ -91,6 +95,377 @@ struct CountImpl : public ScalarAggregator {
   int64_t nulls = 0;
 };
 
+struct GroupedAggregator {
+  virtual ~GroupedAggregator() = default;
+
+  virtual void Consume(KernelContext*, const Datum& aggregand,
+   const uint32_t* group_ids) = 0;
+
+  virtual void Finalize(KernelContext* ctx, Datum* out) = 0;
+
+  virtual void Resize(KernelContext* ctx, int64_t new_num_groups) = 0;
+
+  virtual int64_t num_groups() const = 0;
+
+  void MaybeResize(KernelContext* ctx, int64_t length, const uint32_t* 
group_ids) {
+if (length == 0) return;
+
+// maybe a batch of group_ids should include the min/max group id
+int64_t max_group = *std::max_element(group_ids, group_ids + length);
+auto old_size = num_groups();
+
+if (max_group >= old_size) {
+  auto new_size = BufferBuilder::GrowByFactor(old_size, max_group + 1);
+  Resize(ctx, new_size);
+}
+  }
+
+  virtual std::shared_ptr out_type() const = 0;
+};
+
+struct GroupedCountImpl : public GroupedAggregator {
+  static std::unique_ptr Make(KernelContext* ctx,
+const 
std::shared_ptr&,
+const FunctionOptions* 
options) {
+auto out = ::arrow::internal::make_unique();
+out->options_ = checked_cast(*options);
+ctx->SetStatus(ctx->Allocate(0).Value(>counts_));
+return out;
+  }
+
+  void Resize(KernelContext* ctx, int64_t new_num_groups) override {
+auto old_size = num_groups();
+KERNEL_RETURN_IF_ERROR(ctx, counts_->TypedResize(new_num_groups));
+auto new_size = num_groups();
+
+auto raw_counts = reinterpret_cast(counts_->mutable_data());
+for (auto i = old_size; i < new_size; ++i) {
+  raw_counts[i] = 0;
+}
+  }
+
+  void Consume(KernelContext* ctx, const Datum& aggregand,
+   const uint32_t* group_ids) override {
+MaybeResize(ctx, aggregand.length(), group_ids);
+if (ctx->HasError()) return;
+
+auto raw_counts = reinterpret_cast(counts_->mutable_data());
+
+const auto& input = aggregand.array();
+
+if (options_.count_mode == CountOptions::COUNT_NULL) {
+  for (int64_t i = 0, input_i = input->offset; i < input->length; ++i, 
++input_i) {
+auto g = group_ids[i];
+raw_counts[g] += !BitUtil::GetBit(input->buffers[0]->data(), input_i);
+  }
+  return;
+}
+
+arrow::internal::VisitSetBitRunsVoid(
+input->buffers[0], input->offset, input->length,
+[&](int64_t begin, int64_t length) {
+  for (int64_t input_i = begin, i = begin - input->offset;
+   input_i < begin + length; ++input_i, ++i) {
+auto g = group_ids[i];
+raw_counts[g] += 1;
+  }
+});
+  }
+
+  void Finalize(KernelContext* ctx, Datum* out) override {
+auto length = num_groups();
+*out = std::make_shared(length, std::move(counts_));
+  }
+
+  int64_t num_groups() const override { return counts_->size() / 
sizeof(int64_t); }
+
+  std::shared_ptr out_type() const override { return int64(); }
+
+  CountOptions options_;
+  std::shared_ptr counts_;
+};
+
+struct GroupedSumImpl : public GroupedAggregator {
+  // NB: whether we are accumulating into double, int64_t, or uint64_t
+  // we always have 64 bits per group in the sums buffer.
+  static constexpr size_t kSumSize = sizeof(int64_t);
+
+  using ConsumeImpl = std::function&,
+ const uint32_t*, Buffer*, Buffer*)>;
+
+  struct GetConsumeImpl {
+template ::Type>
+Status Visit(const T&) {
+  consume_impl = [](const std::shared_ptr& input,
+const uint32_t* group_ids, Buffer* sums, Buffer* 
counts) {
+auto raw_input = reinterpret_cast::CType*>(
+input->buffers[1]->data());
+auto raw_sums = reinterpret_cast::CType*>(
+sums->mutable_data());
+auto raw_counts = reinterpret_cast(counts->mutable_data());
+
+arrow::internal::VisitSetBitRunsVoid(
+input->buffers[0], input->offset, input->length,
+[&](int64_t begin, int64_t length) {
+  for (int64_t input_i = begin, i = begin - input->offset;
+   input_i < begin + length; ++input_i, ++i) {
+auto g = group_ids[i];
+raw_sums[g] += raw_input[input_i];
+raw_counts[g] += 1;
+  }
+});
+  };
+  out_type = TypeTraits::type_singleton();
+  return Status::OK();
+}
+
+Status Visit(const BooleanType&) {
+  consume_impl = [](const std::shared_ptr& input,
+const uint32_t* group_ids, Buffer* sums, Buffer* 
counts) {
+auto raw_input = input->buffers[1]->data();
+auto 

[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-11 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r592456204



##
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##
@@ -91,6 +95,377 @@ struct CountImpl : public ScalarAggregator {
   int64_t nulls = 0;
 };
 
+struct GroupedAggregator {
+  virtual ~GroupedAggregator() = default;
+
+  virtual void Consume(KernelContext*, const Datum& aggregand,
+   const uint32_t* group_ids) = 0;
+
+  virtual void Finalize(KernelContext* ctx, Datum* out) = 0;
+
+  virtual void Resize(KernelContext* ctx, int64_t new_num_groups) = 0;
+
+  virtual int64_t num_groups() const = 0;
+
+  void MaybeResize(KernelContext* ctx, int64_t length, const uint32_t* 
group_ids) {
+if (length == 0) return;
+
+// maybe a batch of group_ids should include the min/max group id
+int64_t max_group = *std::max_element(group_ids, group_ids + length);
+auto old_size = num_groups();
+
+if (max_group >= old_size) {
+  auto new_size = BufferBuilder::GrowByFactor(old_size, max_group + 1);
+  Resize(ctx, new_size);
+}
+  }
+
+  virtual std::shared_ptr out_type() const = 0;
+};
+
+struct GroupedCountImpl : public GroupedAggregator {
+  static std::unique_ptr Make(KernelContext* ctx,
+const 
std::shared_ptr&,
+const FunctionOptions* 
options) {
+auto out = ::arrow::internal::make_unique();
+out->options_ = checked_cast(*options);
+ctx->SetStatus(ctx->Allocate(0).Value(>counts_));
+return out;
+  }
+
+  void Resize(KernelContext* ctx, int64_t new_num_groups) override {
+auto old_size = num_groups();
+KERNEL_RETURN_IF_ERROR(ctx, counts_->TypedResize(new_num_groups));
+auto new_size = num_groups();
+
+auto raw_counts = reinterpret_cast(counts_->mutable_data());
+for (auto i = old_size; i < new_size; ++i) {
+  raw_counts[i] = 0;
+}
+  }
+
+  void Consume(KernelContext* ctx, const Datum& aggregand,
+   const uint32_t* group_ids) override {
+MaybeResize(ctx, aggregand.length(), group_ids);
+if (ctx->HasError()) return;
+
+auto raw_counts = reinterpret_cast(counts_->mutable_data());
+
+const auto& input = aggregand.array();
+
+if (options_.count_mode == CountOptions::COUNT_NULL) {
+  for (int64_t i = 0, input_i = input->offset; i < input->length; ++i, 
++input_i) {
+auto g = group_ids[i];
+raw_counts[g] += !BitUtil::GetBit(input->buffers[0]->data(), input_i);
+  }
+  return;
+}
+
+arrow::internal::VisitSetBitRunsVoid(
+input->buffers[0], input->offset, input->length,
+[&](int64_t begin, int64_t length) {
+  for (int64_t input_i = begin, i = begin - input->offset;
+   input_i < begin + length; ++input_i, ++i) {
+auto g = group_ids[i];
+raw_counts[g] += 1;
+  }
+});
+  }
+
+  void Finalize(KernelContext* ctx, Datum* out) override {
+auto length = num_groups();
+*out = std::make_shared(length, std::move(counts_));
+  }
+
+  int64_t num_groups() const override { return counts_->size() / 
sizeof(int64_t); }
+
+  std::shared_ptr out_type() const override { return int64(); }
+
+  CountOptions options_;
+  std::shared_ptr counts_;
+};
+
+struct GroupedSumImpl : public GroupedAggregator {
+  // NB: whether we are accumulating into double, int64_t, or uint64_t
+  // we always have 64 bits per group in the sums buffer.
+  static constexpr size_t kSumSize = sizeof(int64_t);
+
+  using ConsumeImpl = std::function&,
+ const uint32_t*, Buffer*, Buffer*)>;
+
+  struct GetConsumeImpl {
+template ::Type>
+Status Visit(const T&) {
+  consume_impl = [](const std::shared_ptr& input,
+const uint32_t* group_ids, Buffer* sums, Buffer* 
counts) {
+auto raw_input = reinterpret_cast::CType*>(
+input->buffers[1]->data());
+auto raw_sums = reinterpret_cast::CType*>(
+sums->mutable_data());
+auto raw_counts = reinterpret_cast(counts->mutable_data());
+
+arrow::internal::VisitSetBitRunsVoid(
+input->buffers[0], input->offset, input->length,
+[&](int64_t begin, int64_t length) {
+  for (int64_t input_i = begin, i = begin - input->offset;
+   input_i < begin + length; ++input_i, ++i) {
+auto g = group_ids[i];
+raw_sums[g] += raw_input[input_i];
+raw_counts[g] += 1;
+  }
+});
+  };
+  out_type = TypeTraits::type_singleton();
+  return Status::OK();
+}
+
+Status Visit(const BooleanType&) {
+  consume_impl = [](const std::shared_ptr& input,
+const uint32_t* group_ids, Buffer* sums, Buffer* 
counts) {
+auto raw_input = input->buffers[1]->data();
+auto 

[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-11 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r592455705



##
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##
@@ -91,6 +95,377 @@ struct CountImpl : public ScalarAggregator {
   int64_t nulls = 0;
 };
 
+struct GroupedAggregator {
+  virtual ~GroupedAggregator() = default;
+
+  virtual void Consume(KernelContext*, const Datum& aggregand,
+   const uint32_t* group_ids) = 0;
+
+  virtual void Finalize(KernelContext* ctx, Datum* out) = 0;
+
+  virtual void Resize(KernelContext* ctx, int64_t new_num_groups) = 0;
+
+  virtual int64_t num_groups() const = 0;
+
+  void MaybeResize(KernelContext* ctx, int64_t length, const uint32_t* 
group_ids) {
+if (length == 0) return;
+
+// maybe a batch of group_ids should include the min/max group id
+int64_t max_group = *std::max_element(group_ids, group_ids + length);
+auto old_size = num_groups();
+
+if (max_group >= old_size) {
+  auto new_size = BufferBuilder::GrowByFactor(old_size, max_group + 1);
+  Resize(ctx, new_size);
+}
+  }
+
+  virtual std::shared_ptr out_type() const = 0;
+};
+
+struct GroupedCountImpl : public GroupedAggregator {
+  static std::unique_ptr Make(KernelContext* ctx,
+const 
std::shared_ptr&,
+const FunctionOptions* 
options) {
+auto out = ::arrow::internal::make_unique();
+out->options_ = checked_cast(*options);
+ctx->SetStatus(ctx->Allocate(0).Value(>counts_));
+return out;
+  }
+
+  void Resize(KernelContext* ctx, int64_t new_num_groups) override {
+auto old_size = num_groups();
+KERNEL_RETURN_IF_ERROR(ctx, counts_->TypedResize(new_num_groups));
+auto new_size = num_groups();
+
+auto raw_counts = reinterpret_cast(counts_->mutable_data());
+for (auto i = old_size; i < new_size; ++i) {
+  raw_counts[i] = 0;
+}
+  }
+
+  void Consume(KernelContext* ctx, const Datum& aggregand,
+   const uint32_t* group_ids) override {
+MaybeResize(ctx, aggregand.length(), group_ids);
+if (ctx->HasError()) return;
+
+auto raw_counts = reinterpret_cast(counts_->mutable_data());
+
+const auto& input = aggregand.array();
+
+if (options_.count_mode == CountOptions::COUNT_NULL) {
+  for (int64_t i = 0, input_i = input->offset; i < input->length; ++i, 
++input_i) {
+auto g = group_ids[i];
+raw_counts[g] += !BitUtil::GetBit(input->buffers[0]->data(), input_i);
+  }
+  return;
+}
+
+arrow::internal::VisitSetBitRunsVoid(
+input->buffers[0], input->offset, input->length,
+[&](int64_t begin, int64_t length) {
+  for (int64_t input_i = begin, i = begin - input->offset;
+   input_i < begin + length; ++input_i, ++i) {
+auto g = group_ids[i];
+raw_counts[g] += 1;
+  }
+});
+  }
+
+  void Finalize(KernelContext* ctx, Datum* out) override {
+auto length = num_groups();
+*out = std::make_shared(length, std::move(counts_));
+  }
+
+  int64_t num_groups() const override { return counts_->size() / 
sizeof(int64_t); }
+
+  std::shared_ptr out_type() const override { return int64(); }
+
+  CountOptions options_;
+  std::shared_ptr counts_;
+};
+
+struct GroupedSumImpl : public GroupedAggregator {
+  // NB: whether we are accumulating into double, int64_t, or uint64_t
+  // we always have 64 bits per group in the sums buffer.
+  static constexpr size_t kSumSize = sizeof(int64_t);
+
+  using ConsumeImpl = std::function&,
+ const uint32_t*, Buffer*, Buffer*)>;
+
+  struct GetConsumeImpl {
+template ::Type>
+Status Visit(const T&) {
+  consume_impl = [](const std::shared_ptr& input,
+const uint32_t* group_ids, Buffer* sums, Buffer* 
counts) {
+auto raw_input = reinterpret_cast::CType*>(
+input->buffers[1]->data());
+auto raw_sums = reinterpret_cast::CType*>(
+sums->mutable_data());
+auto raw_counts = reinterpret_cast(counts->mutable_data());
+
+arrow::internal::VisitSetBitRunsVoid(
+input->buffers[0], input->offset, input->length,
+[&](int64_t begin, int64_t length) {
+  for (int64_t input_i = begin, i = begin - input->offset;
+   input_i < begin + length; ++input_i, ++i) {
+auto g = group_ids[i];
+raw_sums[g] += raw_input[input_i];
+raw_counts[g] += 1;
+  }
+});
+  };
+  out_type = TypeTraits::type_singleton();
+  return Status::OK();
+}
+
+Status Visit(const BooleanType&) {
+  consume_impl = [](const std::shared_ptr& input,
+const uint32_t* group_ids, Buffer* sums, Buffer* 
counts) {
+auto raw_input = input->buffers[1]->data();
+auto 

[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-11 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r592455189



##
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##
@@ -91,6 +95,377 @@ struct CountImpl : public ScalarAggregator {
   int64_t nulls = 0;
 };
 
+struct GroupedAggregator {
+  virtual ~GroupedAggregator() = default;
+
+  virtual void Consume(KernelContext*, const Datum& aggregand,
+   const uint32_t* group_ids) = 0;
+
+  virtual void Finalize(KernelContext* ctx, Datum* out) = 0;
+
+  virtual void Resize(KernelContext* ctx, int64_t new_num_groups) = 0;
+
+  virtual int64_t num_groups() const = 0;
+
+  void MaybeResize(KernelContext* ctx, int64_t length, const uint32_t* 
group_ids) {
+if (length == 0) return;
+
+// maybe a batch of group_ids should include the min/max group id
+int64_t max_group = *std::max_element(group_ids, group_ids + length);
+auto old_size = num_groups();
+
+if (max_group >= old_size) {
+  auto new_size = BufferBuilder::GrowByFactor(old_size, max_group + 1);
+  Resize(ctx, new_size);
+}
+  }
+
+  virtual std::shared_ptr out_type() const = 0;
+};
+
+struct GroupedCountImpl : public GroupedAggregator {
+  static std::unique_ptr Make(KernelContext* ctx,
+const 
std::shared_ptr&,
+const FunctionOptions* 
options) {
+auto out = ::arrow::internal::make_unique();
+out->options_ = checked_cast(*options);
+ctx->SetStatus(ctx->Allocate(0).Value(>counts_));
+return out;
+  }
+
+  void Resize(KernelContext* ctx, int64_t new_num_groups) override {
+auto old_size = num_groups();
+KERNEL_RETURN_IF_ERROR(ctx, counts_->TypedResize(new_num_groups));
+auto new_size = num_groups();
+
+auto raw_counts = reinterpret_cast(counts_->mutable_data());
+for (auto i = old_size; i < new_size; ++i) {
+  raw_counts[i] = 0;
+}
+  }
+
+  void Consume(KernelContext* ctx, const Datum& aggregand,
+   const uint32_t* group_ids) override {
+MaybeResize(ctx, aggregand.length(), group_ids);
+if (ctx->HasError()) return;
+
+auto raw_counts = reinterpret_cast(counts_->mutable_data());
+
+const auto& input = aggregand.array();
+
+if (options_.count_mode == CountOptions::COUNT_NULL) {
+  for (int64_t i = 0, input_i = input->offset; i < input->length; ++i, 
++input_i) {
+auto g = group_ids[i];
+raw_counts[g] += !BitUtil::GetBit(input->buffers[0]->data(), input_i);
+  }
+  return;
+}
+
+arrow::internal::VisitSetBitRunsVoid(
+input->buffers[0], input->offset, input->length,
+[&](int64_t begin, int64_t length) {
+  for (int64_t input_i = begin, i = begin - input->offset;
+   input_i < begin + length; ++input_i, ++i) {
+auto g = group_ids[i];
+raw_counts[g] += 1;
+  }
+});
+  }
+
+  void Finalize(KernelContext* ctx, Datum* out) override {
+auto length = num_groups();
+*out = std::make_shared(length, std::move(counts_));
+  }
+
+  int64_t num_groups() const override { return counts_->size() / 
sizeof(int64_t); }
+
+  std::shared_ptr out_type() const override { return int64(); }
+
+  CountOptions options_;
+  std::shared_ptr counts_;
+};
+
+struct GroupedSumImpl : public GroupedAggregator {
+  // NB: whether we are accumulating into double, int64_t, or uint64_t
+  // we always have 64 bits per group in the sums buffer.
+  static constexpr size_t kSumSize = sizeof(int64_t);
+
+  using ConsumeImpl = std::function&,
+ const uint32_t*, Buffer*, Buffer*)>;
+
+  struct GetConsumeImpl {
+template ::Type>
+Status Visit(const T&) {
+  consume_impl = [](const std::shared_ptr& input,
+const uint32_t* group_ids, Buffer* sums, Buffer* 
counts) {
+auto raw_input = reinterpret_cast::CType*>(
+input->buffers[1]->data());
+auto raw_sums = reinterpret_cast::CType*>(
+sums->mutable_data());
+auto raw_counts = reinterpret_cast(counts->mutable_data());
+
+arrow::internal::VisitSetBitRunsVoid(
+input->buffers[0], input->offset, input->length,
+[&](int64_t begin, int64_t length) {
+  for (int64_t input_i = begin, i = begin - input->offset;
+   input_i < begin + length; ++input_i, ++i) {
+auto g = group_ids[i];
+raw_sums[g] += raw_input[input_i];
+raw_counts[g] += 1;
+  }
+});
+  };
+  out_type = TypeTraits::type_singleton();
+  return Status::OK();
+}
+
+Status Visit(const BooleanType&) {
+  consume_impl = [](const std::shared_ptr& input,
+const uint32_t* group_ids, Buffer* sums, Buffer* 
counts) {
+auto raw_input = input->buffers[1]->data();
+auto 

[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-11 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r592454275



##
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##
@@ -91,6 +95,377 @@ struct CountImpl : public ScalarAggregator {
   int64_t nulls = 0;
 };
 
+struct GroupedAggregator {
+  virtual ~GroupedAggregator() = default;
+
+  virtual void Consume(KernelContext*, const Datum& aggregand,
+   const uint32_t* group_ids) = 0;
+
+  virtual void Finalize(KernelContext* ctx, Datum* out) = 0;
+
+  virtual void Resize(KernelContext* ctx, int64_t new_num_groups) = 0;
+
+  virtual int64_t num_groups() const = 0;
+
+  void MaybeResize(KernelContext* ctx, int64_t length, const uint32_t* 
group_ids) {
+if (length == 0) return;
+
+// maybe a batch of group_ids should include the min/max group id
+int64_t max_group = *std::max_element(group_ids, group_ids + length);
+auto old_size = num_groups();
+
+if (max_group >= old_size) {
+  auto new_size = BufferBuilder::GrowByFactor(old_size, max_group + 1);
+  Resize(ctx, new_size);
+}
+  }
+
+  virtual std::shared_ptr out_type() const = 0;
+};
+
+struct GroupedCountImpl : public GroupedAggregator {
+  static std::unique_ptr Make(KernelContext* ctx,
+const 
std::shared_ptr&,
+const FunctionOptions* 
options) {
+auto out = ::arrow::internal::make_unique();
+out->options_ = checked_cast(*options);
+ctx->SetStatus(ctx->Allocate(0).Value(>counts_));
+return out;
+  }
+
+  void Resize(KernelContext* ctx, int64_t new_num_groups) override {
+auto old_size = num_groups();
+KERNEL_RETURN_IF_ERROR(ctx, counts_->TypedResize(new_num_groups));
+auto new_size = num_groups();
+
+auto raw_counts = reinterpret_cast(counts_->mutable_data());
+for (auto i = old_size; i < new_size; ++i) {
+  raw_counts[i] = 0;
+}
+  }
+
+  void Consume(KernelContext* ctx, const Datum& aggregand,
+   const uint32_t* group_ids) override {
+MaybeResize(ctx, aggregand.length(), group_ids);
+if (ctx->HasError()) return;
+
+auto raw_counts = reinterpret_cast(counts_->mutable_data());
+
+const auto& input = aggregand.array();
+
+if (options_.count_mode == CountOptions::COUNT_NULL) {
+  for (int64_t i = 0, input_i = input->offset; i < input->length; ++i, 
++input_i) {
+auto g = group_ids[i];
+raw_counts[g] += !BitUtil::GetBit(input->buffers[0]->data(), input_i);
+  }
+  return;
+}
+
+arrow::internal::VisitSetBitRunsVoid(
+input->buffers[0], input->offset, input->length,
+[&](int64_t begin, int64_t length) {
+  for (int64_t input_i = begin, i = begin - input->offset;
+   input_i < begin + length; ++input_i, ++i) {
+auto g = group_ids[i];
+raw_counts[g] += 1;
+  }
+});
+  }
+
+  void Finalize(KernelContext* ctx, Datum* out) override {
+auto length = num_groups();
+*out = std::make_shared(length, std::move(counts_));
+  }
+
+  int64_t num_groups() const override { return counts_->size() / 
sizeof(int64_t); }
+
+  std::shared_ptr out_type() const override { return int64(); }
+
+  CountOptions options_;
+  std::shared_ptr counts_;
+};
+
+struct GroupedSumImpl : public GroupedAggregator {
+  // NB: whether we are accumulating into double, int64_t, or uint64_t
+  // we always have 64 bits per group in the sums buffer.
+  static constexpr size_t kSumSize = sizeof(int64_t);
+
+  using ConsumeImpl = std::function&,
+ const uint32_t*, Buffer*, Buffer*)>;
+
+  struct GetConsumeImpl {
+template ::Type>
+Status Visit(const T&) {
+  consume_impl = [](const std::shared_ptr& input,
+const uint32_t* group_ids, Buffer* sums, Buffer* 
counts) {
+auto raw_input = reinterpret_cast::CType*>(
+input->buffers[1]->data());
+auto raw_sums = reinterpret_cast::CType*>(
+sums->mutable_data());
+auto raw_counts = reinterpret_cast(counts->mutable_data());
+
+arrow::internal::VisitSetBitRunsVoid(
+input->buffers[0], input->offset, input->length,
+[&](int64_t begin, int64_t length) {
+  for (int64_t input_i = begin, i = begin - input->offset;
+   input_i < begin + length; ++input_i, ++i) {
+auto g = group_ids[i];
+raw_sums[g] += raw_input[input_i];
+raw_counts[g] += 1;
+  }
+});
+  };
+  out_type = TypeTraits::type_singleton();
+  return Status::OK();
+}
+
+Status Visit(const BooleanType&) {
+  consume_impl = [](const std::shared_ptr& input,
+const uint32_t* group_ids, Buffer* sums, Buffer* 
counts) {
+auto raw_input = input->buffers[1]->data();
+auto 

[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-11 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r592450949



##
File path: cpp/src/arrow/compute/kernels/aggregate_test.cc
##
@@ -27,24 +27,531 @@
 #include "arrow/array.h"
 #include "arrow/chunked_array.h"
 #include "arrow/compute/api_aggregate.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/cast.h"
 #include "arrow/compute/kernels/aggregate_internal.h"
 #include "arrow/compute/kernels/test_util.h"
+#include "arrow/compute/registry.h"
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
 #include "arrow/util/bitmap_reader.h"
 #include "arrow/util/checked_cast.h"
+#include "arrow/util/int_util_internal.h"
 
 #include "arrow/testing/gtest_common.h"
 #include "arrow/testing/gtest_util.h"
 #include "arrow/testing/random.h"
+#include "arrow/util/logging.h"
 
 namespace arrow {
 
+using internal::BitmapReader;
 using internal::checked_cast;
 using internal::checked_pointer_cast;
 
 namespace compute {
 
+// Copy-pasta from partition.cc
+//
+// In the finished product this will only be a test helper for group_by
+// and partition.cc will rely on a no-aggregate call to group_by.
+namespace group_helpers {
+namespace {
+
+// Transform an array of counts to offsets which will divide a ListArray
+// into an equal number of slices with corresponding lengths.
+Result> CountsToOffsets(std::shared_ptr 
counts) {
+  TypedBufferBuilder offset_builder;
+  RETURN_NOT_OK(offset_builder.Resize(counts->length() + 1));
+
+  int32_t current_offset = 0;
+  offset_builder.UnsafeAppend(current_offset);
+
+  for (int64_t i = 0; i < counts->length(); ++i) {
+DCHECK_NE(counts->Value(i), 0);
+current_offset += static_cast(counts->Value(i));
+offset_builder.UnsafeAppend(current_offset);
+  }
+
+  std::shared_ptr offsets;
+  RETURN_NOT_OK(offset_builder.Finish());
+  return offsets;
+}
+
+class StructDictionary {
+ public:
+  struct Encoded {
+std::shared_ptr indices;
+std::shared_ptr dictionary;
+  };
+
+  static Result Encode(const ArrayVector& columns) {
+Encoded out{nullptr, std::make_shared()};
+
+for (const auto& column : columns) {
+  if (column->null_count() != 0) {
+return Status::NotImplemented("Grouping on a field with nulls");
+  }
+
+  RETURN_NOT_OK(out.dictionary->AddOne(column, ));
+}
+
+return out;
+  }
+
+  Result> Decode(std::shared_ptr 
fused_indices,
+  FieldVector fields) {
+std::vector builders(dictionaries_.size());
+for (Int32Builder& b : builders) {
+  RETURN_NOT_OK(b.Resize(fused_indices->length()));
+}
+
+std::vector codes(dictionaries_.size());
+for (int64_t i = 0; i < fused_indices->length(); ++i) {
+  Expand(fused_indices->Value(i), codes.data());
+
+  auto builder_it = builders.begin();
+  for (int32_t index : codes) {
+builder_it++->UnsafeAppend(index);
+  }
+}
+
+ArrayVector columns(dictionaries_.size());
+for (size_t i = 0; i < dictionaries_.size(); ++i) {
+  std::shared_ptr indices;
+  RETURN_NOT_OK(builders[i].FinishInternal());
+
+  ARROW_ASSIGN_OR_RAISE(Datum column, compute::Take(dictionaries_[i], 
indices));
+
+  if (fields[i]->type()->id() == Type::DICTIONARY) {
+RETURN_NOT_OK(RestoreDictionaryEncoding(
+checked_pointer_cast(fields[i]->type()), ));
+  }
+
+  columns[i] = column.make_array();
+}
+
+return StructArray::Make(std::move(columns), std::move(fields));
+  }
+
+ private:
+  Status AddOne(Datum column, std::shared_ptr* fused_indices) {
+if (column.type()->id() != Type::DICTIONARY) {
+  ARROW_ASSIGN_OR_RAISE(column, 
compute::DictionaryEncode(std::move(column)));
+}
+
+auto dict_column = column.array_as();
+dictionaries_.push_back(dict_column->dictionary());
+ARROW_ASSIGN_OR_RAISE(auto indices, compute::Cast(*dict_column->indices(), 
int32()));
+
+if (*fused_indices == nullptr) {
+  *fused_indices = checked_pointer_cast(std::move(indices));
+  return IncreaseSize();
+}
+
+// It's useful to think about the case where each of dictionaries_ has 
size 10.
+// In this case the decimal digit in the ones place is the code in 
dictionaries_[0],
+// the tens place corresponds to the code in dictionaries_[1], etc.
+// The incumbent indices must be shifted to the hundreds place so as not 
to collide.
+ARROW_ASSIGN_OR_RAISE(Datum new_fused_indices,
+  compute::Multiply(indices, MakeScalar(size_)));
+
+ARROW_ASSIGN_OR_RAISE(new_fused_indices,
+  compute::Add(new_fused_indices, *fused_indices));
+
+*fused_indices = 
checked_pointer_cast(new_fused_indices.make_array());
+return IncreaseSize();
+  }
+
+  // expand a fused code into component dict codes, order is in order of 
addition
+  void Expand(int32_t fused_code, int32_t* codes) {
+

[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-11 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r592450678



##
File path: cpp/src/arrow/compute/kernels/aggregate_test.cc
##
@@ -27,24 +27,531 @@
 #include "arrow/array.h"
 #include "arrow/chunked_array.h"
 #include "arrow/compute/api_aggregate.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/cast.h"
 #include "arrow/compute/kernels/aggregate_internal.h"
 #include "arrow/compute/kernels/test_util.h"
+#include "arrow/compute/registry.h"
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
 #include "arrow/util/bitmap_reader.h"
 #include "arrow/util/checked_cast.h"
+#include "arrow/util/int_util_internal.h"
 
 #include "arrow/testing/gtest_common.h"
 #include "arrow/testing/gtest_util.h"
 #include "arrow/testing/random.h"
+#include "arrow/util/logging.h"
 
 namespace arrow {
 
+using internal::BitmapReader;
 using internal::checked_cast;
 using internal::checked_pointer_cast;
 
 namespace compute {
 
+// Copy-pasta from partition.cc
+//
+// In the finished product this will only be a test helper for group_by
+// and partition.cc will rely on a no-aggregate call to group_by.
+namespace group_helpers {
+namespace {
+
+// Transform an array of counts to offsets which will divide a ListArray
+// into an equal number of slices with corresponding lengths.
+Result> CountsToOffsets(std::shared_ptr 
counts) {
+  TypedBufferBuilder offset_builder;
+  RETURN_NOT_OK(offset_builder.Resize(counts->length() + 1));
+
+  int32_t current_offset = 0;
+  offset_builder.UnsafeAppend(current_offset);
+
+  for (int64_t i = 0; i < counts->length(); ++i) {
+DCHECK_NE(counts->Value(i), 0);
+current_offset += static_cast(counts->Value(i));
+offset_builder.UnsafeAppend(current_offset);
+  }
+
+  std::shared_ptr offsets;
+  RETURN_NOT_OK(offset_builder.Finish());
+  return offsets;
+}
+
+class StructDictionary {
+ public:
+  struct Encoded {
+std::shared_ptr indices;
+std::shared_ptr dictionary;
+  };
+
+  static Result Encode(const ArrayVector& columns) {
+Encoded out{nullptr, std::make_shared()};
+
+for (const auto& column : columns) {
+  if (column->null_count() != 0) {
+return Status::NotImplemented("Grouping on a field with nulls");
+  }
+
+  RETURN_NOT_OK(out.dictionary->AddOne(column, ));
+}
+
+return out;
+  }
+
+  Result> Decode(std::shared_ptr 
fused_indices,
+  FieldVector fields) {
+std::vector builders(dictionaries_.size());
+for (Int32Builder& b : builders) {
+  RETURN_NOT_OK(b.Resize(fused_indices->length()));
+}
+
+std::vector codes(dictionaries_.size());
+for (int64_t i = 0; i < fused_indices->length(); ++i) {
+  Expand(fused_indices->Value(i), codes.data());
+
+  auto builder_it = builders.begin();
+  for (int32_t index : codes) {
+builder_it++->UnsafeAppend(index);
+  }
+}
+
+ArrayVector columns(dictionaries_.size());
+for (size_t i = 0; i < dictionaries_.size(); ++i) {
+  std::shared_ptr indices;
+  RETURN_NOT_OK(builders[i].FinishInternal());
+
+  ARROW_ASSIGN_OR_RAISE(Datum column, compute::Take(dictionaries_[i], 
indices));
+
+  if (fields[i]->type()->id() == Type::DICTIONARY) {
+RETURN_NOT_OK(RestoreDictionaryEncoding(
+checked_pointer_cast(fields[i]->type()), ));
+  }
+
+  columns[i] = column.make_array();
+}
+
+return StructArray::Make(std::move(columns), std::move(fields));
+  }
+
+ private:
+  Status AddOne(Datum column, std::shared_ptr* fused_indices) {
+if (column.type()->id() != Type::DICTIONARY) {
+  ARROW_ASSIGN_OR_RAISE(column, 
compute::DictionaryEncode(std::move(column)));
+}
+
+auto dict_column = column.array_as();
+dictionaries_.push_back(dict_column->dictionary());
+ARROW_ASSIGN_OR_RAISE(auto indices, compute::Cast(*dict_column->indices(), 
int32()));
+
+if (*fused_indices == nullptr) {
+  *fused_indices = checked_pointer_cast(std::move(indices));
+  return IncreaseSize();
+}
+
+// It's useful to think about the case where each of dictionaries_ has 
size 10.
+// In this case the decimal digit in the ones place is the code in 
dictionaries_[0],
+// the tens place corresponds to the code in dictionaries_[1], etc.
+// The incumbent indices must be shifted to the hundreds place so as not 
to collide.
+ARROW_ASSIGN_OR_RAISE(Datum new_fused_indices,
+  compute::Multiply(indices, MakeScalar(size_)));
+
+ARROW_ASSIGN_OR_RAISE(new_fused_indices,
+  compute::Add(new_fused_indices, *fused_indices));
+
+*fused_indices = 
checked_pointer_cast(new_fused_indices.make_array());
+return IncreaseSize();
+  }
+
+  // expand a fused code into component dict codes, order is in order of 
addition
+  void Expand(int32_t fused_code, int32_t* codes) {
+

[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-11 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r592450325



##
File path: cpp/src/arrow/compute/kernels/aggregate_test.cc
##
@@ -27,24 +27,531 @@
 #include "arrow/array.h"
 #include "arrow/chunked_array.h"
 #include "arrow/compute/api_aggregate.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/cast.h"
 #include "arrow/compute/kernels/aggregate_internal.h"
 #include "arrow/compute/kernels/test_util.h"
+#include "arrow/compute/registry.h"
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
 #include "arrow/util/bitmap_reader.h"
 #include "arrow/util/checked_cast.h"
+#include "arrow/util/int_util_internal.h"
 
 #include "arrow/testing/gtest_common.h"
 #include "arrow/testing/gtest_util.h"
 #include "arrow/testing/random.h"
+#include "arrow/util/logging.h"
 
 namespace arrow {
 
+using internal::BitmapReader;
 using internal::checked_cast;
 using internal::checked_pointer_cast;
 
 namespace compute {
 
+// Copy-pasta from partition.cc
+//
+// In the finished product this will only be a test helper for group_by
+// and partition.cc will rely on a no-aggregate call to group_by.
+namespace group_helpers {
+namespace {
+
+// Transform an array of counts to offsets which will divide a ListArray
+// into an equal number of slices with corresponding lengths.
+Result> CountsToOffsets(std::shared_ptr 
counts) {
+  TypedBufferBuilder offset_builder;
+  RETURN_NOT_OK(offset_builder.Resize(counts->length() + 1));
+
+  int32_t current_offset = 0;
+  offset_builder.UnsafeAppend(current_offset);
+
+  for (int64_t i = 0; i < counts->length(); ++i) {
+DCHECK_NE(counts->Value(i), 0);
+current_offset += static_cast(counts->Value(i));
+offset_builder.UnsafeAppend(current_offset);
+  }
+
+  std::shared_ptr offsets;
+  RETURN_NOT_OK(offset_builder.Finish());
+  return offsets;
+}
+
+class StructDictionary {
+ public:
+  struct Encoded {
+std::shared_ptr indices;
+std::shared_ptr dictionary;
+  };
+
+  static Result Encode(const ArrayVector& columns) {
+Encoded out{nullptr, std::make_shared()};
+
+for (const auto& column : columns) {
+  if (column->null_count() != 0) {
+return Status::NotImplemented("Grouping on a field with nulls");
+  }
+
+  RETURN_NOT_OK(out.dictionary->AddOne(column, ));
+}
+
+return out;
+  }
+
+  Result> Decode(std::shared_ptr 
fused_indices,
+  FieldVector fields) {
+std::vector builders(dictionaries_.size());
+for (Int32Builder& b : builders) {
+  RETURN_NOT_OK(b.Resize(fused_indices->length()));
+}
+
+std::vector codes(dictionaries_.size());
+for (int64_t i = 0; i < fused_indices->length(); ++i) {
+  Expand(fused_indices->Value(i), codes.data());
+
+  auto builder_it = builders.begin();
+  for (int32_t index : codes) {
+builder_it++->UnsafeAppend(index);
+  }
+}
+
+ArrayVector columns(dictionaries_.size());
+for (size_t i = 0; i < dictionaries_.size(); ++i) {
+  std::shared_ptr indices;
+  RETURN_NOT_OK(builders[i].FinishInternal());
+
+  ARROW_ASSIGN_OR_RAISE(Datum column, compute::Take(dictionaries_[i], 
indices));
+
+  if (fields[i]->type()->id() == Type::DICTIONARY) {
+RETURN_NOT_OK(RestoreDictionaryEncoding(
+checked_pointer_cast(fields[i]->type()), ));
+  }
+
+  columns[i] = column.make_array();
+}
+
+return StructArray::Make(std::move(columns), std::move(fields));
+  }
+
+ private:
+  Status AddOne(Datum column, std::shared_ptr* fused_indices) {
+if (column.type()->id() != Type::DICTIONARY) {
+  ARROW_ASSIGN_OR_RAISE(column, 
compute::DictionaryEncode(std::move(column)));
+}
+
+auto dict_column = column.array_as();
+dictionaries_.push_back(dict_column->dictionary());
+ARROW_ASSIGN_OR_RAISE(auto indices, compute::Cast(*dict_column->indices(), 
int32()));
+
+if (*fused_indices == nullptr) {
+  *fused_indices = checked_pointer_cast(std::move(indices));
+  return IncreaseSize();
+}
+
+// It's useful to think about the case where each of dictionaries_ has 
size 10.
+// In this case the decimal digit in the ones place is the code in 
dictionaries_[0],
+// the tens place corresponds to the code in dictionaries_[1], etc.
+// The incumbent indices must be shifted to the hundreds place so as not 
to collide.
+ARROW_ASSIGN_OR_RAISE(Datum new_fused_indices,
+  compute::Multiply(indices, MakeScalar(size_)));
+
+ARROW_ASSIGN_OR_RAISE(new_fused_indices,
+  compute::Add(new_fused_indices, *fused_indices));
+
+*fused_indices = 
checked_pointer_cast(new_fused_indices.make_array());
+return IncreaseSize();
+  }
+
+  // expand a fused code into component dict codes, order is in order of 
addition
+  void Expand(int32_t fused_code, int32_t* codes) {
+

[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-11 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r592445217



##
File path: cpp/src/arrow/compute/kernels/aggregate_benchmark.cc
##
@@ -300,6 +300,70 @@ BENCHMARK_TEMPLATE(ReferenceSum, 
SumBitmapVectorizeUnroll)
 ->Apply(BenchmarkSetArgs);
 #endif  // ARROW_WITH_BENCHMARKS_REFERENCE
 
+//
+// GroupBy
+//
+
+static void BenchmarkGroupBy(benchmark::State& state,
+ std::vector aggregates,
+ std::vector aggregands, std::vector 
keys) {
+  std::vector arguments;
+  for (const Datum& aggregand : aggregands) {
+arguments.push_back(aggregand);
+  }
+
+  for (const Datum& key : keys) {
+arguments.push_back(key);
+  }
+
+  GroupByOptions options;
+  options.aggregates = aggregates;
+  options.key_names.resize(keys.size(), "ignored");
+
+  for (auto _ : state) {
+ABORT_NOT_OK(CallFunction("group_by", arguments, ).status());
+  }
+}
+
+#define GROUP_BY_BENCHMARK(Name, Impl)   \
+  static void Name(benchmark::State& state) {\
+RegressionArgs args(state, false);   \
+auto rng = random::RandomArrayGenerator(1923);   \
+(Impl)();\
+  }  \
+  BENCHMARK(Name)->Apply([](benchmark::internal::Benchmark* bench) { \
+BenchmarkSetArgsWithSizes(bench, {1 * 1024 * 1024}); \
+  })
+
+GROUP_BY_BENCHMARK(SumDoublesGroupedBySmallStringSet, [&] {
+  auto summand = rng.Float64(args.size,
+ /*min=*/0.0,
+ /*max=*/1.0e14,
+ /*null_probability=*/args.null_proportion,
+ /*nan_probability=*/args.null_proportion / 10);
+
+  auto key = rng.StringWithRepeats(args.size,
+   /*unique=*/16,
+   /*min_length=*/3,
+   /*max_length=*/32);
+
+  BenchmarkGroupBy(state, {{"sum", NULLPTR, "summed f64"}}, {summand}, {key});

Review comment:
   Well it doesn't work. I'll remove the second string





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-11 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r59262



##
File path: cpp/src/arrow/compute/kernels/aggregate_test.cc
##
@@ -27,24 +27,531 @@
 #include "arrow/array.h"

Review comment:
   Agreed. I'll move it to a separate test.cc





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-11 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r592443814



##
File path: cpp/src/arrow/compute/exec.cc
##
@@ -838,6 +838,13 @@ class ScalarAggExecutor : public 
KernelExecutorImpl {
 
  private:
   Status Consume(const ExecBatch& batch) {
+if (kernel_->nomerge) {
+  kernel_->consume(kernel_ctx_, batch);
+  ARROW_CTX_RETURN_IF_ERROR(kernel_ctx_);
+  return Status::OK();
+}

Review comment:
   This is a confusing aspect of the KernelExecutor contract which should 
be documented: `KernelExecutor::Init` initializes the executor but assumes that 
the kernel's `init` has already been called in the provided `KernelContext`. 
This is to facilitate the case where `init` may be expensive and does not need 
to be called again for each execution of the kernel, for example the same 
lookup table can be re-used for all scanned batches in a dataset filter.
   
   I'll add a comment clarifying this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-11 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r592443814



##
File path: cpp/src/arrow/compute/exec.cc
##
@@ -838,6 +838,13 @@ class ScalarAggExecutor : public 
KernelExecutorImpl {
 
  private:
   Status Consume(const ExecBatch& batch) {
+if (kernel_->nomerge) {
+  kernel_->consume(kernel_ctx_, batch);
+  ARROW_CTX_RETURN_IF_ERROR(kernel_ctx_);
+  return Status::OK();
+}

Review comment:
   This is a confusing aspect of the KernelExecutor contract which should 
be documented: `KernelExecutor::Init` initializes the executor but assumes that 
the kernel's `init` has already been called in the provided `KernelContext`. 
This is to facilitate the case where `init` may be expensive and does not need 
to be called again for each execution of the kernel, for example the same 
lookup table can be re-used for all scanned batches in a dataset filter.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] bkietz commented on a change in pull request #9621: ARROW-11591: [C++][Compute] Grouped aggregation

2021-03-11 Thread GitBox


bkietz commented on a change in pull request #9621:
URL: https://github.com/apache/arrow/pull/9621#discussion_r592436887



##
File path: cpp/src/arrow/compute/api_aggregate.h
##
@@ -306,5 +326,34 @@ Result TDigest(const Datum& value,
   const TDigestOptions& options = 
TDigestOptions::Defaults(),
   ExecContext* ctx = NULLPTR);
 
+/// \brief Calculate multiple aggregations grouped on multiple keys
+///
+/// \param[in] aggregands datums to which aggregations will be applied
+/// \param[in] keys datums which will be used to group the aggregations
+/// \param[in] options GroupByOptions, encapsulating the names and options of 
aggregate
+///functions to be applied and the field names for results in the 
output.
+/// \return a StructArray with len(aggregands) + len(keys) fields. The first
+/// len(aggregands) fields are the results of the aggregations for the 
group
+/// specified by keys in the final len(keys) fields.
+///
+/// For example:
+///   GroupByOptions options = {
+/// .aggregates = {
+///   {"sum", nullptr, "sum result"},
+///   {"mean", nullptr, "mean result"},
+/// },
+/// .key_names = {"str key", "date key"},
+///   };
+/// assert(*GroupBy({[2, 5, 8], [1.5, 2.0, 3.0]},
+/// {["a", "b", "a"], [today, today, today]},
+/// options).Equals([
+///   {"sum result": 10, "mean result": 2.25, "str key": "a", "date key": 
today},
+///   {"sum result": 5,  "mean result": 2.0,  "str key": "b", "date key": 
today},
+/// ]))

Review comment:
   Since the group id lists are temporary (except in the rare case where we 
need to partition batches for writing), we will be computing and discarding 
them on the fly rather than materializing an O(N) set of them.
   
   I'll be removing this compute function; as mentioned above it's not 
necessary for group by to live in the function registry.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org