js8544 commented on code in PR #36891:
URL: https://github.com/apache/arrow/pull/36891#discussion_r1308417006
##########
cpp/src/arrow/compute/kernels/scalar_nested.cc:
##########
@@ -819,6 +832,378 @@ const FunctionDoc map_lookup_doc{
"MapLookupOptions",
/*options_required=*/true};
+struct AdjoinAsListState : public KernelState {
+ explicit AdjoinAsListState(std::shared_ptr<DataType> list_type,
+ std::shared_ptr<DataType> input_type)
+ : list_type(std::move(list_type)), input_type(std::move(input_type)) {}
+
+ static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
+ const KernelInitArgs& args)
{
+ auto options = static_cast<const AdjoinAsListOptions*>(args.options);
+ if (!options) {
+ return Status::Invalid(
+ "Attempted to initialize KernelState from null FunctionOptions");
+ }
+
+ // Make sure input args have the same type
+ if (args.inputs.empty()) {
+ return Status::Invalid("AdjoinAsList requires at least one input
argument");
+ }
+
+ auto input_type = args.inputs[0];
+ if (std::any_of(args.inputs.begin() + 1, args.inputs.end(),
+ [&input_type](const auto& arg) { return arg != input_type;
})) {
+ return Status::Invalid(
+ "AdjoinAsList requires all input arguments to have the same type");
+ }
+
+ switch (options->list_type) {
+ case AdjoinAsListOptions::LIST:
+ return
std::make_unique<AdjoinAsListState>(list(input_type.GetSharedPtr()),
+ input_type.GetSharedPtr());
+ case AdjoinAsListOptions::LARGE_LIST:
+ return
std::make_unique<AdjoinAsListState>(large_list(input_type.GetSharedPtr()),
+ input_type.GetSharedPtr());
+ case AdjoinAsListOptions::FIXED_SIZE_LIST:
+ return std::make_unique<AdjoinAsListState>(
+ fixed_size_list(input_type.GetSharedPtr(),
+ static_cast<int32_t>(args.inputs.size())),
+ input_type.GetSharedPtr());
+ default:
+ return Status::Invalid(
+ "AdjoinAsList requires list_type to be LIST, "
+ "LARGE_LIST or FIXED_SIZE_LIST");
+ }
+ }
+
+ std::shared_ptr<DataType> list_type;
+ std::shared_ptr<DataType> input_type;
+};
+
+Result<TypeHolder> ResolveAdjoinAsListOutput(KernelContext* ctx,
+ const std::vector<TypeHolder>&
types) {
+ auto list_type = static_cast<const
AdjoinAsListState*>(ctx->state())->list_type;
+ return TypeHolder(list_type);
+}
+
+template <typename OutputType>
+struct AdjoinAsListImpl {
+ const std::shared_ptr<DataType>& list_type;
+ const std::shared_ptr<DataType>& input_type;
+
+ AdjoinAsListImpl(const std::shared_ptr<DataType>& list_type,
+ const std::shared_ptr<DataType>& input_type)
+ : list_type(list_type), input_type(input_type) {}
+
+ // ReserveData for binary builders
+ template <typename InputType, typename Builder>
+ Status ReserveBinaryData(const ExecSpan& batch, Builder* builder) {
+ static_assert(is_base_binary_type<InputType>::value ||
+ is_fixed_size_binary_type<InputType>::value);
+ int64_t total_bytes = 0;
+ for (const auto& input : batch.values) {
+ if (input.is_array()) {
+ const auto& arr = input.array;
+ if constexpr (std::is_same_v<InputType, FixedSizeBinaryType>) {
+ total_bytes += arr.buffers[1].size;
+ } else {
+ total_bytes += arr.buffers[2].size;
+ }
+ } else {
+ total_bytes += static_cast<const
BaseBinaryScalar&>(*input.scalar).value->size() *
+ batch.length;
+ }
+ }
+ return builder->ReserveData(total_bytes);
+ }
+
+ // Construct offset buffer for variable-size list builders
+ Result<std::shared_ptr<Buffer>> MakeOffsetsBuffer(const ExecSpan& batch) {
+ TypedBufferBuilder<typename OutputType::offset_type> offset_builder;
+ RETURN_NOT_OK(offset_builder.Reserve(batch.length + 1));
+ typename OutputType::offset_type cur_offset = 0;
+ offset_builder.UnsafeAppend(cur_offset);
+ for (int i = 0; i < batch.length; ++i) {
+ cur_offset += batch.num_values();
+ offset_builder.UnsafeAppend(cur_offset);
+ }
+ return offset_builder.Finish(/*shrink_to_fit=*/false);
+ }
+
+ Status Visit(const NullType& null_type, KernelContext* ctx, const ExecSpan&
batch,
+ ExecResult* out) {
+ auto length = batch.length * batch.num_values();
+ auto out_data = *out->array_data_mutable();
+ out_data->child_data.emplace_back(ArrayData::Make(null(), length,
{nullptr}, length));
+ out_data->type = list_type;
+ if constexpr (!is_fixed_size_list_type<OutputType>::value) {
+ ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], MakeOffsetsBuffer(batch));
+ }
+ return Status::OK();
+ }
+
+ Status Visit(const BooleanType& boolean_type, KernelContext* ctx, const
ExecSpan& batch,
+ ExecResult* out) {
+ using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+ auto builder =
std::make_shared<BooleanBuilder>(ctx->exec_context()->memory_pool());
+ ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder,
list_type);
+
+ RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+ RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+ for (int i = 0; i < batch.length; ++i) {
+ RETURN_NOT_OK(list_builder.Append());
+ for (const auto& input : batch.values) {
+ if (input.is_array()) {
+ const auto& arr = input.array;
+ if (arr.IsValid(i)) {
+ builder->UnsafeAppend(bit_util::GetBit(arr.buffers[1].data,
arr.offset + i));
+ } else {
+ builder->UnsafeAppendNull();
+ }
+ } else {
+
builder->UnsafeAppend(UnboxScalar<BooleanType>::Unbox(*input.scalar));
+ }
+ }
+ }
+ return list_builder.FinishInternal(out->array_data_mutable());
+ }
+
+ // Numeric and temporal types
+ template <typename InputType>
+ std::enable_if_t<has_c_type<InputType>::value ||
is_temporal_type<InputType>::value,
+ Status>
+ Visit(const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+ ExecResult* out) {
+ using BuilderType = typename TypeTraits<InputType>::BuilderType;
+ using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+
+ auto builder = std::make_shared<BuilderType>(input_type.GetSharedPtr(),
+
ctx->exec_context()->memory_pool());
+ ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder,
list_type);
+
+ RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+ RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+ for (int i = 0; i < batch.length; ++i) {
+ RETURN_NOT_OK(list_builder.Append());
+ for (const auto& input : batch.values) {
+ if (input.is_array()) {
+ const auto& arr = input.array;
+ if (arr.IsValid(i)) {
+ builder->UnsafeAppend(arr.GetValues<typename
InputType::c_type>(1)[i]);
+ } else {
+ builder->UnsafeAppendNull();
+ }
+ } else {
+ builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+ }
+ }
+ }
+ return list_builder.FinishInternal(out->array_data_mutable());
+ }
+
+ // Varlen binary types
+ template <typename InputType>
+ std::enable_if_t<is_base_binary_type<InputType>::value, Status> Visit(
+ const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+ ExecResult* out) {
+ using BuilderType = typename TypeTraits<InputType>::BuilderType;
+ using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+ using OffsetType = typename TypeTraits<InputType>::OffsetType::c_type;
+ auto builder = std::make_shared<BuilderType>();
+ ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder,
list_type);
+
+ RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+ RETURN_NOT_OK(ReserveBinaryData<InputType>(batch, builder.get()));
+
+ RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+ for (int i = 0; i < batch.length; ++i) {
+ RETURN_NOT_OK(list_builder.Append());
+ for (const auto& input : batch.values) {
+ if (input.is_array()) {
+ const auto& arr = input.array;
+ if (arr.IsValid(i)) {
+ auto cur_offset = arr.GetValues<OffsetType>(1)[i];
+ auto next_offset = arr.GetValues<OffsetType>(1)[i + 1];
+ std::string_view view(arr.buffers[2].data_as<char>() + cur_offset,
+ next_offset - cur_offset);
+ builder->UnsafeAppend(view);
+ } else {
+ builder->UnsafeAppendNull();
+ }
+ } else {
+ builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+ }
+ }
+ }
+ return list_builder.FinishInternal(out->array_data_mutable());
+ }
+
+ // Fixed-size binary types, including decimals
+ template <typename InputType>
+ std::enable_if_t<is_fixed_size_binary_type<InputType>::value, Status> Visit(
+ const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+ ExecResult* out) {
+ using BuilderType = typename TypeTraits<InputType>::BuilderType;
+ using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+ auto builder = std::make_shared<BuilderType>(input_type.GetSharedPtr(),
+
ctx->exec_context()->memory_pool());
+ ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder,
list_type);
+
+ RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+ RETURN_NOT_OK(ReserveBinaryData<InputType>(batch, builder.get()));
+
+ RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+ for (int i = 0; i < batch.length; ++i) {
+ RETURN_NOT_OK(list_builder.Append());
+ for (const auto& input : batch.values) {
+ if (input.is_array()) {
+ const auto& arr = input.array;
+ if (arr.IsValid(i)) {
+ std::string_view view(arr.buffers[1].data_as<char>() +
+ (i + arr.offset) *
input_type.byte_width(),
+ input_type.byte_width());
+ builder->UnsafeAppend(view);
+ } else {
+ builder->UnsafeAppendNull();
+ }
+ } else {
+ builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+ }
+ }
+ }
+ return list_builder.FinishInternal(out->array_data_mutable());
+ }
+
+ // Deal with nested/union types with a naive approach: First concatenate the
inputs,
+ // then shuffle it using Take
+ Status Visit(const DataType& input_type, KernelContext* ctx, const ExecSpan&
batch,
+ ExecResult* out) {
+ std::vector<std::shared_ptr<ArrayData>> inputs;
+ inputs.reserve(batch.num_values());
+ // Starting index of each input in the concatenated array
+ std::vector<int64_t> input_start_index;
+ input_start_index.reserve(batch.num_values());
+ int64_t cur_index = 0;
+ for (const auto& input : batch.values) {
+ input_start_index.push_back(cur_index);
+ if (input.is_array()) {
+ inputs.emplace_back(input.array.ToArrayData());
+ cur_index += input.array.length;
+ } else {
+ ARROW_ASSIGN_OR_RAISE(auto arr_from_scalar,
+ MakeArrayFromScalar(*input.scalar, 1));
+ inputs.emplace_back(std::move(arr_from_scalar)->data());
+ cur_index += 1;
+ }
+ }
+ ARROW_ASSIGN_OR_RAISE(auto concatenated_inputs, Concatenate(inputs));
+ // Build child index for take
+ Int64Builder child_indices_builder;
+ RETURN_NOT_OK(child_indices_builder.Reserve(batch.num_values() *
batch.length));
+ for (int i = 0; i < batch.length; ++i) {
+ for (int j = 0; j < batch.num_values(); ++j) {
+ if (batch.values[j].is_array()) {
+ child_indices_builder.UnsafeAppend(input_start_index[j] + i);
+ } else {
+ child_indices_builder.UnsafeAppend(input_start_index[j]);
+ }
+ }
+ }
+ std::shared_ptr<ArrayData> child_indices;
+ RETURN_NOT_OK(child_indices_builder.FinishInternal(&child_indices));
+ ARROW_ASSIGN_OR_RAISE(auto shuffled_data,
+ Take(*concatenated_inputs, *child_indices,
+ TakeOptions::NoBoundsCheck(),
ctx->exec_context()));
+ auto out_data = *out->array_data_mutable();
+ out_data->child_data.emplace_back(std::move(shuffled_data).array());
+
+ out_data->type = list_type;
+
+ if constexpr (!is_fixed_size_list_type<OutputType>::value) {
+ ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], MakeOffsetsBuffer(batch));
+ }
+ return Status::OK();
+ }
+};
+
+template <template <typename OutputType> typename AdjoinAsListImpl, typename
InputType>
+Status AdjoinAsListExec(KernelContext* ctx, const ExecSpan& batch, ExecResult*
out) {
+ const auto& state = static_cast<const AdjoinAsListState*>(ctx->state());
+ const auto& list_type = state->list_type;
+ const auto& input_type = state->input_type;
+
+ switch (list_type->id()) {
+ case Type::LIST: {
+ return AdjoinAsListImpl<ListType>(list_type, input_type)
+ .Visit(checked_cast<const InputType&>(*input_type), ctx, batch, out);
+ }
+ case Type::LARGE_LIST: {
+ return AdjoinAsListImpl<LargeListType>(list_type, input_type)
+ .Visit(checked_cast<const InputType&>(*input_type), ctx, batch, out);
+ }
+ case Type::FIXED_SIZE_LIST: {
+ return AdjoinAsListImpl<FixedSizeListType>(list_type, input_type)
+ .Visit(checked_cast<const InputType&>(*input_type), ctx, batch, out);
+ }
+ default:
+ return Status::Invalid(
+ "AdjoinAsList requires list_type to be LIST, "
+ "LARGE_LIST or FIXED_SIZE_LIST");
+ }
+}
+
+// A visitor to dispatch type to its type-specific kernel at compile time
+struct AdjoinAsListKernelGenerator {
+ ScalarKernel kernel;
+
+ AdjoinAsListKernelGenerator() {
+ kernel.null_handling = NullHandling::OUTPUT_NOT_NULL;
+ kernel.mem_allocation = MemAllocation::NO_PREALLOCATE;
+ kernel.init = AdjoinAsListState::Init;
+ }
+
+ template <typename ArrowType>
+ Status Visit(const ArrowType* type) {
+ kernel.signature = KernelSignature::Make({InputType(ArrowType::type_id)},
+
OutputType(ResolveAdjoinAsListOutput), true);
+ kernel.exec = AdjoinAsListExec<AdjoinAsListImpl, ArrowType>;
+ return Status::OK();
+ }
+};
+
+void AddAdjoinAsListKernels(ScalarFunction* func) {
+ AdjoinAsListKernelGenerator generator;
+ // non-parametric types
+ for (const auto& tys :
+ {PrimitiveTypes(), TemporalTypes(), DurationTypes(), IntervalTypes()}) {
+ for (const auto& ty : tys) {
+ DCHECK_OK(VisitTypeIdInline(ty->id(), &generator));
+ DCHECK_OK(func->AddKernel(generator.kernel));
+ }
+ }
+
+ // parametric types
+ for (const auto& ty : {Type::FIXED_SIZE_BINARY, Type::DECIMAL128,
Type::DECIMAL256,
+ Type::LIST, Type::LARGE_LIST, Type::FIXED_SIZE_LIST,
+ Type::DENSE_UNION, Type::DICTIONARY, Type::STRUCT,
Type::MAP}) {
+ // TODO(jinshang): add support for SparseUnion, need Take to support it
first
Review Comment:
Done
##########
cpp/src/arrow/compute/kernels/scalar_nested.cc:
##########
@@ -819,6 +832,378 @@ const FunctionDoc map_lookup_doc{
"MapLookupOptions",
/*options_required=*/true};
+struct AdjoinAsListState : public KernelState {
+ explicit AdjoinAsListState(std::shared_ptr<DataType> list_type,
+ std::shared_ptr<DataType> input_type)
+ : list_type(std::move(list_type)), input_type(std::move(input_type)) {}
+
+ static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
+ const KernelInitArgs& args)
{
+ auto options = static_cast<const AdjoinAsListOptions*>(args.options);
+ if (!options) {
+ return Status::Invalid(
+ "Attempted to initialize KernelState from null FunctionOptions");
+ }
+
+ // Make sure input args have the same type
+ if (args.inputs.empty()) {
+ return Status::Invalid("AdjoinAsList requires at least one input
argument");
+ }
+
+ auto input_type = args.inputs[0];
+ if (std::any_of(args.inputs.begin() + 1, args.inputs.end(),
+ [&input_type](const auto& arg) { return arg != input_type;
})) {
+ return Status::Invalid(
+ "AdjoinAsList requires all input arguments to have the same type");
+ }
+
+ switch (options->list_type) {
+ case AdjoinAsListOptions::LIST:
+ return
std::make_unique<AdjoinAsListState>(list(input_type.GetSharedPtr()),
+ input_type.GetSharedPtr());
+ case AdjoinAsListOptions::LARGE_LIST:
+ return
std::make_unique<AdjoinAsListState>(large_list(input_type.GetSharedPtr()),
+ input_type.GetSharedPtr());
+ case AdjoinAsListOptions::FIXED_SIZE_LIST:
+ return std::make_unique<AdjoinAsListState>(
+ fixed_size_list(input_type.GetSharedPtr(),
+ static_cast<int32_t>(args.inputs.size())),
+ input_type.GetSharedPtr());
+ default:
+ return Status::Invalid(
+ "AdjoinAsList requires list_type to be LIST, "
+ "LARGE_LIST or FIXED_SIZE_LIST");
+ }
+ }
+
+ std::shared_ptr<DataType> list_type;
+ std::shared_ptr<DataType> input_type;
+};
+
+Result<TypeHolder> ResolveAdjoinAsListOutput(KernelContext* ctx,
+ const std::vector<TypeHolder>&
types) {
+ auto list_type = static_cast<const
AdjoinAsListState*>(ctx->state())->list_type;
+ return TypeHolder(list_type);
+}
+
+template <typename OutputType>
+struct AdjoinAsListImpl {
+ const std::shared_ptr<DataType>& list_type;
+ const std::shared_ptr<DataType>& input_type;
+
+ AdjoinAsListImpl(const std::shared_ptr<DataType>& list_type,
+ const std::shared_ptr<DataType>& input_type)
+ : list_type(list_type), input_type(input_type) {}
+
+ // ReserveData for binary builders
+ template <typename InputType, typename Builder>
+ Status ReserveBinaryData(const ExecSpan& batch, Builder* builder) {
+ static_assert(is_base_binary_type<InputType>::value ||
+ is_fixed_size_binary_type<InputType>::value);
+ int64_t total_bytes = 0;
+ for (const auto& input : batch.values) {
+ if (input.is_array()) {
+ const auto& arr = input.array;
+ if constexpr (std::is_same_v<InputType, FixedSizeBinaryType>) {
+ total_bytes += arr.buffers[1].size;
+ } else {
+ total_bytes += arr.buffers[2].size;
+ }
+ } else {
+ total_bytes += static_cast<const
BaseBinaryScalar&>(*input.scalar).value->size() *
+ batch.length;
+ }
+ }
+ return builder->ReserveData(total_bytes);
+ }
+
+ // Construct offset buffer for variable-size list builders
+ Result<std::shared_ptr<Buffer>> MakeOffsetsBuffer(const ExecSpan& batch) {
+ TypedBufferBuilder<typename OutputType::offset_type> offset_builder;
+ RETURN_NOT_OK(offset_builder.Reserve(batch.length + 1));
+ typename OutputType::offset_type cur_offset = 0;
+ offset_builder.UnsafeAppend(cur_offset);
+ for (int i = 0; i < batch.length; ++i) {
+ cur_offset += batch.num_values();
+ offset_builder.UnsafeAppend(cur_offset);
+ }
+ return offset_builder.Finish(/*shrink_to_fit=*/false);
+ }
+
+ Status Visit(const NullType& null_type, KernelContext* ctx, const ExecSpan&
batch,
+ ExecResult* out) {
+ auto length = batch.length * batch.num_values();
+ auto out_data = *out->array_data_mutable();
+ out_data->child_data.emplace_back(ArrayData::Make(null(), length,
{nullptr}, length));
+ out_data->type = list_type;
+ if constexpr (!is_fixed_size_list_type<OutputType>::value) {
+ ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], MakeOffsetsBuffer(batch));
+ }
+ return Status::OK();
+ }
+
+ Status Visit(const BooleanType& boolean_type, KernelContext* ctx, const
ExecSpan& batch,
+ ExecResult* out) {
+ using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+ auto builder =
std::make_shared<BooleanBuilder>(ctx->exec_context()->memory_pool());
+ ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder,
list_type);
+
+ RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+ RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+ for (int i = 0; i < batch.length; ++i) {
+ RETURN_NOT_OK(list_builder.Append());
+ for (const auto& input : batch.values) {
+ if (input.is_array()) {
+ const auto& arr = input.array;
+ if (arr.IsValid(i)) {
+ builder->UnsafeAppend(bit_util::GetBit(arr.buffers[1].data,
arr.offset + i));
+ } else {
+ builder->UnsafeAppendNull();
+ }
+ } else {
+
builder->UnsafeAppend(UnboxScalar<BooleanType>::Unbox(*input.scalar));
+ }
+ }
+ }
+ return list_builder.FinishInternal(out->array_data_mutable());
+ }
+
+ // Numeric and temporal types
+ template <typename InputType>
+ std::enable_if_t<has_c_type<InputType>::value ||
is_temporal_type<InputType>::value,
+ Status>
+ Visit(const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+ ExecResult* out) {
+ using BuilderType = typename TypeTraits<InputType>::BuilderType;
+ using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+
+ auto builder = std::make_shared<BuilderType>(input_type.GetSharedPtr(),
+
ctx->exec_context()->memory_pool());
+ ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder,
list_type);
+
+ RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+ RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+ for (int i = 0; i < batch.length; ++i) {
+ RETURN_NOT_OK(list_builder.Append());
+ for (const auto& input : batch.values) {
+ if (input.is_array()) {
+ const auto& arr = input.array;
+ if (arr.IsValid(i)) {
+ builder->UnsafeAppend(arr.GetValues<typename
InputType::c_type>(1)[i]);
+ } else {
+ builder->UnsafeAppendNull();
+ }
+ } else {
+ builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+ }
+ }
+ }
+ return list_builder.FinishInternal(out->array_data_mutable());
+ }
+
+ // Varlen binary types
+ template <typename InputType>
+ std::enable_if_t<is_base_binary_type<InputType>::value, Status> Visit(
+ const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+ ExecResult* out) {
+ using BuilderType = typename TypeTraits<InputType>::BuilderType;
+ using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+ using OffsetType = typename TypeTraits<InputType>::OffsetType::c_type;
+ auto builder = std::make_shared<BuilderType>();
+ ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder,
list_type);
+
+ RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+ RETURN_NOT_OK(ReserveBinaryData<InputType>(batch, builder.get()));
+
+ RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+ for (int i = 0; i < batch.length; ++i) {
+ RETURN_NOT_OK(list_builder.Append());
+ for (const auto& input : batch.values) {
+ if (input.is_array()) {
+ const auto& arr = input.array;
+ if (arr.IsValid(i)) {
+ auto cur_offset = arr.GetValues<OffsetType>(1)[i];
+ auto next_offset = arr.GetValues<OffsetType>(1)[i + 1];
+ std::string_view view(arr.buffers[2].data_as<char>() + cur_offset,
+ next_offset - cur_offset);
+ builder->UnsafeAppend(view);
+ } else {
+ builder->UnsafeAppendNull();
+ }
+ } else {
+ builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+ }
+ }
+ }
+ return list_builder.FinishInternal(out->array_data_mutable());
+ }
+
+ // Fixed-size binary types, including decimals
+ template <typename InputType>
+ std::enable_if_t<is_fixed_size_binary_type<InputType>::value, Status> Visit(
+ const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+ ExecResult* out) {
+ using BuilderType = typename TypeTraits<InputType>::BuilderType;
+ using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+ auto builder = std::make_shared<BuilderType>(input_type.GetSharedPtr(),
+
ctx->exec_context()->memory_pool());
+ ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder,
list_type);
+
+ RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+ RETURN_NOT_OK(ReserveBinaryData<InputType>(batch, builder.get()));
+
+ RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+ for (int i = 0; i < batch.length; ++i) {
+ RETURN_NOT_OK(list_builder.Append());
+ for (const auto& input : batch.values) {
+ if (input.is_array()) {
+ const auto& arr = input.array;
+ if (arr.IsValid(i)) {
+ std::string_view view(arr.buffers[1].data_as<char>() +
+ (i + arr.offset) *
input_type.byte_width(),
+ input_type.byte_width());
+ builder->UnsafeAppend(view);
+ } else {
+ builder->UnsafeAppendNull();
+ }
+ } else {
+ builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+ }
+ }
+ }
+ return list_builder.FinishInternal(out->array_data_mutable());
+ }
+
+ // Deal with nested/union types with a naive approach: First concatenate the
inputs,
+ // then shuffle it using Take
+ Status Visit(const DataType& input_type, KernelContext* ctx, const ExecSpan&
batch,
+ ExecResult* out) {
+ std::vector<std::shared_ptr<ArrayData>> inputs;
+ inputs.reserve(batch.num_values());
+ // Starting index of each input in the concatenated array
+ std::vector<int64_t> input_start_index;
+ input_start_index.reserve(batch.num_values());
+ int64_t cur_index = 0;
+ for (const auto& input : batch.values) {
+ input_start_index.push_back(cur_index);
+ if (input.is_array()) {
+ inputs.emplace_back(input.array.ToArrayData());
+ cur_index += input.array.length;
+ } else {
+ ARROW_ASSIGN_OR_RAISE(auto arr_from_scalar,
+ MakeArrayFromScalar(*input.scalar, 1));
+ inputs.emplace_back(std::move(arr_from_scalar)->data());
+ cur_index += 1;
+ }
+ }
+ ARROW_ASSIGN_OR_RAISE(auto concatenated_inputs, Concatenate(inputs));
+ // Build child index for take
+ Int64Builder child_indices_builder;
+ RETURN_NOT_OK(child_indices_builder.Reserve(batch.num_values() *
batch.length));
+ for (int i = 0; i < batch.length; ++i) {
+ for (int j = 0; j < batch.num_values(); ++j) {
+ if (batch.values[j].is_array()) {
+ child_indices_builder.UnsafeAppend(input_start_index[j] + i);
+ } else {
+ child_indices_builder.UnsafeAppend(input_start_index[j]);
+ }
+ }
+ }
+ std::shared_ptr<ArrayData> child_indices;
+ RETURN_NOT_OK(child_indices_builder.FinishInternal(&child_indices));
+ ARROW_ASSIGN_OR_RAISE(auto shuffled_data,
+ Take(*concatenated_inputs, *child_indices,
+ TakeOptions::NoBoundsCheck(),
ctx->exec_context()));
+ auto out_data = *out->array_data_mutable();
+ out_data->child_data.emplace_back(std::move(shuffled_data).array());
+
+ out_data->type = list_type;
+
+ if constexpr (!is_fixed_size_list_type<OutputType>::value) {
+ ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], MakeOffsetsBuffer(batch));
+ }
+ return Status::OK();
+ }
+};
+
+template <template <typename OutputType> typename AdjoinAsListImpl, typename
InputType>
+Status AdjoinAsListExec(KernelContext* ctx, const ExecSpan& batch, ExecResult*
out) {
+ const auto& state = static_cast<const AdjoinAsListState*>(ctx->state());
+ const auto& list_type = state->list_type;
+ const auto& input_type = state->input_type;
+
+ switch (list_type->id()) {
+ case Type::LIST: {
+ return AdjoinAsListImpl<ListType>(list_type, input_type)
+ .Visit(checked_cast<const InputType&>(*input_type), ctx, batch, out);
+ }
+ case Type::LARGE_LIST: {
+ return AdjoinAsListImpl<LargeListType>(list_type, input_type)
+ .Visit(checked_cast<const InputType&>(*input_type), ctx, batch, out);
+ }
+ case Type::FIXED_SIZE_LIST: {
+ return AdjoinAsListImpl<FixedSizeListType>(list_type, input_type)
+ .Visit(checked_cast<const InputType&>(*input_type), ctx, batch, out);
+ }
+ default:
+ return Status::Invalid(
+ "AdjoinAsList requires list_type to be LIST, "
+ "LARGE_LIST or FIXED_SIZE_LIST");
+ }
+}
+
+// A visitor to dispatch type to its type-specific kernel at compile time
+struct AdjoinAsListKernelGenerator {
+ ScalarKernel kernel;
+
+ AdjoinAsListKernelGenerator() {
+ kernel.null_handling = NullHandling::OUTPUT_NOT_NULL;
+ kernel.mem_allocation = MemAllocation::NO_PREALLOCATE;
+ kernel.init = AdjoinAsListState::Init;
+ }
+
+ template <typename ArrowType>
+ Status Visit(const ArrowType* type) {
+ kernel.signature = KernelSignature::Make({InputType(ArrowType::type_id)},
+
OutputType(ResolveAdjoinAsListOutput), true);
+ kernel.exec = AdjoinAsListExec<AdjoinAsListImpl, ArrowType>;
+ return Status::OK();
+ }
+};
+
+void AddAdjoinAsListKernels(ScalarFunction* func) {
+ AdjoinAsListKernelGenerator generator;
+ // non-parametric types
+ for (const auto& tys :
+ {PrimitiveTypes(), TemporalTypes(), DurationTypes(), IntervalTypes()}) {
+ for (const auto& ty : tys) {
+ DCHECK_OK(VisitTypeIdInline(ty->id(), &generator));
+ DCHECK_OK(func->AddKernel(generator.kernel));
+ }
+ }
+
+ // parametric types
+ for (const auto& ty : {Type::FIXED_SIZE_BINARY, Type::DECIMAL128,
Type::DECIMAL256,
+ Type::LIST, Type::LARGE_LIST, Type::FIXED_SIZE_LIST,
+ Type::DENSE_UNION, Type::DICTIONARY, Type::STRUCT,
Type::MAP}) {
+ // TODO(jinshang): add support for SparseUnion, need Take to support it
first
+ DCHECK_OK(VisitTypeIdInline(ty, &generator));
+ DCHECK_OK(func->AddKernel(generator.kernel));
+ }
+}
+
+FunctionDoc adjoin_as_list_doc(
+ "Adjoin multiple arrays row-wise as a list array",
+ "Combine multiple arrays row-wise as a list array.\n"
+ "The input arrays must have the same type and length.\n"
+ "For N arrays each with length M, the output list array will\n"
+ "have length M and each list will have N elements.\n"
+ "The output list type can be specified in AdjoinAsListOptions",
Review Comment:
Done
##########
cpp/src/arrow/compute/kernels/scalar_nested.cc:
##########
@@ -819,6 +832,378 @@ const FunctionDoc map_lookup_doc{
"MapLookupOptions",
/*options_required=*/true};
+struct AdjoinAsListState : public KernelState {
+ explicit AdjoinAsListState(std::shared_ptr<DataType> list_type,
+ std::shared_ptr<DataType> input_type)
+ : list_type(std::move(list_type)), input_type(std::move(input_type)) {}
+
+ static Result<std::unique_ptr<KernelState>> Init(KernelContext* ctx,
+ const KernelInitArgs& args)
{
+ auto options = static_cast<const AdjoinAsListOptions*>(args.options);
+ if (!options) {
+ return Status::Invalid(
+ "Attempted to initialize KernelState from null FunctionOptions");
+ }
+
+ // Make sure input args have the same type
+ if (args.inputs.empty()) {
+ return Status::Invalid("AdjoinAsList requires at least one input
argument");
+ }
+
+ auto input_type = args.inputs[0];
+ if (std::any_of(args.inputs.begin() + 1, args.inputs.end(),
+ [&input_type](const auto& arg) { return arg != input_type;
})) {
+ return Status::Invalid(
+ "AdjoinAsList requires all input arguments to have the same type");
+ }
+
+ switch (options->list_type) {
+ case AdjoinAsListOptions::LIST:
+ return
std::make_unique<AdjoinAsListState>(list(input_type.GetSharedPtr()),
+ input_type.GetSharedPtr());
+ case AdjoinAsListOptions::LARGE_LIST:
+ return
std::make_unique<AdjoinAsListState>(large_list(input_type.GetSharedPtr()),
+ input_type.GetSharedPtr());
+ case AdjoinAsListOptions::FIXED_SIZE_LIST:
+ return std::make_unique<AdjoinAsListState>(
+ fixed_size_list(input_type.GetSharedPtr(),
+ static_cast<int32_t>(args.inputs.size())),
+ input_type.GetSharedPtr());
+ default:
+ return Status::Invalid(
+ "AdjoinAsList requires list_type to be LIST, "
+ "LARGE_LIST or FIXED_SIZE_LIST");
+ }
+ }
+
+ std::shared_ptr<DataType> list_type;
+ std::shared_ptr<DataType> input_type;
+};
+
+Result<TypeHolder> ResolveAdjoinAsListOutput(KernelContext* ctx,
+ const std::vector<TypeHolder>&
types) {
+ auto list_type = static_cast<const
AdjoinAsListState*>(ctx->state())->list_type;
+ return TypeHolder(list_type);
+}
+
+template <typename OutputType>
+struct AdjoinAsListImpl {
+ const std::shared_ptr<DataType>& list_type;
+ const std::shared_ptr<DataType>& input_type;
+
+ AdjoinAsListImpl(const std::shared_ptr<DataType>& list_type,
+ const std::shared_ptr<DataType>& input_type)
+ : list_type(list_type), input_type(input_type) {}
+
+ // ReserveData for binary builders
+ template <typename InputType, typename Builder>
+ Status ReserveBinaryData(const ExecSpan& batch, Builder* builder) {
+ static_assert(is_base_binary_type<InputType>::value ||
+ is_fixed_size_binary_type<InputType>::value);
+ int64_t total_bytes = 0;
+ for (const auto& input : batch.values) {
+ if (input.is_array()) {
+ const auto& arr = input.array;
+ if constexpr (std::is_same_v<InputType, FixedSizeBinaryType>) {
+ total_bytes += arr.buffers[1].size;
+ } else {
+ total_bytes += arr.buffers[2].size;
+ }
+ } else {
+ total_bytes += static_cast<const
BaseBinaryScalar&>(*input.scalar).value->size() *
+ batch.length;
+ }
+ }
+ return builder->ReserveData(total_bytes);
+ }
+
+ // Construct offset buffer for variable-size list builders
+ Result<std::shared_ptr<Buffer>> MakeOffsetsBuffer(const ExecSpan& batch) {
+ TypedBufferBuilder<typename OutputType::offset_type> offset_builder;
+ RETURN_NOT_OK(offset_builder.Reserve(batch.length + 1));
+ typename OutputType::offset_type cur_offset = 0;
+ offset_builder.UnsafeAppend(cur_offset);
+ for (int i = 0; i < batch.length; ++i) {
+ cur_offset += batch.num_values();
+ offset_builder.UnsafeAppend(cur_offset);
+ }
+ return offset_builder.Finish(/*shrink_to_fit=*/false);
+ }
+
+ Status Visit(const NullType& null_type, KernelContext* ctx, const ExecSpan&
batch,
+ ExecResult* out) {
+ auto length = batch.length * batch.num_values();
+ auto out_data = *out->array_data_mutable();
+ out_data->child_data.emplace_back(ArrayData::Make(null(), length,
{nullptr}, length));
+ out_data->type = list_type;
+ if constexpr (!is_fixed_size_list_type<OutputType>::value) {
+ ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], MakeOffsetsBuffer(batch));
+ }
+ return Status::OK();
+ }
+
+ Status Visit(const BooleanType& boolean_type, KernelContext* ctx, const
ExecSpan& batch,
+ ExecResult* out) {
+ using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+ auto builder =
std::make_shared<BooleanBuilder>(ctx->exec_context()->memory_pool());
+ ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder,
list_type);
+
+ RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+ RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+ for (int i = 0; i < batch.length; ++i) {
+ RETURN_NOT_OK(list_builder.Append());
+ for (const auto& input : batch.values) {
+ if (input.is_array()) {
+ const auto& arr = input.array;
+ if (arr.IsValid(i)) {
+ builder->UnsafeAppend(bit_util::GetBit(arr.buffers[1].data,
arr.offset + i));
+ } else {
+ builder->UnsafeAppendNull();
+ }
+ } else {
+
builder->UnsafeAppend(UnboxScalar<BooleanType>::Unbox(*input.scalar));
+ }
+ }
+ }
+ return list_builder.FinishInternal(out->array_data_mutable());
+ }
+
+ // Numeric and temporal types
+ template <typename InputType>
+ std::enable_if_t<has_c_type<InputType>::value ||
is_temporal_type<InputType>::value,
+ Status>
+ Visit(const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+ ExecResult* out) {
+ using BuilderType = typename TypeTraits<InputType>::BuilderType;
+ using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+
+ auto builder = std::make_shared<BuilderType>(input_type.GetSharedPtr(),
+
ctx->exec_context()->memory_pool());
+ ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder,
list_type);
+
+ RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+ RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+ for (int i = 0; i < batch.length; ++i) {
+ RETURN_NOT_OK(list_builder.Append());
+ for (const auto& input : batch.values) {
+ if (input.is_array()) {
+ const auto& arr = input.array;
+ if (arr.IsValid(i)) {
+ builder->UnsafeAppend(arr.GetValues<typename
InputType::c_type>(1)[i]);
+ } else {
+ builder->UnsafeAppendNull();
+ }
+ } else {
+ builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+ }
+ }
+ }
+ return list_builder.FinishInternal(out->array_data_mutable());
+ }
+
+ // Varlen binary types
+ template <typename InputType>
+ std::enable_if_t<is_base_binary_type<InputType>::value, Status> Visit(
+ const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+ ExecResult* out) {
+ using BuilderType = typename TypeTraits<InputType>::BuilderType;
+ using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+ using OffsetType = typename TypeTraits<InputType>::OffsetType::c_type;
+ auto builder = std::make_shared<BuilderType>();
+ ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder,
list_type);
+
+ RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+ RETURN_NOT_OK(ReserveBinaryData<InputType>(batch, builder.get()));
+
+ RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+ for (int i = 0; i < batch.length; ++i) {
+ RETURN_NOT_OK(list_builder.Append());
+ for (const auto& input : batch.values) {
+ if (input.is_array()) {
+ const auto& arr = input.array;
+ if (arr.IsValid(i)) {
+ auto cur_offset = arr.GetValues<OffsetType>(1)[i];
+ auto next_offset = arr.GetValues<OffsetType>(1)[i + 1];
+ std::string_view view(arr.buffers[2].data_as<char>() + cur_offset,
+ next_offset - cur_offset);
+ builder->UnsafeAppend(view);
+ } else {
+ builder->UnsafeAppendNull();
+ }
+ } else {
+ builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+ }
+ }
+ }
+ return list_builder.FinishInternal(out->array_data_mutable());
+ }
+
+ // Fixed-size binary types, including decimals
+ template <typename InputType>
+ std::enable_if_t<is_fixed_size_binary_type<InputType>::value, Status> Visit(
+ const InputType& input_type, KernelContext* ctx, const ExecSpan& batch,
+ ExecResult* out) {
+ using BuilderType = typename TypeTraits<InputType>::BuilderType;
+ using ListBuilderType = typename TypeTraits<OutputType>::BuilderType;
+ auto builder = std::make_shared<BuilderType>(input_type.GetSharedPtr(),
+
ctx->exec_context()->memory_pool());
+ ListBuilderType list_builder(ctx->exec_context()->memory_pool(), builder,
list_type);
+
+ RETURN_NOT_OK(builder->Reserve(batch.num_values() * batch.length));
+ RETURN_NOT_OK(ReserveBinaryData<InputType>(batch, builder.get()));
+
+ RETURN_NOT_OK(list_builder.Reserve(batch.length));
+
+ for (int i = 0; i < batch.length; ++i) {
+ RETURN_NOT_OK(list_builder.Append());
+ for (const auto& input : batch.values) {
+ if (input.is_array()) {
+ const auto& arr = input.array;
+ if (arr.IsValid(i)) {
+ std::string_view view(arr.buffers[1].data_as<char>() +
+ (i + arr.offset) *
input_type.byte_width(),
+ input_type.byte_width());
+ builder->UnsafeAppend(view);
+ } else {
+ builder->UnsafeAppendNull();
+ }
+ } else {
+ builder->UnsafeAppend(UnboxScalar<InputType>::Unbox(*input.scalar));
+ }
+ }
+ }
+ return list_builder.FinishInternal(out->array_data_mutable());
+ }
+
+ // Deal with nested/union types with a naive approach: First concatenate the
inputs,
+ // then shuffle it using Take
+ Status Visit(const DataType& input_type, KernelContext* ctx, const ExecSpan&
batch,
+ ExecResult* out) {
+ std::vector<std::shared_ptr<ArrayData>> inputs;
+ inputs.reserve(batch.num_values());
+ // Starting index of each input in the concatenated array
+ std::vector<int64_t> input_start_index;
+ input_start_index.reserve(batch.num_values());
+ int64_t cur_index = 0;
+ for (const auto& input : batch.values) {
+ input_start_index.push_back(cur_index);
+ if (input.is_array()) {
+ inputs.emplace_back(input.array.ToArrayData());
+ cur_index += input.array.length;
+ } else {
+ ARROW_ASSIGN_OR_RAISE(auto arr_from_scalar,
+ MakeArrayFromScalar(*input.scalar, 1));
+ inputs.emplace_back(std::move(arr_from_scalar)->data());
+ cur_index += 1;
+ }
+ }
+ ARROW_ASSIGN_OR_RAISE(auto concatenated_inputs, Concatenate(inputs));
+ // Build child index for take
+ Int64Builder child_indices_builder;
+ RETURN_NOT_OK(child_indices_builder.Reserve(batch.num_values() *
batch.length));
+ for (int i = 0; i < batch.length; ++i) {
+ for (int j = 0; j < batch.num_values(); ++j) {
+ if (batch.values[j].is_array()) {
+ child_indices_builder.UnsafeAppend(input_start_index[j] + i);
+ } else {
+ child_indices_builder.UnsafeAppend(input_start_index[j]);
+ }
+ }
+ }
+ std::shared_ptr<ArrayData> child_indices;
+ RETURN_NOT_OK(child_indices_builder.FinishInternal(&child_indices));
+ ARROW_ASSIGN_OR_RAISE(auto shuffled_data,
+ Take(*concatenated_inputs, *child_indices,
+ TakeOptions::NoBoundsCheck(),
ctx->exec_context()));
+ auto out_data = *out->array_data_mutable();
+ out_data->child_data.emplace_back(std::move(shuffled_data).array());
+
+ out_data->type = list_type;
+
+ if constexpr (!is_fixed_size_list_type<OutputType>::value) {
+ ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], MakeOffsetsBuffer(batch));
+ }
+ return Status::OK();
+ }
+};
+
+template <template <typename OutputType> typename AdjoinAsListImpl, typename
InputType>
+Status AdjoinAsListExec(KernelContext* ctx, const ExecSpan& batch, ExecResult*
out) {
+ const auto& state = static_cast<const AdjoinAsListState*>(ctx->state());
+ const auto& list_type = state->list_type;
+ const auto& input_type = state->input_type;
+
+ switch (list_type->id()) {
+ case Type::LIST: {
+ return AdjoinAsListImpl<ListType>(list_type, input_type)
+ .Visit(checked_cast<const InputType&>(*input_type), ctx, batch, out);
+ }
+ case Type::LARGE_LIST: {
+ return AdjoinAsListImpl<LargeListType>(list_type, input_type)
+ .Visit(checked_cast<const InputType&>(*input_type), ctx, batch, out);
+ }
+ case Type::FIXED_SIZE_LIST: {
+ return AdjoinAsListImpl<FixedSizeListType>(list_type, input_type)
+ .Visit(checked_cast<const InputType&>(*input_type), ctx, batch, out);
+ }
+ default:
+ return Status::Invalid(
+ "AdjoinAsList requires list_type to be LIST, "
+ "LARGE_LIST or FIXED_SIZE_LIST");
+ }
+}
+
+// A visitor to dispatch type to its type-specific kernel at compile time
+struct AdjoinAsListKernelGenerator {
+ ScalarKernel kernel;
+
+ AdjoinAsListKernelGenerator() {
+ kernel.null_handling = NullHandling::OUTPUT_NOT_NULL;
+ kernel.mem_allocation = MemAllocation::NO_PREALLOCATE;
+ kernel.init = AdjoinAsListState::Init;
+ }
+
+ template <typename ArrowType>
+ Status Visit(const ArrowType* type) {
+ kernel.signature = KernelSignature::Make({InputType(ArrowType::type_id)},
+
OutputType(ResolveAdjoinAsListOutput), true);
+ kernel.exec = AdjoinAsListExec<AdjoinAsListImpl, ArrowType>;
+ return Status::OK();
+ }
+};
+
+void AddAdjoinAsListKernels(ScalarFunction* func) {
+ AdjoinAsListKernelGenerator generator;
+ // non-parametric types
+ for (const auto& tys :
+ {PrimitiveTypes(), TemporalTypes(), DurationTypes(), IntervalTypes()}) {
+ for (const auto& ty : tys) {
+ DCHECK_OK(VisitTypeIdInline(ty->id(), &generator));
+ DCHECK_OK(func->AddKernel(generator.kernel));
+ }
+ }
+
+ // parametric types
+ for (const auto& ty : {Type::FIXED_SIZE_BINARY, Type::DECIMAL128,
Type::DECIMAL256,
+ Type::LIST, Type::LARGE_LIST, Type::FIXED_SIZE_LIST,
+ Type::DENSE_UNION, Type::DICTIONARY, Type::STRUCT,
Type::MAP}) {
+ // TODO(jinshang): add support for SparseUnion, need Take to support it
first
+ DCHECK_OK(VisitTypeIdInline(ty, &generator));
+ DCHECK_OK(func->AddKernel(generator.kernel));
+ }
+}
+
+FunctionDoc adjoin_as_list_doc(
+ "Adjoin multiple arrays row-wise as a list array",
+ "Combine multiple arrays row-wise as a list array.\n"
+ "The input arrays must have the same type and length.\n"
+ "For N arrays each with length M, the output list array will\n"
+ "have length M and each list will have N elements.\n"
+ "The output list type can be specified in AdjoinAsListOptions",
+ {"input"}, "AdjoinAsListOptions", false);
Review Comment:
Fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]