This is an automated email from the ASF dual-hosted git repository.

felipecrv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9749d7d653 GH-39798: [C++] Optimize Take for fixed-size types 
including nested fixed-size lists (#41297)
9749d7d653 is described below

commit 9749d7d653e1b106d0662624b22b2982b3ad0516
Author: Felipe Oliveira Carvalho <felipe...@gmail.com>
AuthorDate: Thu May 2 22:02:00 2024 -0300

    GH-39798: [C++] Optimize Take for fixed-size types including nested 
fixed-size lists (#41297)
    
    ### Rationale for this change
    
    Introduce utilities for dealing with fixed-width types (including 
fixed-size lists of fixed-width types) generically. And use it for initial 
optimizations of `Take` and `Filter`.
    
    ### What changes are included in this PR?
    
    - [x] Introduce utilities for dealing with fixed-width types generically
    - [x] Use faster `Take` kernel on small power-of-2 byte widths of 
fixed-width types
      - [x] from `FSLTakeExec` (including FSLs of FSBs)
      - [x] from `FSBTakeExec` (done before this PR)
    - [x] ~Take on any fixed-width type~ (as a separate issue #41301)
    - [x] Use faster `Filter` kernel on both primitive and fixed-width types of 
any length
       - [x] from `FSLFilterExec` (including FSLs of FSBs)
       - [x] from `FSBFilterExec` (done before this PR)
    
    ### Are these changes tested?
    
    By existing and new tests.
    
    ### Are there any user-facing changes?
    
    Some functions added to the `arrow::util` namespace and documented inline.
    * GitHub Issue: #39798
    
    Authored-by: Felipe Oliveira Carvalho <felipe...@gmail.com>
    Signed-off-by: Felipe Oliveira Carvalho <felipe...@gmail.com>
---
 cpp/src/arrow/CMakeLists.txt                       |   1 +
 .../kernels/vector_selection_filter_internal.cc    |  30 +-
 .../compute/kernels/vector_selection_internal.cc   |  56 ++--
 .../compute/kernels/vector_selection_internal.h    |   7 +-
 .../kernels/vector_selection_take_internal.cc      |  39 +--
 .../arrow/compute/kernels/vector_selection_test.cc | 156 +++++++++--
 cpp/src/arrow/util/CMakeLists.txt                  |   1 +
 cpp/src/arrow/util/fixed_width_internal.cc         | 226 +++++++++++++++
 cpp/src/arrow/util/fixed_width_internal.h          | 307 +++++++++++++++++++++
 cpp/src/arrow/util/fixed_width_test.cc             | 217 +++++++++++++++
 cpp/src/arrow/util/fixed_width_test_util.h         | 203 ++++++++++++++
 11 files changed, 1171 insertions(+), 72 deletions(-)

diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 2ef82dd614..5d61112518 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -501,6 +501,7 @@ set(ARROW_UTIL_SRCS
     util/decimal.cc
     util/delimiting.cc
     util/dict_util.cc
+    util/fixed_width_internal.cc
     util/float16.cc
     util/formatting.cc
     util/future.cc
diff --git a/cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc 
b/cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc
index 8825d697fd..d5e5e5ad28 100644
--- a/cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc
+++ b/cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc
@@ -40,6 +40,7 @@
 #include "arrow/util/bit_run_reader.h"
 #include "arrow/util/bit_util.h"
 #include "arrow/util/bitmap_ops.h"
+#include "arrow/util/fixed_width_internal.h"
 
 namespace arrow {
 
@@ -158,9 +159,11 @@ class PrimitiveFilterImpl {
   PrimitiveFilterImpl(const ArraySpan& values, const ArraySpan& filter,
                       FilterOptions::NullSelectionBehavior null_selection,
                       ArrayData* out_arr)
-      : byte_width_(values.type->byte_width()),
+      : byte_width_(util::FixedWidthInBytes(*values.type)),
         values_is_valid_(values.buffers[0].data),
-        values_data_(values.buffers[1].data),
+        // No offset applied for boolean because it's a bitmap
+        values_data_(kIsBoolean ? values.buffers[1].data
+                                : 
util::OffsetPointerOfFixedWidthValues(values)),
         values_null_count_(values.null_count),
         values_offset_(values.offset),
         values_length_(values.length),
@@ -169,17 +172,13 @@ class PrimitiveFilterImpl {
     if constexpr (kByteWidth >= 0 && !kIsBoolean) {
       DCHECK_EQ(kByteWidth, byte_width_);
     }
-    if constexpr (!kIsBoolean) {
-      // No offset applied for boolean because it's a bitmap
-      values_data_ += values.offset * byte_width();
-    }
 
+    DCHECK_EQ(out_arr->offset, 0);
     if (out_arr->buffers[0] != nullptr) {
       // May be unallocated if neither filter nor values contain nulls
       out_is_valid_ = out_arr->buffers[0]->mutable_data();
     }
-    out_data_ = out_arr->buffers[1]->mutable_data();
-    DCHECK_EQ(out_arr->offset, 0);
+    out_data_ = util::MutableFixedWidthValuesPointer(out_arr);
     out_length_ = out_arr->length;
     out_position_ = 0;
   }
@@ -416,7 +415,7 @@ class PrimitiveFilterImpl {
     out_position_ += length;
   }
 
-  constexpr int32_t byte_width() const {
+  constexpr int64_t byte_width() const {
     if constexpr (kByteWidth >= 0) {
       return kByteWidth;
     } else {
@@ -425,7 +424,7 @@ class PrimitiveFilterImpl {
   }
 
  private:
-  int32_t byte_width_;
+  int64_t byte_width_;
   const uint8_t* values_is_valid_;
   const uint8_t* values_data_;
   int64_t values_null_count_;
@@ -439,6 +438,8 @@ class PrimitiveFilterImpl {
   int64_t out_position_;
 };
 
+}  // namespace
+
 Status PrimitiveFilterExec(KernelContext* ctx, const ExecSpan& batch, 
ExecResult* out) {
   const ArraySpan& values = batch[0].array;
   const ArraySpan& filter = batch[1].array;
@@ -468,9 +469,10 @@ Status PrimitiveFilterExec(KernelContext* ctx, const 
ExecSpan& batch, ExecResult
   // validity bitmap.
   const bool allocate_validity = values.null_count != 0 || 
!filter_null_count_is_zero;
 
-  const int bit_width = values.type->bit_width();
-  RETURN_NOT_OK(PreallocatePrimitiveArrayData(ctx, output_length, bit_width,
-                                              allocate_validity, out_arr));
+  DCHECK(util::IsFixedWidthLike(values, /*force_null_count=*/false));
+  const int64_t bit_width = util::FixedWidthInBits(*values.type);
+  RETURN_NOT_OK(util::internal::PreallocateFixedWidthArrayData(
+      ctx, output_length, /*source=*/values, allocate_validity, out_arr));
 
   switch (bit_width) {
     case 1:
@@ -505,6 +507,8 @@ Status PrimitiveFilterExec(KernelContext* ctx, const 
ExecSpan& batch, ExecResult
   return Status::OK();
 }
 
+namespace {
+
 // ----------------------------------------------------------------------
 // Optimized filter for base binary types (32-bit and 64-bit)
 
diff --git a/cpp/src/arrow/compute/kernels/vector_selection_internal.cc 
b/cpp/src/arrow/compute/kernels/vector_selection_internal.cc
index a0fe2808e3..93cd506034 100644
--- a/cpp/src/arrow/compute/kernels/vector_selection_internal.cc
+++ b/cpp/src/arrow/compute/kernels/vector_selection_internal.cc
@@ -37,6 +37,7 @@
 #include "arrow/util/bit_block_counter.h"
 #include "arrow/util/bit_run_reader.h"
 #include "arrow/util/bit_util.h"
+#include "arrow/util/fixed_width_internal.h"
 #include "arrow/util/int_util.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/ree_util.h"
@@ -65,24 +66,6 @@ void RegisterSelectionFunction(const std::string& name, 
FunctionDoc doc,
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
-Status PreallocatePrimitiveArrayData(KernelContext* ctx, int64_t length, int 
bit_width,
-                                     bool allocate_validity, ArrayData* out) {
-  // Preallocate memory
-  out->length = length;
-  out->buffers.resize(2);
-
-  if (allocate_validity) {
-    ARROW_ASSIGN_OR_RAISE(out->buffers[0], ctx->AllocateBitmap(length));
-  }
-  if (bit_width == 1) {
-    ARROW_ASSIGN_OR_RAISE(out->buffers[1], ctx->AllocateBitmap(length));
-  } else {
-    ARROW_ASSIGN_OR_RAISE(out->buffers[1],
-                          ctx->Allocate(bit_util::BytesForBits(length * 
bit_width)));
-  }
-  return Status::OK();
-}
-
 namespace {
 
 /// \brief Iterate over a REE filter, emitting ranges of a plain values array 
that
@@ -909,6 +892,20 @@ Status LargeListFilterExec(KernelContext* ctx, const 
ExecSpan& batch, ExecResult
 }
 
 Status FSLFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* 
out) {
+  const ArraySpan& values = batch[0].array;
+
+  // If a FixedSizeList wraps a fixed-width type we can, in some cases, use
+  // PrimitiveFilterExec for a fixed-size list array.
+  if (util::IsFixedWidthLike(values,
+                             /*force_null_count=*/true,
+                             /*exclude_dictionary=*/true)) {
+    const auto byte_width = util::FixedWidthInBytes(*values.type);
+    // 0 is a valid byte width for FixedSizeList, but PrimitiveFilterExec
+    // might not handle it correctly.
+    if (byte_width > 0) {
+      return PrimitiveFilterExec(ctx, batch, out);
+    }
+  }
   return FilterExec<FSLSelectionImpl>(ctx, batch, out);
 }
 
@@ -968,6 +965,29 @@ Status LargeListTakeExec(KernelContext* ctx, const 
ExecSpan& batch, ExecResult*
 }
 
 Status FSLTakeExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) 
{
+  const ArraySpan& values = batch[0].array;
+
+  // If a FixedSizeList wraps a fixed-width type we can, in some cases, use
+  // PrimitiveTakeExec for a fixed-size list array.
+  if (util::IsFixedWidthLike(values,
+                             /*force_null_count=*/true,
+                             /*exclude_dictionary=*/true)) {
+    const auto byte_width = util::FixedWidthInBytes(*values.type);
+    // Additionally, PrimitiveTakeExec is only implemented for specific byte 
widths.
+    // TODO(GH-41301): Extend PrimitiveTakeExec for any fixed-width type.
+    switch (byte_width) {
+      case 1:
+      case 2:
+      case 4:
+      case 8:
+      case 16:
+      case 32:
+        return PrimitiveTakeExec(ctx, batch, out);
+      default:
+        break;  // fallback to TakeExec<FSBSelectionImpl>
+    }
+  }
+
   return TakeExec<FSLSelectionImpl>(ctx, batch, out);
 }
 
diff --git a/cpp/src/arrow/compute/kernels/vector_selection_internal.h 
b/cpp/src/arrow/compute/kernels/vector_selection_internal.h
index 95f3e51cd6..a169f4b38a 100644
--- a/cpp/src/arrow/compute/kernels/vector_selection_internal.h
+++ b/cpp/src/arrow/compute/kernels/vector_selection_internal.h
@@ -45,12 +45,6 @@ void RegisterSelectionFunction(const std::string& name, 
FunctionDoc doc,
                                const FunctionOptions* default_options,
                                FunctionRegistry* registry);
 
-/// \brief Allocate an ArrayData for a primitive array with a given length and 
bit width
-///
-/// \param[in] bit_width 1 or a multiple of 8
-Status PreallocatePrimitiveArrayData(KernelContext* ctx, int64_t length, int 
bit_width,
-                                     bool allocate_validity, ArrayData* out);
-
 /// \brief Callback type for VisitPlainxREEFilterOutputSegments.
 ///
 /// position is the logical position in the values array relative to its 
offset.
@@ -70,6 +64,7 @@ void VisitPlainxREEFilterOutputSegments(
     FilterOptions::NullSelectionBehavior null_selection,
     const EmitREEFilterSegment& emit_segment);
 
+Status PrimitiveFilterExec(KernelContext*, const ExecSpan&, ExecResult*);
 Status ListFilterExec(KernelContext*, const ExecSpan&, ExecResult*);
 Status LargeListFilterExec(KernelContext*, const ExecSpan&, ExecResult*);
 Status FSLFilterExec(KernelContext*, const ExecSpan&, ExecResult*);
diff --git a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc 
b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc
index 5cd3710828..48a2de9936 100644
--- a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc
+++ b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc
@@ -37,6 +37,7 @@
 #include "arrow/util/bit_block_counter.h"
 #include "arrow/util/bit_run_reader.h"
 #include "arrow/util/bit_util.h"
+#include "arrow/util/fixed_width_internal.h"
 #include "arrow/util/int_util.h"
 #include "arrow/util/ree_util.h"
 
@@ -323,7 +324,7 @@ namespace {
 using TakeState = OptionsWrapper<TakeOptions>;
 
 // ----------------------------------------------------------------------
-// Implement optimized take for primitive types from boolean to 1/2/4/8-byte
+// Implement optimized take for primitive types from boolean to 
1/2/4/8/16/32-byte
 // C-type based types. Use common implementation for every byte width and only
 // generate code for unsigned integer indices, since after boundschecking to
 // check for negative numbers in the indices we can safely reinterpret_cast
@@ -333,16 +334,20 @@ using TakeState = OptionsWrapper<TakeOptions>;
 /// use the logical Arrow type but rather the physical C type. This way we
 /// only generate one take function for each byte width.
 ///
-/// This function assumes that the indices have been boundschecked.
+/// Also note that this function can also handle fixed-size-list arrays if
+/// they fit the criteria described in fixed_width_internal.h, so use the
+/// function defined in that file to access values and destination pointers
+/// and DO NOT ASSUME `values.type()` is a primitive type.
+///
+/// \pre the indices have been boundschecked
 template <typename IndexCType, typename ValueWidthConstant>
 struct PrimitiveTakeImpl {
   static constexpr int kValueWidth = ValueWidthConstant::value;
 
   static void Exec(const ArraySpan& values, const ArraySpan& indices,
                    ArrayData* out_arr) {
-    DCHECK_EQ(values.type->byte_width(), kValueWidth);
-    const auto* values_data =
-        values.GetValues<uint8_t>(1, 0) + kValueWidth * values.offset;
+    DCHECK_EQ(util::FixedWidthInBytes(*values.type), kValueWidth);
+    const auto* values_data = util::OffsetPointerOfFixedWidthValues(values);
     const uint8_t* values_is_valid = values.buffers[0].data;
     auto values_offset = values.offset;
 
@@ -350,16 +355,15 @@ struct PrimitiveTakeImpl {
     const uint8_t* indices_is_valid = indices.buffers[0].data;
     auto indices_offset = indices.offset;
 
-    auto out = out_arr->GetMutableValues<uint8_t>(1, 0) + kValueWidth * 
out_arr->offset;
+    DCHECK_EQ(out_arr->offset, 0);
+    auto* out = util::MutableFixedWidthValuesPointer(out_arr);
     auto out_is_valid = out_arr->buffers[0]->mutable_data();
-    auto out_offset = out_arr->offset;
-    DCHECK_EQ(out_offset, 0);
 
     // If either the values or indices have nulls, we preemptively zero out the
     // out validity bitmap so that we don't have to use ClearBit in each
     // iteration for nulls.
     if (values.null_count != 0 || indices.null_count != 0) {
-      bit_util::SetBitsTo(out_is_valid, out_offset, indices.length, false);
+      bit_util::SetBitsTo(out_is_valid, 0, indices.length, false);
     }
 
     auto WriteValue = [&](int64_t position) {
@@ -386,7 +390,7 @@ struct PrimitiveTakeImpl {
         valid_count += block.popcount;
         if (block.popcount == block.length) {
           // Fastest path: neither values nor index nulls
-          bit_util::SetBitsTo(out_is_valid, out_offset + position, 
block.length, true);
+          bit_util::SetBitsTo(out_is_valid, position, block.length, true);
           for (int64_t i = 0; i < block.length; ++i) {
             WriteValue(position);
             ++position;
@@ -396,7 +400,7 @@ struct PrimitiveTakeImpl {
           for (int64_t i = 0; i < block.length; ++i) {
             if (bit_util::GetBit(indices_is_valid, indices_offset + position)) 
{
               // index is not null
-              bit_util::SetBit(out_is_valid, out_offset + position);
+              bit_util::SetBit(out_is_valid, position);
               WriteValue(position);
             } else {
               WriteZero(position);
@@ -416,7 +420,7 @@ struct PrimitiveTakeImpl {
                                  values_offset + indices_data[position])) {
               // value is not null
               WriteValue(position);
-              bit_util::SetBit(out_is_valid, out_offset + position);
+              bit_util::SetBit(out_is_valid, position);
               ++valid_count;
             } else {
               WriteZero(position);
@@ -433,7 +437,7 @@ struct PrimitiveTakeImpl {
                                  values_offset + indices_data[position])) {
               // index is not null && value is not null
               WriteValue(position);
-              bit_util::SetBit(out_is_valid, out_offset + position);
+              bit_util::SetBit(out_is_valid, position);
               ++valid_count;
             } else {
               WriteZero(position);
@@ -584,14 +588,17 @@ Status PrimitiveTakeExec(KernelContext* ctx, const 
ExecSpan& batch, ExecResult*
 
   ArrayData* out_arr = out->array_data().get();
 
-  const int bit_width = values.type->bit_width();
+  DCHECK(util::IsFixedWidthLike(values, /*force_null_count=*/false,
+                                /*exclude_dictionary=*/true));
+  const int64_t bit_width = util::FixedWidthInBits(*values.type);
 
   // TODO: When neither values nor indices contain nulls, we can skip
   // allocating the validity bitmap altogether and save time and space. A
   // streamlined PrimitiveTakeImpl would need to be written that skips all
   // interactions with the output validity bitmap, though.
-  RETURN_NOT_OK(PreallocatePrimitiveArrayData(ctx, indices.length, bit_width,
-                                              /*allocate_validity=*/true, 
out_arr));
+  RETURN_NOT_OK(util::internal::PreallocateFixedWidthArrayData(
+      ctx, indices.length, /*source=*/values,
+      /*allocate_validity=*/true, out_arr));
   switch (bit_width) {
     case 1:
       TakeIndexDispatch<BooleanTakeImpl>(values, indices, out_arr);
diff --git a/cpp/src/arrow/compute/kernels/vector_selection_test.cc 
b/cpp/src/arrow/compute/kernels/vector_selection_test.cc
index ec94b328ea..4c7d85b103 100644
--- a/cpp/src/arrow/compute/kernels/vector_selection_test.cc
+++ b/cpp/src/arrow/compute/kernels/vector_selection_test.cc
@@ -23,6 +23,7 @@
 #include <utility>
 #include <vector>
 
+#include "arrow/array/builder_nested.h"
 #include "arrow/array/concatenate.h"
 #include "arrow/chunked_array.h"
 #include "arrow/compute/api.h"
@@ -32,6 +33,7 @@
 #include "arrow/testing/gtest_util.h"
 #include "arrow/testing/random.h"
 #include "arrow/testing/util.h"
+#include "arrow/util/fixed_width_test_util.h"
 #include "arrow/util/logging.h"
 
 namespace arrow {
@@ -726,7 +728,37 @@ TEST_F(TestFilterKernelWithLargeList, FilterListInt32) {
                      "[[1,2], null, null]");
 }
 
-class TestFilterKernelWithFixedSizeList : public TestFilterKernel {};
+class TestFilterKernelWithFixedSizeList : public TestFilterKernel {
+ protected:
+  std::vector<std::shared_ptr<Array>> five_length_filters_ = {
+      ArrayFromJSON(boolean(), "[false, false, false, false, false]"),
+      ArrayFromJSON(boolean(), "[true, true, true, true, true]"),
+      ArrayFromJSON(boolean(), "[false, true, true, false, true]"),
+      ArrayFromJSON(boolean(), "[null, true, null, false, true]"),
+  };
+
+  void AssertFilterOnNestedLists(const std::shared_ptr<DataType>& inner_type,
+                                 const std::vector<int>& list_sizes) {
+    using NLG = ::arrow::util::internal::NestedListGenerator;
+    constexpr int64_t kLength = 5;
+    // Create two equivalent lists: one as a FixedSizeList and another as a 
List.
+    ASSERT_OK_AND_ASSIGN(auto fsl_list,
+                         NLG::NestedFSLArray(inner_type, list_sizes, kLength));
+    ASSERT_OK_AND_ASSIGN(auto list,
+                         NLG::NestedListArray(inner_type, list_sizes, 
kLength));
+
+    ARROW_SCOPED_TRACE("CheckTakeOnNestedLists of type `", *fsl_list->type(), 
"`");
+
+    for (auto& filter : five_length_filters_) {
+      // Use the Filter on ListType as the reference implementation.
+      ASSERT_OK_AND_ASSIGN(auto expected_list,
+                           Filter(*list, *filter, /*options=*/emit_null_));
+      ASSERT_OK_AND_ASSIGN(auto expected_fsl, Cast(expected_list, 
fsl_list->type()));
+      auto expected_fsl_array = expected_fsl.make_array();
+      this->AssertFilter(fsl_list, filter, expected_fsl_array);
+    }
+  }
+};
 
 TEST_F(TestFilterKernelWithFixedSizeList, FilterFixedSizeListInt32) {
   std::string list_json = "[null, [1, null, 3], [4, 5, 6], [7, 8, null]]";
@@ -740,6 +772,33 @@ TEST_F(TestFilterKernelWithFixedSizeList, 
FilterFixedSizeListInt32) {
                      "[[1, null, 3], [7, 8, null]]");
 }
 
+TEST_F(TestFilterKernelWithFixedSizeList, FilterFixedSizeListVarWidth) {
+  std::string list_json =
+      R"([["zero", "one", ""], ["two", "", "three"], ["four", "five", "six"], 
["seven", "eight", ""]])";
+  this->AssertFilter(fixed_size_list(utf8(), 3), list_json, "[0, 0, 0, 0]", 
"[]");
+  this->AssertFilter(fixed_size_list(utf8(), 3), list_json, "[0, 1, 1, null]",
+                     R"([["two", "", "three"], ["four", "five", "six"], 
null])");
+  this->AssertFilter(fixed_size_list(utf8(), 3), list_json, "[0, 0, 1, null]",
+                     R"([["four", "five", "six"], null])");
+  this->AssertFilter(fixed_size_list(utf8(), 3), list_json, "[1, 1, 1, 1]", 
list_json);
+  this->AssertFilter(fixed_size_list(utf8(), 3), list_json, "[0, 1, 0, 1]",
+                     R"([["two", "", "three"], ["seven", "eight", ""]])");
+}
+
+TEST_F(TestFilterKernelWithFixedSizeList, FilterFixedSizeListModuloNesting) {
+  using NLG = ::arrow::util::internal::NestedListGenerator;
+  const std::vector<std::shared_ptr<DataType>> value_types = {
+      int16(),
+      int32(),
+      int64(),
+  };
+  NLG::VisitAllNestedListConfigurations(
+      value_types, [this](const std::shared_ptr<DataType>& inner_type,
+                          const std::vector<int>& list_sizes) {
+        this->AssertFilterOnNestedLists(inner_type, list_sizes);
+      });
+}
+
 class TestFilterKernelWithMap : public TestFilterKernel {};
 
 TEST_F(TestFilterKernelWithMap, FilterMapStringToInt32) {
@@ -1034,29 +1093,34 @@ Status TakeJSON(const std::shared_ptr<DataType>& type, 
const std::string& values
       .Value(out);
 }
 
+void DoCheckTake(const std::shared_ptr<Array>& values,
+                 const std::shared_ptr<Array>& indices,
+                 const std::shared_ptr<Array>& expected) {
+  AssertTakeArrays(values, indices, expected);
+
+  // Check sliced values
+  ASSERT_OK_AND_ASSIGN(auto values_filler, MakeArrayOfNull(values->type(), 2));
+  ASSERT_OK_AND_ASSIGN(auto values_sliced,
+                       Concatenate({values_filler, values, values_filler}));
+  values_sliced = values_sliced->Slice(2, values->length());
+  AssertTakeArrays(values_sliced, indices, expected);
+
+  // Check sliced indices
+  ASSERT_OK_AND_ASSIGN(auto zero, MakeScalar(indices->type(), int8_t{0}));
+  ASSERT_OK_AND_ASSIGN(auto indices_filler, MakeArrayFromScalar(*zero, 3));
+  ASSERT_OK_AND_ASSIGN(auto indices_sliced,
+                       Concatenate({indices_filler, indices, indices_filler}));
+  indices_sliced = indices_sliced->Slice(3, indices->length());
+  AssertTakeArrays(values, indices_sliced, expected);
+}
+
 void CheckTake(const std::shared_ptr<DataType>& type, const std::string& 
values_json,
                const std::string& indices_json, const std::string& 
expected_json) {
   auto values = ArrayFromJSON(type, values_json);
   auto expected = ArrayFromJSON(type, expected_json);
-
   for (auto index_type : {int8(), uint32()}) {
     auto indices = ArrayFromJSON(index_type, indices_json);
-    AssertTakeArrays(values, indices, expected);
-
-    // Check sliced values
-    ASSERT_OK_AND_ASSIGN(auto values_filler, MakeArrayOfNull(type, 2));
-    ASSERT_OK_AND_ASSIGN(auto values_sliced,
-                         Concatenate({values_filler, values, values_filler}));
-    values_sliced = values_sliced->Slice(2, values->length());
-    AssertTakeArrays(values_sliced, indices, expected);
-
-    // Check sliced indices
-    ASSERT_OK_AND_ASSIGN(auto zero, MakeScalar(index_type, int8_t{0}));
-    ASSERT_OK_AND_ASSIGN(auto indices_filler, MakeArrayFromScalar(*zero, 3));
-    ASSERT_OK_AND_ASSIGN(auto indices_sliced,
-                         Concatenate({indices_filler, indices, 
indices_filler}));
-    indices_sliced = indices_sliced->Slice(3, indices->length());
-    AssertTakeArrays(values, indices_sliced, expected);
+    DoCheckTake(values, indices, expected);
   }
 }
 
@@ -1427,7 +1491,25 @@ TEST_F(TestTakeKernelWithLargeList, TakeLargeListInt32) {
   CheckTake(large_list(int32()), list_json, "[null, 1, 2, 0]", "[null, [1,2], 
null, []]");
 }
 
-class TestTakeKernelWithFixedSizeList : public 
TestTakeKernelTyped<FixedSizeListType> {};
+class TestTakeKernelWithFixedSizeList : public 
TestTakeKernelTyped<FixedSizeListType> {
+ protected:
+  void CheckTakeOnNestedLists(const std::shared_ptr<DataType>& inner_type,
+                              const std::vector<int>& list_sizes, int64_t 
length) {
+    using NLG = ::arrow::util::internal::NestedListGenerator;
+    // Create two equivalent lists: one as a FixedSizeList and another as a 
List.
+    ASSERT_OK_AND_ASSIGN(auto fsl_list,
+                         NLG::NestedFSLArray(inner_type, list_sizes, length));
+    ASSERT_OK_AND_ASSIGN(auto list, NLG::NestedListArray(inner_type, 
list_sizes, length));
+
+    ARROW_SCOPED_TRACE("CheckTakeOnNestedLists of type `", *fsl_list->type(), 
"`");
+
+    auto indices = ArrayFromJSON(int64(), "[1, 2, 4]");
+    // Use the Take on ListType as the reference implementation.
+    ASSERT_OK_AND_ASSIGN(auto expected_list, Take(*list, *indices));
+    ASSERT_OK_AND_ASSIGN(auto expected_fsl, Cast(*expected_list, 
fsl_list->type()));
+    DoCheckTake(fsl_list, indices, expected_fsl);
+  }
+};
 
 TEST_F(TestTakeKernelWithFixedSizeList, TakeFixedSizeListInt32) {
   std::string list_json = "[null, [1, null, 3], [4, 5, 6], [7, 8, null]]";
@@ -1449,6 +1531,42 @@ TEST_F(TestTakeKernelWithFixedSizeList, 
TakeFixedSizeListInt32) {
                                                 "[0, 1, 0]");
 }
 
+TEST_F(TestTakeKernelWithFixedSizeList, TakeFixedSizeListVarWidth) {
+  std::string list_json =
+      R"([["zero", "one", ""], ["two", "", "three"], ["four", "five", "six"], 
["seven", "eight", ""]])";
+  CheckTake(fixed_size_list(utf8(), 3), list_json, "[]", "[]");
+  CheckTake(fixed_size_list(utf8(), 3), list_json, "[3, 2, 1]",
+            R"([["seven", "eight", ""], ["four", "five", "six"], ["two", "", 
"three"]])");
+  CheckTake(fixed_size_list(utf8(), 3), list_json, "[null, 2, 0]",
+            R"([null, ["four", "five", "six"], ["zero", "one", ""]])");
+  CheckTake(fixed_size_list(utf8(), 3), list_json, R"([null, null])", "[null, 
null]");
+  CheckTake(
+      fixed_size_list(utf8(), 3), list_json, "[3, 0, 0,3]",
+      R"([["seven", "eight", ""], ["zero", "one", ""], ["zero", "one", ""], 
["seven", "eight", ""]])");
+  CheckTake(fixed_size_list(utf8(), 3), list_json, "[0, 1, 2, 3]", list_json);
+  CheckTake(fixed_size_list(utf8(), 3), list_json, "[2, 2, 2, 2, 2, 2, 1]",
+            R"([
+                 ["four", "five", "six"], ["four", "five", "six"],
+                 ["four", "five", "six"], ["four", "five", "six"],
+                 ["four", "five", "six"], ["four", "five", "six"],
+                 ["two", "", "three"]
+               ])");
+}
+
+TEST_F(TestTakeKernelWithFixedSizeList, TakeFixedSizeListModuloNesting) {
+  using NLG = ::arrow::util::internal::NestedListGenerator;
+  const std::vector<std::shared_ptr<DataType>> value_types = {
+      int16(),
+      int32(),
+      int64(),
+  };
+  NLG::VisitAllNestedListConfigurations(
+      value_types, [this](const std::shared_ptr<DataType>& inner_type,
+                          const std::vector<int>& list_sizes) {
+        this->CheckTakeOnNestedLists(inner_type, list_sizes, /*length=*/5);
+      });
+}
+
 class TestTakeKernelWithMap : public TestTakeKernelTyped<MapType> {};
 
 TEST_F(TestTakeKernelWithMap, TakeMapStringToInt32) {
diff --git a/cpp/src/arrow/util/CMakeLists.txt 
b/cpp/src/arrow/util/CMakeLists.txt
index e26efba285..087e4e3879 100644
--- a/cpp/src/arrow/util/CMakeLists.txt
+++ b/cpp/src/arrow/util/CMakeLists.txt
@@ -56,6 +56,7 @@ add_arrow_test(utility-test
                compression_test.cc
                decimal_test.cc
                float16_test.cc
+               fixed_width_test.cc
                formatting_util_test.cc
                key_value_metadata_test.cc
                hashing_test.cc
diff --git a/cpp/src/arrow/util/fixed_width_internal.cc 
b/cpp/src/arrow/util/fixed_width_internal.cc
new file mode 100644
index 0000000000..164af3cff6
--- /dev/null
+++ b/cpp/src/arrow/util/fixed_width_internal.cc
@@ -0,0 +1,226 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <optional>
+#include <utility>
+
+#include "arrow/array/data.h"
+#include "arrow/compute/kernel.h"
+#include "arrow/result.h"
+#include "arrow/type.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/fixed_width_internal.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/small_vector.h"
+
+namespace arrow::util {
+
+using ::arrow::internal::checked_cast;
+
+bool IsFixedWidthLike(const ArraySpan& source, bool force_null_count,
+                      bool exclude_dictionary) {
+  return IsFixedWidthLike(source, force_null_count,
+                          [exclude_dictionary](const DataType& type) {
+                            return !exclude_dictionary || type.id() != 
Type::DICTIONARY;
+                          });
+}
+
+static int64_t FixedWidthInBytesFallback(const FixedSizeListType& 
fixed_size_list_type) {
+  auto* fsl = &fixed_size_list_type;
+  int64_t list_size = fsl->list_size();
+  for (auto type = fsl->value_type().get();;) {
+    if (type->id() == Type::FIXED_SIZE_LIST) {
+      fsl = checked_cast<const FixedSizeListType*>(type);
+      list_size *= fsl->list_size();
+      type = fsl->value_type().get();
+      continue;
+    }
+    if (type->id() != Type::BOOL && is_fixed_width(type->id())) {
+      const int64_t flat_byte_width = list_size * type->byte_width();
+      DCHECK_GE(flat_byte_width, 0);
+      return flat_byte_width;
+    }
+    break;
+  }
+  return -1;
+}
+
+int64_t FixedWidthInBytes(const DataType& type) {
+  auto type_id = type.id();
+  if (is_fixed_width(type_id)) {
+    const int32_t num_bits = type.bit_width();
+    return (type_id == Type::BOOL) ? -1 : num_bits / 8;
+  }
+  if (type_id == Type::FIXED_SIZE_LIST) {
+    auto& fsl = ::arrow::internal::checked_cast<const 
FixedSizeListType&>(type);
+    return FixedWidthInBytesFallback(fsl);
+  }
+  return -1;
+}
+
+int64_t FixedWidthInBits(const DataType& type) {
+  auto type_id = type.id();
+  if (is_fixed_width(type_id)) {
+    return type.bit_width();
+  }
+  const int64_t byte_width = FixedWidthInBytes(type);
+  if (ARROW_PREDICT_FALSE(byte_width < 0)) {
+    return -1;
+  }
+  return byte_width * 8;
+}
+
+namespace internal {
+
+Status PreallocateFixedWidthArrayData(::arrow::compute::KernelContext* ctx,
+                                      int64_t length, const ArraySpan& source,
+                                      bool allocate_validity, ArrayData* out) {
+  DCHECK(!source.MayHaveNulls() || allocate_validity)
+      << "allocate_validity cannot be false if source may have nulls";
+  DCHECK_EQ(source.type->id(), out->type->id());
+  auto* type = source.type;
+  out->length = length;
+  if (type->id() == Type::FIXED_SIZE_LIST) {
+    out->buffers.resize(1);
+    out->child_data = {std::make_shared<ArrayData>()};
+  } else {
+    out->buffers.resize(2);
+  }
+  if (allocate_validity) {
+    ARROW_ASSIGN_OR_RAISE(out->buffers[0], ctx->AllocateBitmap(length));
+  }
+
+  if (type->id() == Type::BOOL) {
+    ARROW_ASSIGN_OR_RAISE(out->buffers[1], ctx->AllocateBitmap(length));
+    return Status::OK();
+  }
+  if (is_fixed_width(type->id())) {
+    if (type->id() == Type::DICTIONARY) {
+      return Status::NotImplemented(
+          "PreallocateFixedWidthArrayData: DICTIONARY type allocation: ", 
*type);
+    }
+    ARROW_ASSIGN_OR_RAISE(out->buffers[1],
+                          ctx->Allocate(length * source.type->byte_width()));
+    return Status::OK();
+  }
+  if (type->id() == Type::FIXED_SIZE_LIST) {
+    auto& fsl_type = checked_cast<const FixedSizeListType&>(*type);
+    auto& value_type = fsl_type.value_type();
+    if (ARROW_PREDICT_FALSE(value_type->id() == Type::BOOL)) {
+      return Status::Invalid("PreallocateFixedWidthArrayData: Invalid type: ", 
fsl_type);
+    }
+    if (ARROW_PREDICT_FALSE(value_type->id() == Type::DICTIONARY)) {
+      return Status::NotImplemented(
+          "PreallocateFixedWidthArrayData: DICTIONARY type allocation: ", 
*type);
+    }
+    if (source.child_data[0].MayHaveNulls()) {
+      return Status::Invalid(
+          "PreallocateFixedWidthArrayData: "
+          "FixedSizeList may have null values in child array: ",
+          fsl_type);
+    }
+    auto* child_values = out->child_data[0].get();
+    child_values->type = value_type;
+    return PreallocateFixedWidthArrayData(ctx, length * fsl_type.list_size(),
+                                          /*source=*/source.child_data[0],
+                                          /*allocate_validity=*/false,
+                                          /*out=*/child_values);
+  }
+  return Status::Invalid("PreallocateFixedWidthArrayData: Invalid type: ", 
*type);
+}
+
+}  // namespace internal
+
+/// \pre same as OffsetPointerOfFixedWidthValues
+/// \pre source.type->id() != Type::BOOL
+static const uint8_t* OffsetPointerOfFixedWidthValuesFallback(const ArraySpan& 
source) {
+  using OffsetAndListSize = std::pair<int64_t, int64_t>;
+  auto get_offset = [](auto pair) { return pair.first; };
+  auto get_list_size = [](auto pair) { return pair.second; };
+  ::arrow::internal::SmallVector<OffsetAndListSize, 1> stack;
+
+  DCHECK_NE(source.type->id(), Type::BOOL);
+
+  int64_t list_size = 1;
+  auto* array = &source;
+  while (array->type->id() == Type::FIXED_SIZE_LIST) {
+    list_size *= checked_cast<const 
FixedSizeListType*>(array->type)->list_size();
+    stack.emplace_back(array->offset, list_size);
+    array = &array->child_data[0];
+  }
+  // Now that innermost values were reached, pop the stack and calculate the 
offset
+  // in bytes of the innermost values buffer by considering the offset at each
+  // level of nesting.
+  DCHECK(array->type->id() != Type::BOOL && is_fixed_width(*array->type));
+  DCHECK(array == &source || !array->MayHaveNulls())
+      << "OffsetPointerOfFixedWidthValues: array is expected to be flat or 
have no "
+         "nulls in the arrays nested by FIXED_SIZE_LIST.";
+  int64_t value_width = array->type->byte_width();
+  int64_t offset_in_bytes = array->offset * value_width;
+  for (auto it = stack.rbegin(); it != stack.rend(); ++it) {
+    value_width *= get_list_size(*it);
+    offset_in_bytes += get_offset(*it) * value_width;
+  }
+  return value_width < 0 ? nullptr : array->GetValues<uint8_t>(1, 
offset_in_bytes);
+}
+
+const uint8_t* OffsetPointerOfFixedWidthValues(const ArraySpan& source) {
+  auto type_id = source.type->id();
+  if (is_fixed_width(type_id)) {
+    if (ARROW_PREDICT_FALSE(type_id == Type::BOOL)) {
+      // BOOL arrays are bit-packed, thus a byte-aligned pointer cannot be 
produced in the
+      // general case. Returning something for BOOL arrays that happen to 
byte-align
+      // because offset=0 would create too much confusion.
+      return nullptr;
+    }
+    return source.GetValues<uint8_t>(1, 0) + source.offset * 
source.type->byte_width();
+  }
+  return OffsetPointerOfFixedWidthValuesFallback(source);
+}
+
+/// \brief Get the mutable pointer to the fixed-width values of an array
+///        allocated by PreallocateFixedWidthArrayData.
+///
+/// \pre mutable_array->offset and the offset of child array (if it's a
+///      FixedSizeList) MUST be 0 (recursively).
+/// \pre IsFixedWidthLike(ArraySpan(mutable_array)) or the more restrictive
+///      is_fixed_width(*mutable_array->type) MUST be true
+/// \return The mutable pointer to the fixed-width byte blocks of the array. If
+///         pre-conditions are not satisfied, the return values is undefined.
+uint8_t* MutableFixedWidthValuesPointer(ArrayData* mutable_array) {
+  auto type_id = mutable_array->type->id();
+  if (type_id == Type::FIXED_SIZE_LIST) {
+    auto* array = mutable_array;
+    do {
+      DCHECK_EQ(array->offset, 0);
+      DCHECK_EQ(array->child_data.size(), 1) << array->type->ToString(true) << 
" part of "
+                                             << 
mutable_array->type->ToString(true);
+      array = array->child_data[0].get();
+    } while (array->type->id() == Type::FIXED_SIZE_LIST);
+    DCHECK_EQ(array->offset, 0);
+    DCHECK(array->type->id() != Type::BOOL && is_fixed_width(*array->type));
+    return array->GetMutableValues<uint8_t>(1, 0);
+  }
+  DCHECK_EQ(mutable_array->offset, 0);
+  // BOOL is allowed here only because the offset is expected to be 0,
+  // so the byte-aligned pointer also points to the first *bit* of the buffer.
+  DCHECK(is_fixed_width(type_id));
+  return mutable_array->GetMutableValues<uint8_t>(1, 0);
+}
+
+}  // namespace arrow::util
diff --git a/cpp/src/arrow/util/fixed_width_internal.h 
b/cpp/src/arrow/util/fixed_width_internal.h
new file mode 100644
index 0000000000..f6959485fb
--- /dev/null
+++ b/cpp/src/arrow/util/fixed_width_internal.h
@@ -0,0 +1,307 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array/data.h"
+#include "arrow/type.h"
+#include "arrow/type_fwd.h"
+#include "arrow/type_traits.h"
+
+namespace arrow::compute {
+// XXX: remove dependency on compute::KernelContext
+class KernelContext;
+}  // namespace arrow::compute
+
+namespace arrow::util {
+
+/// \brief Checks if the given array has a fixed-width type or if it's an 
array of
+/// fixed-size list that can be flattened to an array of fixed-width values.
+///
+/// Fixed-width types are the ones defined by the is_fixed_width() predicate in
+/// type_traits.h. They are all the types that passes any of the following
+/// predicates:
+///
+///  - is_primitive()
+///  - is_fixed_size_binary()
+///  - is_dictionary()
+///
+/// At least 3 types in this set require special care:
+///  - `Type::BOOL` is fixed-width, but it's a 1-bit type and pointers to 
first bit
+///    in boolean buffers are not always aligned to byte boundaries.
+///  - `Type::DICTIONARY` is fixed-width because the indices are fixed-width, 
but the
+///    dictionary values are not necessarily fixed-width and have to be managed
+///    by separate operations.
+///  - Type::FIXED_SIZE_BINARY unlike other fixed-width types, fixed-size 
binary
+///    values are defined by a size attribute that is not known at compile 
time.
+///    The other types have power-of-2 byte widths, while fixed-size binary can
+///    have any byte width including 0.
+///
+/// Additionally, we say that a type is "fixed-width like" if it's a 
fixed-width as
+/// defined above, or if it's a fixed-size list (or nested fixed-size lists) 
and
+/// the innermost type is fixed-width and the following restrictions also 
apply:
+///  - The value type of the innermost fixed-size list is not BOOL (it has to 
be excluded
+///    because a 1-bit type doesn't byte-align)
+///  - Only the top-level array may have nulls, all the inner array have to be 
completely
+///    free of nulls so we don't need to manage internal validity bitmaps.
+///
+/// Take the following `fixed_size_list<fixed_size_list<int32, 2>, 3>` array 
as an
+/// example:
+///
+///     [
+///       [[1, 2], [3,  4], [ 5,  6]],
+///       null,
+///       [[7, 8], [9, 10], [11, 12]]
+///     ]
+///
+/// in memory, it would look like:
+///
+///     {
+///        type: fixed_size_list<fixed_size_list<int32, 2>, 3>,
+///        length: 3,
+///        null_count: 1,
+///        offset: 0,
+///        buffers: [
+///          0: [0b00000101]
+///        ],
+///        child_data: [
+///          0: {
+///            type: fixed_size_list<int32, 2>,
+///            length: 9,
+///            null_count: 0,
+///            offset: 0,
+///            buffers: [0: NULL],
+///            child_data: [
+///              0: {
+///                type: int32,
+///                length: 18,
+///                null_count: 0,
+///                offset: 0,
+///                buffers: [
+///                  0: NULL,
+///                  1: [ 1,  2,  3,  4,  5,  6,
+///                       0,  0,  0,  0,  0,  0
+///                       7,  8,  9, 10, 11, 12 ]
+///                ],
+///                child_data: []
+///              }
+///            ]
+///          }
+///        ]
+///     }
+///
+/// This layout fits the fixed-width like definition because the innermost type
+/// is byte-aligned fixed-width (int32 = 4 bytes) and the internal arrays don't
+/// have nulls. The validity bitmap is only needed at the top-level array.
+///
+/// Writing to this array can be done in the same way writing to a flat 
fixed-width
+/// array is done, by:
+/// 1. Updating the validity bitmap at the top-level array if nulls are 
present.
+/// 2. Updating a continuous fixed-width block of memory through a single 
pointer.
+///
+/// The length of this block of memory is the product of the list sizes in the
+/// `FixedSizeList` types and the byte width of the innermost fixed-width type:
+///
+///     3 * 2 * 4 = 24 bytes
+///
+/// Writing the `[[1, 2], [3, 4], [5, 6]]` value at a given index can be done 
by
+/// simply setting the validity bit to 1 and writing the 24-byte sequence of
+/// integers `[1, 2, 3, 4, 5, 6]` to the memory block at `byte_ptr + index * 
24`.
+///
+/// The length of the top-level array fully defines the lengths that all the 
nested
+/// arrays must have, which makes defining all the lengths as easy as defining 
the
+/// length of the top-level array.
+///
+///     length = 3
+///     child_data[0].length == 3 * 3 == 9
+///     child_data[0].child_data[0].length == 3 * 3 * 2 == 18
+///
+///     child_data[0].child_data[0].buffers[1].size() >=
+///       (3 * (3 * 2 * sizeof(int32)) == 3 * 24 == 72)
+///
+/// Dealing with offsets is a bit involved. Let's say the array described 
above has
+/// the offsets 2, 5, and 7:
+///
+///     {
+///        type: fixed_size_list<fixed_size_list<int32, 2>, 3>,
+///        offset: 2,
+///        ...
+///        child_data: [
+///          0: {
+///            type: fixed_size_list<int32, 2>,
+///            offset: 5,
+///            ...
+///            child_data: [
+///              0: {
+///                type: int32,
+///                offset: 7,
+///                buffers: [
+///                  0: NULL,
+///                  1: [ 1, 1, 1, 1, 1, 1, 1,      // 7 values skipped
+///                       0,1, 0,1, 0,1, 0,1, 0,1,  // 5 [x,x] values skipped
+///
+///                       0,0,0,0,0,1,  //
+///                       0,0,0,0,0,1,  // 2 [[x,x], [x,x], [x,x]] values 
skipped
+///
+///                       1,  2,  3,  4,  5,  6,  //
+///                       0,  0,  0,  0,  0,  0   // the actual values
+///                       7,  8,  9, 10, 11, 12   //
+///                     ]
+///                ],
+///              }
+///            ]
+///          }
+///        ]
+///     }
+///
+/// The offset of the innermost values buffer, in bytes, is calculated as:
+///
+///     ((2 * 3) + (5 * 2) + 7) * sizeof(int32) = 29 * 4 bytes = 116 bytes
+///
+/// In general, the formula to calculate the offset of the innermost values 
buffer is:
+///
+///     ((off_0 * fsl_size_0) + (off_1 * fsl_size_1) + ... + innermost_off)
+///        * sizeof(innermost_type)
+///
+/// `OffsetPointerOfFixedWidthValues()` can calculate this byte offset and 
return the
+/// pointer to the first relevant byte of the innermost values buffer.
+///
+/// \param source The array to check
+/// \param force_null_count If true, GetNullCount() is used instead of 
null_count
+/// \param exclude_dictionary If true, DICTIONARY is excluded from the
+///                           is_fixed_width() types. Default: false.
+ARROW_EXPORT bool IsFixedWidthLike(const ArraySpan& source, bool 
force_null_count = false,
+                                   bool exclude_dictionary = false);
+
+/// \brief Checks if the given array has a fixed-width type or if it's an 
array of
+/// fixed-size list that can be flattened to an array of fixed-width values.
+///
+/// This function is a more general version of
+/// `IsFixedWidthLike(const ArraySpan&, bool)` that allows the caller to 
further
+/// restrict the inner value types that should be considered fixed-width.
+///
+/// \param source The array to check
+/// \param force_null_count If true, GetNullCount() is used instead of 
null_count
+/// \param extra_predicate A DataType predicate that can be used to further
+///                        restrict the types that are considered fixed-width
+template <class ExtraPred>
+inline bool IsFixedWidthLike(const ArraySpan& source, bool force_null_count,
+                             ExtraPred extra_predicate) {
+  const auto* type = source.type;
+  // BOOL is considered fixed-width if not nested under FIXED_SIZE_LIST.
+  if (is_fixed_width(type->id()) && extra_predicate(*type)) {
+    return true;
+  }
+  if (type->id() == Type::FIXED_SIZE_LIST) {
+    // All the inner arrays must not contain any nulls.
+    const auto* values = &source.child_data[0];
+    while ((force_null_count ? values->GetNullCount() : values->null_count) == 
0) {
+      type = values->type;
+      if (type->id() == Type::FIXED_SIZE_LIST) {
+        values = &values->child_data[0];
+        continue;
+      }
+      // BOOL has to be excluded because it's not byte-aligned.
+      return type->id() != Type::BOOL && is_fixed_width(type->id()) &&
+             extra_predicate(*type);
+    }
+  }
+  return false;
+}
+
+/// \brief Get the fixed-width in bytes of a type if it is a fixed-width like
+/// type, but not BOOL.
+///
+/// If the array is a FixedSizeList (of any level of nesting), the byte width 
of
+/// the values is the product of all fixed-list sizes and the byte width of the
+/// innermost fixed-width value type.
+///
+/// IsFixedWidthLike(array) performs more checks than this function and should
+/// be used to guarantee that, if type is not BOOL, this function will not 
return -1.
+///
+/// NOTE: this function translates `DataType::bit_width()` to bytes 
differently from
+/// `DataType::byte_width()`. `DataType::byte_width()` will return 0 for
+/// BOOL, while this function will return `-1`. This is done because 0 is
+/// a valid return value for FIXED_SIZE_LIST with size 0 or 
`FIXED_SIZE_BINARY` with
+/// size 0.
+///
+/// \pre The instance of the array where this type is from must pass
+///      `IsFixedWidthLike(array)` and should not be BOOL.
+/// \return The fixed-byte width of the values or -1 if the type is BOOL or not
+///         fixed-width like. 0 is a valid return value as fixed-size-lists
+///         and fixed-size-binary with size 0 are allowed.
+ARROW_EXPORT int64_t FixedWidthInBytes(const DataType& type);
+
+/// \brief Get the fixed-width in bits of a type if it is a fixed-width like
+/// type.
+///
+/// \return The bit-width of the values or -1
+/// \see FixedWidthInBytes
+ARROW_EXPORT int64_t FixedWidthInBits(const DataType& type);
+
+namespace internal {
+
+/// \brief Allocate an ArrayData for a type that is fixed-width like.
+///
+/// This function performs the same checks performed by
+/// `IsFixedWidthLike(source, false)`. If `source.type` is not a simple
+/// fixed-width type, caller should make sure it passes the
+/// `IsFixedWidthLike(source)` checks. That guarantees that it's possible to
+/// allocate an array that can serve as a destination for a kernel that writes 
values
+/// through a single pointer to fixed-width byte blocks.
+///
+/// \param[in] length The length of the array to allocate (unrelated to the 
length of
+///                   the source array)
+/// \param[in] source The source array that carries the type information and 
the
+///                   validity bitmaps that are relevant for the type 
validation
+///                   when the source is a FixedSizeList.
+/// \see IsFixedWidthLike
+ARROW_EXPORT Status 
PreallocateFixedWidthArrayData(::arrow::compute::KernelContext* ctx,
+                                                   int64_t length,
+                                                   const ArraySpan& source,
+                                                   bool allocate_validity,
+                                                   ArrayData* out);
+
+}  // namespace internal
+
+/// \brief Get the pointer to the fixed-width values of a fixed-width like 
array.
+///
+/// This function might return NULLPTR if the type of the array is BOOL or
+/// if the pre-conditions listed are not satisfied. The converse is not true
+/// (i.e. not getting NULLPTR doesn't guarantee that source is a fixed-width
+/// like array).
+///
+/// \pre `IsFixedWidthLike(source)` or the more restrictive
+///      is_fixed_width(*mutable_array->type) SHOULD be true
+/// \return The pointer to the fixed-width values of an array or NULLPTR
+///         if pre-conditions are not satisfied.
+ARROW_EXPORT const uint8_t* OffsetPointerOfFixedWidthValues(const ArraySpan& 
source);
+
+/// \brief Get the mutable pointer to the fixed-width values of an array
+///        allocated by PreallocateFixedWidthArrayData.
+///
+/// \pre mutable_array->offset and the offset of child array (if it's a
+///      FixedSizeList) MUST be 0 (recursively).
+/// \pre IsFixedWidthLike(ArraySpan(mutable_array)) or the more restrictive
+///      is_fixed_width(*mutable_array->type) MUST be true
+/// \return The mutable pointer to the fixed-width byte blocks of the array. If
+///         pre-conditions are not satisfied, the return values is undefined.
+ARROW_EXPORT uint8_t* MutableFixedWidthValuesPointer(ArrayData* mutable_array);
+
+}  // namespace arrow::util
diff --git a/cpp/src/arrow/util/fixed_width_test.cc 
b/cpp/src/arrow/util/fixed_width_test.cc
new file mode 100644
index 0000000000..2f05221ed6
--- /dev/null
+++ b/cpp/src/arrow/util/fixed_width_test.cc
@@ -0,0 +1,217 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// #include <cmath>
+// #include <string>
+
+#include <gtest/gtest.h>
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/data.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+#include "arrow/util/fixed_width_internal.h"
+
+namespace arrow::util {
+
+namespace {
+bool NotBool(const DataType& type) { return type.id() != Type::BOOL; }
+bool NotInt32(const DataType& type) { return type.id() != Type::INT32; }
+}  // namespace
+
+class TestFixedWidth : public ::testing::Test {
+ protected:
+  std::shared_ptr<Array> bool_array_array_;
+  std::shared_ptr<Array> int_array_array_;
+  std::shared_ptr<Array> fsl_bool_array_;
+  std::shared_ptr<Array> fsl_int_array_;
+  std::shared_ptr<Array> fsl_int_nulls_array_;
+  std::shared_ptr<Array> fsl_int_inner_nulls_array_;
+  std::shared_ptr<Array> dict_string_array_;
+
+  std::shared_ptr<DataType> fsl(int32_t list_size,
+                                const std::shared_ptr<DataType>& value_type) {
+    return fixed_size_list(value_type, list_size);
+  }
+
+ public:
+  void SetUp() override {
+    bool_array_array_ = ArrayFromJSON(boolean(), "[true, false, null]");
+    int_array_array_ = ArrayFromJSON(int32(), "[1, 0, null]");
+    fsl_bool_array_ = ArrayFromJSON(fsl(2, boolean()), "[[true, false]]");
+    fsl_int_array_ = ArrayFromJSON(fsl(2, int32()), "[[1, 0], [2, 3]]");
+    fsl_int_nulls_array_ = ArrayFromJSON(fsl(2, int32()), "[[1, 0], null, [1, 
2]]");
+    fsl_int_inner_nulls_array_ =
+        ArrayFromJSON(fsl(2, int32()), "[[1, 0], [2, 3], [null, 2]]");
+    dict_string_array_ =
+        ArrayFromJSON(dictionary(int32(), utf8()), R"(["Alice", "Bob", 
"Alice"])");
+  }
+};
+
+TEST_F(TestFixedWidth, IsFixedWidth) {
+  auto arr = ArraySpan{*bool_array_array_->data()};
+  // force_null_count doesn't matter because nulls at the top-level
+  // of the array are allowed by IsFixedWidthLike.
+  ASSERT_TRUE(IsFixedWidthLike(arr, /*force_null_count=*/false));
+  ASSERT_TRUE(IsFixedWidthLike(arr, /*force_null_count=*/true));
+
+  ASSERT_TRUE(IsFixedWidthLike(arr, /*force_null_count=*/false, NotInt32));
+  ASSERT_FALSE(IsFixedWidthLike(arr, /*force_null_count=*/false, NotBool));
+
+  arr = ArraySpan{*int_array_array_->data()};
+  ASSERT_TRUE(IsFixedWidthLike(arr, /*force_null_count=*/false));
+  ASSERT_TRUE(IsFixedWidthLike(arr, /*force_null_count=*/true));
+  ASSERT_TRUE(IsFixedWidthLike(arr, /*force_null_count=*/false, NotBool));
+}
+
+TEST_F(TestFixedWidth, IsFixedWidthLike) {
+  auto arr = ArraySpan{*fsl_bool_array_->data()};
+  // bools wrapped by fixed-size-list are not fixed-width because the
+  // innermost data buffer is a bitmap and won't byte-align.
+  ASSERT_FALSE(IsFixedWidthLike(arr, /*force_null_count=*/false));
+  ASSERT_FALSE(IsFixedWidthLike(arr, /*force_null_count=*/true));
+
+  arr = ArraySpan{*fsl_int_array_->data()};
+  ASSERT_TRUE(IsFixedWidthLike(arr, /*force_null_count=*/false));
+  arr.null_count = kUnknownNullCount;
+  // force_null_count=true isn't necessary because nulls at the top-level
+  // of the array are allowed by IsFixedWidthLike.
+  ASSERT_TRUE(IsFixedWidthLike(arr, /*force_null_count=*/false));
+
+  arr.child_data[0].null_count = kUnknownNullCount;
+  // inner nulls are not allowed by IsFixedWidthLike...
+  ASSERT_FALSE(IsFixedWidthLike(arr, /*force_null_count=*/false));
+  // ...but forcing null counting at on every internal array increases
+  // the chances of IsFixedWidthLike returning true.
+  ASSERT_TRUE(IsFixedWidthLike(arr, /*force_null_count=*/true));
+  // Excluding INT32 from the internal array checks.
+  ASSERT_FALSE(IsFixedWidthLike(arr, /*force_null_count=*/true, NotInt32));
+
+  arr = ArraySpan{*fsl_int_nulls_array_->data()};
+  // Nulls at the top-level of the array are allowed by IsFixedWidthLike.
+  //
+  // TODO(GH-10157): ArrayFromJSON uses FixedSizeListBuilder which currently
+  // produces nulls on the child data if one of the list-typed elements is 
null.
+  // ASSERT_TRUE(IsFixedWidthLike(arr, /*force_null_count=*/false));
+
+  arr = ArraySpan{*fsl_int_inner_nulls_array_->data()};
+  // Inner nulls are not allowed by IsFixedWidthLike.
+  ASSERT_FALSE(IsFixedWidthLike(arr, /*force_null_count=*/true));
+
+  arr = ArraySpan{*dict_string_array_->data()};
+  // Dictionaries are considered fixed-width by is_fixed_width(), but excluded
+  // by IsFixedWidthLike if exclude_dictionary=true.
+  ASSERT_TRUE(IsFixedWidthLike(arr));
+  ASSERT_TRUE(
+      IsFixedWidthLike(arr, /*force_null_count=*/false, 
/*exclude_dictionary=*/false));
+  ASSERT_FALSE(
+      IsFixedWidthLike(arr, /*force_null_count=*/false, 
/*exclude_dictionary=*/true));
+}
+
+TEST_F(TestFixedWidth, MeasureWidthInBytes) {
+  auto b = boolean();
+  auto i8 = int8();
+  auto i32 = int32();
+  auto fsb = fixed_size_binary(3);
+  auto dict = dictionary(int32(), utf8());
+  auto varlen = utf8();
+  ASSERT_EQ(FixedWidthInBytes(*b), -1);
+  ASSERT_EQ(FixedWidthInBytes(*i8), 1);
+  ASSERT_EQ(FixedWidthInBytes(*i32), 4);
+  ASSERT_EQ(FixedWidthInBytes(*fsb), 3);
+  ASSERT_EQ(FixedWidthInBytes(*dict), 4);
+
+  ASSERT_EQ(FixedWidthInBytes(*varlen), -1);
+  ASSERT_EQ(FixedWidthInBytes(*varlen), -1);
+
+  ASSERT_EQ(FixedWidthInBytes(*fsl(0, b)), -1);
+  ASSERT_EQ(FixedWidthInBytes(*fsl(3, b)), -1);
+  ASSERT_EQ(FixedWidthInBytes(*fsl(5, b)), -1);
+
+  ASSERT_EQ(FixedWidthInBytes(*fsl(0, i8)), 0);
+  ASSERT_EQ(FixedWidthInBytes(*fsl(3, i8)), 3);
+  ASSERT_EQ(FixedWidthInBytes(*fsl(5, i8)), 5);
+  ASSERT_EQ(FixedWidthInBytes(*fsl(0, i32)), 0);
+  ASSERT_EQ(FixedWidthInBytes(*fsl(3, i32)), 3 * 4);
+  ASSERT_EQ(FixedWidthInBytes(*fsl(5, i32)), 5 * 4);
+  ASSERT_EQ(FixedWidthInBytes(*fsl(5, fsb)), 5 * 3);
+  ASSERT_EQ(FixedWidthInBytes(*fsl(5, dict)), 5 * 4);
+
+  ASSERT_EQ(FixedWidthInBytes(*fsl(2, fsl(0, i8))), 0);
+  ASSERT_EQ(FixedWidthInBytes(*fsl(2, fsl(3, i8))), 2 * 3);
+  ASSERT_EQ(FixedWidthInBytes(*fsl(2, fsl(5, i8))), 2 * 5);
+  ASSERT_EQ(FixedWidthInBytes(*fsl(2, fsl(0, i32))), 0);
+  ASSERT_EQ(FixedWidthInBytes(*fsl(2, fsl(3, i32))), 2 * 3 * 4);
+  ASSERT_EQ(FixedWidthInBytes(*fsl(2, fsl(5, i32))), 2 * 5 * 4);
+  ASSERT_EQ(FixedWidthInBytes(*fsl(2, fsl(0, fsb))), 0);
+  ASSERT_EQ(FixedWidthInBytes(*fsl(2, fsl(3, fsb))), 2 * 3 * 3);
+  ASSERT_EQ(FixedWidthInBytes(*fsl(2, fsl(5, fsb))), 2 * 5 * 3);
+  ASSERT_EQ(FixedWidthInBytes(*fsl(2, fsl(0, dict))), 0);
+  ASSERT_EQ(FixedWidthInBytes(*fsl(2, fsl(3, dict))), 2 * 3 * 4);
+  ASSERT_EQ(FixedWidthInBytes(*fsl(2, fsl(5, dict))), 2 * 5 * 4);
+
+  ASSERT_EQ(FixedWidthInBytes(*fsl(0, varlen)), -1);
+  ASSERT_EQ(FixedWidthInBytes(*fsl(2, varlen)), -1);
+}
+
+TEST_F(TestFixedWidth, MeasureWidthInBits) {
+  auto b = boolean();
+  auto i8 = int8();
+  auto i32 = int32();
+  auto fsb = fixed_size_binary(3);
+  auto dict = dictionary(int32(), utf8());
+  auto varlen = utf8();
+  ASSERT_EQ(FixedWidthInBits(*b), 1);
+  ASSERT_EQ(FixedWidthInBits(*i8), 8);
+  ASSERT_EQ(FixedWidthInBits(*i32), 4 * 8);
+  ASSERT_EQ(FixedWidthInBits(*fsb), 3 * 8);
+  ASSERT_EQ(FixedWidthInBits(*dict), 4 * 8);
+
+  ASSERT_EQ(FixedWidthInBits(*varlen), -1);
+  ASSERT_EQ(FixedWidthInBits(*varlen), -1);
+
+  ASSERT_EQ(FixedWidthInBits(*fsl(0, b)), -1);
+  ASSERT_EQ(FixedWidthInBits(*fsl(3, b)), -1);
+  ASSERT_EQ(FixedWidthInBits(*fsl(5, b)), -1);
+
+  ASSERT_EQ(FixedWidthInBits(*fsl(0, i8)), 0);
+  ASSERT_EQ(FixedWidthInBits(*fsl(3, i8)), 3 * 8);
+  ASSERT_EQ(FixedWidthInBits(*fsl(5, i8)), 5 * 8);
+  ASSERT_EQ(FixedWidthInBits(*fsl(0, i32)), 0);
+  ASSERT_EQ(FixedWidthInBits(*fsl(3, i32)), 4 * 3 * 8);
+  ASSERT_EQ(FixedWidthInBits(*fsl(5, i32)), 4 * 5 * 8);
+  ASSERT_EQ(FixedWidthInBits(*fsl(5, fsb)), 5 * 3 * 8);
+  ASSERT_EQ(FixedWidthInBits(*fsl(5, dict)), 5 * 4 * 8);
+
+  ASSERT_EQ(FixedWidthInBits(*fsl(2, fsl(0, i8))), 0);
+  ASSERT_EQ(FixedWidthInBits(*fsl(2, fsl(3, i8))), 2 * 3 * 8);
+  ASSERT_EQ(FixedWidthInBits(*fsl(2, fsl(5, i8))), 2 * 5 * 8);
+  ASSERT_EQ(FixedWidthInBits(*fsl(2, fsl(0, i32))), 0);
+  ASSERT_EQ(FixedWidthInBits(*fsl(2, fsl(3, i32))), 2 * 3 * 4 * 8);
+  ASSERT_EQ(FixedWidthInBits(*fsl(2, fsl(5, i32))), 2 * 5 * 4 * 8);
+  ASSERT_EQ(FixedWidthInBits(*fsl(2, fsl(0, fsb))), 0);
+  ASSERT_EQ(FixedWidthInBits(*fsl(2, fsl(3, fsb))), 2 * 3 * 3 * 8);
+  ASSERT_EQ(FixedWidthInBits(*fsl(2, fsl(5, fsb))), 2 * 5 * 3 * 8);
+  ASSERT_EQ(FixedWidthInBits(*fsl(2, fsl(0, dict))), 0);
+  ASSERT_EQ(FixedWidthInBits(*fsl(2, fsl(3, dict))), 2 * 3 * 4 * 8);
+  ASSERT_EQ(FixedWidthInBits(*fsl(2, fsl(5, dict))), 2 * 5 * 4 * 8);
+
+  ASSERT_EQ(FixedWidthInBits(*fsl(0, varlen)), -1);
+  ASSERT_EQ(FixedWidthInBits(*fsl(2, varlen)), -1);
+}
+
+}  // namespace arrow::util
diff --git a/cpp/src/arrow/util/fixed_width_test_util.h 
b/cpp/src/arrow/util/fixed_width_test_util.h
new file mode 100644
index 0000000000..ca141b7ca2
--- /dev/null
+++ b/cpp/src/arrow/util/fixed_width_test_util.h
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "arrow/array/builder_primitive.h"
+#include "arrow/builder.h"
+#include "arrow/type.h"
+#include "arrow/util/checked_cast.h"
+
+namespace arrow::util::internal {
+
+class NestedListGenerator {
+ public:
+  /// \brief Create a nested FixedSizeListType.
+  ///
+  /// \return `fixed_size_list(fixed_size_list(..., sizes[1]), sizes[0])`
+  static std::shared_ptr<DataType> NestedFSLType(
+      const std::shared_ptr<DataType>& inner_type, const std::vector<int>& 
sizes) {
+    auto type = inner_type;
+    for (auto it = sizes.rbegin(); it != sizes.rend(); it++) {
+      type = fixed_size_list(std::move(type), *it);
+    }
+    return type;
+  }
+
+  /// \brief Create a nested FixedListType.
+  ///
+  /// \return `list(list(...))`
+  static std::shared_ptr<DataType> NestedListType(
+      const std::shared_ptr<DataType>& inner_type, size_t depth) {
+    auto list_type = list(inner_type);
+    for (size_t i = 1; i < depth; i++) {
+      list_type = list(std::move(list_type));
+    }
+    return list_type;
+  }
+
+ private:
+  template <typename ArrowType>
+  static Status AppendNumeric(ArrayBuilder* builder, int64_t* next_value) {
+    using NumericBuilder = ::arrow::NumericBuilder<ArrowType>;
+    using value_type = typename NumericBuilder::value_type;
+    auto* numeric_builder = 
::arrow::internal::checked_cast<NumericBuilder*>(builder);
+    auto cast_next_value =
+        static_cast<value_type>(*next_value % 
std::numeric_limits<value_type>::max());
+    RETURN_NOT_OK(numeric_builder->Append(cast_next_value));
+    *next_value += 1;
+    return Status::OK();
+  }
+
+  // Append([...[[*next_inner_value++, *next_inner_value++, ...]]...])
+  static Status AppendNestedList(ArrayBuilder* nested_builder, const int* 
list_sizes,
+                                 int64_t* next_inner_value) {
+    using ::arrow::internal::checked_cast;
+    ArrayBuilder* builder = nested_builder;
+    auto type = builder->type();
+    if (type->id() == Type::FIXED_SIZE_LIST || type->id() == Type::LIST) {
+      const int list_size = *list_sizes;
+      if (type->id() == Type::FIXED_SIZE_LIST) {
+        auto* fsl_builder = checked_cast<FixedSizeListBuilder*>(builder);
+        assert(list_size == 
checked_cast<FixedSizeListType&>(*type).list_size());
+        RETURN_NOT_OK(fsl_builder->Append());
+        builder = fsl_builder->value_builder();
+      } else {  // type->id() == Type::LIST)
+        auto* list_builder = checked_cast<ListBuilder*>(builder);
+        RETURN_NOT_OK(list_builder->Append(/*is_valid=*/true, list_size));
+        builder = list_builder->value_builder();
+      }
+      list_sizes++;
+      for (int i = 0; i < list_size; i++) {
+        RETURN_NOT_OK(AppendNestedList(builder, list_sizes, next_inner_value));
+      }
+    } else {
+      switch (type->id()) {
+        case Type::INT8:
+          RETURN_NOT_OK(AppendNumeric<Int8Type>(builder, next_inner_value));
+          break;
+        case Type::INT16:
+          RETURN_NOT_OK(AppendNumeric<Int16Type>(builder, next_inner_value));
+          break;
+        case Type::INT32:
+          RETURN_NOT_OK(AppendNumeric<Int32Type>(builder, next_inner_value));
+          break;
+        case Type::INT64:
+          RETURN_NOT_OK(AppendNumeric<Int64Type>(builder, next_inner_value));
+          break;
+        default:
+          return Status::NotImplemented("Unsupported type: ", *type);
+      }
+    }
+    return Status::OK();
+  }
+
+  static Result<std::shared_ptr<Array>> NestedListArray(
+      ArrayBuilder* nested_builder, const std::vector<int>& list_sizes, 
int64_t length) {
+    int64_t next_inner_value = 0;
+    for (int64_t i = 0; i < length; i++) {
+      RETURN_NOT_OK(
+          AppendNestedList(nested_builder, list_sizes.data(), 
&next_inner_value));
+    }
+    return nested_builder->Finish();
+  }
+
+ public:
+  static Result<std::shared_ptr<Array>> NestedFSLArray(
+      const std::shared_ptr<DataType>& inner_type, const std::vector<int>& 
list_sizes,
+      int64_t length) {
+    auto nested_type = NestedFSLType(inner_type, list_sizes);
+    ARROW_ASSIGN_OR_RAISE(auto builder, MakeBuilder(nested_type));
+    return NestedListArray(builder.get(), list_sizes, length);
+  }
+
+  static Result<std::shared_ptr<Array>> NestedListArray(
+      const std::shared_ptr<DataType>& inner_type, const std::vector<int>& 
list_sizes,
+      int64_t length) {
+    auto nested_type = NestedListType(inner_type, list_sizes.size());
+    ARROW_ASSIGN_OR_RAISE(auto builder, MakeBuilder(nested_type));
+    return NestedListArray(builder.get(), list_sizes, length);
+  }
+
+  /// \brief Generate all possible nested list configurations of depth 1 to 
max_depth.
+  ///
+  /// Each configuration consists of a single inner value type and a list of 
sizes.
+  /// Both can be used with NestedFSLArray and NestedListArray to generate 
test data.
+  ///
+  /// The product of the list sizes and the size of the inner value type is 
always a power
+  /// of 2 no greater than max_power_of_2_size. For max_depth=3 and
+  /// max_power_of_2_size=32, this generates 108 configurations.
+  ///
+  /// \tparam Visit a function type with signature
+  ///     void(const std::shared_ptr<DataType>& inner_type,
+  ///          const std::vector<int>& list_sizes)
+  template <class Visit>
+  static void VisitAllNestedListConfigurations(
+      const std::vector<std::shared_ptr<DataType>>& inner_value_types, Visit&& 
visit,
+      int max_depth = 3, int max_power_of_2_size = 32) {
+    for (int depth = 1; depth <= max_depth; depth++) {
+      for (auto& type : inner_value_types) {
+        assert(is_fixed_width(*type));
+        int value_width = type->byte_width();
+
+        std::vector<int> list_sizes;  // stack of list sizes
+        auto pop = [&]() {            // pop the list_sizes stack
+          assert(!list_sizes.empty());
+          value_width /= list_sizes.back();
+          list_sizes.pop_back();
+        };
+        auto next = [&]() {  // double the top of the stack
+          assert(!list_sizes.empty());
+          value_width *= 2;
+          list_sizes.back() *= 2;
+          return value_width;
+        };
+        auto push_1s = [&]() {  // fill the stack with 1s
+          while (list_sizes.size() < static_cast<size_t>(depth)) {
+            list_sizes.push_back(1);
+          }
+        };
+
+        // Loop invariants:
+        //   value_width == product(list_sizes) * type->byte_width()
+        //   value_width is a power-of-2 (1, 2, 4, 8, 16, 
max_power_of_2_size=32)
+        push_1s();
+        do {
+          // for (auto x : list_sizes) printf("%d * ", x);
+          // printf("(%s) %d = %2d\n", type->name().c_str(), 
type->byte_width(),
+          // value_width);
+          visit(type, list_sizes);
+          // Advance to the next test case
+          while (!list_sizes.empty()) {
+            if (next() <= max_power_of_2_size) {
+              push_1s();
+              break;
+            }
+            pop();
+          }
+        } while (!list_sizes.empty());
+      }
+    }
+  }
+};
+
+}  // namespace arrow::util::internal

Reply via email to