emkornfield commented on code in PR #48345: URL: https://github.com/apache/arrow/pull/48345#discussion_r3472351816
########## cpp/src/arrow/util/alp/alp.cc: ########## @@ -0,0 +1,965 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/util/alp/alp.h" + +#include <cmath> +#include <cstring> +#include <functional> +#include <iostream> +#include <map> + +#include "arrow/util/alp/alp_constants.h" +#include "arrow/util/bit_stream_utils_internal.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/endian.h" +#include "arrow/util/bpacking_internal.h" +#include "arrow/util/logging.h" +#include "arrow/util/span.h" +#include "arrow/util/ubsan.h" + +namespace arrow { +namespace util { +namespace alp { + +// ALP serialization uses memcpy for multi-byte integers (frame_of_reference, +// num_exceptions, offsets) and assumes little-endian byte order on disk. +static_assert(ARROW_LITTLE_ENDIAN, + "ALP serialization assumes little-endian byte order"); + +// ---------------------------------------------------------------------- +// AlpEncodedVectorInfo implementation (non-templated, 4 bytes) + +void AlpEncodedVectorInfo::Store(arrow::util::span<uint8_t> output_buffer) const { + ARROW_CHECK(output_buffer.size() >= static_cast<size_t>(GetStoredSize())) + << "alp_vector_info_output_too_small: " << output_buffer.size() << " vs " + << GetStoredSize(); + + uint8_t* ptr = output_buffer.data(); + + // exponent, factor: 1 byte each + *ptr++ = exponent_; + *ptr++ = factor_; + + // num_exceptions: 2 bytes + util::SafeStore(ptr, num_exceptions_); +} + +Result<AlpEncodedVectorInfo> AlpEncodedVectorInfo::Load( + arrow::util::span<const uint8_t> input_buffer) { + if (input_buffer.size() < static_cast<size_t>(GetStoredSize())) { + return Status::Invalid("ALP vector info buffer too small: ", input_buffer.size(), + " < ", GetStoredSize()); + } + + AlpEncodedVectorInfo result{}; + const uint8_t* ptr = input_buffer.data(); + + // exponent, factor: 1 byte each + result.exponent_ = *ptr++; + result.factor_ = *ptr++; + + // num_exceptions: 2 bytes + result.num_exceptions_ = util::SafeLoadAs<int16_t>(ptr); + + return result; +} + +// ---------------------------------------------------------------------- +// AlpEncodedForVectorInfo implementation (templated, 5/9 bytes) + +template <typename T> +void AlpEncodedForVectorInfo<T>::Store(arrow::util::span<uint8_t> output_buffer) const { + ARROW_CHECK(output_buffer.size() >= static_cast<size_t>(GetStoredSize())) + << "alp_for_vector_info_output_too_small: " << output_buffer.size() << " vs " + << GetStoredSize(); + + uint8_t* ptr = output_buffer.data(); + + // frame_of_reference: 4 bytes for float, 8 bytes for double + util::SafeStore(ptr, frame_of_reference_); + ptr += sizeof(frame_of_reference_); + + // bit_width: 1 byte + *ptr = bit_width_; +} + +template <typename T> +Result<AlpEncodedForVectorInfo<T>> AlpEncodedForVectorInfo<T>::Load( + arrow::util::span<const uint8_t> input_buffer) { + if (input_buffer.size() < static_cast<size_t>(GetStoredSize())) { + return Status::Invalid("ALP FOR vector info buffer too small: ", input_buffer.size(), + " < ", GetStoredSize()); + } + + AlpEncodedForVectorInfo<T> result{}; + const uint8_t* ptr = input_buffer.data(); + + // frame_of_reference: 4 bytes for float, 8 bytes for double + result.frame_of_reference_ = util::SafeLoadAs<typename AlpEncodedForVectorInfo<T>::ExactType>(ptr); + ptr += sizeof(result.frame_of_reference_); + + // bit_width: 1 byte + result.bit_width_ = *ptr; + if (result.bit_width_ > sizeof(typename AlpEncodedForVectorInfo<T>::ExactType) * 8) { + return Status::Invalid("ALP FOR bit_width out of range: ", result.bit_width_); + } + + return result; +} + +// Explicit template instantiations for AlpEncodedForVectorInfo +template class AlpEncodedForVectorInfo<float>; +template class AlpEncodedForVectorInfo<double>; + +// ---------------------------------------------------------------------- +// AlpEncodedVector implementation + +template <typename T> +void AlpEncodedVector<T>::Store(arrow::util::span<uint8_t> output_buffer) const { + const int64_t overall_size = GetStoredSize(); + ARROW_CHECK(static_cast<int64_t>(output_buffer.size()) >= overall_size) + << "alp_bit_packed_vector_store_output_too_small: " << output_buffer.size() + << " vs " << overall_size; + + int64_t offset = 0; + + // Store AlpInfo (4 bytes) + alp_info_.Store({output_buffer.data() + offset, AlpEncodedVectorInfo::kStoredSize}); + offset += AlpEncodedVectorInfo::kStoredSize; + + // Store ForInfo + for_info_.Store( + {output_buffer.data() + offset, AlpEncodedForVectorInfo<T>::kStoredSize}); + offset += AlpEncodedForVectorInfo<T>::kStoredSize; + + // Store data section + StoreDataOnly({output_buffer.data() + offset, output_buffer.size() - offset}); +} + +template <typename T> +void AlpEncodedVector<T>::StoreDataOnly(arrow::util::span<uint8_t> output_buffer) const { + const int64_t data_size = GetDataStoredSize(); + // Internal invariants: caller must provide adequate buffer and consistent metadata. + // These are programmer errors (not data errors), so CHECK is appropriate. + ARROW_CHECK(static_cast<int64_t>(output_buffer.size()) >= data_size) + << "alp_bit_packed_vector_store_data_output_too_small: " << output_buffer.size() + << " vs " << data_size; + + ARROW_CHECK(static_cast<size_t>(alp_info_.num_exceptions()) == exceptions_.size() && + static_cast<size_t>(alp_info_.num_exceptions()) == + exception_positions_.size()) + << "alp_bit_packed_vector_store_num_exceptions_mismatch: " + << alp_info_.num_exceptions() << " vs " << exceptions_.size() << " vs " + << exception_positions_.size(); + + int64_t offset = 0; + + // Compute bit_packed_size from num_elements and bit_width + const int64_t bit_packed_size = + AlpEncodedForVectorInfo<T>::GetBitPackedSize(num_elements_, for_info_.bit_width()); + + // Store all successfully compressed values first. + std::memcpy(output_buffer.data() + offset, packed_values_.data(), bit_packed_size); + offset += bit_packed_size; + + // Store exception positions. + const int64_t exception_position_size = + alp_info_.num_exceptions() * sizeof(AlpConstants::PositionType); + std::memcpy(output_buffer.data() + offset, exception_positions_.data(), + exception_position_size); + offset += exception_position_size; + + // Store exception values. + const int64_t exception_size = alp_info_.num_exceptions() * sizeof(T); + std::memcpy(output_buffer.data() + offset, exceptions_.data(), exception_size); + offset += exception_size; + + // Internal invariant: total bytes written must match precomputed size. + ARROW_CHECK(offset == data_size) + << "alp_bit_packed_vector_data_size_mismatch: " << offset << " vs " << data_size; +} + +template <typename T> +Result<AlpEncodedVector<T>> AlpEncodedVector<T>::Load( + arrow::util::span<const uint8_t> input_buffer, int32_t num_elements) { + if (num_elements > (1 << AlpConstants::kMaxLogVectorSize)) { + return Status::Invalid("ALP element count too large: ", num_elements, + " > ", (1 << AlpConstants::kMaxLogVectorSize)); + } + + AlpEncodedVector<T> result; + int64_t input_offset = 0; + + // Load AlpInfo (4 bytes) + ARROW_ASSIGN_OR_RAISE( + AlpEncodedVectorInfo alp_info, + AlpEncodedVectorInfo::Load( + {input_buffer.data() + input_offset, AlpEncodedVectorInfo::kStoredSize})); + input_offset += AlpEncodedVectorInfo::kStoredSize; + result.set_alp_info(alp_info); + + // Load ForInfo + ARROW_ASSIGN_OR_RAISE( + AlpEncodedForVectorInfo<T> for_info, + AlpEncodedForVectorInfo<T>::Load( + {input_buffer.data() + input_offset, AlpEncodedForVectorInfo<T>::kStoredSize})); + input_offset += AlpEncodedForVectorInfo<T>::kStoredSize; + result.set_for_info(for_info); + + result.set_num_elements(num_elements); + + const int64_t overall_size = GetStoredSize(alp_info, for_info, num_elements); + + if (static_cast<int64_t>(input_buffer.size()) < overall_size) { + return Status::Invalid("ALP compressed vector buffer too small: ", + input_buffer.size(), " < ", overall_size); + } + + // Compute bit_packed_size from num_elements and bit_width + const int64_t bit_packed_size = + AlpEncodedForVectorInfo<T>::GetBitPackedSize(num_elements, for_info.bit_width()); + + // TODO: resize() zero-initializes before memcpy overwrites. Consider + // using uninitialized storage if this shows up in decode-path profiling. + result.mutable_packed_values().resize(bit_packed_size); + std::memcpy(result.mutable_packed_values().data(), input_buffer.data() + input_offset, + bit_packed_size); + input_offset += bit_packed_size; + + result.mutable_exception_positions().resize(alp_info.num_exceptions()); + const int64_t exception_position_size = + alp_info.num_exceptions() * sizeof(AlpConstants::PositionType); + std::memcpy(result.mutable_exception_positions().data(), + input_buffer.data() + input_offset, exception_position_size); + input_offset += exception_position_size; + + result.mutable_exceptions().resize(alp_info.num_exceptions()); + const int64_t exception_size = alp_info.num_exceptions() * sizeof(T); + std::memcpy(result.mutable_exceptions().data(), input_buffer.data() + input_offset, + exception_size); + return result; +} + +template <typename T> +int64_t AlpEncodedVector<T>::GetStoredSize() const { + return GetStoredSize(alp_info_, for_info_, num_elements_); +} + +template <typename T> +int64_t AlpEncodedVector<T>::GetStoredSize(const AlpEncodedVectorInfo& alp_info, + const AlpEncodedForVectorInfo<T>& for_info, + int32_t num_elements) { + const int64_t bit_packed_size = + AlpEncodedForVectorInfo<T>::GetBitPackedSize(num_elements, for_info.bit_width()); + return AlpEncodedVectorInfo::kStoredSize + AlpEncodedForVectorInfo<T>::kStoredSize + + bit_packed_size + + alp_info.num_exceptions() * (sizeof(AlpConstants::PositionType) + sizeof(T)); +} + +template <typename T> +bool AlpEncodedVector<T>::operator==(const AlpEncodedVector<T>& other) const { + if (alp_info_ != other.alp_info_ || for_info_ != other.for_info_ || + num_elements_ != other.num_elements_) { + return false; + } + if (packed_values_.size() != other.packed_values_.size() || + !std::equal(packed_values_.begin(), packed_values_.end(), + other.packed_values_.begin())) { + return false; + } + if (exceptions_.size() != other.exceptions_.size() || + !std::equal(exceptions_.begin(), exceptions_.end(), other.exceptions_.begin())) { + return false; + } + if (exception_positions_.size() != other.exception_positions_.size() || + !std::equal(exception_positions_.begin(), exception_positions_.end(), + other.exception_positions_.begin())) { + return false; + } + return true; +} + +// ---------------------------------------------------------------------- +// AlpEncodedVectorView implementation + +template <typename T> +Result<AlpEncodedVectorView<T>> AlpEncodedVectorView<T>::LoadView( + arrow::util::span<const uint8_t> input_buffer, int32_t num_elements) { + if (num_elements > (1 << AlpConstants::kMaxLogVectorSize)) { + return Status::Invalid("ALP view element count too large: ", num_elements, + " > ", (1 << AlpConstants::kMaxLogVectorSize)); + } + + AlpEncodedVectorView<T> result; + int64_t input_offset = 0; + + // Load AlpInfo (4 bytes) + ARROW_ASSIGN_OR_RAISE( + AlpEncodedVectorInfo alp_info, + AlpEncodedVectorInfo::Load( + {input_buffer.data() + input_offset, AlpEncodedVectorInfo::kStoredSize})); + input_offset += AlpEncodedVectorInfo::kStoredSize; + result.set_alp_info(alp_info); + + // Load ForInfo + ARROW_ASSIGN_OR_RAISE( + AlpEncodedForVectorInfo<T> for_info, + AlpEncodedForVectorInfo<T>::Load( + {input_buffer.data() + input_offset, AlpEncodedForVectorInfo<T>::kStoredSize})); + input_offset += AlpEncodedForVectorInfo<T>::kStoredSize; + result.set_for_info(for_info); + + result.set_num_elements(num_elements); + + const int64_t overall_size = + AlpEncodedVector<T>::GetStoredSize(alp_info, for_info, num_elements); + + if (static_cast<int64_t>(input_buffer.size()) < overall_size) { + return Status::Invalid("ALP view buffer too small: ", input_buffer.size(), + " < ", overall_size); + } + + // Load data section (after metadata) + ARROW_ASSIGN_OR_RAISE( + AlpEncodedVectorView<T> data_view, + LoadViewDataOnly( + {input_buffer.data() + input_offset, input_buffer.size() - input_offset}, + alp_info, for_info, num_elements)); + + // Copy the loaded data into result + result.set_packed_values(data_view.packed_values()); + result.set_exception_positions(std::move(data_view.mutable_exception_positions())); + result.set_exceptions(std::move(data_view.mutable_exceptions())); + + return result; +} + +template <typename T> +Result<AlpEncodedVectorView<T>> AlpEncodedVectorView<T>::LoadViewDataOnly( + arrow::util::span<const uint8_t> input_buffer, const AlpEncodedVectorInfo& alp_info, + const AlpEncodedForVectorInfo<T>& for_info, int32_t num_elements) { + if (num_elements > (1 << AlpConstants::kMaxLogVectorSize)) { + return Status::Invalid("ALP view data element count too large: ", num_elements, + " > ", (1 << AlpConstants::kMaxLogVectorSize)); + } + + AlpEncodedVectorView<T> result; + result.set_alp_info(alp_info); + result.set_for_info(for_info); + result.set_num_elements(num_elements); + + const int64_t data_size = for_info.GetDataStoredSize(num_elements, alp_info.num_exceptions()); + if (static_cast<int64_t>(input_buffer.size()) < data_size) { + return Status::Invalid("ALP view data buffer too small: ", input_buffer.size(), + " < ", data_size); + } + + int64_t input_offset = 0; + + // Compute bit_packed_size from num_elements and bit_width + const int64_t bit_packed_size = + AlpEncodedForVectorInfo<T>::GetBitPackedSize(num_elements, for_info.bit_width()); + + // Zero-copy for packed values (bytes have no alignment requirements) + result.set_packed_values( + {input_buffer.data() + input_offset, static_cast<size_t>(bit_packed_size)}); + input_offset += bit_packed_size; + + // Copy exception positions into aligned storage to avoid UB from misaligned access. + // Exceptions are rare (typically < 5%), so the copy overhead is negligible. + const int64_t exception_position_size = + alp_info.num_exceptions() * sizeof(AlpConstants::PositionType); + result.mutable_exception_positions().resize(alp_info.num_exceptions()); + std::memcpy(result.mutable_exception_positions().data(), + input_buffer.data() + input_offset, exception_position_size); + input_offset += exception_position_size; + + // Copy exception values into aligned storage to avoid UB from misaligned access. + const int64_t exception_size = alp_info.num_exceptions() * sizeof(T); + result.mutable_exceptions().resize(alp_info.num_exceptions()); + std::memcpy(result.mutable_exceptions().data(), input_buffer.data() + input_offset, + exception_size); + + return result; +} + +template <typename T> +int64_t AlpEncodedVectorView<T>::GetStoredSize() const { + return AlpEncodedVector<T>::GetStoredSize(alp_info_, for_info_, num_elements_); +} + +template class AlpEncodedVectorView<float>; +template class AlpEncodedVectorView<double>; + +template class AlpEncodedVector<float>; +template class AlpEncodedVector<double>; + +// ---------------------------------------------------------------------- +// Internal helper classes + +namespace { + +/// \brief Helper class for encoding/decoding individual values +template <typename T> +class AlpInlines { + public: + using Constants = AlpTypedConstants<T>; + using ExactType = typename Constants::FloatingToExact; + using SignedExactType = typename Constants::FloatingToSignedExact; + + /// \brief Check if float is a special value that cannot be converted + static inline bool IsImpossibleToEncode(const T n) { + // We do not have to check for positive or negative infinity, since + // std::numeric_limits<T>::infinity() > std::numeric_limits<T>::max() + // and vice versa for negative infinity. + return std::isnan(n) || n > Constants::kEncodingUpperLimit || + n < Constants::kEncodingLowerLimit || + (n == 0.0 && std::signbit(n)); // Verification for -0.0 + } + + /// \brief Round a float to the nearest integer using the magic-number technique + static inline auto FastRound(T n) -> SignedExactType { + if (n >= 0) { + n = n + Constants::kMagicNumber - Constants::kMagicNumber; + } else { + n = n - Constants::kMagicNumber + Constants::kMagicNumber; + } + return static_cast<SignedExactType>(n); + } + + /// \brief Fast way to round float to nearest integer + static inline auto NumberToInt(T n) -> SignedExactType { + if (IsImpossibleToEncode(n)) { + return static_cast<SignedExactType>(Constants::kEncodingUpperLimit); + } + return FastRound(n); + } + + /// \brief Convert a float into an int using encoding options + static inline SignedExactType EncodeValue( + const T value, const AlpExponentAndFactor exponent_and_factor) { + const T tmp_encoded_value = value * + Constants::GetExponent(exponent_and_factor.exponent) * + Constants::GetFactor(exponent_and_factor.factor); + return NumberToInt(tmp_encoded_value); + } + + /// \brief Reconvert an int to a float using encoding options + static inline T DecodeValue(const SignedExactType encoded_value, + const AlpExponentAndFactor exponent_and_factor) { + // The cast to T is needed to prevent a signed integer overflow. + return static_cast<T>(encoded_value) * AlpConstants::GetFactor(exponent_and_factor.factor) * + Constants::GetFactor(exponent_and_factor.exponent); + } +}; + +/// \brief Helper struct for tracking compression combinations +struct AlpCombination { + AlpExponentAndFactor exponent_and_factor; + int64_t num_appearances{0}; + int64_t estimated_compression_size{0}; +}; + +/// \brief Compare two ALP combinations to determine which is better +/// +/// Return true if c1 is a better combination than c2. +/// First criteria is number of times it appears as best combination. +/// Second criteria is the estimated compression size. +/// Third criteria is bigger exponent. Review Comment: can you elaborate on why these are considered better? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
