This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1951a1ae69 GH-35498: [C++] Relax EnsureAlignment check in Acero from 
requiring 64-byte aligned buffers to requiring value-aligned buffers (#35565)
1951a1ae69 is described below

commit 1951a1ae69590ad58d97f6be929fa14485f81f42
Author: Weston Pace <[email protected]>
AuthorDate: Mon May 29 10:44:23 2023 -0700

    GH-35498: [C++] Relax EnsureAlignment check in Acero from requiring 64-byte 
aligned buffers to requiring value-aligned buffers (#35565)
    
    ### Rationale for this change
    
    Various compute kernels and Acero internals rely on type punning.  This is 
only safe when the buffer has appropriate alignment (e.g. casting uint8_t* to 
uint32_t* is only safe if the buffer has 4-byte alignment).  To avoid errors we 
enforced 64-byte alignment in Acero.  However, this is too strict.  While 
Arrow's allocators will always generate 64-byte aligned buffers this is not the 
case for numpy's allocators (and presumably many others).  This PR relaxes the 
constraint so that we o [...]
    
    ### What changes are included in this PR?
    
    The main complexity here is determining which buffers need aligned and how 
much.  A special flag kMallocAlignment is added which can be specified when 
calling CheckAlignment or EnforceAlignment to only require value-alignment and 
not a particular number.
    
    ### Are these changes tested?
    
    Yes
    
    ### Are there any user-facing changes?
    
    No
    * Closes: #35498
    
    Lead-authored-by: Weston Pace <[email protected]>
    Co-authored-by: Antoine Pitrou <[email protected]>
    Co-authored-by: Benjamin Kietzman <[email protected]>
    Signed-off-by: Weston Pace <[email protected]>
---
 cpp/src/arrow/CMakeLists.txt          |   1 +
 cpp/src/arrow/acero/exec_plan.cc      |  41 ++++-
 cpp/src/arrow/acero/exec_plan.h       |  40 ++++
 cpp/src/arrow/acero/plan_test.cc      |  40 ++++
 cpp/src/arrow/acero/source_node.cc    |  52 +++++-
 cpp/src/arrow/testing/gtest_util.cc   |  26 +++
 cpp/src/arrow/testing/gtest_util.h    |   9 +
 cpp/src/arrow/type_traits.cc          | 102 +++++++++++
 cpp/src/arrow/type_traits.h           |  23 +++
 cpp/src/arrow/util/align_util.cc      |  90 +++++++--
 cpp/src/arrow/util/align_util.h       | 108 +++++++++++
 cpp/src/arrow/util/align_util_test.cc | 333 +++++++++++++++++++++++++++++++++-
 12 files changed, 840 insertions(+), 25 deletions(-)

diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 4e6826bc61..d928cdf58b 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -180,6 +180,7 @@ set(ARROW_SRCS
     tensor/csf_converter.cc
     tensor/csx_converter.cc
     type.cc
+    type_traits.cc
     visitor.cc
     c/bridge.cc
     io/buffered.cc
diff --git a/cpp/src/arrow/acero/exec_plan.cc b/cpp/src/arrow/acero/exec_plan.cc
index 2fe8c484e4..2f03159de5 100644
--- a/cpp/src/arrow/acero/exec_plan.cc
+++ b/cpp/src/arrow/acero/exec_plan.cc
@@ -36,6 +36,7 @@
 #include "arrow/table.h"
 #include "arrow/util/async_generator.h"
 #include "arrow/util/checked_cast.h"
+#include "arrow/util/io_util.h"
 #include "arrow/util/key_value_metadata.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/string.h"
@@ -358,10 +359,38 @@ std::optional<int> GetNodeIndex(const 
std::vector<ExecNode*>& nodes,
   return std::nullopt;
 }
 
+const char* kAceroAlignmentHandlingEnvVar = "ACERO_ALIGNMENT_HANDLING";
+
+UnalignedBufferHandling DetermineDefaultUnalignedBufferHandling() {
+  auto maybe_value = 
::arrow::internal::GetEnvVar(kAceroAlignmentHandlingEnvVar);
+  if (!maybe_value.ok()) {
+    return UnalignedBufferHandling::kWarn;
+  }
+  std::string value = maybe_value.MoveValueUnsafe();
+  if (::arrow::internal::AsciiEqualsCaseInsensitive(value, "warn")) {
+    return UnalignedBufferHandling::kWarn;
+  } else if (::arrow::internal::AsciiEqualsCaseInsensitive(value, "ignore")) {
+    return UnalignedBufferHandling::kIgnore;
+  } else if (::arrow::internal::AsciiEqualsCaseInsensitive(value, 
"reallocate")) {
+    return UnalignedBufferHandling::kReallocate;
+  } else if (::arrow::internal::AsciiEqualsCaseInsensitive(value, "error")) {
+    return UnalignedBufferHandling::kError;
+  } else {
+    ARROW_LOG(WARNING) << "unrecognized value for ACERO_ALIGNMENT_HANDLING: " 
<< value;
+    return UnalignedBufferHandling::kWarn;
+  }
+}
+
 }  // namespace
 
 const uint32_t ExecPlan::kMaxBatchSize;
 
+UnalignedBufferHandling GetDefaultUnalignedBufferHandling() {
+  static UnalignedBufferHandling default_value =
+      DetermineDefaultUnalignedBufferHandling();
+  return default_value;
+}
+
 Result<std::shared_ptr<ExecPlan>> ExecPlan::Make(
     QueryOptions opts, ExecContext ctx,
     std::shared_ptr<const KeyValueMetadata> metadata) {
@@ -621,7 +650,8 @@ Future<std::shared_ptr<Table>> DeclarationToTableImpl(
                        query_options.function_registry);
   std::shared_ptr<std::shared_ptr<Table>> output_table =
       std::make_shared<std::shared_ptr<Table>>();
-  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan, 
ExecPlan::Make(exec_ctx));
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan,
+                        ExecPlan::Make(query_options, exec_ctx));
   TableSinkNodeOptions sink_options(output_table.get());
   sink_options.sequence_output = query_options.sequence_output;
   sink_options.names = std::move(query_options.field_names);
@@ -648,7 +678,8 @@ Future<BatchesWithCommonSchema> 
DeclarationToExecBatchesImpl(
   std::shared_ptr<Schema> out_schema;
   AsyncGenerator<std::optional<ExecBatch>> sink_gen;
   ExecContext exec_ctx(options.memory_pool, cpu_executor, 
options.function_registry);
-  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan, 
ExecPlan::Make(exec_ctx));
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan,
+                        ExecPlan::Make(options, exec_ctx));
   SinkNodeOptions sink_options(&sink_gen, &out_schema);
   sink_options.sequence_output = options.sequence_output;
   Declaration with_sink = Declaration::Sequence({declaration, {"sink", 
sink_options}});
@@ -678,7 +709,8 @@ Future<BatchesWithCommonSchema> 
DeclarationToExecBatchesImpl(
 Future<> DeclarationToStatusImpl(Declaration declaration, QueryOptions options,
                                  ::arrow::internal::Executor* cpu_executor) {
   ExecContext exec_ctx(options.memory_pool, cpu_executor, 
options.function_registry);
-  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan, 
ExecPlan::Make(exec_ctx));
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan,
+                        ExecPlan::Make(options, exec_ctx));
   ARROW_ASSIGN_OR_RAISE(ExecNode * last_node, 
declaration.AddToPlan(exec_plan.get()));
   if (!last_node->is_sink()) {
     ConsumingSinkNodeOptions sink_options(NullSinkNodeConsumer::Make());
@@ -972,7 +1004,8 @@ Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> 
DeclarationToRecordBatchGen
     ::arrow::internal::Executor* cpu_executor, std::shared_ptr<Schema>* 
out_schema) {
   auto converter = std::make_shared<BatchConverter>();
   ExecContext exec_ctx(options.memory_pool, cpu_executor, 
options.function_registry);
-  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> plan, 
ExecPlan::Make(exec_ctx));
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> plan,
+                        ExecPlan::Make(options, exec_ctx));
   Declaration with_sink = Declaration::Sequence(
       {declaration,
        {"sink", SinkNodeOptions(&converter->exec_batch_gen, 
&converter->schema)}});
diff --git a/cpp/src/arrow/acero/exec_plan.h b/cpp/src/arrow/acero/exec_plan.h
index 424d9107d2..f82a0f8106 100644
--- a/cpp/src/arrow/acero/exec_plan.h
+++ b/cpp/src/arrow/acero/exec_plan.h
@@ -496,6 +496,16 @@ struct ARROW_ACERO_EXPORT Declaration {
   std::string label;
 };
 
+/// \brief How to handle unaligned buffers
+enum class UnalignedBufferHandling { kWarn, kIgnore, kReallocate, kError };
+
+/// \brief get the default behavior of unaligned buffer handling
+///
+/// This is configurable via the ACERO_ALIGNMENT_HANDLING environment variable 
which
+/// can be set to "warn", "ignore", "reallocate", or "error".  If the 
environment
+/// variable is not set, or is set to an invalid value, this will return kWarn
+UnalignedBufferHandling GetDefaultUnalignedBufferHandling();
+
 /// \brief plan-wide options that can be specified when executing an execution 
plan
 struct ARROW_ACERO_EXPORT QueryOptions {
   /// \brief Should the plan use a legacy batching strategy
@@ -562,6 +572,36 @@ struct ARROW_ACERO_EXPORT QueryOptions {
   ///
   /// If set then the number of names must equal the number of output columns
   std::vector<std::string> field_names;
+
+  /// \brief Policy for unaligned buffers in source data
+  ///
+  /// Various compute functions and acero internals will type pun array
+  /// buffers from uint8_t* to some kind of value type (e.g. we might
+  /// cast to int32_t* to add two int32 arrays)
+  ///
+  /// If the buffer is poorly aligned (e.g. an int32 array is not aligned
+  /// on a 4-byte boundary) then this is technically undefined behavior in C++.
+  /// However, most modern compilers and CPUs are fairly tolerant of this
+  /// behavior and nothing bad (beyond a small hit to performance) is likely
+  /// to happen.
+  ///
+  /// Note that this only applies to source buffers.  All buffers allocated 
internally
+  /// by Acero will be suitably aligned.
+  ///
+  /// If this field is set to kWarn then Acero will check if any buffers are 
unaligned
+  /// and, if they are, will emit a warning.
+  ///
+  /// If this field is set to kReallocate then Acero will allocate a new, 
suitably aligned
+  /// buffer and copy the contents from the old buffer into this new buffer.
+  ///
+  /// If this field is set to kError then Acero will gracefully abort the plan 
instead.
+  ///
+  /// If this field is set to kIgnore then Acero will not even check if the 
buffers are
+  /// unaligned.
+  ///
+  /// If this field is not set then it will be treated as kWarn unless 
overridden
+  /// by the ACERO_ALIGNMENT_HANDLING environment variable
+  std::optional<UnalignedBufferHandling> unaligned_buffer_handling;
 };
 
 /// \brief Calculate the output schema of a declaration
diff --git a/cpp/src/arrow/acero/plan_test.cc b/cpp/src/arrow/acero/plan_test.cc
index 8ec5c0f70a..5f62550177 100644
--- a/cpp/src/arrow/acero/plan_test.cc
+++ b/cpp/src/arrow/acero/plan_test.cc
@@ -1704,5 +1704,45 @@ TEST(ExecPlanExecution, 
SegmentedAggregationWithBatchCrossingSegment) {
                                       {expected});
 }
 
+TEST(ExecPlanExecution, UnalignedInput) {
+  std::shared_ptr<Array> array = ArrayFromJSON(int32(), "[1, 2, 3]");
+  std::shared_ptr<Array> unaligned = UnalignBuffers(*array);
+  ASSERT_OK_AND_ASSIGN(ExecBatch sample_batch,
+                       ExecBatch::Make({unaligned}, array->length()));
+
+  BatchesWithSchema data;
+  data.batches = {std::move(sample_batch)};
+  data.schema = schema({field("i32", int32())});
+
+  Declaration plan = Declaration::Sequence({
+      {"exec_batch_source", ExecBatchSourceNodeOptions(data.schema, 
data.batches)},
+  });
+
+  int64_t initial_bytes_allocated = 
default_memory_pool()->total_bytes_allocated();
+
+  // By default we should warn and so the plan should finish ok
+  ASSERT_OK(DeclarationToStatus(plan));
+  ASSERT_EQ(initial_bytes_allocated, 
default_memory_pool()->total_bytes_allocated());
+
+  QueryOptions query_options;
+
+#ifndef ARROW_UBSAN
+  // Nothing should happen if we ignore alignment
+  query_options.unaligned_buffer_handling = UnalignedBufferHandling::kIgnore;
+  ASSERT_OK(DeclarationToStatus(plan, query_options));
+  ASSERT_EQ(initial_bytes_allocated, 
default_memory_pool()->total_bytes_allocated());
+#endif
+
+  query_options.unaligned_buffer_handling = UnalignedBufferHandling::kError;
+  ASSERT_THAT(DeclarationToStatus(plan, query_options),
+              Raises(StatusCode::Invalid,
+                     testing::HasSubstr("An input buffer was poorly 
aligned")));
+  ASSERT_EQ(initial_bytes_allocated, 
default_memory_pool()->total_bytes_allocated());
+
+  query_options.unaligned_buffer_handling = 
UnalignedBufferHandling::kReallocate;
+  ASSERT_OK(DeclarationToStatus(plan, query_options));
+  ASSERT_LT(initial_bytes_allocated, 
default_memory_pool()->total_bytes_allocated());
+}
+
 }  // namespace acero
 }  // namespace arrow
diff --git a/cpp/src/arrow/acero/source_node.cc 
b/cpp/src/arrow/acero/source_node.cc
index 6c138d8dcc..9cacb237b2 100644
--- a/cpp/src/arrow/acero/source_node.cc
+++ b/cpp/src/arrow/acero/source_node.cc
@@ -52,6 +52,46 @@ using arrow::internal::MapVector;
 namespace acero {
 namespace {
 
+Status HandleUnalignedBuffers(ExecBatch* batch, UnalignedBufferHandling 
handling) {
+  if (handling == UnalignedBufferHandling::kIgnore) {
+    return Status::OK();
+  }
+  for (auto& value : batch->values) {
+    if (value.is_array()) {
+      switch (handling) {
+        case UnalignedBufferHandling::kIgnore:
+          // Should be impossible to get here
+          return Status::OK();
+        case UnalignedBufferHandling::kError:
+          if (!arrow::util::CheckAlignment(*value.array(),
+                                           arrow::util::kValueAlignment)) {
+            return Status::Invalid(
+                "An input buffer was poorly aligned and 
UnalignedBufferHandling is set "
+                "to kError");
+          }
+          break;
+        case UnalignedBufferHandling::kWarn:
+          if (!arrow::util::CheckAlignment(*value.array(),
+                                           arrow::util::kValueAlignment)) {
+            ARROW_LOG(WARNING)
+                << "An input buffer was poorly aligned.  This could lead to 
crashes or "
+                   "poor performance on some hardware.  Please ensure that all 
Acero "
+                   "sources generate aligned buffers, or change the unaligned 
buffer "
+                   "handling configuration to silence this warning.";
+          }
+          break;
+        case UnalignedBufferHandling::kReallocate: {
+          ARROW_ASSIGN_OR_RAISE(value, arrow::util::EnsureAlignment(
+                                           value.array(), 
arrow::util::kValueAlignment,
+                                           default_memory_pool()));
+          break;
+        }
+      }
+    }
+  }
+  return Status::OK();
+}
+
 struct SourceNode : ExecNode, public TracedNode {
   SourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
              AsyncGenerator<std::optional<ExecBatch>> generator,
@@ -104,13 +144,11 @@ struct SourceNode : ExecNode, public TracedNode {
               batch_size = morsel_length;
             }
             ExecBatch batch = morsel.Slice(offset, batch_size);
-            for (auto& value : batch.values) {
-              if (value.is_array()) {
-                ARROW_ASSIGN_OR_RAISE(value, arrow::util::EnsureAlignment(
-                                                 value.make_array(), 
ipc::kArrowAlignment,
-                                                 default_memory_pool()));
-              }
-            }
+            UnalignedBufferHandling unaligned_buffer_handling =
+                
plan_->query_context()->options().unaligned_buffer_handling.value_or(
+                    GetDefaultUnalignedBufferHandling());
+            ARROW_RETURN_NOT_OK(
+                HandleUnalignedBuffers(&batch, unaligned_buffer_handling));
             if (has_ordering) {
               batch.index = batch_index;
             }
diff --git a/cpp/src/arrow/testing/gtest_util.cc 
b/cpp/src/arrow/testing/gtest_util.cc
index 9569375bda..6fc709874e 100644
--- a/cpp/src/arrow/testing/gtest_util.cc
+++ b/cpp/src/arrow/testing/gtest_util.cc
@@ -1099,4 +1099,30 @@ std::shared_ptr<GatingTask> GatingTask::Make(double 
timeout_seconds) {
   return std::make_shared<GatingTask>(timeout_seconds);
 }
 
+std::shared_ptr<ArrayData> UnalignBuffers(const ArrayData& array) {
+  std::vector<std::shared_ptr<Buffer>> new_buffers;
+  new_buffers.reserve(array.buffers.size());
+
+  for (const auto& buffer : array.buffers) {
+    if (!buffer) {
+      new_buffers.emplace_back();
+      continue;
+    }
+    EXPECT_OK_AND_ASSIGN(std::shared_ptr<Buffer> padded,
+                         AllocateBuffer(buffer->size() + 1, 
default_memory_pool()));
+    memcpy(padded->mutable_data() + 1, buffer->data(), buffer->size());
+    std::shared_ptr<Buffer> unaligned = SliceBuffer(padded, 1);
+    new_buffers.push_back(std::move(unaligned));
+  }
+
+  std::shared_ptr<ArrayData> array_data = std::make_shared<ArrayData>(array);
+  array_data->buffers = std::move(new_buffers);
+  return array_data;
+}
+
+std::shared_ptr<Array> UnalignBuffers(const Array& array) {
+  std::shared_ptr<ArrayData> array_data = UnalignBuffers(*array.data());
+  return MakeArray(array_data);
+}
+
 }  // namespace arrow
diff --git a/cpp/src/arrow/testing/gtest_util.h 
b/cpp/src/arrow/testing/gtest_util.h
index 55bd307b12..13fc0b3e81 100644
--- a/cpp/src/arrow/testing/gtest_util.h
+++ b/cpp/src/arrow/testing/gtest_util.h
@@ -532,4 +532,13 @@ class ARROW_TESTING_EXPORT GatingTask {
   std::shared_ptr<Impl> impl_;
 };
 
+/// \brief create an exact copy of the data where each buffer has a max 
alignment of 1
+///
+/// This method does not recurse into the dictionary or children
+ARROW_TESTING_EXPORT std::shared_ptr<ArrayData> UnalignBuffers(const 
ArrayData& array);
+/// \brief create an exact copy of the array where each buffer has a max 
alignment of 1
+///
+/// This method does not recurse into the dictionary or children
+ARROW_TESTING_EXPORT std::shared_ptr<Array> UnalignBuffers(const Array& array);
+
 }  // namespace arrow
diff --git a/cpp/src/arrow/type_traits.cc b/cpp/src/arrow/type_traits.cc
new file mode 100644
index 0000000000..ac16afe4b8
--- /dev/null
+++ b/cpp/src/arrow/type_traits.cc
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/type_traits.h"
+
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+int RequiredValueAlignmentForBuffer(Type::type type_id, int buffer_index) {
+  if (buffer_index == 2 && type_id == Type::DENSE_UNION) {
+    // A dense union array is the only array (so far) that requires alignment
+    // on a buffer with a buffer_index that is not equal to 1
+    return 4;
+  }
+  if (buffer_index != 1) {
+    // If the buffer index is 0 then either:
+    //  * The array type has no buffers, thus this shouldn't be called anyways
+    //  * The array has a validity buffer at 0, no alignment needed
+    //  * The array is a union array and has a types buffer at 0, no alignment 
needed
+    // If the buffer index is > 1 then, in all current cases, it represents 
binary
+    //  data and no alignment is needed.  The only exception is dense union 
buffers
+    //  which are checked above.
+    return 1;
+  }
+  DCHECK_NE(type_id, Type::DICTIONARY);
+  DCHECK_NE(type_id, Type::EXTENSION);
+
+  switch (type_id) {
+    case Type::NA:                 // No buffers
+    case Type::FIXED_SIZE_LIST:    // No second buffer (values in child array)
+    case Type::FIXED_SIZE_BINARY:  // Fixed size binary could be dangerous but 
the
+                                   // compute kernels don't type pun this.  
E.g. if
+                                   // an extension type is storing some kind 
of struct
+                                   // here then the user should do their own 
alignment
+                                   // check before casting to an array of 
structs
+    case Type::BOOL:               // Always treated as uint8_t*
+    case Type::INT8:               // Always treated as uint8_t*
+    case Type::UINT8:              // Always treated as uint8_t*
+    case Type::DENSE_UNION:        // Union arrays have a uint8_t* types 
buffer here
+    case Type::SPARSE_UNION:       // Union arrays have a uint8_t* types 
buffer here
+    case Type::RUN_END_ENCODED:    // No buffers
+    case Type::STRUCT:             // No second buffer
+      return 1;
+    case Type::INT16:
+    case Type::UINT16:
+    case Type::HALF_FLOAT:
+      return 2;
+    case Type::INT32:
+    case Type::UINT32:
+    case Type::FLOAT:
+    case Type::STRING:  // Offsets may be cast to int32_t*
+    case Type::BINARY:  // Offsets may be cast to int32_t*
+    case Type::DATE32:
+    case Type::TIME32:
+    case Type::LIST:  // Offsets may be cast to int32_t*, data is in child 
array
+    case Type::MAP:   // This is a list array
+    case Type::INTERVAL_MONTHS:    // Stored as int32_t*
+    case Type::INTERVAL_DAY_TIME:  // Stored as two contiguous 32-bit integers
+      return 4;
+    case Type::INT64:
+    case Type::UINT64:
+    case Type::DOUBLE:
+    case Type::DECIMAL128:    // May be cast to GenericBasicDecimal* which 
requires
+                              // alignment of 8
+    case Type::DECIMAL256:    // May be cast to GenericBasicDecimal* which 
requires
+                              // alignment of 8
+    case Type::LARGE_BINARY:  // Offsets may be cast to int64_t*
+    case Type::LARGE_LIST:    // Offsets may be cast to int64_t*
+    case Type::LARGE_STRING:  // Offsets may be cast to int64_t*
+    case Type::DATE64:
+    case Type::TIME64:
+    case Type::TIMESTAMP:
+    case Type::DURATION:
+    case Type::INTERVAL_MONTH_DAY_NANO:  // Stored as two 32-bit integers and 
a 64-bit
+                                         // integer
+      return 8;
+    case Type::DICTIONARY:
+    case Type::EXTENSION:
+    case Type::MAX_ID:
+      break;
+  }
+  Status::Invalid("RequiredValueAlignmentForBuffer called with invalid type id 
", type_id)
+      .Warn();
+  return 1;
+}
+
+}  // namespace arrow
diff --git a/cpp/src/arrow/type_traits.h b/cpp/src/arrow/type_traits.h
index 2d20d87d14..7204fd6d85 100644
--- a/cpp/src/arrow/type_traits.h
+++ b/cpp/src/arrow/type_traits.h
@@ -1309,6 +1309,29 @@ static inline int offset_bit_width(Type::type type_id) {
   return 0;
 }
 
+/// \brief Get the alignment a buffer should have to be considered "value 
aligned"
+///
+/// Some buffers are frequently type-punned.  For example, in an int32 array 
the
+/// values buffer is frequently cast to int32_t*
+///
+/// This sort of punning is technically only valid if the pointer is aligned 
to a
+/// proper width (e.g. 4 bytes in the case of int32).  However, most modern 
compilers
+/// are quite permissive if we get this wrong.  Note that this alignment is 
something
+/// that is guaranteed by malloc (e.g. new int32_t[] will return a buffer that 
is 4
+/// byte aligned) or common libraries (e.g. numpy) but it is not currently 
guaranteed
+/// by flight (GH-32276).
+///
+/// We call this "value aligned" and this method will calculate that required 
alignment.
+///
+/// \param type_id the type of the array containing the buffer
+///                Note: this should be the indices type for a dictionary 
array since
+///                A dictionary array's buffers are indices.  It should be the 
storage
+///                type for an extension array.
+/// \param buffer_index the index of the buffer to check, for example 0 will 
typically
+///                     give you the alignment expected of the validity buffer
+/// \return the required value alignment in bytes (1 if no alignment required)
+int RequiredValueAlignmentForBuffer(Type::type type_id, int buffer_index);
+
 /// \brief Check for an integer type (signed or unsigned)
 ///
 /// \param[in] type the type to check
diff --git a/cpp/src/arrow/util/align_util.cc b/cpp/src/arrow/util/align_util.cc
index d77650fcd6..7bc687b155 100644
--- a/cpp/src/arrow/util/align_util.cc
+++ b/cpp/src/arrow/util/align_util.cc
@@ -21,23 +21,68 @@
 #include "arrow/chunked_array.h"
 #include "arrow/record_batch.h"
 #include "arrow/table.h"
+#include "arrow/type_fwd.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/logging.h"
 
 namespace arrow {
 
 namespace util {
 
 bool CheckAlignment(const Buffer& buffer, int64_t alignment) {
+  if (alignment <= 0) {
+    return true;
+  }
   return buffer.address() % alignment == 0;
 }
 
-bool CheckAlignment(const ArrayData& array, int64_t alignment) {
-  for (const auto& buffer : array.buffers) {
-    if (buffer) {
-      if (!CheckAlignment(*buffer, alignment)) return false;
+namespace {
+
+// Returns the type that controls how the buffers of this ArrayData (not its 
children)
+// should behave
+Type::type GetTypeForBuffers(const ArrayData& array) {
+  Type::type type_id = array.type->storage_id();
+  if (type_id == Type::DICTIONARY) {
+    return ::arrow::internal::checked_pointer_cast<DictionaryType>(array.type)
+        ->index_type()
+        ->id();
+  }
+  return type_id;
+}
+
+// Checks to see if an array's own buffers are aligned but doesn't check
+// children
+bool CheckSelfAlignment(const ArrayData& array, int64_t alignment) {
+  if (alignment == kValueAlignment) {
+    Type::type type_id = GetTypeForBuffers(array);
+    for (std::size_t i = 0; i < array.buffers.size(); i++) {
+      if (array.buffers[i]) {
+        int expected_alignment =
+            RequiredValueAlignmentForBuffer(type_id, static_cast<int>(i));
+        if (!CheckAlignment(*array.buffers[i], expected_alignment)) {
+          return false;
+        }
+      }
+    }
+  } else {
+    for (const auto& buffer : array.buffers) {
+      if (buffer) {
+        if (!CheckAlignment(*buffer, alignment)) return false;
+      }
     }
   }
+  return true;
+}
+
+}  // namespace
+
+bool CheckAlignment(const ArrayData& array, int64_t alignment) {
+  if (!CheckSelfAlignment(array, alignment)) {
+    return false;
+  }
 
-  if (array.type->id() == Type::DICTIONARY) {
+  if (array.dictionary) {
     if (!CheckAlignment(*array.dictionary, alignment)) return false;
   }
 
@@ -97,9 +142,22 @@ bool CheckAlignment(const Table& table, int64_t alignment,
 Result<std::shared_ptr<Buffer>> EnsureAlignment(std::shared_ptr<Buffer> buffer,
                                                 int64_t alignment,
                                                 MemoryPool* memory_pool) {
+  if (alignment == kValueAlignment) {
+    return Status::Invalid(
+        "The kValueAlignment option may only be used to call EnsureAlignment 
on arrays "
+        "or tables and cannot be used with buffers");
+  }
+  if (alignment <= 0) {
+    return Status::Invalid("Alignment must be a positive integer");
+  }
   if (!CheckAlignment(*buffer, alignment)) {
-    ARROW_ASSIGN_OR_RAISE(auto new_buffer,
-                          AllocateBuffer(buffer->size(), alignment, 
memory_pool));
+    if (!buffer->is_cpu()) {
+      return Status::NotImplemented("Reallocating an unaligned non-CPU 
buffer.");
+    }
+    int64_t minimum_desired_alignment = std::max(kDefaultBufferAlignment, 
alignment);
+    ARROW_ASSIGN_OR_RAISE(
+        auto new_buffer,
+        AllocateBuffer(buffer->size(), minimum_desired_alignment, 
memory_pool));
     std::memcpy(new_buffer->mutable_data(), buffer->data(), buffer->size());
     return std::move(new_buffer);
   } else {
@@ -111,11 +169,18 @@ Result<std::shared_ptr<ArrayData>> 
EnsureAlignment(std::shared_ptr<ArrayData> ar
                                                    int64_t alignment,
                                                    MemoryPool* memory_pool) {
   if (!CheckAlignment(*array_data, alignment)) {
-    std::vector<std::shared_ptr<Buffer>> buffers_ = array_data->buffers;
-    for (size_t i = 0; i < buffers_.size(); ++i) {
-      if (buffers_[i]) {
+    std::vector<std::shared_ptr<Buffer>> buffers = array_data->buffers;
+    Type::type type_id = GetTypeForBuffers(*array_data);
+    for (size_t i = 0; i < buffers.size(); ++i) {
+      if (buffers[i]) {
+        int64_t expected_alignment = alignment;
+        if (alignment == kValueAlignment) {
+          expected_alignment =
+              RequiredValueAlignmentForBuffer(type_id, static_cast<int>(i));
+        }
         ARROW_ASSIGN_OR_RAISE(
-            buffers_[i], EnsureAlignment(std::move(buffers_[i]), alignment, 
memory_pool));
+            buffers[i],
+            EnsureAlignment(std::move(buffers[i]), expected_alignment, 
memory_pool));
       }
     }
 
@@ -130,10 +195,9 @@ Result<std::shared_ptr<ArrayData>> 
EnsureAlignment(std::shared_ptr<ArrayData> ar
     }
 
     auto new_array_data = ArrayData::Make(
-        array_data->type, array_data->length, std::move(buffers_), 
array_data->child_data,
+        array_data->type, array_data->length, std::move(buffers), 
array_data->child_data,
         array_data->dictionary, array_data->GetNullCount(), 
array_data->offset);
     return std::move(new_array_data);
-
   } else {
     return std::move(array_data);
   }
diff --git a/cpp/src/arrow/util/align_util.h b/cpp/src/arrow/util/align_util.h
index abdabe22fc..63df63749c 100644
--- a/cpp/src/arrow/util/align_util.h
+++ b/cpp/src/arrow/util/align_util.h
@@ -70,8 +70,33 @@ inline BitmapWordAlignParams BitmapWordAlign(const uint8_t* 
data, int64_t bit_of
 namespace util {
 
 // Functions to check if the provided Arrow object is aligned by the specified 
alignment
+
+/// \brief Special alignment value to use data type-specific alignment
+///
+/// If this is passed as the `alignment` in one of the CheckAlignment or 
EnsureAlignment
+/// functions, then the function will ensure ensure each buffer is suitably 
aligned
+/// for the data type of the array.  For example, given an int32 buffer the 
values
+/// buffer's address must be a multiple of 4.  Given a large_string buffer the 
offsets
+/// buffer's address must be a multiple of 8.
+constexpr int64_t kValueAlignment = -3;
+
+/// \brief Calculate if the buffer's address is a multiple of `alignment`
+///
+/// If `alignment` is less than or equal to 0 then this method will always 
return true
+/// \param buffer the buffer to check
+/// \param alignment the alignment (in bytes) to check for
 ARROW_EXPORT bool CheckAlignment(const Buffer& buffer, int64_t alignment);
+/// \brief Calculate if all buffers in the array data are aligned
+///
+/// This will also check the buffers in the dictionary and any children
+/// \param array the array data to check
+/// \param alignment the alignment (in bytes) to check for
 ARROW_EXPORT bool CheckAlignment(const ArrayData& array, int64_t alignment);
+/// \brief Calculate if all buffers in the array are aligned
+///
+/// This will also check the buffers in the dictionary and any children
+/// \param array the array to check
+/// \param alignment the alignment (in bytes) to check for
 ARROW_EXPORT bool CheckAlignment(const Array& array, int64_t alignment);
 
 // Following functions require an additional boolean vector which stores the
@@ -82,29 +107,112 @@ ARROW_EXPORT bool CheckAlignment(const Array& array, 
int64_t alignment);
 // of the constituent objects during the EnsureAlignment function where certain
 // objects can be ignored for further checking if we already know that they are
 // completely aligned.
+
+/// \brief Calculate which (if any) chunks in a chunked array are unaligned
+/// \param array the array to check
+/// \param alignment the alignment (in bytes) to check for
+/// \param needs_alignment an output vector that will store the results of the 
check
+///        it must be set to a valid vector.  Extra elements will be added to 
the end
+///        of the vector for each chunk that is checked.  `true` will be 
stored if
+///        the chunk is unaligned.
+/// \param offset the index of the chunk to start checking
+/// \return true if all chunks (starting at `offset`) are aligned, false 
otherwise
 ARROW_EXPORT bool CheckAlignment(const ChunkedArray& array, int64_t alignment,
                                  std::vector<bool>* needs_alignment, int 
offset = 0);
+
+/// \brief calculate which (if any) columns in a record batch are unaligned
+/// \param batch the batch to check
+/// \param alignment the alignment (in bytes) to check for
+/// \param needs_alignment an output vector that will store the results of the
+///        check.  It must be set to a valid vector.  Extra elements will be 
added
+///        to the end of the vector for each column that is checked.  `true` 
will be
+///        stored if the column is unaligned.
 ARROW_EXPORT bool CheckAlignment(const RecordBatch& batch, int64_t alignment,
                                  std::vector<bool>* needs_alignment);
+
+/// \brief calculate which (if any) columns in a table are unaligned
+/// \param table the table to check
+/// \param alignment the alignment (in bytes) to check for
+/// \param needs_alignment an output vector that will store the results of the
+///        check.  It must be set to a valid vector.  Extra elements will be 
added
+///        to the end of the vector for each column that is checked.  `true` 
will be
+///        stored if the column is unaligned.
 ARROW_EXPORT bool CheckAlignment(const Table& table, int64_t alignment,
                                  std::vector<bool>* needs_alignment);
 
+/// \brief return a buffer that has the given alignment and the same data as 
the input
+/// buffer
+///
+/// If the input buffer is already aligned then this method will return the 
input buffer
+/// If the input buffer is not already aligned then this method will allocate 
a new
+/// buffer.  The alignment of the new buffer will have at least
+/// max(kDefaultBufferAlignment, alignment) bytes of alignment.
+///
+/// \param buffer the buffer to check
+/// \param alignment the alignment (in bytes) to check for
+/// \param memory_pool a memory pool that will be used to allocate a new 
buffer if the
+///        input buffer is not sufficiently aligned
 ARROW_EXPORT Result<std::shared_ptr<Buffer>> EnsureAlignment(
     std::shared_ptr<Buffer> buffer, int64_t alignment, MemoryPool* 
memory_pool);
 
+/// \brief return an array data where all buffers are aligned by the given 
alignment
+///
+/// If any input buffer is already aligned then this method will reuse that 
same input
+/// buffer.
+///
+/// \param array_data the array data to check
+/// \param alignment the alignment (in bytes) to check for
+/// \param memory_pool a memory pool that will be used to allocate new buffers 
if any
+///        input buffer is not sufficiently aligned
 ARROW_EXPORT Result<std::shared_ptr<ArrayData>> EnsureAlignment(
     std::shared_ptr<ArrayData> array_data, int64_t alignment, MemoryPool* 
memory_pool);
 
+/// \brief return an array where all buffers are aligned by the given alignment
+///
+/// If any input buffer is already aligned then this method will reuse that 
same input
+/// buffer.
+///
+/// \param array the array to check
+/// \param alignment the alignment (in bytes) to check for
+/// \param memory_pool a memory pool that will be used to allocate new buffers 
if any
+///        input buffer is not sufficiently aligned
 ARROW_EXPORT Result<std::shared_ptr<Array>> 
EnsureAlignment(std::shared_ptr<Array> array,
                                                             int64_t alignment,
                                                             MemoryPool* 
memory_pool);
 
+/// \brief return a chunked array where all buffers are aligned by the given 
alignment
+///
+/// If any input buffer is already aligned then this method will reuse that 
same input
+/// buffer.
+///
+/// \param array the chunked array to check
+/// \param alignment the alignment (in bytes) to check for
+/// \param memory_pool a memory pool that will be used to allocate new buffers 
if any
+///        input buffer is not sufficiently aligned
 ARROW_EXPORT Result<std::shared_ptr<ChunkedArray>> EnsureAlignment(
     std::shared_ptr<ChunkedArray> array, int64_t alignment, MemoryPool* 
memory_pool);
 
+/// \brief return a record batch where all buffers are aligned by the given 
alignment
+///
+/// If any input buffer is already aligned then this method will reuse that 
same input
+/// buffer.
+///
+/// \param batch the batch to check
+/// \param alignment the alignment (in bytes) to check for
+/// \param memory_pool a memory pool that will be used to allocate new buffers 
if any
+///        input buffer is not sufficiently aligned
 ARROW_EXPORT Result<std::shared_ptr<RecordBatch>> EnsureAlignment(
     std::shared_ptr<RecordBatch> batch, int64_t alignment, MemoryPool* 
memory_pool);
 
+/// \brief return a table where all buffers are aligned by the given alignment
+///
+/// If any input buffer is already aligned then this method will reuse that 
same input
+/// buffer.
+///
+/// \param table the table to check
+/// \param alignment the alignment (in bytes) to check for
+/// \param memory_pool a memory pool that will be used to allocate new buffers 
if any
+///        input buffer is not sufficiently aligned
 ARROW_EXPORT Result<std::shared_ptr<Table>> 
EnsureAlignment(std::shared_ptr<Table> table,
                                                             int64_t alignment,
                                                             MemoryPool* 
memory_pool);
diff --git a/cpp/src/arrow/util/align_util_test.cc 
b/cpp/src/arrow/util/align_util_test.cc
index c4ec83de3e..a14041597a 100644
--- a/cpp/src/arrow/util/align_util_test.cc
+++ b/cpp/src/arrow/util/align_util_test.cc
@@ -15,17 +15,24 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <gtest/gtest.h>
 #include <algorithm>
 #include <cstdint>
 #include <utility>
 #include <vector>
 
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
 #include "arrow/array.h"
+#include "arrow/buffer.h"
 #include "arrow/record_batch.h"
 #include "arrow/table.h"
+#include "arrow/testing/extension_type.h"
 #include "arrow/testing/gtest_util.h"
+#include "arrow/testing/matchers.h"
 #include "arrow/testing/random.h"
+#include "arrow/type.h"
+#include "arrow/type_fwd.h"
 #include "arrow/util/align_util.h"
 
 namespace arrow {
@@ -166,6 +173,62 @@ TEST(BitmapWordAlign, UnalignedDataStart) {
 }
 }  // namespace internal
 
+TEST(EnsureAlignment, Buffer) {
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<Buffer> buffer, 
AllocateBuffer(/*size=*/1024));
+  std::shared_ptr<Buffer> unaligned_view = SliceBuffer(buffer, 1);
+  std::shared_ptr<Buffer> aligned_view = SliceBuffer(buffer, 0);
+
+  ASSERT_TRUE(util::CheckAlignment(*aligned_view, kDefaultBufferAlignment));
+  ASSERT_FALSE(util::CheckAlignment(*unaligned_view, /*alignment=*/2));
+
+  ASSERT_OK_AND_ASSIGN(
+      std::shared_ptr<Buffer> aligned_dupe,
+      util::EnsureAlignment(aligned_view, /*alignment=*/8, 
default_memory_pool()));
+
+  ASSERT_EQ(aligned_view->data(), aligned_dupe->data());
+  ASSERT_TRUE(util::CheckAlignment(*aligned_dupe, kDefaultBufferAlignment));
+
+  ASSERT_OK_AND_ASSIGN(
+      std::shared_ptr<Buffer> realigned,
+      util::EnsureAlignment(unaligned_view, /*alignment=*/8, 
default_memory_pool()));
+
+  ASSERT_NE(realigned->data(), unaligned_view->data());
+  // Even though we only asked to check for 8 bytes of alignment, any 
reallocation will
+  // always allocate at least kDefaultBufferAlignment bytes of alignment
+  ASSERT_TRUE(util::CheckAlignment(*realigned, 
/*alignment=*/kDefaultBufferAlignment));
+
+  ASSERT_OK_AND_ASSIGN(
+      std::shared_ptr<Buffer> realigned_large,
+      util::EnsureAlignment(unaligned_view, /*alignment=*/256, 
default_memory_pool()));
+  // If the user wants more than kDefaultBufferAlignment they should get it
+  ASSERT_TRUE(util::CheckAlignment(*realigned_large, /*alignment=*/256));
+
+  ASSERT_OK_AND_ASSIGN(
+      std::shared_ptr<Buffer> realigned_huge,
+      util::EnsureAlignment(unaligned_view, /*alignment=*/2048, 
default_memory_pool()));
+  // It should even be valid for the alignment to be larger than the buffer 
size itself
+  ASSERT_TRUE(util::CheckAlignment(*realigned_huge, /*alignment=*/2048));
+}
+
+TEST(EnsureAlignment, BufferInvalid) {
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<Buffer> buffer, 
AllocateBuffer(/*size=*/1024));
+
+  // This is nonsense but not worth introducing a Status return.  We just 
return true.
+  ASSERT_TRUE(util::CheckAlignment(*buffer, 0));
+  ASSERT_TRUE(util::CheckAlignment(*buffer, -1));
+
+  ASSERT_THAT(util::EnsureAlignment(buffer, /*alignment=*/0, 
default_memory_pool()),
+              Raises(StatusCode::Invalid,
+                     testing::HasSubstr("Alignment must be a positive 
integer")));
+
+  ASSERT_THAT(
+      util::EnsureAlignment(buffer, /*alignment=*/util::kValueAlignment,
+                            default_memory_pool()),
+      Raises(StatusCode::Invalid,
+             testing::HasSubstr(
+                 "may only be used to call EnsureAlignment on arrays or 
tables")));
+}
+
 TEST(EnsureAlignment, Array) {
   MemoryPool* pool = default_memory_pool();
   auto rand = ::arrow::random::RandomArrayGenerator(1923);
@@ -278,4 +341,272 @@ TEST(EnsureAlignment, Table) {
   ASSERT_EQ(util::CheckAlignment(*aligned_table, 2048, &needs_alignment), 
true);
 }
 
+using TypesRequiringSomeKindOfAlignment =
+    testing::Types<Int16Type, Int32Type, Int64Type, UInt16Type, UInt32Type, 
UInt64Type,
+                   FloatType, DoubleType, Date32Type, Date64Type, Time32Type, 
Time64Type,
+                   Decimal128Type, Decimal256Type, TimestampType, 
DurationType, MapType,
+                   DenseUnionType, LargeBinaryType, LargeListType, 
LargeStringType,
+                   MonthIntervalType, DayTimeIntervalType, 
MonthDayNanoIntervalType>;
+
+using TypesNotRequiringAlignment =
+    testing::Types<NullType, Int8Type, UInt8Type, FixedSizeListType, 
FixedSizeBinaryType,
+                   BooleanType, SparseUnionType>;
+
+template <typename ArrowType>
+std::shared_ptr<DataType> sample_type() {
+  return TypeTraits<ArrowType>::type_singleton();
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<FixedSizeBinaryType>() {
+  return fixed_size_binary(16);
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<FixedSizeListType>() {
+  return fixed_size_list(uint8(), 16);
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<Decimal128Type>() {
+  return decimal128(32, 6);
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<Decimal256Type>() {
+  return decimal256(60, 10);
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<LargeListType>() {
+  return large_list(int8());
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<DenseUnionType>() {
+  return dense_union({field("x", int8()), field("y", uint8())});
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<MapType>() {
+  return map(utf8(), field("item", utf8()));
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<DurationType>() {
+  return duration(TimeUnit::NANO);
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<TimestampType>() {
+  return timestamp(TimeUnit::NANO);
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<Time32Type>() {
+  return time32(TimeUnit::SECOND);
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<Time64Type>() {
+  return time64(TimeUnit::NANO);
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<SparseUnionType>() {
+  return sparse_union({field("x", uint8()), field("y", int8())}, {1, 2});
+}
+
+template <typename ArrowType>
+std::shared_ptr<ArrayData> SampleArray() {
+  random::RandomArrayGenerator gen(42);
+  return gen.ArrayOf(sample_type<ArrowType>(), 100)->data();
+}
+
+template <>
+std::shared_ptr<ArrayData> SampleArray<SparseUnionType>() {
+  auto ty = sparse_union({field("ints", int64()), field("strs", utf8())}, {2, 
7});
+  auto ints = ArrayFromJSON(int64(), "[0, 1, 2, 3]");
+  auto strs = ArrayFromJSON(utf8(), R"(["a", null, "c", "d"])");
+  auto ids = ArrayFromJSON(int8(), "[2, 7, 2, 7]")->data()->buffers[1];
+  const int length = 4;
+  SparseUnionArray arr(ty, length, {ints, strs}, ids);
+  return arr.data();
+}
+
+class ValueAlignment : public ::testing::Test {
+ public:
+  void CheckModified(const ArrayData& src, const ArrayData& dst) {
+    ASSERT_EQ(src.buffers.size(), dst.buffers.size());
+    for (std::size_t i = 0; i < src.buffers.size(); i++) {
+      if (!src.buffers[i] || !dst.buffers[i]) {
+        continue;
+      }
+      if (src.buffers[i]->address() != dst.buffers[i]->address()) {
+        return;
+      }
+    }
+    FAIL() << "Expected at least one buffer to have been modified by 
EnsureAlignment";
+  }
+
+  void CheckUnmodified(const ArrayData& src, const ArrayData& dst) {
+    ASSERT_EQ(src.buffers.size(), dst.buffers.size());
+    for (std::size_t i = 0; i < src.buffers.size(); i++) {
+      if (!src.buffers[i] || !dst.buffers[i]) {
+        continue;
+      }
+      ASSERT_EQ(src.buffers[i]->address(), dst.buffers[i]->address());
+    }
+  }
+};
+
+template <typename T>
+class ValueAlignmentRequired : public ValueAlignment {};
+template <typename T>
+class ValueAlignmentNotRequired : public ValueAlignment {};
+
+TYPED_TEST_SUITE(ValueAlignmentRequired, TypesRequiringSomeKindOfAlignment);
+TYPED_TEST_SUITE(ValueAlignmentNotRequired, TypesNotRequiringAlignment);
+
+// The default buffer alignment should always be large enough for value 
alignment
+TYPED_TEST(ValueAlignmentRequired, DefaultAlignmentSufficient) {
+  std::shared_ptr<ArrayData> data = SampleArray<TypeParam>();
+  ASSERT_OK_AND_ASSIGN(
+      std::shared_ptr<ArrayData> aligned,
+      util::EnsureAlignment(data, util::kValueAlignment, 
default_memory_pool()));
+
+  ASSERT_TRUE(util::CheckAlignment(*aligned, util::kValueAlignment));
+  AssertArraysEqual(*MakeArray(data), *MakeArray(aligned));
+  this->CheckUnmodified(*data, *aligned);
+}
+
+TYPED_TEST(ValueAlignmentRequired, RoundTrip) {
+  std::shared_ptr<ArrayData> data = SampleArray<TypeParam>();
+  std::shared_ptr<ArrayData> unaligned = UnalignBuffers(*data);
+  ASSERT_OK_AND_ASSIGN(
+      std::shared_ptr<ArrayData> aligned,
+      util::EnsureAlignment(unaligned, util::kValueAlignment, 
default_memory_pool()));
+
+  ASSERT_TRUE(util::CheckAlignment(*aligned, util::kValueAlignment));
+  AssertArraysEqual(*MakeArray(data), *MakeArray(aligned));
+  this->CheckModified(*unaligned, *aligned);
+}
+
+TYPED_TEST(ValueAlignmentNotRequired, RoundTrip) {
+  std::shared_ptr<ArrayData> data = SampleArray<TypeParam>();
+  std::shared_ptr<ArrayData> unaligned = UnalignBuffers(*data);
+  ASSERT_OK_AND_ASSIGN(
+      std::shared_ptr<ArrayData> aligned,
+      util::EnsureAlignment(unaligned, util::kValueAlignment, 
default_memory_pool()));
+
+  ASSERT_TRUE(util::CheckAlignment(*aligned, util::kValueAlignment));
+  AssertArraysEqual(*MakeArray(data), *MakeArray(aligned));
+  this->CheckUnmodified(*unaligned, *aligned);
+}
+
+TYPED_TEST(ValueAlignmentNotRequired, DefaultAlignmentSufficient) {
+  std::shared_ptr<ArrayData> data = SampleArray<TypeParam>();
+  ASSERT_OK_AND_ASSIGN(
+      std::shared_ptr<ArrayData> aligned,
+      util::EnsureAlignment(data, util::kValueAlignment, 
default_memory_pool()));
+
+  ASSERT_TRUE(util::CheckAlignment(*aligned, util::kValueAlignment));
+  AssertArraysEqual(*MakeArray(data), *MakeArray(aligned));
+  this->CheckUnmodified(*data, *aligned);
+}
+
+TEST_F(ValueAlignment, DenseUnion) {
+  std::shared_ptr<ArrayData> data = SampleArray<DenseUnionType>();
+  ASSERT_TRUE(util::CheckAlignment(*data, util::kValueAlignment));
+
+  std::shared_ptr<ArrayData> unaligned = UnalignBuffers(*data);
+  ASSERT_FALSE(util::CheckAlignment(*unaligned, util::kValueAlignment));
+  // Dense union arrays are the only array type where the buffer at index 2 is 
expected
+  // to be aligned (it contains 32-bit offsets and should be 4-byte aligned)
+  ASSERT_FALSE(util::CheckAlignment(*unaligned->buffers[2], 4));
+
+  ASSERT_OK_AND_ASSIGN(
+      std::shared_ptr<ArrayData> realigned,
+      util::EnsureAlignment(unaligned, util::kValueAlignment, 
default_memory_pool()));
+
+  ASSERT_TRUE(util::CheckAlignment(*realigned, util::kValueAlignment));
+  ASSERT_TRUE(util::CheckAlignment(*realigned->buffers[2], 4));
+  // The buffer at index 1 is the types buffer which does not require 
realignment
+  ASSERT_EQ(unaligned->buffers[1]->data(), realigned->buffers[1]->data());
+}
+
+TEST_F(ValueAlignment, RunEndEncoded) {
+  // Run end requires alignment, value type does not
+  std::shared_ptr<Array> run_ends = ArrayFromJSON(int32(), "[3, 5]");
+  std::shared_ptr<Array> values = ArrayFromJSON(int8(), "[50, 100]");
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> array,
+                       RunEndEncodedArray::Make(/*logical_length=*/5, 
std::move(run_ends),
+                                                std::move(values), 0));
+
+  std::shared_ptr<ArrayData> unaligned_ree = 
std::make_shared<ArrayData>(*array->data());
+  unaligned_ree->child_data[0] = UnalignBuffers(*unaligned_ree->child_data[0]);
+  unaligned_ree->child_data[1] = UnalignBuffers(*unaligned_ree->child_data[1]);
+
+  std::shared_ptr<ArrayData> aligned_ree = 
std::make_shared<ArrayData>(*unaligned_ree);
+
+  ASSERT_OK_AND_ASSIGN(
+      aligned_ree,
+      util::EnsureAlignment(aligned_ree, util::kValueAlignment, 
default_memory_pool()));
+  ASSERT_TRUE(util::CheckAlignment(*aligned_ree, util::kValueAlignment));
+
+  this->CheckModified(*unaligned_ree->child_data[0], 
*aligned_ree->child_data[0]);
+  this->CheckUnmodified(*unaligned_ree->child_data[1], 
*aligned_ree->child_data[1]);
+}
+
+TEST_F(ValueAlignment, Dictionary) {
+  // Dictionary values require alignment, dictionary indices do not
+  std::shared_ptr<DataType> int8_utf8 = dictionary(int8(), utf8());
+  std::shared_ptr<Array> array = ArrayFromJSON(int8_utf8, R"(["x", "x", 
"y"])");
+
+  std::shared_ptr<ArrayData> unaligned_dict = 
std::make_shared<ArrayData>(*array->data());
+  unaligned_dict->dictionary = UnalignBuffers(*unaligned_dict->dictionary);
+  unaligned_dict = UnalignBuffers(*unaligned_dict);
+
+  std::shared_ptr<ArrayData> aligned_dict = 
std::make_shared<ArrayData>(*unaligned_dict);
+
+  ASSERT_OK_AND_ASSIGN(
+      aligned_dict,
+      util::EnsureAlignment(aligned_dict, util::kValueAlignment, 
default_memory_pool()));
+
+  ASSERT_TRUE(util::CheckAlignment(*aligned_dict, util::kValueAlignment));
+  this->CheckUnmodified(*unaligned_dict, *aligned_dict);
+  this->CheckModified(*unaligned_dict->dictionary, *aligned_dict->dictionary);
+
+  // Dictionary values do not require alignment, dictionary indices do
+  std::shared_ptr<DataType> int16_int8 = dictionary(int16(), int8());
+  array = ArrayFromJSON(int16_int8, R"([7, 11])");
+
+  unaligned_dict = std::make_shared<ArrayData>(*array->data());
+  unaligned_dict->dictionary = UnalignBuffers(*unaligned_dict->dictionary);
+  unaligned_dict = UnalignBuffers(*unaligned_dict);
+
+  aligned_dict = std::make_shared<ArrayData>(*unaligned_dict);
+
+  ASSERT_OK_AND_ASSIGN(
+      aligned_dict,
+      util::EnsureAlignment(aligned_dict, util::kValueAlignment, 
default_memory_pool()));
+
+  ASSERT_TRUE(util::CheckAlignment(*aligned_dict, util::kValueAlignment));
+  this->CheckModified(*unaligned_dict, *aligned_dict);
+  this->CheckUnmodified(*unaligned_dict->dictionary, 
*aligned_dict->dictionary);
+}
+
+TEST_F(ValueAlignment, Extension) {
+  std::shared_ptr<Array> array = ExampleSmallint();
+
+  std::shared_ptr<ArrayData> unaligned = UnalignBuffers(*array->data());
+
+  ASSERT_OK_AND_ASSIGN(
+      std::shared_ptr<ArrayData> aligned,
+      util::EnsureAlignment(unaligned, util::kValueAlignment, 
default_memory_pool()));
+
+  ASSERT_TRUE(util::CheckAlignment(*aligned, util::kValueAlignment));
+  this->CheckModified(*unaligned, *aligned);
+}
+
 }  // namespace arrow

Reply via email to