prtkgaur commented on code in PR #48345:
URL: https://github.com/apache/arrow/pull/48345#discussion_r3148227040


##########
cpp/src/arrow/util/alp/ALP_Encoding_Specification_terse.md:
##########
@@ -0,0 +1,249 @@
+# ALP Encoding Specification

Review Comment:
   yes once that PR is merged, this will be removed. This is to make code 
easier to review in case someone wants to consult it.



##########
cpp/src/parquet/encoder.cc:
##########
@@ -995,6 +999,90 @@ class ByteStreamSplitEncoder<FLBAType> : public 
ByteStreamSplitEncoderBase<FLBAT
   }
 };
 
+// ----------------------------------------------------------------------
+// ALP encoder (Adaptive Lossless floating-Point)
+
+template <typename DType>
+class AlpEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using ArrowType = typename EncodingTraits<DType>::ArrowType;
+  using TypedEncoder<DType>::Put;
+
+  explicit AlpEncoder(const ColumnDescriptor* descr,
+                      ::arrow::MemoryPool* pool = 
::arrow::default_memory_pool())
+      : EncoderImpl(descr, Encoding::ALP, pool),
+        sink_{pool} {
+    static_assert(std::is_same<T, float>::value || std::is_same<T, 
double>::value,
+                  "ALP only supports float and double types");
+  }
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  std::shared_ptr<Buffer> FlushValues() override {
+    if (sink_.length() == 0) {
+      // Empty buffer case
+      PARQUET_ASSIGN_OR_THROW(auto buf, sink_.Finish());
+      return buf;
+    }
+
+    // Call AlpWrapper::Encode() - it handles sampling, preset selection, and 
compression
+    const size_t decompSize = sink_.length();
+    size_t compSize = 
::arrow::util::alp::AlpWrapper<T>::GetMaxCompressedSize(decompSize);
+
+    PARQUET_ASSIGN_OR_THROW(
+        auto compressed_buffer,
+        ::arrow::AllocateResizableBuffer(compSize, this->memory_pool()));
+
+    ::arrow::util::alp::AlpWrapper<T>::Encode(
+        reinterpret_cast<const T*>(sink_.data()),
+        decompSize,
+        reinterpret_cast<char*>(compressed_buffer->mutable_data()),
+        &compSize);
+
+    PARQUET_THROW_NOT_OK(compressed_buffer->Resize(compSize));
+    sink_.Reset();
+
+    return std::shared_ptr<Buffer>(std::move(compressed_buffer));
+  }
+
+  void Put(const T* buffer, int num_values) override {
+    if (num_values > 0) {
+      PARQUET_THROW_NOT_OK(

Review Comment:
   Umm not sure I don't see a benefit with that.
   
   The current approach decodes the full page once into `decoded_buffer_` and 
serves subsequent calls from it. The intermediate buffer is typically small 
(one page ≈ tens of KB). Incremental vector-by-vector decoding would reduce 
peak memory and avoid a memcpy for the no-nulls fast path, but adds complexity 
for vector-boundary tracking and partial reads. The performance benefit is 
marginal since the decode dominates the memcpy cost.



##########
cpp/src/arrow/util/alp/alp_constants.h:
##########
@@ -0,0 +1,256 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Constants and type traits for ALP compression
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/util/logging.h"
+
+namespace arrow {
+namespace util {
+namespace alp {
+
+// ----------------------------------------------------------------------
+// AlpConstants
+
+/// \brief Constants used throughout ALP compression
+class AlpConstants {
+ public:
+  /// Number of elements compressed together as a unit. Fixed for 
compatibility.
+  static constexpr uint64_t kAlpVectorSize = 1024;
+
+  /// Number of elements to use when determining sampling parameters.
+  static constexpr uint64_t kSamplerVectorSize = 4096;
+
+  /// Total number of elements in a rowgroup for sampling purposes.
+  static constexpr uint64_t kSamplerRowgroupSize = 122880;

Review Comment:
   These sampling constants come from the ALP paper (Afroozeh et al., SIGMOD 
2023). Added a citation comment(above) and inline derivations.



##########
cpp/src/arrow/util/alp/alp_sampler.cc:
##########
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/util/alp/alp_sampler.h"
+
+#include <cmath>
+
+#include "arrow/util/alp/alp.h"
+#include "arrow/util/alp/alp_constants.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/ubsan.h"
+
+namespace arrow {
+namespace util {
+namespace alp {
+
+// ----------------------------------------------------------------------
+// AlpSampler implementation
+
+template <typename T>
+AlpSampler<T>::AlpSampler()
+    : sample_vector_size_(AlpConstants::kSamplerVectorSize),
+      rowgroup_size_(AlpConstants::kSamplerRowgroupSize),
+      samples_per_vector_(AlpConstants::kSamplerSamplesPerVector),
+      
sample_vectors_per_rowgroup_(AlpConstants::kSamplerSampleVectorsPerRowgroup),
+      rowgroup_sample_jump_((rowgroup_size_ / sample_vectors_per_rowgroup_) /
+                            sample_vector_size_) {}
+
+template <typename T>
+void AlpSampler<T>::AddSample(arrow::util::span<const T> input) {
+  for (uint64_t i = 0; i < input.size(); i += sample_vector_size_) {
+    const uint64_t elements = std::min(input.size() - i, sample_vector_size_);
+    AddSampleVector({input.data() + i, elements});
+  }
+}
+
+template <typename T>
+void AlpSampler<T>::AddSampleVector(arrow::util::span<const T> input) {
+  const bool must_skip_current_vector =
+      MustSkipSamplingFromCurrentVector(vectors_count_, vectors_sampled_count_,
+                                        input.size());
+
+  vectors_count_ += 1;
+  total_values_count_ += input.size();
+  if (must_skip_current_vector) {
+    return;
+  }
+
+  const AlpSamplingParameters sampling_params = 
GetAlpSamplingParameters(input.size());
+
+  // Slice: take first num_lookup_value elements.
+  std::vector<T> current_vector_values(
+      input.begin(),
+      input.begin() + std::min<size_t>(sampling_params.num_lookup_value, 
input.size()));
+
+  // Stride: take every num_sampled_increments-th element.
+  std::vector<T> current_vector_sample;
+  for (size_t i = 0; i < current_vector_values.size();
+       i += sampling_params.num_sampled_increments) {
+    current_vector_sample.push_back(current_vector_values[i]);
+  }
+  sample_stored_ += current_vector_sample.size();
+
+  complete_vectors_sampled_.push_back(std::move(current_vector_values));
+  rowgroup_sample_.push_back(std::move(current_vector_sample));
+  vectors_sampled_count_++;
+}
+
+template <typename T>
+typename AlpSampler<T>::AlpSamplerResult AlpSampler<T>::Finalize() {
+  ARROW_LOG(DEBUG) << "AlpSampler finalized: vectorsSampled=" << 
vectors_sampled_count_
+                   << "/" << vectors_count_ << " total"
+                   << ", valuesSampled=" << sample_stored_ << "/" << 
total_values_count_
+                   << " total";
+
+  AlpSamplerResult result;
+  result.alp_preset = 
AlpCompression<T>::CreateEncodingPreset(rowgroup_sample_);
+
+  ARROW_LOG(DEBUG) << "AlpSampler preset: " << 
result.alp_preset.combinations.size()
+                   << " exponent/factor combinations"
+                   << ", estimatedSize=" << 
result.alp_preset.best_compressed_size
+                   << " bytes";
+
+  return result;
+}
+
+template <typename T>
+typename AlpSampler<T>::AlpSamplingParameters 
AlpSampler<T>::GetAlpSamplingParameters(
+    uint64_t num_current_vector_values) {
+  const uint64_t num_lookup_values =
+      std::min(num_current_vector_values,
+               static_cast<uint64_t>(AlpConstants::kAlpVectorSize));
+  // Sample equidistant values within a vector; jump a fixed number of values.
+  const uint64_t num_sampled_increments =
+      std::max(uint64_t{1}, static_cast<uint64_t>(std::ceil(
+                                static_cast<double>(num_lookup_values) /
+                                samples_per_vector_)));
+  const uint64_t num_sampled_values =
+      std::ceil(static_cast<double>(num_lookup_values) / 
num_sampled_increments);
+
+  ARROW_CHECK(num_sampled_values < AlpConstants::kAlpVectorSize) << 
"alp_sample_too_large";

Review Comment:
   Added a safety proof comment. `num_lookup_values` is capped at 
`kAlpVectorSize` and `num_sampled_increments >= 1`, so `ceil(num_lookup_values 
/ num_sampled_increments) <= kAlpVectorSize` always holds. This operates on 
internally-computed values (not untrusted data), so the ARROW_CHECK is a 
defensive invariant, not a reachable error path



##########
cpp/src/arrow/util/alp/alp_wrapper.cc:
##########
@@ -0,0 +1,435 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/util/alp/alp_wrapper.h"
+
+#include <cmath>
+#include <optional>
+
+#include "arrow/util/alp/alp.h"
+#include "arrow/util/alp/alp_constants.h"
+#include "arrow/util/alp/alp_sampler.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/ubsan.h"
+
+namespace arrow {
+namespace util {
+namespace alp {
+
+namespace {
+
+// ----------------------------------------------------------------------
+// AlpHeader
+
+/// \brief Header structure for ALP compression blocks
+///
+/// Contains page-level metadata for ALP compression. The num_elements field
+/// stores the total element count for the page, allowing per-vector element
+/// counts to be inferred (all vectors except the last have vector_size 
elements).
+///
+/// Note: num_elements is uint32_t because Parquet page headers use i32 for 
num_values.
+/// See: 
https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift
+///
+/// Note: log_vector_size stores the base-2 logarithm of the vector size.
+/// The actual vector size is computed as: 1u << log_vector_size (i.e., 
2^log_vector_size).
+/// For example, log_vector_size=10 means vector_size=1024.
+/// This allows representing any power-of-2 vector size up to 2^255 in a 
single byte.
+///
+/// Header format (version 1):
+///
+///   +---------------------------------------------------+
+///   |  AlpHeader (8 bytes)                              |
+///   +---------------------------------------------------+
+///   |  Offset |  Field              |  Size             |
+///   +---------+---------------------+-------------------+
+///   |    0    |  version            |  1 byte (uint8)   |
+///   |    1    |  compression_mode   |  1 byte (uint8)   |
+///   |    2    |  integer_encoding   |  1 byte (uint8)   |
+///   |    3    |  log_vector_size    |  1 byte (uint8)   |
+///   |    4    |  num_elements       |  4 bytes (uint32) |
+///   +---------------------------------------------------+
+///
+/// Page-level layout (metadata-at-start for efficient random access):
+///
+///   +-------------------------------------------------------------------+
+///   |  [AlpHeader (8B)]                                                 |
+///   |  [VectorInfo₀ | VectorInfo₁ | ... | VectorInfoₙ]  ← Metadata      |
+///   |  [Data₀ | Data₁ | ... | Dataₙ]                    ← Data sections |
+///   +-------------------------------------------------------------------+
+///
+/// This layout enables O(1) random access to any vector by:
+/// 1. Reading all VectorInfo first (contiguous, cache-friendly)
+/// 2. Computing data offsets from VectorInfo
+/// 3. Seeking directly to the target vector's data
+///
+/// \note version must remain the first field to allow reading the rest
+///       of the header based on version number.
+struct AlpHeader {
+  /// Version number. Must remain the first field for version-based parsing.
+  uint8_t version = 0;
+  /// Compression mode (currently only kAlp is supported).
+  uint8_t compression_mode = static_cast<uint8_t>(AlpMode::kAlp);
+  /// Integer encoding method used (currently only kForBitPack is supported).
+  uint8_t integer_encoding = 
static_cast<uint8_t>(AlpIntegerEncoding::kForBitPack);
+  /// Log base 2 of vector size. Actual vector size = 1u << log_vector_size.
+  /// For example: 10 means 2^10 = 1024 elements per vector.
+  uint8_t log_vector_size = 0;
+  /// Total number of elements in the page (uint32_t since Parquet uses i32).
+  /// Per-vector element count is inferred: vector_size for all but the last 
vector.
+  uint32_t num_elements = 0;
+
+  /// \brief Get the size in bytes of the AlpHeader for a version
+  ///
+  /// \param[in] v the version number
+  /// \return the size in bytes
+  static constexpr size_t GetSizeForVersion(uint8_t v) {
+    // Version 1 header is 8 bytes
+    return (v == 1) ? 8 : 0;
+  }
+
+  /// \brief Check whether the given version is valid
+  ///
+  /// \param[in] v the version to check
+  /// \return the version if valid, otherwise asserts
+  static uint8_t IsValidVersion(uint8_t v) {
+    ARROW_CHECK(v == 1) << "invalid_version: " << static_cast<int>(v);
+    return v;
+  }
+
+  /// \brief Compute the actual vector size from log_vector_size
+  ///
+  /// \return the vector size (2^log_vector_size)
+  uint32_t GetVectorSize() const { return 1u << log_vector_size; }
+
+  /// \brief Compute log base 2 of a power-of-2 value
+  ///
+  /// \param[in] value a power-of-2 value
+  /// \return the log base 2 of value
+  static uint8_t Log2(uint32_t value) {
+    ARROW_CHECK(value > 0 && (value & (value - 1)) == 0)
+        << "value_must_be_power_of_2: " << value;
+    uint8_t log = 0;
+    while ((1u << log) < value) {
+      ++log;
+    }
+    return log;
+  }
+
+  /// \brief Calculate the number of elements for a given vector index
+  ///
+  /// \param[in] vector_index the 0-based index of the vector
+  /// \return the number of elements in this vector
+  uint16_t GetVectorNumElements(uint64_t vector_index) const {
+    const uint32_t vector_size = GetVectorSize();
+    const uint64_t num_full_vectors = num_elements / vector_size;
+    const uint64_t remainder = num_elements % vector_size;
+    if (vector_index < num_full_vectors) {
+      return static_cast<uint16_t>(vector_size);  // Full vector
+    } else if (vector_index == num_full_vectors && remainder > 0) {
+      return static_cast<uint16_t>(remainder);  // Last partial vector
+    }
+    return 0;  // Invalid index
+  }
+
+  /// \brief Get the AlpMode enum from the stored uint8_t
+  AlpMode GetCompressionMode() const {
+    return static_cast<AlpMode>(compression_mode);
+  }
+
+  /// \brief Get the AlpIntegerEncoding enum from the stored uint8_t
+  AlpIntegerEncoding GetIntegerEncoding() const {
+    return static_cast<AlpIntegerEncoding>(integer_encoding);
+  }
+};
+
+}  // namespace
+
+// ----------------------------------------------------------------------
+// AlpWrapper::AlpHeader definition
+
+template <typename T>
+struct AlpWrapper<T>::AlpHeader : public ::arrow::util::alp::AlpHeader {
+};
+
+// ----------------------------------------------------------------------
+// AlpWrapper implementation
+
+template <typename T>
+typename AlpWrapper<T>::AlpHeader AlpWrapper<T>::LoadHeader(
+    const char* comp, size_t comp_size) {
+  ARROW_CHECK(comp_size >= 1) << 
"alp_loadHeader_compSize_too_small_for_version";
+  uint8_t version;
+  std::memcpy(&version, comp, sizeof(version));
+  AlpHeader::IsValidVersion(version);
+  const size_t header_size = AlpHeader::GetSizeForVersion(version);
+  ARROW_CHECK(comp_size >= header_size) << "alp_loadHeader_compSize_too_small";
+  AlpHeader header{};
+  std::memcpy(&header, comp, header_size);
+  return header;
+}
+
+template <typename T>
+void AlpWrapper<T>::Encode(const T* decomp, size_t decomp_size, char* comp,
+                           size_t* comp_size, std::optional<AlpMode> 
enforce_mode) {
+  ARROW_CHECK(decomp_size % sizeof(T) == 0) << 
"alp_encode_input_must_be_multiple_of_T";
+  const uint64_t element_count = decomp_size / sizeof(T);
+  const uint8_t version =
+      AlpHeader::IsValidVersion(AlpConstants::kAlpVersion);
+
+  AlpSampler<T> sampler;
+  sampler.AddSample({decomp, element_count});
+  auto sampling_result = sampler.Finalize();
+
+  // Make room to store header afterwards.
+  char* encoded_header = comp;
+  const size_t header_size = AlpHeader::GetSizeForVersion(version);
+  comp += header_size;
+  const uint64_t remaining_compressed_size = *comp_size - header_size;
+
+  const CompressionProgress compression_progress =
+      EncodeAlp(decomp, element_count, comp, remaining_compressed_size,
+                sampling_result.alp_preset);
+
+  AlpHeader header{};
+  header.version = version;
+  header.compression_mode = static_cast<uint8_t>(AlpMode::kAlp);
+  header.integer_encoding = 
static_cast<uint8_t>(AlpIntegerEncoding::kForBitPack);
+  header.log_vector_size = AlpHeader::Log2(AlpConstants::kAlpVectorSize);
+  header.num_elements = static_cast<uint32_t>(element_count);
+
+  std::memcpy(encoded_header, &header, header_size);
+  *comp_size = header_size + 
compression_progress.num_compressed_bytes_produced;
+}
+
+template <typename T>
+template <typename TargetType>
+void AlpWrapper<T>::Decode(TargetType* decomp, uint32_t num_elements, const 
char* comp,
+                           size_t comp_size) {
+  const AlpHeader header = LoadHeader(comp, comp_size);
+  const uint32_t vector_size = header.GetVectorSize();
+  ARROW_CHECK(vector_size == AlpConstants::kAlpVectorSize)
+      << "unsupported_vector_size: " << vector_size;
+
+  const size_t header_size = AlpHeader::GetSizeForVersion(header.version);
+  const char* compression_body = comp + header_size;
+  const uint64_t compression_body_size = comp_size - header_size;
+
+  ARROW_CHECK(header.GetCompressionMode() == AlpMode::kAlp)
+      << "alp_decode_unsupported_mode";
+
+  DecodeAlp<TargetType>(decomp, num_elements, compression_body, 
compression_body_size,
+                        header.GetIntegerEncoding(), vector_size,
+                        header.num_elements);
+}
+
+template void AlpWrapper<float>::Decode(float* decomp, uint32_t num_elements,
+                                        const char* comp, size_t comp_size);
+template void AlpWrapper<float>::Decode(double* decomp, uint32_t num_elements,
+                                        const char* comp, size_t comp_size);
+template void AlpWrapper<double>::Decode(double* decomp, uint32_t num_elements,
+                                         const char* comp, size_t comp_size);
+
+template <typename T>
+uint64_t AlpWrapper<T>::GetMaxCompressedSize(uint64_t decomp_size) {
+  ARROW_CHECK(decomp_size % sizeof(T) == 0)
+      << "alp_decompressed_size_not_multiple_of_T";
+  const uint64_t element_count = decomp_size / sizeof(T);
+  const uint8_t version =
+      AlpHeader::IsValidVersion(AlpConstants::kAlpVersion);
+  uint64_t max_alp_size = AlpHeader::GetSizeForVersion(version);
+  // Add per-vector metadata sizes: AlpInfo (4 bytes) + ForInfo (6/10 bytes)
+  const uint64_t vectors_count =
+      static_cast<uint64_t>(std::ceil(static_cast<double>(element_count) / 
AlpConstants::kAlpVectorSize));
+  max_alp_size +=
+      (AlpEncodedVectorInfo::kStoredSize + 
AlpEncodedForVectorInfo<T>::kStoredSize) * vectors_count;
+  // Worst case: everything is an exception, except two values that are chosen
+  // with large difference to make FOR encoding for placeholders impossible.
+  // Values/placeholders.
+  max_alp_size += element_count * sizeof(T);
+  // Exceptions.
+  max_alp_size += element_count * sizeof(T);
+  // Exception positions.
+  max_alp_size += element_count * sizeof(AlpConstants::PositionType);
+
+  return max_alp_size;
+}
+
+template <typename T>
+auto AlpWrapper<T>::EncodeAlp(const T* decomp, uint64_t element_count, char* 
comp,
+                              size_t comp_size, const AlpEncodingPreset& 
combinations)
+    -> CompressionProgress {
+  // GROUPED METADATA LAYOUT:
+  // [AlpInfo₀ | AlpInfo₁ | ... | AlpInfoₙ]     ← All ALP metadata (4B each)
+  // [ForInfo₀ | ForInfo₁ | ... | ForInfoₙ]     ← All FOR metadata (6/10B each)
+  // [Data₀ | Data₁ | ... | Dataₙ]               ← All data sections
+
+  // Phase 1: Compress all vectors and collect them
+  std::vector<AlpEncodedVector<T>> encoded_vectors;
+  const uint64_t num_vectors =
+      (element_count + AlpConstants::kAlpVectorSize - 1) / 
AlpConstants::kAlpVectorSize;
+  encoded_vectors.reserve(num_vectors);
+
+  uint64_t input_offset = 0;

Review Comment:
   Updated



##########
cpp/src/arrow/util/alp/alp.h:
##########
@@ -0,0 +1,843 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Adaptive Lossless floating-Point (ALP) compression implementation
+
+#pragma once
+
+#include <vector>
+
+#include "arrow/util/alp/alp_constants.h"
+#include "arrow/util/small_vector.h"
+#include "arrow/util/span.h"
+
+namespace arrow {
+namespace util {
+namespace alp {
+
+// ----------------------------------------------------------------------
+// ALP Overview
+//
+// IMPORTANT: For abstract interfaces or examples how to use ALP, consult
+// alp_wrapper.h.
+// This is our implementation of the adaptive lossless floating-point
+// compression for decimals (ALP) (https://dl.acm.org/doi/10.1145/3626717).
+// It works by converting a float into a decimal (if possible). The exponent
+// and factor are chosen per vector. Each float is converted using
+// c(f) = int64(f * 10^exponent * 10^-factor). The converted floats are then
+// encoded via a delta frame of reference and bitpacked. Every exception,
+// where the conversion/reconversion changes the value of the float, is stored
+// separately and has to be patched into the decompressed vector afterwards.
+//
+// ==========================================================================
+//                    ALP COMPRESSION/DECOMPRESSION PIPELINE
+// ==========================================================================
+//
+// COMPRESSION FLOW:
+// -----------------
+//
+//   Input: float/double array
+//        |
+//        v
+//   +------------------------------------------------------------------+
+//   | 1. SAMPLING & PRESET GENERATION                                  |
+//   |    * Sample vectors from dataset                                 |
+//   |    * Try all exponent/factor combinations (e, f)                 |
+//   |    * Select best k combinations for preset                       |
+//   +------------------------------------+-----------------------------+
+//                                        | preset.combinations
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 2. PER-VECTOR COMPRESSION                                        |
+//   |    a) Find best (e,f) from preset for this vector                |
+//   |    b) Encode: encoded[i] = int64(value[i] * 10^e * 10^-f)        |
+//   |    c) Verify: if decode(encoded[i]) != value[i] -> exception     |
+//   |    d) Replace exceptions with placeholder value                  |
+//   +------------------------------------+-----------------------------+
+//                                        | encoded integers + exceptions
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 3. FRAME OF REFERENCE (FOR)                                      |
+//   |    * Find min value in encoded integers                          |
+//   |    * Subtract min from all values: delta[i] = encoded[i] - min   |
+//   +------------------------------------+-----------------------------+
+//                                        | delta values (smaller range)
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 4. BIT PACKING                                                   |
+//   |    * Calculate bit_width = log2(max_delta)                       |
+//   |    * Pack each value into bit_width bits                         |
+//   |    * Result: tightly packed binary data                          |
+//   +------------------------------------+-----------------------------+
+//                                        | packed bytes
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 5. SERIALIZATION (offset-based interleaved layout)              |
+//   |    [Header][Offsets...][Vector₀][Vector₁]...                    |
+//   |    where each Vector = [AlpInfo|ForInfo|Data]                   |
+//   +------------------------------------------------------------------+
+//
+//
+// DECOMPRESSION FLOW:
+// -------------------
+//
+//   Serialized bytes -> AlpEncodedVector::Load()
+//        |
+//        v
+//   +------------------------------------------------------------------+
+//   | 1. BIT UNPACKING                                                 |
+//   |    * Extract bit_width from metadata                             |
+//   |    * Unpack each value from bit_width bits -> delta values       |
+//   +------------------------------------+-----------------------------+
+//                                        | delta values
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 2. REVERSE FRAME OF REFERENCE (unFOR)                            |
+//   |    * Add back min: encoded[i] = delta[i] + frame_of_reference    |
+//   +------------------------------------+-----------------------------+
+//                                        | encoded integers
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 3. DECODE                                                        |
+//   |    * Apply inverse formula: value[i] = encoded[i] * 10^-e * 10^f |
+//   +------------------------------------+-----------------------------+
+//                                        | decoded floats (with placeholders)
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 4. PATCH EXCEPTIONS                                              |
+//   |    * Replace values at exception_positions[] with exceptions[]   |
+//   +------------------------------------+-----------------------------+
+//                                        |
+//                                        v
+//   Output: Original float/double array (lossless!)
+//
+// ==========================================================================
+
+// ----------------------------------------------------------------------
+// AlpMode
+
+/// \brief ALP compression mode
+///
+/// Currently only ALP (decimal compression) is implemented.
+enum class AlpMode { kAlp };
+
+// ----------------------------------------------------------------------
+// AlpExponentAndFactor
+
+/// \brief Helper struct to encapsulate the exponent and factor
+struct AlpExponentAndFactor {
+  uint8_t exponent{0};
+  uint8_t factor{0};
+
+  bool operator==(const AlpExponentAndFactor& other) const {
+    return exponent == other.exponent && factor == other.factor;
+  }
+
+  /// \brief Comparison operator for deterministic std::map ordering
+  bool operator<(const AlpExponentAndFactor& other) const {
+    if (exponent != other.exponent) return exponent < other.exponent;
+    return factor < other.factor;
+  }
+};
+
+// ----------------------------------------------------------------------
+// AlpEncodedVectorInfo (non-templated, ALP core metadata)
+
+/// \brief ALP-specific metadata for an encoded vector (non-templated)
+///
+/// Contains the metadata specific to ALP's float-to-integer conversion:
+///   - exponent/factor: parameters for decimal encoding
+///   - num_exceptions: count of values that couldn't be losslessly encoded
+///
+/// This struct is the same size regardless of the floating-point type 
(float/double).
+/// It is separate from the integer encoding metadata (e.g., FOR) to allow
+/// different integer encodings to be used in the future.
+///
+/// Serialization format (4 bytes):
+///
+///   +------------------------------------------+
+///   |  AlpEncodedVectorInfo (4 bytes)          |
+///   +------------------------------------------+
+///   |  Offset |  Field              |  Size    |
+///   +---------+---------------------+----------+
+///   |    0    |  exponent (uint8_t) |  1 byte  |
+///   |    1    |  factor (uint8_t)   |  1 byte  |
+///   |    2    |  num_exceptions     |  2 bytes |
+///   +------------------------------------------+
+struct AlpEncodedVectorInfo {
+  /// Exponent used for decimal encoding (multiply by 10^exponent)
+  uint8_t exponent = 0;
+  /// Factor used for decimal encoding (divide by 10^factor)
+  uint8_t factor = 0;
+  /// Number of exceptions stored in this vector
+  uint16_t num_exceptions = 0;
+
+  /// Size of the serialized portion (4 bytes, fixed)
+  static constexpr uint64_t kStoredSize = 4;
+
+  /// \brief Store the ALP metadata into an output buffer
+  void Store(arrow::util::span<char> output_buffer) const;
+
+  /// \brief Load ALP metadata from an input buffer
+  static AlpEncodedVectorInfo Load(arrow::util::span<const char> input_buffer);
+
+  /// \brief Get serialized size of the ALP metadata
+  static uint64_t GetStoredSize() { return kStoredSize; }
+
+  /// \brief Get exponent and factor as a combined struct
+  AlpExponentAndFactor GetExponentAndFactor() const {
+    return AlpExponentAndFactor{exponent, factor};
+  }
+
+  bool operator==(const AlpEncodedVectorInfo& other) const {
+    return exponent == other.exponent && factor == other.factor &&
+           num_exceptions == other.num_exceptions;
+  }
+
+  bool operator!=(const AlpEncodedVectorInfo& other) const { return !(*this == 
other); }
+};
+
+// ----------------------------------------------------------------------
+// AlpEncodedForVectorInfo (templated, FOR integer encoding metadata)
+
+/// \brief FOR (Frame of Reference) encoding metadata for an encoded vector
+///
+/// Contains the metadata specific to FOR bit-packing integer encoding:
+///   - frame_of_reference: minimum value subtracted from all encoded integers
+///   - bit_width: number of bits used to pack each delta value
+///
+/// This struct is templated because frame_of_reference size depends on T:
+///   - float:  uint32_t frame_of_reference (4 bytes)
+///   - double: uint64_t frame_of_reference (8 bytes)
+///
+/// Serialization format for float (5 bytes):
+///
+///   +------------------------------------------+
+///   |  AlpEncodedForVectorInfo<float> (5B)     |
+///   +------------------------------------------+
+///   |  Offset |  Field              |  Size    |
+///   +---------+---------------------+----------+
+///   |    0    |  frame_of_reference |  4 bytes |
+///   |    4    |  bit_width (uint8_t)|  1 byte  |
+///   +------------------------------------------+
+///
+/// Serialization format for double (9 bytes):
+///
+///   +------------------------------------------+
+///   |  AlpEncodedForVectorInfo<double> (9B)    |
+///   +------------------------------------------+
+///   |  Offset |  Field              |  Size    |
+///   +---------+---------------------+----------+
+///   |    0    |  frame_of_reference |  8 bytes |
+///   |    8    |  bit_width (uint8_t)|  1 byte  |
+///   +------------------------------------------+
+///
+/// \tparam T the floating point type (float or double)
+template <typename T>
+struct AlpEncodedForVectorInfo {
+  static_assert(std::is_same_v<T, float> || std::is_same_v<T, double>,
+                "AlpEncodedForVectorInfo only supports float and double");
+
+  /// Use uint32_t for float, uint64_t for double (matches encoded integer 
size)
+  using ExactType = typename AlpTypedConstants<T>::FloatingToExact;
+
+  /// Delta used for frame of reference encoding (4 bytes for float, 8 for 
double)
+  ExactType frame_of_reference = 0;
+  /// Bitwidth used for bitpacking
+  uint8_t bit_width = 0;
+
+  /// Size of the serialized portion (5 bytes for float, 9 for double)
+  static constexpr uint64_t kStoredSize = sizeof(ExactType) + 1;
+
+  /// \brief Compute the bitpacked size in bytes from num_elements and 
bit_width
+  ///
+  /// \param[in] num_elements number of elements in this vector
+  /// \param[in] bit_width bits per element
+  /// \return the size in bytes of the bitpacked data
+  static uint64_t GetBitPackedSize(uint16_t num_elements, uint8_t bit_width) {
+    return (static_cast<uint64_t>(num_elements) * bit_width + 7) / 8;
+  }
+
+  /// \brief Store the FOR metadata into an output buffer
+  void Store(arrow::util::span<char> output_buffer) const;
+
+  /// \brief Load FOR metadata from an input buffer
+  static AlpEncodedForVectorInfo Load(arrow::util::span<const char> 
input_buffer);
+
+  /// \brief Get serialized size of the FOR metadata
+  static uint64_t GetStoredSize() { return kStoredSize; }
+
+  /// \brief Get the size of the data section (packed values + exceptions)
+  ///
+  /// \param[in] num_elements number of elements in this vector
+  /// \param[in] num_exceptions number of exceptions (from 
AlpEncodedVectorInfo)
+  /// \return the size in bytes of packed values + exception positions + 
exceptions
+  uint64_t GetDataStoredSize(uint16_t num_elements, uint16_t num_exceptions) 
const {
+    const uint64_t bit_packed_size = GetBitPackedSize(num_elements, bit_width);
+    return bit_packed_size +
+           num_exceptions * (sizeof(AlpConstants::PositionType) + sizeof(T));
+  }
+
+  bool operator==(const AlpEncodedForVectorInfo& other) const {
+    return frame_of_reference == other.frame_of_reference &&
+           bit_width == other.bit_width;
+  }
+
+  bool operator!=(const AlpEncodedForVectorInfo& other) const { return !(*this 
== other); }
+};
+
+// ----------------------------------------------------------------------
+// AlpEncodedVector
+
+/// \class AlpEncodedVector
+/// \brief A compressed ALP vector with metadata
+///
+/// Per-vector data layout:
+///
+///   +------------------------------------------------------------+
+///   |  AlpEncodedVector<T> Data Layout                           |
+///   +------------------------------------------------------------+
+///   |  Section              |  Size (bytes)        | Description |
+///   +-----------------------+----------------------+-------------+
+///   |  1. AlpInfo           |  4B (fixed)          |  ALP meta   |
+///   +-----------------------+----------------------+-------------+
+///   |  2. ForInfo           |  6B (float) or       |  FOR meta   |
+///   |                       |  10B (double)        |             |
+///   +-----------------------+----------------------+-------------+
+///   |  3. Packed Values     |  bit_packed_size     |  Bitpacked  |
+///   |     (compressed data) |  (computed)          |  integers   |
+///   +-----------------------+----------------------+-------------+
+///   |  4. Exception Pos     |  num_exceptions * 2  |  uint16_t[] |
+///   |     (indices)         |  (variable)          |  positions  |
+///   +-----------------------+----------------------+-------------+
+///   |  5. Exception Values  |  num_exceptions *    |  T[] (float/|
+///   |     (original floats) |  sizeof(T)           |  double)    |
+///   +------------------------------------------------------------+
+///
+/// Page-level layout (offset-based interleaved for O(1) random access):
+///
+///   +------------------------------------------------------------+
+///   |  Page Layout                                               |
+///   +------------------------------------------------------------+
+///   |  [Header (8B)]                                             |
+///   |  [Offset₀ | Offset₁ | ... | Offsetₙ₋₁]   ← Vector offsets  |
+///   |  [Vector₀][Vector₁]...[Vectorₙ₋₁]        ← Interleaved     |
+///   +------------------------------------------------------------+
+///   where each Vector = [AlpInfo | ForInfo | Data]
+///
+/// The offset-based layout enables:
+/// - O(1) random access to any vector via offset lookup
+/// - Better locality for single-vector decompression
+/// - Parallel decompression without coordination
+///
+/// Example for 1024 floats with 5 exceptions and bit_width=8:
+///   - AlpInfo:            4 bytes (fixed)
+///   - ForInfo:            6 bytes (float)
+///   - Packed Values:   1024 bytes (1024 * 8 bits / 8)
+///   - Exception Pos:     10 bytes (5 * 2)
+///   - Exception Values:  20 bytes (5 * 4)
+///   Total:             1064 bytes
+template <typename T>
+class AlpEncodedVector {
+ public:
+  /// ALP-specific metadata (exponent, factor, num_exceptions)
+  AlpEncodedVectorInfo alp_info;
+  /// FOR-specific metadata (frame_of_reference, bit_width)
+  AlpEncodedForVectorInfo<T> for_info;
+  /// Number of elements in this vector (not serialized; from page header)
+  uint16_t num_elements = 0;
+  /// Successfully encoded and bitpacked data
+  arrow::internal::StaticVector<uint8_t, AlpConstants::kAlpVectorSize * 
sizeof(T)>
+      packed_values;
+  /// Float values that could not be converted successfully
+  arrow::internal::StaticVector<T, AlpConstants::kAlpVectorSize> exceptions;
+  /// Positions of the exceptions in the decompressed vector
+  arrow::internal::StaticVector<uint16_t, AlpConstants::kAlpVectorSize> 
exception_positions;
+
+  /// Total metadata size (AlpInfo + ForInfo)
+  static constexpr uint64_t kMetadataStoredSize =
+      AlpEncodedVectorInfo::kStoredSize + 
AlpEncodedForVectorInfo<T>::kStoredSize;
+
+  /// \brief Get the size of the vector if stored into a sequential memory 
block
+  ///
+  /// \return the stored size in bytes
+  uint64_t GetStoredSize() const;
+
+  /// \brief Get the stored size for given metadata and element count
+  ///
+  /// \param[in] alp_info the ALP metadata
+  /// \param[in] for_info the FOR metadata
+  /// \param[in] num_elements the number of elements in this vector
+  /// \return the stored size in bytes
+  static uint64_t GetStoredSize(const AlpEncodedVectorInfo& alp_info,
+                                const AlpEncodedForVectorInfo<T>& for_info,
+                                uint16_t num_elements);
+
+  /// \brief Get the number of elements in this vector
+  ///
+  /// \return number of elements
+  uint64_t GetNumElements() const { return num_elements; }
+
+  /// \brief Store the compressed vector in a compact format into an output 
buffer
+  ///
+  /// Stores 
[AlpInfo][ForInfo][PackedValues][ExceptionPositions][ExceptionValues]
+  ///
+  /// \param[out] output_buffer the buffer to store the compressed data into
+  void Store(arrow::util::span<char> output_buffer) const;
+
+  /// \brief Store only the data section (without metadata) into an output 
buffer
+  ///
+  /// Stores [PackedValues][ExceptionPositions][ExceptionValues]
+  /// Used when metadata (AlpInfo, ForInfo) is written separately.
+  ///
+  /// \param[out] output_buffer the buffer to store the data section into
+  void StoreDataOnly(arrow::util::span<char> output_buffer) const;
+
+  /// \brief Get the size of the data section only (without metadata)
+  ///
+  /// \return the size in bytes of packed values + exception positions + 
exceptions
+  uint64_t GetDataStoredSize() const {
+    return for_info.GetDataStoredSize(num_elements, alp_info.num_exceptions);
+  }
+
+  /// \brief Load a compressed vector from a compact format from an input 
buffer
+  ///
+  /// \param[in] input_buffer the buffer to load from
+  /// \param[in] num_elements the number of elements (from page header)
+  /// \return the loaded AlpEncodedVector
+  static AlpEncodedVector Load(arrow::util::span<const char> input_buffer,
+                               uint16_t num_elements);
+
+  bool operator==(const AlpEncodedVector<T>& other) const;
+};
+
+// ----------------------------------------------------------------------
+// AlpEncodedVectorView
+
+/// \class AlpEncodedVectorView
+/// \brief A view into compressed ALP data optimized for decompression
+///
+/// Unlike AlpEncodedVector which copies all data into internal buffers,
+/// AlpEncodedVectorView uses zero-copy for the large packed values array
+/// while copying the small exception arrays into aligned storage.
+///
+/// The packed values are accessed via a span (zero-copy) since they are
+/// byte arrays with no alignment requirements. Exception positions and
+/// values are copied into aligned StaticVectors because:
+///   1. The serialized data may not be properly aligned for uint16_t/T access
+///   2. Exceptions are rare (typically < 5%), so copying is negligible
+///   3. This avoids undefined behavior from misaligned memory access
+///
+/// Use LoadView() to create a view, then pass to DecompressVectorView().
+/// The underlying buffer must remain valid for the lifetime of the view
+/// (for packed_values access).
+template <typename T>
+struct AlpEncodedVectorView {
+  /// ALP-specific metadata (exponent, factor, num_exceptions)
+  AlpEncodedVectorInfo alp_info;
+  /// FOR-specific metadata (frame_of_reference, bit_width)
+  AlpEncodedForVectorInfo<T> for_info;
+  /// Number of elements in this vector (not serialized; from page header)
+  uint16_t num_elements = 0;

Review Comment:
   Good catch! Added `kMaxLogVectorSize = 15` in `alp_constants.h` with a 
comment explaining why (2^16 = 65536 overflows uint16_t). `Decode()` validates 
`log_vector_size <= 15` and returns `Status::Invalid` if exceeded.



##########
cpp/src/arrow/util/alp/alp_constants.h:
##########
@@ -0,0 +1,256 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Constants and type traits for ALP compression
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/util/logging.h"
+
+namespace arrow {
+namespace util {
+namespace alp {
+
+// ----------------------------------------------------------------------
+// AlpConstants
+
+/// \brief Constants used throughout ALP compression
+class AlpConstants {
+ public:
+  /// Number of elements compressed together as a unit. Fixed for 
compatibility.
+  static constexpr uint64_t kAlpVectorSize = 1024;
+
+  /// Number of elements to use when determining sampling parameters.
+  static constexpr uint64_t kSamplerVectorSize = 4096;
+
+  /// Total number of elements in a rowgroup for sampling purposes.
+  static constexpr uint64_t kSamplerRowgroupSize = 122880;
+
+  /// Number of samples to collect per vector during the sampling phase.
+  static constexpr uint64_t kSamplerSamplesPerVector = 256;
+
+  /// Number of sample vectors to collect per rowgroup.
+  static constexpr uint64_t kSamplerSampleVectorsPerRowgroup = 8;
+
+  /// Version number for the ALP compression format.
+  static constexpr uint8_t kAlpVersion = 1;
+
+  /// Type used to store exception positions within a compressed vector.
+  using PositionType = uint16_t;
+
+  /// Threshold for early exit during sampling when compression quality is 
poor.
+  static constexpr uint8_t kSamplingEarlyExitThreshold = 4;
+
+  /// Maximum number of exponent-factor combinations to try during compression.
+  static constexpr uint8_t kMaxCombinations = 5;

Review Comment:
   They serve different purposes: `kMaxCombinations` bounds how many 
exponent/factor pairs we store in the preset, while 
`kSamplingEarlyExitThreshold` bounds how many consecutive worse results we 
tolerate before early-exiting during per-vector selection. Deriving one from 
the other would couple them incorrectly. Added a `static_assert` so the 
early-exit path is always reachable, plus a comment explaining the relationship.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to