pitrou commented on code in PR #12460: URL: https://github.com/apache/arrow/pull/12460#discussion_r848649199
########## cpp/src/arrow/compute/kernels/vector_cumulative_ops_test.cc: ########## @@ -0,0 +1,257 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <memory> +#include <string> +#include <vector> + +#include <gtest/gtest.h> + +#include "arrow/array.h" +#include "arrow/chunked_array.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/util.h" +#include "arrow/type.h" + +#include "arrow/compute/api.h" +#include "arrow/compute/kernels/test_util.h" + +namespace arrow { +namespace compute { + +template <typename T, typename OptionsType> +class TestCumulativeOp : public ::testing::Test { Review Comment: We've been inconsistent in this, but to minimize compile times it would be better to use [value parameterization](https://github.com/google/googletest/blob/main/docs/advanced.md#value-parameterized-tests) (or even manual parameterization using for loops) rather than type parameterization. For example the "values" for value parameterization could be instances of a struct: ```c++ struct CumulativeSumParam { std::shared_ptr<DataType> type; std::string json_input; std::string json_start; std::string json_expected; } ``` ########## cpp/src/arrow/compute/kernels/vector_cumulative_ops.cc: ########## @@ -0,0 +1,218 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/array/array_base.h" +#include "arrow/compute/api_scalar.h" +#include "arrow/compute/api_vector.h" +#include "arrow/compute/cast.h" +#include "arrow/compute/kernels/base_arithmetic_internal.h" +#include "arrow/compute/kernels/codegen_internal.h" +#include "arrow/compute/kernels/common.h" +#include "arrow/result.h" +#include "arrow/util/bit_util.h" +#include "arrow/visit_type_inline.h" + +namespace arrow { +namespace compute { +namespace internal { +namespace { +template <typename OptionsType> +struct CumulativeOptionsWrapper : public OptionsWrapper<OptionsType> { + using State = CumulativeOptionsWrapper<OptionsType>; + + explicit CumulativeOptionsWrapper(OptionsType options) + : OptionsWrapper<OptionsType>(std::move(options)) {} + + static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx, + const KernelInitArgs& args) { + auto options = static_cast<const OptionsType*>(args.options); + if (!options) { + return Status::Invalid( + "Attempted to initialize KernelState from null FunctionOptions"); + } + + const auto& start = options->start; + if (!start || !start->is_valid) { + return Status::Invalid("Cumulative `start` option must be non-null and valid"); + } + + // Ensure `start` option matches input type + if (!start->type->Equals(args.inputs[0].type)) { + ARROW_ASSIGN_OR_RAISE(auto casted_start, + Cast(Datum(start), args.inputs[0].type, CastOptions::Safe(), + ctx->exec_context())); + auto new_options = OptionsType(casted_start.scalar(), options->skip_nulls); + return ::arrow::internal::make_unique<State>(new_options); + } + return ::arrow::internal::make_unique<State>(*options); + } +}; + +// The driver kernel for all cumulative compute functions. Op is a compute kernel +// representing any binary associative operation (add, product, min, max, etc.) and +// OptionsType the options type corresponding to Op. ArgType and OutType are the input +// and output types, which will normally be the same (e.g. the cumulative sum of an array +// of Int64Type will result in an array of Int64Type). +template <typename OutType, typename ArgType, typename Op, typename OptionsType> +struct CumulativeGeneric { + using OutValue = typename GetOutputType<OutType>::T; + using ArgValue = typename GetViewType<ArgType>::T; + + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + const auto& options = CumulativeOptionsWrapper<OptionsType>::Get(ctx); + auto start = UnboxScalar<OutType>::Unbox(*(options.start)); + auto skip_nulls = options.skip_nulls; + + int64_t base_output_offset = 0; + bool encountered_null = false; + ArrayData* out_arr = out->mutable_array(); + + switch (batch[0].kind()) { + case Datum::ARRAY: { + auto st = Call(ctx, base_output_offset, *batch[0].array(), out_arr, &start, + skip_nulls, &encountered_null); + out_arr->SetNullCount(arrow::kUnknownNullCount); + return st; + } + case Datum::CHUNKED_ARRAY: { + const auto& input = batch[0].chunked_array(); + + for (const auto& chunk : input->chunks()) { + RETURN_NOT_OK(Call(ctx, base_output_offset, *chunk->data(), out_arr, &start, + skip_nulls, &encountered_null)); + base_output_offset += chunk->length(); + } + out_arr->SetNullCount(arrow::kUnknownNullCount); + return Status::OK(); + } + default: + return Status::NotImplemented( + "Unsupported input type for function 'cumulative_<operator>': ", + batch[0].ToString()); + } + } + + static Status Call(KernelContext* ctx, int64_t base_output_offset, + const ArrayData& input, ArrayData* output, ArgValue* accumulator, + bool skip_nulls, bool* encountered_null) { + Status st = Status::OK(); + ArgValue accumulator_tmp = *accumulator; + bool encountered_null_tmp = *encountered_null; + + auto out_bitmap = output->GetMutableValues<uint8_t>(0); + auto out_data = output->GetMutableValues<OutValue>(1) + base_output_offset; + int64_t curr = base_output_offset; + + auto null_func = [&]() { + *out_data++ = OutValue{}; + encountered_null_tmp = true; + arrow::bit_util::SetBitTo(out_bitmap, curr, false); + ++curr; + }; + + if (skip_nulls) { Review Comment: ```suggestion if (skip_nulls || input.GetNullCount() == 0) { ``` ########## cpp/src/arrow/compute/kernels/vector_cumulative_ops.cc: ########## @@ -0,0 +1,215 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/array/array_base.h" +#include "arrow/compute/api_scalar.h" +#include "arrow/compute/api_vector.h" +#include "arrow/compute/cast.h" +#include "arrow/compute/kernels/base_arithmetic_internal.h" +#include "arrow/compute/kernels/codegen_internal.h" +#include "arrow/compute/kernels/common.h" +#include "arrow/result.h" +#include "arrow/util/bit_util.h" +#include "arrow/visit_type_inline.h" + +namespace arrow { +namespace compute { +namespace internal { +namespace { +template <typename OptionsType> +struct CumulativeOptionsWrapper : public OptionsWrapper<OptionsType> { + using State = CumulativeOptionsWrapper<OptionsType>; + + explicit CumulativeOptionsWrapper(OptionsType options) : OptionsWrapper<OptionsType>(std::move(options)) {} + + static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx, + const KernelInitArgs& args) { + auto options = static_cast<const OptionsType*>(args.options); + if (!options) { + return Status::Invalid( + "Attempted to initialize KernelState from null FunctionOptions"); + } + + const auto& start = options->start; + if (!start || !start->is_valid) { + return Status::Invalid("Cumulative `start` option must be non-null and valid"); + } + + // Ensure `start` option matches input type + if (!start->type->Equals(args.inputs[0].type)) { + ARROW_ASSIGN_OR_RAISE(auto casted_start, Cast(Datum(start), args.inputs[0].type)); + auto new_options = OptionsType(casted_start.scalar(), options->skip_nulls); + return ::arrow::internal::make_unique<State>(new_options); + } + return ::arrow::internal::make_unique<State>(*options); + } +}; + +// The driver kernel for all cumulative compute functions. Op is a compute kernel +// representing any binary associative operation (add, product, min, max, etc.) and +// OptionsType the options type corresponding to Op. ArgType and OutType are the input +// and output types, which will normally be the same (e.g. the cumulative sum of an array +// of Int64Type will result in an array of Int64Type). +template <typename OutType, typename ArgType, typename Op, typename OptionsType> +struct CumulativeGeneric { + using OutValue = typename GetOutputType<OutType>::T; + using ArgValue = typename GetViewType<ArgType>::T; + + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + const auto& options = CumulativeOptionsWrapper<OptionsType>::Get(ctx); + auto start = UnboxScalar<OutType>::Unbox(*(options.start)); + auto skip_nulls = options.skip_nulls; + + int64_t base_output_offset = 0; + bool encountered_null = false; + ArrayData* out_arr = out->mutable_array(); Review Comment: Perhaps, though for consistency it would be nicer to return the trivial result for Scalar input. ########## python/pyarrow/tests/test_compute.py: ########## @@ -2503,6 +2504,63 @@ def test_min_max_element_wise(): assert result == pa.array([1, 2, None]) [email protected]('start', (1.1, 10.5, -10.5)) [email protected]('skip_nulls', (True, False)) +def test_cumulative_sum(start, skip_nulls): + # Exact tests (e.g., integral types) + start_int = int(start) + starts = [start_int, pa.scalar(start_int, type=pa.int8()), + pa.scalar(start_int, type=pa.int64())] + for strt in starts: + arrays = [ + pa.array([1, 2, 3]), + pa.array([0, None, 20, 30]), + pa.chunked_array([[0, None], [20, 30]]) + ] + expected_arrays = [ + pa.array([1, 3, 6]), + pa.array([0, None, 20, 50]) + if skip_nulls else pa.array([0, None, None, None]), + pa.chunked_array([[0, None, 20, 50]]) + if skip_nulls else pa.chunked_array([[0, None, None, None]]) + ] + for i, arr in enumerate(arrays): + result = pc.cumulative_sum(arr, start=strt, skip_nulls=skip_nulls) + # Add `start` offset to expected array before comparing + expected = pc.add(expected_arrays[i], strt) + assert result.equals(expected) + + # Approximate and NaN tests (e.g., floating-point types) + # NOTE: Conversion from Arrow to NumPy replaces null slots with NaNs + # which prevent fully validating both states of `skip_nulls` for + # floating-point values. Ideally, equality comparisons make use of Arrow's + # `equals()` functions but an approximate version is not exposed in Python. + starts = [start, pa.scalar(start, type=pa.float32()), + pa.scalar(start, type=pa.float64())] + for strt in starts: + arrays = [ + pa.array([1.1, 2.2, 3.3]), + pa.array([1, np.nan, 2, -3, 4, 5]), + pa.array([1, np.nan, None, 3, None, 5]) + ] + expected_arrays = [ + np.array([1.1, 3.3, 6.6]), + np.array([1, np.nan, np.nan, np.nan, np.nan, np.nan]), + np.array([1, np.nan, None, np.nan, None, np.nan]) + if skip_nulls else np.array([1, np.nan, None, None, None, None]) + ] + for i, arr in enumerate(arrays): + result = pc.cumulative_sum(arr, start=strt, skip_nulls=skip_nulls) + # Add `start` offset to expected array before comparing + expected = pc.add(expected_arrays[i], strt) + np.testing.assert_array_almost_equal(result.to_numpy( + zero_copy_only=False), expected.to_numpy(zero_copy_only=False)) + + for strt in ['a', pa.scalar('arrow'), 1.1]: Review Comment: Should also test a null scalar? ########## cpp/src/arrow/compute/kernels/vector_cumulative_ops.cc: ########## @@ -0,0 +1,218 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/array/array_base.h" +#include "arrow/compute/api_scalar.h" +#include "arrow/compute/api_vector.h" +#include "arrow/compute/cast.h" +#include "arrow/compute/kernels/base_arithmetic_internal.h" +#include "arrow/compute/kernels/codegen_internal.h" +#include "arrow/compute/kernels/common.h" +#include "arrow/result.h" +#include "arrow/util/bit_util.h" +#include "arrow/visit_type_inline.h" + +namespace arrow { +namespace compute { +namespace internal { +namespace { +template <typename OptionsType> +struct CumulativeOptionsWrapper : public OptionsWrapper<OptionsType> { + using State = CumulativeOptionsWrapper<OptionsType>; + + explicit CumulativeOptionsWrapper(OptionsType options) + : OptionsWrapper<OptionsType>(std::move(options)) {} + + static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx, + const KernelInitArgs& args) { + auto options = static_cast<const OptionsType*>(args.options); + if (!options) { + return Status::Invalid( + "Attempted to initialize KernelState from null FunctionOptions"); + } + + const auto& start = options->start; + if (!start || !start->is_valid) { + return Status::Invalid("Cumulative `start` option must be non-null and valid"); + } Review Comment: Should a nullptr `start` simply mean 0 (the common case)? ########## docs/source/python/api/compute.rst: ########## @@ -479,6 +494,7 @@ Structural Transforms .. autosummary:: :toctree: ../generated/ + cumulative_sum Review Comment: Why is this added here? ########## cpp/src/arrow/compute/api_vector.cc: ########## @@ -325,6 +342,15 @@ Result<std::shared_ptr<Array>> DropNull(const Array& values, ExecContext* ctx) { return out.make_array(); } +// ---------------------------------------------------------------------- +// Cumulative functions + +Result<Datum> CumulativeSum(const Datum& values, const CumulativeSumOptions& options, + ExecContext* ctx) { + auto func_name = (options.check_overflow) ? "cumulative_sum_checked" : "cumulative_sum"; + return CallFunction(func_name, {Datum(values)}, &options, ctx); Review Comment: I find this API design a bit weird. What happens if I call "cumulative_sum_checked" but pass `check_overflow` set to false in the options struct (or conversely, call "cumulative_sum" with `check_overflow` set to true)? ########## cpp/src/arrow/compute/kernels/codegen_internal.h: ########## @@ -1134,6 +1134,37 @@ ArrayKernelExec GeneratePhysicalInteger(detail::GetTypeId get_id) { } } +template <template <typename...> class KernelGenerator, typename Op, typename... Args> +ArrayKernelExec ArithmeticExecFromOp(detail::GetTypeId get_id) { Review Comment: I'm not sure it's a good idea to add this helper function inside the already crowded `codegen_internal.h`. Since it's arithmetic-specific, put it in `base_arithmetic_internal.h` perhaps? ########## cpp/src/arrow/compute/kernels/vector_cumulative_ops_test.cc: ########## @@ -0,0 +1,257 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <memory> +#include <string> +#include <vector> + +#include <gtest/gtest.h> + +#include "arrow/array.h" +#include "arrow/chunked_array.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/util.h" +#include "arrow/type.h" + +#include "arrow/compute/api.h" +#include "arrow/compute/kernels/test_util.h" + +namespace arrow { +namespace compute { + +template <typename T, typename OptionsType> +class TestCumulativeOp : public ::testing::Test { + public: + using ArrowType = T; + using ScalarType = typename TypeTraits<T>::ScalarType; + using CType = typename TypeTraits<T>::CType; + + protected: + std::shared_ptr<DataType> type_singleton() { return default_type_instance<T>(); } + + std::shared_ptr<Array> array(const std::string& values) { + return ArrayFromJSON(type_singleton(), values); + } + + std::shared_ptr<ChunkedArray> chunked_array(const std::vector<std::string>& values) { + return ChunkedArrayFromJSON(type_singleton(), values); + } + + template <typename V = T> + enable_if_t<!is_floating_type<V>::value, void> Assert( + const std::string func, const std::shared_ptr<Array>& input, + const std::shared_ptr<Array>& expected, const OptionsType& options) { + ASSERT_OK_AND_ASSIGN(auto result, CallFunction(func, {Datum(input)}, &options)); + + AssertArraysEqual(*expected, *result.make_array(), false, EqualOptions::Defaults()); + } + + template <typename V = T> + enable_if_t<!is_floating_type<V>::value, void> Assert( + const std::string func, const std::shared_ptr<ChunkedArray>& input, + const std::shared_ptr<ChunkedArray>& expected, const OptionsType& options) { + ASSERT_OK_AND_ASSIGN(auto result, + CallFunction(func, {Datum(input)}, &options, nullptr)); + + ChunkedArray actual(result.chunks(), this->type_singleton()); + AssertChunkedEqual(*expected, actual); + } + + template <typename V = T> + enable_if_floating_point<V> Assert(const std::string func, Review Comment: You shouldn't have to special-case floating point, just use `AssertArraysApproxEqual` always. ########## docs/source/python/api/compute.rst: ########## @@ -45,6 +45,21 @@ Aggregations tdigest variance +Cumulative Functions +-------------------- + +Cumulative functions are vector functions that perform a running total on its +input and outputs an array containing the corresponding intermediate running values. Review Comment: ```suggestion Cumulative functions are vector functions that perform a running total on their input and output an array containing the corresponding intermediate running values. ``` ########## cpp/src/arrow/compute/kernels/vector_cumulative_ops_test.cc: ########## @@ -0,0 +1,257 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <memory> +#include <string> +#include <vector> + +#include <gtest/gtest.h> + +#include "arrow/array.h" +#include "arrow/chunked_array.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/util.h" +#include "arrow/type.h" + +#include "arrow/compute/api.h" +#include "arrow/compute/kernels/test_util.h" + +namespace arrow { +namespace compute { + +template <typename T, typename OptionsType> +class TestCumulativeOp : public ::testing::Test { + public: + using ArrowType = T; + using ScalarType = typename TypeTraits<T>::ScalarType; + using CType = typename TypeTraits<T>::CType; + + protected: + std::shared_ptr<DataType> type_singleton() { return default_type_instance<T>(); } + + std::shared_ptr<Array> array(const std::string& values) { + return ArrayFromJSON(type_singleton(), values); + } + + std::shared_ptr<ChunkedArray> chunked_array(const std::vector<std::string>& values) { + return ChunkedArrayFromJSON(type_singleton(), values); + } + + template <typename V = T> + enable_if_t<!is_floating_type<V>::value, void> Assert( + const std::string func, const std::shared_ptr<Array>& input, + const std::shared_ptr<Array>& expected, const OptionsType& options) { + ASSERT_OK_AND_ASSIGN(auto result, CallFunction(func, {Datum(input)}, &options)); + + AssertArraysEqual(*expected, *result.make_array(), false, EqualOptions::Defaults()); + } + + template <typename V = T> + enable_if_t<!is_floating_type<V>::value, void> Assert( + const std::string func, const std::shared_ptr<ChunkedArray>& input, + const std::shared_ptr<ChunkedArray>& expected, const OptionsType& options) { + ASSERT_OK_AND_ASSIGN(auto result, + CallFunction(func, {Datum(input)}, &options, nullptr)); + + ChunkedArray actual(result.chunks(), this->type_singleton()); + AssertChunkedEqual(*expected, actual); + } + + template <typename V = T> + enable_if_floating_point<V> Assert(const std::string func, + const std::shared_ptr<Array>& input, + const std::shared_ptr<Array>& expected, + const OptionsType& options) { + ASSERT_OK_AND_ASSIGN(auto result, CallFunction(func, {Datum(input)}, &options)); + + AssertArraysApproxEqual(*expected, *result.make_array(), false, + EqualOptions::Defaults()); + } + + template <typename V = T> + enable_if_floating_point<V> Assert(const std::string func, + const std::shared_ptr<ChunkedArray>& input, + const std::shared_ptr<ChunkedArray>& expected, + const OptionsType& options) { + ASSERT_OK_AND_ASSIGN(auto result, + CallFunction(func, {Datum(input)}, &options, nullptr)); + + ChunkedArray actual(result.chunks(), this->type_singleton()); + AssertChunkedApproxEquivalent(*expected, actual, EqualOptions::Defaults()); + } +}; + +template <typename T> +class TestCumulativeSum : public TestCumulativeOp<T, CumulativeSumOptions> { + public: + using OptionsType = CumulativeSumOptions; + using ArrowType = typename TestCumulativeOp<T, OptionsType>::ArrowType; + using ScalarType = typename TestCumulativeOp<T, OptionsType>::ScalarType; + using CType = typename TestCumulativeOp<T, OptionsType>::CType; + + protected: + template <typename U = T> + enable_if_parameter_free<U, OptionsType> generate_options(CType start = 0, + bool skip_nulls = false, + bool check_overflow = false) { + return OptionsType(std::make_shared<ScalarType>(start), skip_nulls, check_overflow); + } + + template <typename U = T> + enable_if_t<is_time_type<U>::value || is_timestamp_type<U>::value, OptionsType> + generate_options(CType start = 0, bool skip_nulls = false, + bool check_overflow = false) { + TimeUnit::type unit; + switch (ArrowType::type_id) { + case Type::TIME64: + unit = TimeUnit::NANO; + break; + default: + unit = TimeUnit::SECOND; + break; + } + return OptionsType(std::make_shared<ScalarType>(start, unit), skip_nulls, + check_overflow); + } + + void Assert(const std::string& values, const std::string& expected, + const OptionsType& options) { + auto values_arr = TestCumulativeOp<T, OptionsType>::array(values); + auto expected_arr = TestCumulativeOp<T, OptionsType>::array(expected); + auto func_name = options.check_overflow ? "cumulative_sum_checked" : "cumulative_sum"; + TestCumulativeOp<T, OptionsType>::Assert(func_name, values_arr, expected_arr, + options); + } + + void Assert(const std::vector<std::string>& values, + const std::vector<std::string>& expected, const OptionsType& options) { + auto values_arr = TestCumulativeOp<T, OptionsType>::chunked_array(values); + auto expected_arr = TestCumulativeOp<T, OptionsType>::chunked_array(expected); + auto func_name = options.check_overflow ? "cumulative_sum_checked" : "cumulative_sum"; + TestCumulativeOp<T, OptionsType>::Assert(func_name, values_arr, expected_arr, + options); + } +}; + +TYPED_TEST_SUITE(TestCumulativeSum, NumericArrowTypes); + +TYPED_TEST(TestCumulativeSum, NoStartNoSkipNoNulls) { + CumulativeSumOptions options = this->generate_options(); + auto empty = "[]"; + auto values = "[1, 2, 3, 4, 5, 6]"; + auto expected = "[1, 3, 6, 10, 15, 21]"; + std::vector<std::string> chunked_values = {"[1, 2, 3]", "[4, 5, 6]"}; + std::vector<std::string> chunked_expected = {"[1, 3, 6, 10, 15, 21]"}; Review Comment: Chunked test cases should also include chunked arrays with zero chunks, or zero-sized chunks, for completeness. (perhaps this can be automated?) ########## python/pyarrow/_compute.pyx: ########## @@ -1736,6 +1736,34 @@ class PartitionNthOptions(_PartitionNthOptions): self._set_options(pivot, null_placement) +cdef class _CumulativeSumOptions(FunctionOptions): + def _set_options(self, start, skip_nulls): + if not isinstance(start, Scalar): + try: + start = lib.scalar(start) + except Exception: + _raise_invalid_function_option( + start, "`start` type for CumulativeSumOptions", TypeError) + + self.wrapped.reset(new CCumulativeSumOptions((<Scalar> start).unwrap(), skip_nulls)) + + +class CumulativeSumOptions(_CumulativeSumOptions): + """ + Options for `cumulative_sum` function. + + Parameters + ---------- + start : Scalar, default 0.0 + Starting value for sum computation + skip_nulls : bool, default False + When false, propagates the first null encountered. Review Comment: ```suggestion When false, the first encountered null is propagated. ``` ########## python/pyarrow/tests/test_compute.py: ########## @@ -2503,6 +2504,63 @@ def test_min_max_element_wise(): assert result == pa.array([1, 2, None]) [email protected]('start', (1.1, 10.5, -10.5)) [email protected]('skip_nulls', (True, False)) +def test_cumulative_sum(start, skip_nulls): + # Exact tests (e.g., integral types) + start_int = int(start) + starts = [start_int, pa.scalar(start_int, type=pa.int8()), + pa.scalar(start_int, type=pa.int64())] + for strt in starts: + arrays = [ + pa.array([1, 2, 3]), + pa.array([0, None, 20, 30]), + pa.chunked_array([[0, None], [20, 30]]) + ] + expected_arrays = [ + pa.array([1, 3, 6]), + pa.array([0, None, 20, 50]) + if skip_nulls else pa.array([0, None, None, None]), + pa.chunked_array([[0, None, 20, 50]]) + if skip_nulls else pa.chunked_array([[0, None, None, None]]) + ] + for i, arr in enumerate(arrays): + result = pc.cumulative_sum(arr, start=strt, skip_nulls=skip_nulls) + # Add `start` offset to expected array before comparing + expected = pc.add(expected_arrays[i], strt) + assert result.equals(expected) + + # Approximate and NaN tests (e.g., floating-point types) + # NOTE: Conversion from Arrow to NumPy replaces null slots with NaNs + # which prevent fully validating both states of `skip_nulls` for + # floating-point values. Ideally, equality comparisons make use of Arrow's + # `equals()` functions but an approximate version is not exposed in Python. + starts = [start, pa.scalar(start, type=pa.float32()), + pa.scalar(start, type=pa.float64())] + for strt in starts: + arrays = [ + pa.array([1.1, 2.2, 3.3]), + pa.array([1, np.nan, 2, -3, 4, 5]), + pa.array([1, np.nan, None, 3, None, 5]) + ] + expected_arrays = [ + np.array([1.1, 3.3, 6.6]), + np.array([1, np.nan, np.nan, np.nan, np.nan, np.nan]), + np.array([1, np.nan, None, np.nan, None, np.nan]) + if skip_nulls else np.array([1, np.nan, None, None, None, None]) + ] + for i, arr in enumerate(arrays): + result = pc.cumulative_sum(arr, start=strt, skip_nulls=skip_nulls) + # Add `start` offset to expected array before comparing + expected = pc.add(expected_arrays[i], strt) + np.testing.assert_array_almost_equal(result.to_numpy( + zero_copy_only=False), expected.to_numpy(zero_copy_only=False)) + + for strt in ['a', pa.scalar('arrow'), 1.1]: Review Comment: Why does 1.1 fail by the way? ########## docs/source/python/api/compute.rst: ########## @@ -45,6 +45,21 @@ Aggregations tdigest variance +Cumulative Functions +-------------------- + +Cumulative functions are vector functions that perform a running total on its +input and outputs an array containing the corresponding intermediate running values. +By default these functions do not detect overflow. Most functions are also +available in an overflow-checking variant, suffixed ``_checked``, which +throws an ``ArrowInvalid`` exception when overflow is detected. Review Comment: ```suggestion By default these functions do not detect overflow. They are also available in an overflow-checking variant, suffixed ``_checked``, which throws an ``ArrowInvalid`` exception when overflow is detected. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
