This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 1951a1ae69 GH-35498: [C++] Relax EnsureAlignment check in Acero from
requiring 64-byte aligned buffers to requiring value-aligned buffers (#35565)
1951a1ae69 is described below
commit 1951a1ae69590ad58d97f6be929fa14485f81f42
Author: Weston Pace <[email protected]>
AuthorDate: Mon May 29 10:44:23 2023 -0700
GH-35498: [C++] Relax EnsureAlignment check in Acero from requiring 64-byte
aligned buffers to requiring value-aligned buffers (#35565)
### Rationale for this change
Various compute kernels and Acero internals rely on type punning. This is
only safe when the buffer has appropriate alignment (e.g. casting uint8_t* to
uint32_t* is only safe if the buffer has 4-byte alignment). To avoid errors we
enforced 64-byte alignment in Acero. However, this is too strict. While
Arrow's allocators will always generate 64-byte aligned buffers this is not the
case for numpy's allocators (and presumably many others). This PR relaxes the
constraint so that we o [...]
### What changes are included in this PR?
The main complexity here is determining which buffers need aligned and how
much. A special flag kMallocAlignment is added which can be specified when
calling CheckAlignment or EnforceAlignment to only require value-alignment and
not a particular number.
### Are these changes tested?
Yes
### Are there any user-facing changes?
No
* Closes: #35498
Lead-authored-by: Weston Pace <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Co-authored-by: Benjamin Kietzman <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
---
cpp/src/arrow/CMakeLists.txt | 1 +
cpp/src/arrow/acero/exec_plan.cc | 41 ++++-
cpp/src/arrow/acero/exec_plan.h | 40 ++++
cpp/src/arrow/acero/plan_test.cc | 40 ++++
cpp/src/arrow/acero/source_node.cc | 52 +++++-
cpp/src/arrow/testing/gtest_util.cc | 26 +++
cpp/src/arrow/testing/gtest_util.h | 9 +
cpp/src/arrow/type_traits.cc | 102 +++++++++++
cpp/src/arrow/type_traits.h | 23 +++
cpp/src/arrow/util/align_util.cc | 90 +++++++--
cpp/src/arrow/util/align_util.h | 108 +++++++++++
cpp/src/arrow/util/align_util_test.cc | 333 +++++++++++++++++++++++++++++++++-
12 files changed, 840 insertions(+), 25 deletions(-)
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 4e6826bc61..d928cdf58b 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -180,6 +180,7 @@ set(ARROW_SRCS
tensor/csf_converter.cc
tensor/csx_converter.cc
type.cc
+ type_traits.cc
visitor.cc
c/bridge.cc
io/buffered.cc
diff --git a/cpp/src/arrow/acero/exec_plan.cc b/cpp/src/arrow/acero/exec_plan.cc
index 2fe8c484e4..2f03159de5 100644
--- a/cpp/src/arrow/acero/exec_plan.cc
+++ b/cpp/src/arrow/acero/exec_plan.cc
@@ -36,6 +36,7 @@
#include "arrow/table.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/checked_cast.h"
+#include "arrow/util/io_util.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/string.h"
@@ -358,10 +359,38 @@ std::optional<int> GetNodeIndex(const
std::vector<ExecNode*>& nodes,
return std::nullopt;
}
+const char* kAceroAlignmentHandlingEnvVar = "ACERO_ALIGNMENT_HANDLING";
+
+UnalignedBufferHandling DetermineDefaultUnalignedBufferHandling() {
+ auto maybe_value =
::arrow::internal::GetEnvVar(kAceroAlignmentHandlingEnvVar);
+ if (!maybe_value.ok()) {
+ return UnalignedBufferHandling::kWarn;
+ }
+ std::string value = maybe_value.MoveValueUnsafe();
+ if (::arrow::internal::AsciiEqualsCaseInsensitive(value, "warn")) {
+ return UnalignedBufferHandling::kWarn;
+ } else if (::arrow::internal::AsciiEqualsCaseInsensitive(value, "ignore")) {
+ return UnalignedBufferHandling::kIgnore;
+ } else if (::arrow::internal::AsciiEqualsCaseInsensitive(value,
"reallocate")) {
+ return UnalignedBufferHandling::kReallocate;
+ } else if (::arrow::internal::AsciiEqualsCaseInsensitive(value, "error")) {
+ return UnalignedBufferHandling::kError;
+ } else {
+ ARROW_LOG(WARNING) << "unrecognized value for ACERO_ALIGNMENT_HANDLING: "
<< value;
+ return UnalignedBufferHandling::kWarn;
+ }
+}
+
} // namespace
const uint32_t ExecPlan::kMaxBatchSize;
+UnalignedBufferHandling GetDefaultUnalignedBufferHandling() {
+ static UnalignedBufferHandling default_value =
+ DetermineDefaultUnalignedBufferHandling();
+ return default_value;
+}
+
Result<std::shared_ptr<ExecPlan>> ExecPlan::Make(
QueryOptions opts, ExecContext ctx,
std::shared_ptr<const KeyValueMetadata> metadata) {
@@ -621,7 +650,8 @@ Future<std::shared_ptr<Table>> DeclarationToTableImpl(
query_options.function_registry);
std::shared_ptr<std::shared_ptr<Table>> output_table =
std::make_shared<std::shared_ptr<Table>>();
- ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan,
ExecPlan::Make(exec_ctx));
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan,
+ ExecPlan::Make(query_options, exec_ctx));
TableSinkNodeOptions sink_options(output_table.get());
sink_options.sequence_output = query_options.sequence_output;
sink_options.names = std::move(query_options.field_names);
@@ -648,7 +678,8 @@ Future<BatchesWithCommonSchema>
DeclarationToExecBatchesImpl(
std::shared_ptr<Schema> out_schema;
AsyncGenerator<std::optional<ExecBatch>> sink_gen;
ExecContext exec_ctx(options.memory_pool, cpu_executor,
options.function_registry);
- ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan,
ExecPlan::Make(exec_ctx));
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan,
+ ExecPlan::Make(options, exec_ctx));
SinkNodeOptions sink_options(&sink_gen, &out_schema);
sink_options.sequence_output = options.sequence_output;
Declaration with_sink = Declaration::Sequence({declaration, {"sink",
sink_options}});
@@ -678,7 +709,8 @@ Future<BatchesWithCommonSchema>
DeclarationToExecBatchesImpl(
Future<> DeclarationToStatusImpl(Declaration declaration, QueryOptions options,
::arrow::internal::Executor* cpu_executor) {
ExecContext exec_ctx(options.memory_pool, cpu_executor,
options.function_registry);
- ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan,
ExecPlan::Make(exec_ctx));
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan,
+ ExecPlan::Make(options, exec_ctx));
ARROW_ASSIGN_OR_RAISE(ExecNode * last_node,
declaration.AddToPlan(exec_plan.get()));
if (!last_node->is_sink()) {
ConsumingSinkNodeOptions sink_options(NullSinkNodeConsumer::Make());
@@ -972,7 +1004,8 @@ Result<AsyncGenerator<std::shared_ptr<RecordBatch>>>
DeclarationToRecordBatchGen
::arrow::internal::Executor* cpu_executor, std::shared_ptr<Schema>*
out_schema) {
auto converter = std::make_shared<BatchConverter>();
ExecContext exec_ctx(options.memory_pool, cpu_executor,
options.function_registry);
- ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> plan,
ExecPlan::Make(exec_ctx));
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> plan,
+ ExecPlan::Make(options, exec_ctx));
Declaration with_sink = Declaration::Sequence(
{declaration,
{"sink", SinkNodeOptions(&converter->exec_batch_gen,
&converter->schema)}});
diff --git a/cpp/src/arrow/acero/exec_plan.h b/cpp/src/arrow/acero/exec_plan.h
index 424d9107d2..f82a0f8106 100644
--- a/cpp/src/arrow/acero/exec_plan.h
+++ b/cpp/src/arrow/acero/exec_plan.h
@@ -496,6 +496,16 @@ struct ARROW_ACERO_EXPORT Declaration {
std::string label;
};
+/// \brief How to handle unaligned buffers
+enum class UnalignedBufferHandling { kWarn, kIgnore, kReallocate, kError };
+
+/// \brief get the default behavior of unaligned buffer handling
+///
+/// This is configurable via the ACERO_ALIGNMENT_HANDLING environment variable
which
+/// can be set to "warn", "ignore", "reallocate", or "error". If the
environment
+/// variable is not set, or is set to an invalid value, this will return kWarn
+UnalignedBufferHandling GetDefaultUnalignedBufferHandling();
+
/// \brief plan-wide options that can be specified when executing an execution
plan
struct ARROW_ACERO_EXPORT QueryOptions {
/// \brief Should the plan use a legacy batching strategy
@@ -562,6 +572,36 @@ struct ARROW_ACERO_EXPORT QueryOptions {
///
/// If set then the number of names must equal the number of output columns
std::vector<std::string> field_names;
+
+ /// \brief Policy for unaligned buffers in source data
+ ///
+ /// Various compute functions and acero internals will type pun array
+ /// buffers from uint8_t* to some kind of value type (e.g. we might
+ /// cast to int32_t* to add two int32 arrays)
+ ///
+ /// If the buffer is poorly aligned (e.g. an int32 array is not aligned
+ /// on a 4-byte boundary) then this is technically undefined behavior in C++.
+ /// However, most modern compilers and CPUs are fairly tolerant of this
+ /// behavior and nothing bad (beyond a small hit to performance) is likely
+ /// to happen.
+ ///
+ /// Note that this only applies to source buffers. All buffers allocated
internally
+ /// by Acero will be suitably aligned.
+ ///
+ /// If this field is set to kWarn then Acero will check if any buffers are
unaligned
+ /// and, if they are, will emit a warning.
+ ///
+ /// If this field is set to kReallocate then Acero will allocate a new,
suitably aligned
+ /// buffer and copy the contents from the old buffer into this new buffer.
+ ///
+ /// If this field is set to kError then Acero will gracefully abort the plan
instead.
+ ///
+ /// If this field is set to kIgnore then Acero will not even check if the
buffers are
+ /// unaligned.
+ ///
+ /// If this field is not set then it will be treated as kWarn unless
overridden
+ /// by the ACERO_ALIGNMENT_HANDLING environment variable
+ std::optional<UnalignedBufferHandling> unaligned_buffer_handling;
};
/// \brief Calculate the output schema of a declaration
diff --git a/cpp/src/arrow/acero/plan_test.cc b/cpp/src/arrow/acero/plan_test.cc
index 8ec5c0f70a..5f62550177 100644
--- a/cpp/src/arrow/acero/plan_test.cc
+++ b/cpp/src/arrow/acero/plan_test.cc
@@ -1704,5 +1704,45 @@ TEST(ExecPlanExecution,
SegmentedAggregationWithBatchCrossingSegment) {
{expected});
}
+TEST(ExecPlanExecution, UnalignedInput) {
+ std::shared_ptr<Array> array = ArrayFromJSON(int32(), "[1, 2, 3]");
+ std::shared_ptr<Array> unaligned = UnalignBuffers(*array);
+ ASSERT_OK_AND_ASSIGN(ExecBatch sample_batch,
+ ExecBatch::Make({unaligned}, array->length()));
+
+ BatchesWithSchema data;
+ data.batches = {std::move(sample_batch)};
+ data.schema = schema({field("i32", int32())});
+
+ Declaration plan = Declaration::Sequence({
+ {"exec_batch_source", ExecBatchSourceNodeOptions(data.schema,
data.batches)},
+ });
+
+ int64_t initial_bytes_allocated =
default_memory_pool()->total_bytes_allocated();
+
+ // By default we should warn and so the plan should finish ok
+ ASSERT_OK(DeclarationToStatus(plan));
+ ASSERT_EQ(initial_bytes_allocated,
default_memory_pool()->total_bytes_allocated());
+
+ QueryOptions query_options;
+
+#ifndef ARROW_UBSAN
+ // Nothing should happen if we ignore alignment
+ query_options.unaligned_buffer_handling = UnalignedBufferHandling::kIgnore;
+ ASSERT_OK(DeclarationToStatus(plan, query_options));
+ ASSERT_EQ(initial_bytes_allocated,
default_memory_pool()->total_bytes_allocated());
+#endif
+
+ query_options.unaligned_buffer_handling = UnalignedBufferHandling::kError;
+ ASSERT_THAT(DeclarationToStatus(plan, query_options),
+ Raises(StatusCode::Invalid,
+ testing::HasSubstr("An input buffer was poorly
aligned")));
+ ASSERT_EQ(initial_bytes_allocated,
default_memory_pool()->total_bytes_allocated());
+
+ query_options.unaligned_buffer_handling =
UnalignedBufferHandling::kReallocate;
+ ASSERT_OK(DeclarationToStatus(plan, query_options));
+ ASSERT_LT(initial_bytes_allocated,
default_memory_pool()->total_bytes_allocated());
+}
+
} // namespace acero
} // namespace arrow
diff --git a/cpp/src/arrow/acero/source_node.cc
b/cpp/src/arrow/acero/source_node.cc
index 6c138d8dcc..9cacb237b2 100644
--- a/cpp/src/arrow/acero/source_node.cc
+++ b/cpp/src/arrow/acero/source_node.cc
@@ -52,6 +52,46 @@ using arrow::internal::MapVector;
namespace acero {
namespace {
+Status HandleUnalignedBuffers(ExecBatch* batch, UnalignedBufferHandling
handling) {
+ if (handling == UnalignedBufferHandling::kIgnore) {
+ return Status::OK();
+ }
+ for (auto& value : batch->values) {
+ if (value.is_array()) {
+ switch (handling) {
+ case UnalignedBufferHandling::kIgnore:
+ // Should be impossible to get here
+ return Status::OK();
+ case UnalignedBufferHandling::kError:
+ if (!arrow::util::CheckAlignment(*value.array(),
+ arrow::util::kValueAlignment)) {
+ return Status::Invalid(
+ "An input buffer was poorly aligned and
UnalignedBufferHandling is set "
+ "to kError");
+ }
+ break;
+ case UnalignedBufferHandling::kWarn:
+ if (!arrow::util::CheckAlignment(*value.array(),
+ arrow::util::kValueAlignment)) {
+ ARROW_LOG(WARNING)
+ << "An input buffer was poorly aligned. This could lead to
crashes or "
+ "poor performance on some hardware. Please ensure that all
Acero "
+ "sources generate aligned buffers, or change the unaligned
buffer "
+ "handling configuration to silence this warning.";
+ }
+ break;
+ case UnalignedBufferHandling::kReallocate: {
+ ARROW_ASSIGN_OR_RAISE(value, arrow::util::EnsureAlignment(
+ value.array(),
arrow::util::kValueAlignment,
+ default_memory_pool()));
+ break;
+ }
+ }
+ }
+ }
+ return Status::OK();
+}
+
struct SourceNode : ExecNode, public TracedNode {
SourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
AsyncGenerator<std::optional<ExecBatch>> generator,
@@ -104,13 +144,11 @@ struct SourceNode : ExecNode, public TracedNode {
batch_size = morsel_length;
}
ExecBatch batch = morsel.Slice(offset, batch_size);
- for (auto& value : batch.values) {
- if (value.is_array()) {
- ARROW_ASSIGN_OR_RAISE(value, arrow::util::EnsureAlignment(
- value.make_array(),
ipc::kArrowAlignment,
- default_memory_pool()));
- }
- }
+ UnalignedBufferHandling unaligned_buffer_handling =
+
plan_->query_context()->options().unaligned_buffer_handling.value_or(
+ GetDefaultUnalignedBufferHandling());
+ ARROW_RETURN_NOT_OK(
+ HandleUnalignedBuffers(&batch, unaligned_buffer_handling));
if (has_ordering) {
batch.index = batch_index;
}
diff --git a/cpp/src/arrow/testing/gtest_util.cc
b/cpp/src/arrow/testing/gtest_util.cc
index 9569375bda..6fc709874e 100644
--- a/cpp/src/arrow/testing/gtest_util.cc
+++ b/cpp/src/arrow/testing/gtest_util.cc
@@ -1099,4 +1099,30 @@ std::shared_ptr<GatingTask> GatingTask::Make(double
timeout_seconds) {
return std::make_shared<GatingTask>(timeout_seconds);
}
+std::shared_ptr<ArrayData> UnalignBuffers(const ArrayData& array) {
+ std::vector<std::shared_ptr<Buffer>> new_buffers;
+ new_buffers.reserve(array.buffers.size());
+
+ for (const auto& buffer : array.buffers) {
+ if (!buffer) {
+ new_buffers.emplace_back();
+ continue;
+ }
+ EXPECT_OK_AND_ASSIGN(std::shared_ptr<Buffer> padded,
+ AllocateBuffer(buffer->size() + 1,
default_memory_pool()));
+ memcpy(padded->mutable_data() + 1, buffer->data(), buffer->size());
+ std::shared_ptr<Buffer> unaligned = SliceBuffer(padded, 1);
+ new_buffers.push_back(std::move(unaligned));
+ }
+
+ std::shared_ptr<ArrayData> array_data = std::make_shared<ArrayData>(array);
+ array_data->buffers = std::move(new_buffers);
+ return array_data;
+}
+
+std::shared_ptr<Array> UnalignBuffers(const Array& array) {
+ std::shared_ptr<ArrayData> array_data = UnalignBuffers(*array.data());
+ return MakeArray(array_data);
+}
+
} // namespace arrow
diff --git a/cpp/src/arrow/testing/gtest_util.h
b/cpp/src/arrow/testing/gtest_util.h
index 55bd307b12..13fc0b3e81 100644
--- a/cpp/src/arrow/testing/gtest_util.h
+++ b/cpp/src/arrow/testing/gtest_util.h
@@ -532,4 +532,13 @@ class ARROW_TESTING_EXPORT GatingTask {
std::shared_ptr<Impl> impl_;
};
+/// \brief create an exact copy of the data where each buffer has a max
alignment of 1
+///
+/// This method does not recurse into the dictionary or children
+ARROW_TESTING_EXPORT std::shared_ptr<ArrayData> UnalignBuffers(const
ArrayData& array);
+/// \brief create an exact copy of the array where each buffer has a max
alignment of 1
+///
+/// This method does not recurse into the dictionary or children
+ARROW_TESTING_EXPORT std::shared_ptr<Array> UnalignBuffers(const Array& array);
+
} // namespace arrow
diff --git a/cpp/src/arrow/type_traits.cc b/cpp/src/arrow/type_traits.cc
new file mode 100644
index 0000000000..ac16afe4b8
--- /dev/null
+++ b/cpp/src/arrow/type_traits.cc
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/type_traits.h"
+
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+int RequiredValueAlignmentForBuffer(Type::type type_id, int buffer_index) {
+ if (buffer_index == 2 && type_id == Type::DENSE_UNION) {
+ // A dense union array is the only array (so far) that requires alignment
+ // on a buffer with a buffer_index that is not equal to 1
+ return 4;
+ }
+ if (buffer_index != 1) {
+ // If the buffer index is 0 then either:
+ // * The array type has no buffers, thus this shouldn't be called anyways
+ // * The array has a validity buffer at 0, no alignment needed
+ // * The array is a union array and has a types buffer at 0, no alignment
needed
+ // If the buffer index is > 1 then, in all current cases, it represents
binary
+ // data and no alignment is needed. The only exception is dense union
buffers
+ // which are checked above.
+ return 1;
+ }
+ DCHECK_NE(type_id, Type::DICTIONARY);
+ DCHECK_NE(type_id, Type::EXTENSION);
+
+ switch (type_id) {
+ case Type::NA: // No buffers
+ case Type::FIXED_SIZE_LIST: // No second buffer (values in child array)
+ case Type::FIXED_SIZE_BINARY: // Fixed size binary could be dangerous but
the
+ // compute kernels don't type pun this.
E.g. if
+ // an extension type is storing some kind
of struct
+ // here then the user should do their own
alignment
+ // check before casting to an array of
structs
+ case Type::BOOL: // Always treated as uint8_t*
+ case Type::INT8: // Always treated as uint8_t*
+ case Type::UINT8: // Always treated as uint8_t*
+ case Type::DENSE_UNION: // Union arrays have a uint8_t* types
buffer here
+ case Type::SPARSE_UNION: // Union arrays have a uint8_t* types
buffer here
+ case Type::RUN_END_ENCODED: // No buffers
+ case Type::STRUCT: // No second buffer
+ return 1;
+ case Type::INT16:
+ case Type::UINT16:
+ case Type::HALF_FLOAT:
+ return 2;
+ case Type::INT32:
+ case Type::UINT32:
+ case Type::FLOAT:
+ case Type::STRING: // Offsets may be cast to int32_t*
+ case Type::BINARY: // Offsets may be cast to int32_t*
+ case Type::DATE32:
+ case Type::TIME32:
+ case Type::LIST: // Offsets may be cast to int32_t*, data is in child
array
+ case Type::MAP: // This is a list array
+ case Type::INTERVAL_MONTHS: // Stored as int32_t*
+ case Type::INTERVAL_DAY_TIME: // Stored as two contiguous 32-bit integers
+ return 4;
+ case Type::INT64:
+ case Type::UINT64:
+ case Type::DOUBLE:
+ case Type::DECIMAL128: // May be cast to GenericBasicDecimal* which
requires
+ // alignment of 8
+ case Type::DECIMAL256: // May be cast to GenericBasicDecimal* which
requires
+ // alignment of 8
+ case Type::LARGE_BINARY: // Offsets may be cast to int64_t*
+ case Type::LARGE_LIST: // Offsets may be cast to int64_t*
+ case Type::LARGE_STRING: // Offsets may be cast to int64_t*
+ case Type::DATE64:
+ case Type::TIME64:
+ case Type::TIMESTAMP:
+ case Type::DURATION:
+ case Type::INTERVAL_MONTH_DAY_NANO: // Stored as two 32-bit integers and
a 64-bit
+ // integer
+ return 8;
+ case Type::DICTIONARY:
+ case Type::EXTENSION:
+ case Type::MAX_ID:
+ break;
+ }
+ Status::Invalid("RequiredValueAlignmentForBuffer called with invalid type id
", type_id)
+ .Warn();
+ return 1;
+}
+
+} // namespace arrow
diff --git a/cpp/src/arrow/type_traits.h b/cpp/src/arrow/type_traits.h
index 2d20d87d14..7204fd6d85 100644
--- a/cpp/src/arrow/type_traits.h
+++ b/cpp/src/arrow/type_traits.h
@@ -1309,6 +1309,29 @@ static inline int offset_bit_width(Type::type type_id) {
return 0;
}
+/// \brief Get the alignment a buffer should have to be considered "value
aligned"
+///
+/// Some buffers are frequently type-punned. For example, in an int32 array
the
+/// values buffer is frequently cast to int32_t*
+///
+/// This sort of punning is technically only valid if the pointer is aligned
to a
+/// proper width (e.g. 4 bytes in the case of int32). However, most modern
compilers
+/// are quite permissive if we get this wrong. Note that this alignment is
something
+/// that is guaranteed by malloc (e.g. new int32_t[] will return a buffer that
is 4
+/// byte aligned) or common libraries (e.g. numpy) but it is not currently
guaranteed
+/// by flight (GH-32276).
+///
+/// We call this "value aligned" and this method will calculate that required
alignment.
+///
+/// \param type_id the type of the array containing the buffer
+/// Note: this should be the indices type for a dictionary
array since
+/// A dictionary array's buffers are indices. It should be the
storage
+/// type for an extension array.
+/// \param buffer_index the index of the buffer to check, for example 0 will
typically
+/// give you the alignment expected of the validity buffer
+/// \return the required value alignment in bytes (1 if no alignment required)
+int RequiredValueAlignmentForBuffer(Type::type type_id, int buffer_index);
+
/// \brief Check for an integer type (signed or unsigned)
///
/// \param[in] type the type to check
diff --git a/cpp/src/arrow/util/align_util.cc b/cpp/src/arrow/util/align_util.cc
index d77650fcd6..7bc687b155 100644
--- a/cpp/src/arrow/util/align_util.cc
+++ b/cpp/src/arrow/util/align_util.cc
@@ -21,23 +21,68 @@
#include "arrow/chunked_array.h"
#include "arrow/record_batch.h"
#include "arrow/table.h"
+#include "arrow/type_fwd.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/logging.h"
namespace arrow {
namespace util {
bool CheckAlignment(const Buffer& buffer, int64_t alignment) {
+ if (alignment <= 0) {
+ return true;
+ }
return buffer.address() % alignment == 0;
}
-bool CheckAlignment(const ArrayData& array, int64_t alignment) {
- for (const auto& buffer : array.buffers) {
- if (buffer) {
- if (!CheckAlignment(*buffer, alignment)) return false;
+namespace {
+
+// Returns the type that controls how the buffers of this ArrayData (not its
children)
+// should behave
+Type::type GetTypeForBuffers(const ArrayData& array) {
+ Type::type type_id = array.type->storage_id();
+ if (type_id == Type::DICTIONARY) {
+ return ::arrow::internal::checked_pointer_cast<DictionaryType>(array.type)
+ ->index_type()
+ ->id();
+ }
+ return type_id;
+}
+
+// Checks to see if an array's own buffers are aligned but doesn't check
+// children
+bool CheckSelfAlignment(const ArrayData& array, int64_t alignment) {
+ if (alignment == kValueAlignment) {
+ Type::type type_id = GetTypeForBuffers(array);
+ for (std::size_t i = 0; i < array.buffers.size(); i++) {
+ if (array.buffers[i]) {
+ int expected_alignment =
+ RequiredValueAlignmentForBuffer(type_id, static_cast<int>(i));
+ if (!CheckAlignment(*array.buffers[i], expected_alignment)) {
+ return false;
+ }
+ }
+ }
+ } else {
+ for (const auto& buffer : array.buffers) {
+ if (buffer) {
+ if (!CheckAlignment(*buffer, alignment)) return false;
+ }
}
}
+ return true;
+}
+
+} // namespace
+
+bool CheckAlignment(const ArrayData& array, int64_t alignment) {
+ if (!CheckSelfAlignment(array, alignment)) {
+ return false;
+ }
- if (array.type->id() == Type::DICTIONARY) {
+ if (array.dictionary) {
if (!CheckAlignment(*array.dictionary, alignment)) return false;
}
@@ -97,9 +142,22 @@ bool CheckAlignment(const Table& table, int64_t alignment,
Result<std::shared_ptr<Buffer>> EnsureAlignment(std::shared_ptr<Buffer> buffer,
int64_t alignment,
MemoryPool* memory_pool) {
+ if (alignment == kValueAlignment) {
+ return Status::Invalid(
+ "The kValueAlignment option may only be used to call EnsureAlignment
on arrays "
+ "or tables and cannot be used with buffers");
+ }
+ if (alignment <= 0) {
+ return Status::Invalid("Alignment must be a positive integer");
+ }
if (!CheckAlignment(*buffer, alignment)) {
- ARROW_ASSIGN_OR_RAISE(auto new_buffer,
- AllocateBuffer(buffer->size(), alignment,
memory_pool));
+ if (!buffer->is_cpu()) {
+ return Status::NotImplemented("Reallocating an unaligned non-CPU
buffer.");
+ }
+ int64_t minimum_desired_alignment = std::max(kDefaultBufferAlignment,
alignment);
+ ARROW_ASSIGN_OR_RAISE(
+ auto new_buffer,
+ AllocateBuffer(buffer->size(), minimum_desired_alignment,
memory_pool));
std::memcpy(new_buffer->mutable_data(), buffer->data(), buffer->size());
return std::move(new_buffer);
} else {
@@ -111,11 +169,18 @@ Result<std::shared_ptr<ArrayData>>
EnsureAlignment(std::shared_ptr<ArrayData> ar
int64_t alignment,
MemoryPool* memory_pool) {
if (!CheckAlignment(*array_data, alignment)) {
- std::vector<std::shared_ptr<Buffer>> buffers_ = array_data->buffers;
- for (size_t i = 0; i < buffers_.size(); ++i) {
- if (buffers_[i]) {
+ std::vector<std::shared_ptr<Buffer>> buffers = array_data->buffers;
+ Type::type type_id = GetTypeForBuffers(*array_data);
+ for (size_t i = 0; i < buffers.size(); ++i) {
+ if (buffers[i]) {
+ int64_t expected_alignment = alignment;
+ if (alignment == kValueAlignment) {
+ expected_alignment =
+ RequiredValueAlignmentForBuffer(type_id, static_cast<int>(i));
+ }
ARROW_ASSIGN_OR_RAISE(
- buffers_[i], EnsureAlignment(std::move(buffers_[i]), alignment,
memory_pool));
+ buffers[i],
+ EnsureAlignment(std::move(buffers[i]), expected_alignment,
memory_pool));
}
}
@@ -130,10 +195,9 @@ Result<std::shared_ptr<ArrayData>>
EnsureAlignment(std::shared_ptr<ArrayData> ar
}
auto new_array_data = ArrayData::Make(
- array_data->type, array_data->length, std::move(buffers_),
array_data->child_data,
+ array_data->type, array_data->length, std::move(buffers),
array_data->child_data,
array_data->dictionary, array_data->GetNullCount(),
array_data->offset);
return std::move(new_array_data);
-
} else {
return std::move(array_data);
}
diff --git a/cpp/src/arrow/util/align_util.h b/cpp/src/arrow/util/align_util.h
index abdabe22fc..63df63749c 100644
--- a/cpp/src/arrow/util/align_util.h
+++ b/cpp/src/arrow/util/align_util.h
@@ -70,8 +70,33 @@ inline BitmapWordAlignParams BitmapWordAlign(const uint8_t*
data, int64_t bit_of
namespace util {
// Functions to check if the provided Arrow object is aligned by the specified
alignment
+
+/// \brief Special alignment value to use data type-specific alignment
+///
+/// If this is passed as the `alignment` in one of the CheckAlignment or
EnsureAlignment
+/// functions, then the function will ensure ensure each buffer is suitably
aligned
+/// for the data type of the array. For example, given an int32 buffer the
values
+/// buffer's address must be a multiple of 4. Given a large_string buffer the
offsets
+/// buffer's address must be a multiple of 8.
+constexpr int64_t kValueAlignment = -3;
+
+/// \brief Calculate if the buffer's address is a multiple of `alignment`
+///
+/// If `alignment` is less than or equal to 0 then this method will always
return true
+/// \param buffer the buffer to check
+/// \param alignment the alignment (in bytes) to check for
ARROW_EXPORT bool CheckAlignment(const Buffer& buffer, int64_t alignment);
+/// \brief Calculate if all buffers in the array data are aligned
+///
+/// This will also check the buffers in the dictionary and any children
+/// \param array the array data to check
+/// \param alignment the alignment (in bytes) to check for
ARROW_EXPORT bool CheckAlignment(const ArrayData& array, int64_t alignment);
+/// \brief Calculate if all buffers in the array are aligned
+///
+/// This will also check the buffers in the dictionary and any children
+/// \param array the array to check
+/// \param alignment the alignment (in bytes) to check for
ARROW_EXPORT bool CheckAlignment(const Array& array, int64_t alignment);
// Following functions require an additional boolean vector which stores the
@@ -82,29 +107,112 @@ ARROW_EXPORT bool CheckAlignment(const Array& array,
int64_t alignment);
// of the constituent objects during the EnsureAlignment function where certain
// objects can be ignored for further checking if we already know that they are
// completely aligned.
+
+/// \brief Calculate which (if any) chunks in a chunked array are unaligned
+/// \param array the array to check
+/// \param alignment the alignment (in bytes) to check for
+/// \param needs_alignment an output vector that will store the results of the
check
+/// it must be set to a valid vector. Extra elements will be added to
the end
+/// of the vector for each chunk that is checked. `true` will be
stored if
+/// the chunk is unaligned.
+/// \param offset the index of the chunk to start checking
+/// \return true if all chunks (starting at `offset`) are aligned, false
otherwise
ARROW_EXPORT bool CheckAlignment(const ChunkedArray& array, int64_t alignment,
std::vector<bool>* needs_alignment, int
offset = 0);
+
+/// \brief calculate which (if any) columns in a record batch are unaligned
+/// \param batch the batch to check
+/// \param alignment the alignment (in bytes) to check for
+/// \param needs_alignment an output vector that will store the results of the
+/// check. It must be set to a valid vector. Extra elements will be
added
+/// to the end of the vector for each column that is checked. `true`
will be
+/// stored if the column is unaligned.
ARROW_EXPORT bool CheckAlignment(const RecordBatch& batch, int64_t alignment,
std::vector<bool>* needs_alignment);
+
+/// \brief calculate which (if any) columns in a table are unaligned
+/// \param table the table to check
+/// \param alignment the alignment (in bytes) to check for
+/// \param needs_alignment an output vector that will store the results of the
+/// check. It must be set to a valid vector. Extra elements will be
added
+/// to the end of the vector for each column that is checked. `true`
will be
+/// stored if the column is unaligned.
ARROW_EXPORT bool CheckAlignment(const Table& table, int64_t alignment,
std::vector<bool>* needs_alignment);
+/// \brief return a buffer that has the given alignment and the same data as
the input
+/// buffer
+///
+/// If the input buffer is already aligned then this method will return the
input buffer
+/// If the input buffer is not already aligned then this method will allocate
a new
+/// buffer. The alignment of the new buffer will have at least
+/// max(kDefaultBufferAlignment, alignment) bytes of alignment.
+///
+/// \param buffer the buffer to check
+/// \param alignment the alignment (in bytes) to check for
+/// \param memory_pool a memory pool that will be used to allocate a new
buffer if the
+/// input buffer is not sufficiently aligned
ARROW_EXPORT Result<std::shared_ptr<Buffer>> EnsureAlignment(
std::shared_ptr<Buffer> buffer, int64_t alignment, MemoryPool*
memory_pool);
+/// \brief return an array data where all buffers are aligned by the given
alignment
+///
+/// If any input buffer is already aligned then this method will reuse that
same input
+/// buffer.
+///
+/// \param array_data the array data to check
+/// \param alignment the alignment (in bytes) to check for
+/// \param memory_pool a memory pool that will be used to allocate new buffers
if any
+/// input buffer is not sufficiently aligned
ARROW_EXPORT Result<std::shared_ptr<ArrayData>> EnsureAlignment(
std::shared_ptr<ArrayData> array_data, int64_t alignment, MemoryPool*
memory_pool);
+/// \brief return an array where all buffers are aligned by the given alignment
+///
+/// If any input buffer is already aligned then this method will reuse that
same input
+/// buffer.
+///
+/// \param array the array to check
+/// \param alignment the alignment (in bytes) to check for
+/// \param memory_pool a memory pool that will be used to allocate new buffers
if any
+/// input buffer is not sufficiently aligned
ARROW_EXPORT Result<std::shared_ptr<Array>>
EnsureAlignment(std::shared_ptr<Array> array,
int64_t alignment,
MemoryPool*
memory_pool);
+/// \brief return a chunked array where all buffers are aligned by the given
alignment
+///
+/// If any input buffer is already aligned then this method will reuse that
same input
+/// buffer.
+///
+/// \param array the chunked array to check
+/// \param alignment the alignment (in bytes) to check for
+/// \param memory_pool a memory pool that will be used to allocate new buffers
if any
+/// input buffer is not sufficiently aligned
ARROW_EXPORT Result<std::shared_ptr<ChunkedArray>> EnsureAlignment(
std::shared_ptr<ChunkedArray> array, int64_t alignment, MemoryPool*
memory_pool);
+/// \brief return a record batch where all buffers are aligned by the given
alignment
+///
+/// If any input buffer is already aligned then this method will reuse that
same input
+/// buffer.
+///
+/// \param batch the batch to check
+/// \param alignment the alignment (in bytes) to check for
+/// \param memory_pool a memory pool that will be used to allocate new buffers
if any
+/// input buffer is not sufficiently aligned
ARROW_EXPORT Result<std::shared_ptr<RecordBatch>> EnsureAlignment(
std::shared_ptr<RecordBatch> batch, int64_t alignment, MemoryPool*
memory_pool);
+/// \brief return a table where all buffers are aligned by the given alignment
+///
+/// If any input buffer is already aligned then this method will reuse that
same input
+/// buffer.
+///
+/// \param table the table to check
+/// \param alignment the alignment (in bytes) to check for
+/// \param memory_pool a memory pool that will be used to allocate new buffers
if any
+/// input buffer is not sufficiently aligned
ARROW_EXPORT Result<std::shared_ptr<Table>>
EnsureAlignment(std::shared_ptr<Table> table,
int64_t alignment,
MemoryPool*
memory_pool);
diff --git a/cpp/src/arrow/util/align_util_test.cc
b/cpp/src/arrow/util/align_util_test.cc
index c4ec83de3e..a14041597a 100644
--- a/cpp/src/arrow/util/align_util_test.cc
+++ b/cpp/src/arrow/util/align_util_test.cc
@@ -15,17 +15,24 @@
// specific language governing permissions and limitations
// under the License.
-#include <gtest/gtest.h>
#include <algorithm>
#include <cstdint>
#include <utility>
#include <vector>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
#include "arrow/array.h"
+#include "arrow/buffer.h"
#include "arrow/record_batch.h"
#include "arrow/table.h"
+#include "arrow/testing/extension_type.h"
#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/matchers.h"
#include "arrow/testing/random.h"
+#include "arrow/type.h"
+#include "arrow/type_fwd.h"
#include "arrow/util/align_util.h"
namespace arrow {
@@ -166,6 +173,62 @@ TEST(BitmapWordAlign, UnalignedDataStart) {
}
} // namespace internal
+TEST(EnsureAlignment, Buffer) {
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<Buffer> buffer,
AllocateBuffer(/*size=*/1024));
+ std::shared_ptr<Buffer> unaligned_view = SliceBuffer(buffer, 1);
+ std::shared_ptr<Buffer> aligned_view = SliceBuffer(buffer, 0);
+
+ ASSERT_TRUE(util::CheckAlignment(*aligned_view, kDefaultBufferAlignment));
+ ASSERT_FALSE(util::CheckAlignment(*unaligned_view, /*alignment=*/2));
+
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<Buffer> aligned_dupe,
+ util::EnsureAlignment(aligned_view, /*alignment=*/8,
default_memory_pool()));
+
+ ASSERT_EQ(aligned_view->data(), aligned_dupe->data());
+ ASSERT_TRUE(util::CheckAlignment(*aligned_dupe, kDefaultBufferAlignment));
+
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<Buffer> realigned,
+ util::EnsureAlignment(unaligned_view, /*alignment=*/8,
default_memory_pool()));
+
+ ASSERT_NE(realigned->data(), unaligned_view->data());
+ // Even though we only asked to check for 8 bytes of alignment, any
reallocation will
+ // always allocate at least kDefaultBufferAlignment bytes of alignment
+ ASSERT_TRUE(util::CheckAlignment(*realigned,
/*alignment=*/kDefaultBufferAlignment));
+
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<Buffer> realigned_large,
+ util::EnsureAlignment(unaligned_view, /*alignment=*/256,
default_memory_pool()));
+ // If the user wants more than kDefaultBufferAlignment they should get it
+ ASSERT_TRUE(util::CheckAlignment(*realigned_large, /*alignment=*/256));
+
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<Buffer> realigned_huge,
+ util::EnsureAlignment(unaligned_view, /*alignment=*/2048,
default_memory_pool()));
+ // It should even be valid for the alignment to be larger than the buffer
size itself
+ ASSERT_TRUE(util::CheckAlignment(*realigned_huge, /*alignment=*/2048));
+}
+
+TEST(EnsureAlignment, BufferInvalid) {
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<Buffer> buffer,
AllocateBuffer(/*size=*/1024));
+
+ // This is nonsense but not worth introducing a Status return. We just
return true.
+ ASSERT_TRUE(util::CheckAlignment(*buffer, 0));
+ ASSERT_TRUE(util::CheckAlignment(*buffer, -1));
+
+ ASSERT_THAT(util::EnsureAlignment(buffer, /*alignment=*/0,
default_memory_pool()),
+ Raises(StatusCode::Invalid,
+ testing::HasSubstr("Alignment must be a positive
integer")));
+
+ ASSERT_THAT(
+ util::EnsureAlignment(buffer, /*alignment=*/util::kValueAlignment,
+ default_memory_pool()),
+ Raises(StatusCode::Invalid,
+ testing::HasSubstr(
+ "may only be used to call EnsureAlignment on arrays or
tables")));
+}
+
TEST(EnsureAlignment, Array) {
MemoryPool* pool = default_memory_pool();
auto rand = ::arrow::random::RandomArrayGenerator(1923);
@@ -278,4 +341,272 @@ TEST(EnsureAlignment, Table) {
ASSERT_EQ(util::CheckAlignment(*aligned_table, 2048, &needs_alignment),
true);
}
+using TypesRequiringSomeKindOfAlignment =
+ testing::Types<Int16Type, Int32Type, Int64Type, UInt16Type, UInt32Type,
UInt64Type,
+ FloatType, DoubleType, Date32Type, Date64Type, Time32Type,
Time64Type,
+ Decimal128Type, Decimal256Type, TimestampType,
DurationType, MapType,
+ DenseUnionType, LargeBinaryType, LargeListType,
LargeStringType,
+ MonthIntervalType, DayTimeIntervalType,
MonthDayNanoIntervalType>;
+
+using TypesNotRequiringAlignment =
+ testing::Types<NullType, Int8Type, UInt8Type, FixedSizeListType,
FixedSizeBinaryType,
+ BooleanType, SparseUnionType>;
+
+template <typename ArrowType>
+std::shared_ptr<DataType> sample_type() {
+ return TypeTraits<ArrowType>::type_singleton();
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<FixedSizeBinaryType>() {
+ return fixed_size_binary(16);
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<FixedSizeListType>() {
+ return fixed_size_list(uint8(), 16);
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<Decimal128Type>() {
+ return decimal128(32, 6);
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<Decimal256Type>() {
+ return decimal256(60, 10);
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<LargeListType>() {
+ return large_list(int8());
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<DenseUnionType>() {
+ return dense_union({field("x", int8()), field("y", uint8())});
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<MapType>() {
+ return map(utf8(), field("item", utf8()));
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<DurationType>() {
+ return duration(TimeUnit::NANO);
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<TimestampType>() {
+ return timestamp(TimeUnit::NANO);
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<Time32Type>() {
+ return time32(TimeUnit::SECOND);
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<Time64Type>() {
+ return time64(TimeUnit::NANO);
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<SparseUnionType>() {
+ return sparse_union({field("x", uint8()), field("y", int8())}, {1, 2});
+}
+
+template <typename ArrowType>
+std::shared_ptr<ArrayData> SampleArray() {
+ random::RandomArrayGenerator gen(42);
+ return gen.ArrayOf(sample_type<ArrowType>(), 100)->data();
+}
+
+template <>
+std::shared_ptr<ArrayData> SampleArray<SparseUnionType>() {
+ auto ty = sparse_union({field("ints", int64()), field("strs", utf8())}, {2,
7});
+ auto ints = ArrayFromJSON(int64(), "[0, 1, 2, 3]");
+ auto strs = ArrayFromJSON(utf8(), R"(["a", null, "c", "d"])");
+ auto ids = ArrayFromJSON(int8(), "[2, 7, 2, 7]")->data()->buffers[1];
+ const int length = 4;
+ SparseUnionArray arr(ty, length, {ints, strs}, ids);
+ return arr.data();
+}
+
+class ValueAlignment : public ::testing::Test {
+ public:
+ void CheckModified(const ArrayData& src, const ArrayData& dst) {
+ ASSERT_EQ(src.buffers.size(), dst.buffers.size());
+ for (std::size_t i = 0; i < src.buffers.size(); i++) {
+ if (!src.buffers[i] || !dst.buffers[i]) {
+ continue;
+ }
+ if (src.buffers[i]->address() != dst.buffers[i]->address()) {
+ return;
+ }
+ }
+ FAIL() << "Expected at least one buffer to have been modified by
EnsureAlignment";
+ }
+
+ void CheckUnmodified(const ArrayData& src, const ArrayData& dst) {
+ ASSERT_EQ(src.buffers.size(), dst.buffers.size());
+ for (std::size_t i = 0; i < src.buffers.size(); i++) {
+ if (!src.buffers[i] || !dst.buffers[i]) {
+ continue;
+ }
+ ASSERT_EQ(src.buffers[i]->address(), dst.buffers[i]->address());
+ }
+ }
+};
+
+template <typename T>
+class ValueAlignmentRequired : public ValueAlignment {};
+template <typename T>
+class ValueAlignmentNotRequired : public ValueAlignment {};
+
+TYPED_TEST_SUITE(ValueAlignmentRequired, TypesRequiringSomeKindOfAlignment);
+TYPED_TEST_SUITE(ValueAlignmentNotRequired, TypesNotRequiringAlignment);
+
+// The default buffer alignment should always be large enough for value
alignment
+TYPED_TEST(ValueAlignmentRequired, DefaultAlignmentSufficient) {
+ std::shared_ptr<ArrayData> data = SampleArray<TypeParam>();
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<ArrayData> aligned,
+ util::EnsureAlignment(data, util::kValueAlignment,
default_memory_pool()));
+
+ ASSERT_TRUE(util::CheckAlignment(*aligned, util::kValueAlignment));
+ AssertArraysEqual(*MakeArray(data), *MakeArray(aligned));
+ this->CheckUnmodified(*data, *aligned);
+}
+
+TYPED_TEST(ValueAlignmentRequired, RoundTrip) {
+ std::shared_ptr<ArrayData> data = SampleArray<TypeParam>();
+ std::shared_ptr<ArrayData> unaligned = UnalignBuffers(*data);
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<ArrayData> aligned,
+ util::EnsureAlignment(unaligned, util::kValueAlignment,
default_memory_pool()));
+
+ ASSERT_TRUE(util::CheckAlignment(*aligned, util::kValueAlignment));
+ AssertArraysEqual(*MakeArray(data), *MakeArray(aligned));
+ this->CheckModified(*unaligned, *aligned);
+}
+
+TYPED_TEST(ValueAlignmentNotRequired, RoundTrip) {
+ std::shared_ptr<ArrayData> data = SampleArray<TypeParam>();
+ std::shared_ptr<ArrayData> unaligned = UnalignBuffers(*data);
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<ArrayData> aligned,
+ util::EnsureAlignment(unaligned, util::kValueAlignment,
default_memory_pool()));
+
+ ASSERT_TRUE(util::CheckAlignment(*aligned, util::kValueAlignment));
+ AssertArraysEqual(*MakeArray(data), *MakeArray(aligned));
+ this->CheckUnmodified(*unaligned, *aligned);
+}
+
+TYPED_TEST(ValueAlignmentNotRequired, DefaultAlignmentSufficient) {
+ std::shared_ptr<ArrayData> data = SampleArray<TypeParam>();
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<ArrayData> aligned,
+ util::EnsureAlignment(data, util::kValueAlignment,
default_memory_pool()));
+
+ ASSERT_TRUE(util::CheckAlignment(*aligned, util::kValueAlignment));
+ AssertArraysEqual(*MakeArray(data), *MakeArray(aligned));
+ this->CheckUnmodified(*data, *aligned);
+}
+
+TEST_F(ValueAlignment, DenseUnion) {
+ std::shared_ptr<ArrayData> data = SampleArray<DenseUnionType>();
+ ASSERT_TRUE(util::CheckAlignment(*data, util::kValueAlignment));
+
+ std::shared_ptr<ArrayData> unaligned = UnalignBuffers(*data);
+ ASSERT_FALSE(util::CheckAlignment(*unaligned, util::kValueAlignment));
+ // Dense union arrays are the only array type where the buffer at index 2 is
expected
+ // to be aligned (it contains 32-bit offsets and should be 4-byte aligned)
+ ASSERT_FALSE(util::CheckAlignment(*unaligned->buffers[2], 4));
+
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<ArrayData> realigned,
+ util::EnsureAlignment(unaligned, util::kValueAlignment,
default_memory_pool()));
+
+ ASSERT_TRUE(util::CheckAlignment(*realigned, util::kValueAlignment));
+ ASSERT_TRUE(util::CheckAlignment(*realigned->buffers[2], 4));
+ // The buffer at index 1 is the types buffer which does not require
realignment
+ ASSERT_EQ(unaligned->buffers[1]->data(), realigned->buffers[1]->data());
+}
+
+TEST_F(ValueAlignment, RunEndEncoded) {
+ // Run end requires alignment, value type does not
+ std::shared_ptr<Array> run_ends = ArrayFromJSON(int32(), "[3, 5]");
+ std::shared_ptr<Array> values = ArrayFromJSON(int8(), "[50, 100]");
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> array,
+ RunEndEncodedArray::Make(/*logical_length=*/5,
std::move(run_ends),
+ std::move(values), 0));
+
+ std::shared_ptr<ArrayData> unaligned_ree =
std::make_shared<ArrayData>(*array->data());
+ unaligned_ree->child_data[0] = UnalignBuffers(*unaligned_ree->child_data[0]);
+ unaligned_ree->child_data[1] = UnalignBuffers(*unaligned_ree->child_data[1]);
+
+ std::shared_ptr<ArrayData> aligned_ree =
std::make_shared<ArrayData>(*unaligned_ree);
+
+ ASSERT_OK_AND_ASSIGN(
+ aligned_ree,
+ util::EnsureAlignment(aligned_ree, util::kValueAlignment,
default_memory_pool()));
+ ASSERT_TRUE(util::CheckAlignment(*aligned_ree, util::kValueAlignment));
+
+ this->CheckModified(*unaligned_ree->child_data[0],
*aligned_ree->child_data[0]);
+ this->CheckUnmodified(*unaligned_ree->child_data[1],
*aligned_ree->child_data[1]);
+}
+
+TEST_F(ValueAlignment, Dictionary) {
+ // Dictionary values require alignment, dictionary indices do not
+ std::shared_ptr<DataType> int8_utf8 = dictionary(int8(), utf8());
+ std::shared_ptr<Array> array = ArrayFromJSON(int8_utf8, R"(["x", "x",
"y"])");
+
+ std::shared_ptr<ArrayData> unaligned_dict =
std::make_shared<ArrayData>(*array->data());
+ unaligned_dict->dictionary = UnalignBuffers(*unaligned_dict->dictionary);
+ unaligned_dict = UnalignBuffers(*unaligned_dict);
+
+ std::shared_ptr<ArrayData> aligned_dict =
std::make_shared<ArrayData>(*unaligned_dict);
+
+ ASSERT_OK_AND_ASSIGN(
+ aligned_dict,
+ util::EnsureAlignment(aligned_dict, util::kValueAlignment,
default_memory_pool()));
+
+ ASSERT_TRUE(util::CheckAlignment(*aligned_dict, util::kValueAlignment));
+ this->CheckUnmodified(*unaligned_dict, *aligned_dict);
+ this->CheckModified(*unaligned_dict->dictionary, *aligned_dict->dictionary);
+
+ // Dictionary values do not require alignment, dictionary indices do
+ std::shared_ptr<DataType> int16_int8 = dictionary(int16(), int8());
+ array = ArrayFromJSON(int16_int8, R"([7, 11])");
+
+ unaligned_dict = std::make_shared<ArrayData>(*array->data());
+ unaligned_dict->dictionary = UnalignBuffers(*unaligned_dict->dictionary);
+ unaligned_dict = UnalignBuffers(*unaligned_dict);
+
+ aligned_dict = std::make_shared<ArrayData>(*unaligned_dict);
+
+ ASSERT_OK_AND_ASSIGN(
+ aligned_dict,
+ util::EnsureAlignment(aligned_dict, util::kValueAlignment,
default_memory_pool()));
+
+ ASSERT_TRUE(util::CheckAlignment(*aligned_dict, util::kValueAlignment));
+ this->CheckModified(*unaligned_dict, *aligned_dict);
+ this->CheckUnmodified(*unaligned_dict->dictionary,
*aligned_dict->dictionary);
+}
+
+TEST_F(ValueAlignment, Extension) {
+ std::shared_ptr<Array> array = ExampleSmallint();
+
+ std::shared_ptr<ArrayData> unaligned = UnalignBuffers(*array->data());
+
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<ArrayData> aligned,
+ util::EnsureAlignment(unaligned, util::kValueAlignment,
default_memory_pool()));
+
+ ASSERT_TRUE(util::CheckAlignment(*aligned, util::kValueAlignment));
+ this->CheckModified(*unaligned, *aligned);
+}
+
} // namespace arrow