prtkgaur commented on code in PR #48345: URL: https://github.com/apache/arrow/pull/48345#discussion_r2600329903
########## cpp/src/arrow/util/alp/data/floatingpoint_data.tar.gz: ########## Review Comment: Makes sense. Thanks. https://github.com/apache/parquet-testing/pull/100 ########## cpp/src/parquet/encoding_alp_benchmark.cc: ########## @@ -0,0 +1,1823 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <chrono> +#include <cmath> +#include <cstring> +#include <filesystem> +#include <fstream> +#include <iostream> +#include <memory> +#include <random> +#include <sstream> +#include <unistd.h> +#include <unordered_set> +#include <vector> + +#include <benchmark/benchmark.h> + +#include "arrow/buffer.h" +#include "arrow/util/alp/AlpWrapper.h" +#include "arrow/util/compression.h" +#include "parquet/encoding.h" +#include "parquet/schema.h" +#include "parquet/types.h" + +// This file benchmarks multiple encoding schemes for floating point values in +// Parquet. Structure mirrors Snowflake's FloatComprBenchmark.cpp +// +// It evaluates: +// 1) Compression Ratio +// 2) Encoding Speed +// 3) Decoding Speed +// +// Encoding schemes: +// 1) ALP encoding +// 2) ByteStreamSplit encoding +// 3) ZSTD compression +// +// On synthetic datasets: +// 1) Constant Value +// 2) Increasing values +// 3) Small Range decimal +// 4) Range decimal +// 5) Large Range decimal +// 6) Random values +// +// And real-world datasets: +// 1) floatingpoint_spotify1.csv (9 columns) +// 2) floatingpoint_spotify2.csv (9 columns) +// 3) floatingpoint_citytemperature.csv (1 column) +// 4) floatingpoint_poi.csv (2 columns) +// 5) floatingpoint_birdmigration.csv (1 column) +// 6) floatingpoint_commongovernment.csv (3 columns) +// 7) floatingpoint_arade.csv (4 columns) +// 8) floatingpoint_num_brain.csv (1 column) +// 9) floatingpoint_num_comet.csv (1 column) +// 10) floatingpoint_num_control.csv (1 column) +// 11) floatingpoint_num_plasma.csv (1 column) +// 12) floatingpoint_obs_error.csv (1 column) +// 13) floatingpoint_obs_info.csv (1 column) +// 14) floatingpoint_obs_spitzer.csv (1 column) +// 15) floatingpoint_obs_temp.csv (1 column) +// 16) floatingpoint_msg_bt.csv (1 column) +// 17) floatingpoint_msg_lu.csv (1 column) +// 18) floatingpoint_msg_sp.csv (1 column) +// 19) floatingpoint_msg_sppm.csv (1 column) +// 20) floatingpoint_msg_sweep3d.csv (1 column) + +namespace parquet { + +using schema::PrimitiveNode; + +// Helper function matching Snowflake's pow10 +constexpr uint64_t Pow10(uint64_t exp) { + uint64_t result = 1; + for (uint64_t i = 0; i < exp; ++i) { + result *= 10; + } + return result; +} + +// Encoding type enum (matching Snowflake's ComprEngine pattern) +enum class EncodingType { + kALP, + kByteStreamSplit, + kZSTD, +}; + +// Helper to create column descriptor for float/double +template <typename DType> +std::shared_ptr<ColumnDescriptor> MakeColumnDescriptor() { + auto node = PrimitiveNode::Make("column", Repetition::REQUIRED, DType::type_num); + return std::make_shared<ColumnDescriptor>(node, false, false); +} + +// ============================================================================ +// Benchmark data base class +// ============================================================================ + +/// \brief Helper class to set up encoding benchmark data. +/// +/// Matches Snowflake's RealComprBenchmarkData<T> structure with encoding parameter. +template <typename T> +struct RealComprBenchmarkData { + std::vector<T> input_uncompressed; + std::shared_ptr<Buffer> encoded_data; + std::vector<T> output_uncompressed; + uint64_t encoded_size = 0; + Encoding::type current_encoding; + std::unique_ptr<::arrow::util::Codec> codec; // For ZSTD + + virtual ~RealComprBenchmarkData() = default; + + void PrepareBenchmarkData(uint64_t element_count, EncodingType encoding_type) { + FillUncompressedInput(element_count); + + using DType = + typename std::conditional<std::is_same<T, float>::value, FloatType, + DoubleType>::type; + auto descr = MakeColumnDescriptor<DType>(); + + // Select encoding based on type + switch (encoding_type) { + case EncodingType::kALP: + current_encoding = Encoding::ALP; + break; + case EncodingType::kByteStreamSplit: + current_encoding = Encoding::BYTE_STREAM_SPLIT; + codec = ::arrow::util::Codec::Create(::arrow::Compression::ZSTD).ValueOrDie(); + break; + case EncodingType::kZSTD: + // ZSTD uses PLAIN encoding + compression + current_encoding = Encoding::PLAIN; + codec = ::arrow::util::Codec::Create(::arrow::Compression::ZSTD).ValueOrDie(); + break; + } + + // Do initial encoding to size buffers + if (encoding_type == EncodingType::kALP) { + auto encoder = MakeTypedEncoder<DType>(Encoding::ALP, false, descr.get()); + encoder->Put(input_uncompressed.data(), + static_cast<int>(input_uncompressed.size())); + encoded_data = encoder->FlushValues(); + encoded_size = encoded_data->size(); + } else if (encoding_type == EncodingType::kZSTD) { + // For ZSTD: Plain encode then compress + auto encoder = MakeTypedEncoder<DType>(Encoding::PLAIN, false, descr.get()); + encoder->Put(input_uncompressed.data(), + static_cast<int>(input_uncompressed.size())); + auto plain_data = encoder->FlushValues(); + + // Compress with ZSTD - use AllocateBuffer to properly manage memory + int64_t max_compressed_len = + codec->MaxCompressedLen(plain_data->size(), plain_data->data()); + auto compressed_buffer = + ::arrow::AllocateResizableBuffer(max_compressed_len).ValueOrDie(); + int64_t actual_size = + codec + ->Compress(plain_data->size(), plain_data->data(), max_compressed_len, + compressed_buffer->mutable_data()) + .ValueOrDie(); + // Resize to actual compressed size and move to shared_ptr + (void)compressed_buffer->Resize(actual_size); // Resize can't fail for shrinking + encoded_data = std::shared_ptr<Buffer>(std::move(compressed_buffer)); + encoded_size = actual_size; + } else { + // For ByteStreamSplit: Direct encoding + auto encoder = MakeTypedEncoder<DType>(current_encoding, false, descr.get()); + encoder->Put(input_uncompressed.data(), + static_cast<int>(input_uncompressed.size())); + auto byte_stream_split_data = encoder->FlushValues(); + // Compress with ZSTD - use AllocateBuffer to properly manage memory + int64_t max_compressed_len = codec->MaxCompressedLen( + byte_stream_split_data->size(), byte_stream_split_data->data()); + auto compressed_buffer = + ::arrow::AllocateResizableBuffer(max_compressed_len).ValueOrDie(); + int64_t actual_size = + codec + ->Compress(byte_stream_split_data->size(), byte_stream_split_data->data(), + max_compressed_len, compressed_buffer->mutable_data()) + .ValueOrDie(); + // Resize to actual compressed size and move to shared_ptr + (void)compressed_buffer->Resize(actual_size); // Resize can't fail for shrinking + encoded_data = std::shared_ptr<Buffer>(std::move(compressed_buffer)); + encoded_size = actual_size; + } + + // Prepare output buffer + output_uncompressed.resize(input_uncompressed.size()); + } + + virtual void FillUncompressedInput(uint64_t element_count) = 0; +}; + +// ============================================================================ +// Synthetic Data Generators +// ============================================================================ + +template <typename T> +struct ConstantValues : public RealComprBenchmarkData<T> { + void FillUncompressedInput(uint64_t element_count) override { + const T value = static_cast<T>(1.1); + this->input_uncompressed = std::vector<T>(element_count, value); + } +}; + +template <typename T> +struct IncreasingValues : public RealComprBenchmarkData<T> { + void FillUncompressedInput(uint64_t element_count) override { + this->input_uncompressed.resize(element_count); + T current_value = 0.0; + for (uint64_t i = 0; i < element_count; i++) { + this->input_uncompressed[i] = current_value; + current_value += 1.0; + } + } +}; + +template <typename T> +struct DecimalSmallRange : public RealComprBenchmarkData<T> { + void FillUncompressedInput(uint64_t element_count) override { + this->input_uncompressed.resize(element_count); + const uint64_t min_val = 100; + const uint64_t max_val = 1000; + const uint64_t decimal_places = 2; + const uint64_t mult = Pow10(decimal_places); + + std::uniform_int_distribution<uint64_t> unif(min_val * mult, max_val * mult); + std::default_random_engine re; + for (uint64_t i = 0; i < element_count; i++) { + this->input_uncompressed[i] = unif(re) * 1.0 / mult; + } + } +}; + +template <typename T> +struct DecimalRange : public RealComprBenchmarkData<T> { + void FillUncompressedInput(uint64_t element_count) override { + this->input_uncompressed.resize(element_count); + const uint64_t min_val = 1000; + const uint64_t max_val = 100000; + const uint64_t decimal_places = 6; + const uint64_t mult = Pow10(decimal_places); + + std::uniform_int_distribution<uint64_t> unif(min_val * mult, max_val * mult); + std::default_random_engine re; + for (uint64_t i = 0; i < element_count; i++) { + this->input_uncompressed[i] = unif(re) * 1.0 / mult; + } + } +}; + +template <typename T> +struct DecimalLargeRange : public RealComprBenchmarkData<T> { + void FillUncompressedInput(uint64_t element_count) override { + this->input_uncompressed.resize(element_count); + const uint64_t min_val = 1000; + const uint64_t max_val = 1000000; + const uint64_t decimal_places = 6; + const uint64_t mult = Pow10(decimal_places); + + std::uniform_int_distribution<uint64_t> unif(min_val * mult, max_val * mult); + std::default_random_engine re; + for (uint64_t i = 0; i < element_count; i++) { + this->input_uncompressed[i] = unif(re) * 1.0 / mult; + } + } +}; + +template <typename T> +struct RandomValues : public RealComprBenchmarkData<T> { + void FillUncompressedInput(uint64_t element_count) override { + this->input_uncompressed.resize(element_count); + std::uniform_real_distribution<T> unif(std::numeric_limits<T>::min(), + std::numeric_limits<T>::max()); + std::default_random_engine re; + for (uint64_t i = 0; i < element_count; i++) { + this->input_uncompressed[i] = unif(re); + } + } +}; + +// ============================================================================ +// CSV Loading Infrastructure (for real-world datasets) +// ============================================================================ + +// Extract tarball once and return the data directory path +std::string GetDataDirectory() { + static std::string data_dir; + static bool initialized = false; + + if (!initialized) { + // Find the tarball location relative to this source file + std::string tarball_path = std::string(__FILE__); + tarball_path = tarball_path.substr(0, tarball_path.find_last_of("/\\")); + tarball_path = tarball_path.substr(0, tarball_path.find_last_of("/\\")); + tarball_path += "/arrow/cpp/submodules/parquet-testing/data/floatingpoint_data.tar.gz"; Review Comment: @reviewer the data sits in the parquet-testing submodule https://github.com/apache/parquet-testing/pull/100 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
