This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new a364e4a16d GH-39978: [C++][Parquet] Expand BYTE_STREAM_SPLIT to
support FIXED_LEN_BYTE_ARRAY, INT32 and INT64 (#40094)
a364e4a16d is described below
commit a364e4a16d596f7fcccd76ad8fc27092c59e5e74
Author: Antoine Pitrou <[email protected]>
AuthorDate: Tue Mar 19 12:06:28 2024 +0100
GH-39978: [C++][Parquet] Expand BYTE_STREAM_SPLIT to support
FIXED_LEN_BYTE_ARRAY, INT32 and INT64 (#40094)
### What changes are included in this PR?
Implement the format addition described in
https://issues.apache.org/jira/browse/PARQUET-2414 .
### Are these changes tested?
Yes.
### Are there any user-facing changes?
Yes (additional types supported for Parquet encoding).
* GitHub Issue: #39978
Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/arrow/util/byte_stream_split_internal.h | 126 +++++--
cpp/src/arrow/util/byte_stream_split_test.cc | 72 ++--
cpp/src/parquet/column_io_benchmark.cc | 103 +++++-
cpp/src/parquet/column_writer_test.cc | 16 +-
cpp/src/parquet/encoding.cc | 445 +++++++++++++++---------
cpp/src/parquet/encoding_benchmark.cc | 115 ++++--
cpp/src/parquet/encoding_test.cc | 207 +++++++----
cpp/src/parquet/reader_test.cc | 92 ++++-
cpp/submodules/parquet-testing | 2 +-
python/pyarrow/tests/parquet/test_basic.py | 42 ++-
10 files changed, 849 insertions(+), 371 deletions(-)
diff --git a/cpp/src/arrow/util/byte_stream_split_internal.h
b/cpp/src/arrow/util/byte_stream_split_internal.h
index 77284d695b..8bca0d442c 100644
--- a/cpp/src/arrow/util/byte_stream_split_internal.h
+++ b/cpp/src/arrow/util/byte_stream_split_internal.h
@@ -19,11 +19,14 @@
#include "arrow/util/endian.h"
#include "arrow/util/simd.h"
+#include "arrow/util/small_vector.h"
#include "arrow/util/ubsan.h"
#include <algorithm>
+#include <array>
#include <cassert>
#include <cstdint>
+#include <cstring>
#if defined(ARROW_HAVE_NEON) || defined(ARROW_HAVE_SSE4_2)
#include <xsimd/xsimd.hpp>
@@ -38,10 +41,11 @@ namespace arrow::util::internal {
#if defined(ARROW_HAVE_NEON) || defined(ARROW_HAVE_SSE4_2)
template <int kNumStreams>
-void ByteStreamSplitDecodeSimd128(const uint8_t* data, int64_t num_values,
int64_t stride,
- uint8_t* out) {
+void ByteStreamSplitDecodeSimd128(const uint8_t* data, int width, int64_t
num_values,
+ int64_t stride, uint8_t* out) {
using simd_batch = xsimd::make_sized_batch_t<int8_t, 16>;
+ assert(width == kNumStreams);
static_assert(kNumStreams == 4 || kNumStreams == 8, "Invalid number of
streams.");
constexpr int kNumStreamsLog2 = (kNumStreams == 8 ? 3 : 2);
constexpr int64_t kBlockSize = sizeof(simd_batch) * kNumStreams;
@@ -92,10 +96,11 @@ void ByteStreamSplitDecodeSimd128(const uint8_t* data,
int64_t num_values, int64
}
template <int kNumStreams>
-void ByteStreamSplitEncodeSimd128(const uint8_t* raw_values, const int64_t
num_values,
- uint8_t* output_buffer_raw) {
+void ByteStreamSplitEncodeSimd128(const uint8_t* raw_values, int width,
+ const int64_t num_values, uint8_t*
output_buffer_raw) {
using simd_batch = xsimd::make_sized_batch_t<int8_t, 16>;
+ assert(width == kNumStreams);
static_assert(kNumStreams == 4 || kNumStreams == 8, "Invalid number of
streams.");
constexpr int kBlockSize = sizeof(simd_batch) * kNumStreams;
@@ -215,15 +220,17 @@ void ByteStreamSplitEncodeSimd128(const uint8_t*
raw_values, const int64_t num_v
#if defined(ARROW_HAVE_AVX2)
template <int kNumStreams>
-void ByteStreamSplitDecodeAvx2(const uint8_t* data, int64_t num_values,
int64_t stride,
- uint8_t* out) {
+void ByteStreamSplitDecodeAvx2(const uint8_t* data, int width, int64_t
num_values,
+ int64_t stride, uint8_t* out) {
+ assert(width == kNumStreams);
static_assert(kNumStreams == 4 || kNumStreams == 8, "Invalid number of
streams.");
constexpr int kNumStreamsLog2 = (kNumStreams == 8 ? 3 : 2);
constexpr int64_t kBlockSize = sizeof(__m256i) * kNumStreams;
const int64_t size = num_values * kNumStreams;
if (size < kBlockSize) // Back to SSE for small size
- return ByteStreamSplitDecodeSimd128<kNumStreams>(data, num_values, stride,
out);
+ return ByteStreamSplitDecodeSimd128<kNumStreams>(data, width, num_values,
stride,
+ out);
const int64_t num_blocks = size / kBlockSize;
// First handle suffix.
@@ -299,18 +306,19 @@ void ByteStreamSplitDecodeAvx2(const uint8_t* data,
int64_t num_values, int64_t
}
template <int kNumStreams>
-void ByteStreamSplitEncodeAvx2(const uint8_t* raw_values, const int64_t
num_values,
- uint8_t* output_buffer_raw) {
+void ByteStreamSplitEncodeAvx2(const uint8_t* raw_values, int width,
+ const int64_t num_values, uint8_t*
output_buffer_raw) {
+ assert(width == kNumStreams);
static_assert(kNumStreams == 4 || kNumStreams == 8, "Invalid number of
streams.");
constexpr int kBlockSize = sizeof(__m256i) * kNumStreams;
if constexpr (kNumStreams == 8) // Back to SSE, currently no path for
double.
- return ByteStreamSplitEncodeSimd128<kNumStreams>(raw_values, num_values,
+ return ByteStreamSplitEncodeSimd128<kNumStreams>(raw_values, width,
num_values,
output_buffer_raw);
const int64_t size = num_values * kNumStreams;
if (size < kBlockSize) // Back to SSE for small size
- return ByteStreamSplitEncodeSimd128<kNumStreams>(raw_values, num_values,
+ return ByteStreamSplitEncodeSimd128<kNumStreams>(raw_values, width,
num_values,
output_buffer_raw);
const int64_t num_blocks = size / kBlockSize;
const __m256i* raw_values_simd = reinterpret_cast<const
__m256i*>(raw_values);
@@ -373,25 +381,26 @@ void ByteStreamSplitEncodeAvx2(const uint8_t* raw_values,
const int64_t num_valu
#if defined(ARROW_HAVE_SIMD_SPLIT)
template <int kNumStreams>
-void inline ByteStreamSplitDecodeSimd(const uint8_t* data, int64_t num_values,
+void inline ByteStreamSplitDecodeSimd(const uint8_t* data, int width, int64_t
num_values,
int64_t stride, uint8_t* out) {
#if defined(ARROW_HAVE_AVX2)
- return ByteStreamSplitDecodeAvx2<kNumStreams>(data, num_values, stride, out);
+ return ByteStreamSplitDecodeAvx2<kNumStreams>(data, width, num_values,
stride, out);
#elif defined(ARROW_HAVE_SSE4_2) || defined(ARROW_HAVE_NEON)
- return ByteStreamSplitDecodeSimd128<kNumStreams>(data, num_values, stride,
out);
+ return ByteStreamSplitDecodeSimd128<kNumStreams>(data, width, num_values,
stride, out);
#else
#error "ByteStreamSplitDecodeSimd not implemented"
#endif
}
template <int kNumStreams>
-void inline ByteStreamSplitEncodeSimd(const uint8_t* raw_values, const int64_t
num_values,
+void inline ByteStreamSplitEncodeSimd(const uint8_t* raw_values, int width,
+ const int64_t num_values,
uint8_t* output_buffer_raw) {
#if defined(ARROW_HAVE_AVX2)
- return ByteStreamSplitEncodeAvx2<kNumStreams>(raw_values, num_values,
+ return ByteStreamSplitEncodeAvx2<kNumStreams>(raw_values, width, num_values,
output_buffer_raw);
#elif defined(ARROW_HAVE_SSE4_2) || defined(ARROW_HAVE_NEON)
- return ByteStreamSplitEncodeSimd128<kNumStreams>(raw_values, num_values,
+ return ByteStreamSplitEncodeSimd128<kNumStreams>(raw_values, width,
num_values,
output_buffer_raw);
#else
#error "ByteStreamSplitEncodeSimd not implemented"
@@ -492,18 +501,30 @@ inline void DoMergeStreams(const uint8_t** src_streams,
int width, int64_t nvalu
}
template <int kNumStreams>
-void ByteStreamSplitEncodeScalar(const uint8_t* raw_values, const int64_t
num_values,
- uint8_t* output_buffer_raw) {
+void ByteStreamSplitEncodeScalar(const uint8_t* raw_values, int width,
+ const int64_t num_values, uint8_t* out) {
+ assert(width == kNumStreams);
std::array<uint8_t*, kNumStreams> dest_streams;
for (int stream = 0; stream < kNumStreams; ++stream) {
- dest_streams[stream] = &output_buffer_raw[stream * num_values];
+ dest_streams[stream] = &out[stream * num_values];
}
DoSplitStreams(raw_values, kNumStreams, num_values, dest_streams.data());
}
+inline void ByteStreamSplitEncodeScalarDynamic(const uint8_t* raw_values, int
width,
+ const int64_t num_values,
uint8_t* out) {
+ ::arrow::internal::SmallVector<uint8_t*, 16> dest_streams;
+ dest_streams.resize(width);
+ for (int stream = 0; stream < width; ++stream) {
+ dest_streams[stream] = &out[stream * num_values];
+ }
+ DoSplitStreams(raw_values, width, num_values, dest_streams.data());
+}
+
template <int kNumStreams>
-void ByteStreamSplitDecodeScalar(const uint8_t* data, int64_t num_values,
int64_t stride,
- uint8_t* out) {
+void ByteStreamSplitDecodeScalar(const uint8_t* data, int width, int64_t
num_values,
+ int64_t stride, uint8_t* out) {
+ assert(width == kNumStreams);
std::array<const uint8_t*, kNumStreams> src_streams;
for (int stream = 0; stream < kNumStreams; ++stream) {
src_streams[stream] = &data[stream * stride];
@@ -511,26 +532,63 @@ void ByteStreamSplitDecodeScalar(const uint8_t* data,
int64_t num_values, int64_
DoMergeStreams(src_streams.data(), kNumStreams, num_values, out);
}
-template <int kNumStreams>
-void inline ByteStreamSplitEncode(const uint8_t* raw_values, const int64_t
num_values,
- uint8_t* output_buffer_raw) {
+inline void ByteStreamSplitDecodeScalarDynamic(const uint8_t* data, int width,
+ int64_t num_values, int64_t
stride,
+ uint8_t* out) {
+ ::arrow::internal::SmallVector<const uint8_t*, 16> src_streams;
+ src_streams.resize(width);
+ for (int stream = 0; stream < width; ++stream) {
+ src_streams[stream] = &data[stream * stride];
+ }
+ DoMergeStreams(src_streams.data(), width, num_values, out);
+}
+
+inline void ByteStreamSplitEncode(const uint8_t* raw_values, int width,
+ const int64_t num_values, uint8_t* out) {
#if defined(ARROW_HAVE_SIMD_SPLIT)
- return ByteStreamSplitEncodeSimd<kNumStreams>(raw_values, num_values,
- output_buffer_raw);
+#define ByteStreamSplitEncodePerhapsSimd ByteStreamSplitEncodeSimd
#else
- return ByteStreamSplitEncodeScalar<kNumStreams>(raw_values, num_values,
- output_buffer_raw);
+#define ByteStreamSplitEncodePerhapsSimd ByteStreamSplitEncodeScalar
#endif
+ switch (width) {
+ case 1:
+ memcpy(out, raw_values, num_values);
+ return;
+ case 2:
+ return ByteStreamSplitEncodeScalar<2>(raw_values, width, num_values,
out);
+ case 4:
+ return ByteStreamSplitEncodePerhapsSimd<4>(raw_values, width,
num_values, out);
+ case 8:
+ return ByteStreamSplitEncodePerhapsSimd<8>(raw_values, width,
num_values, out);
+ case 16:
+ return ByteStreamSplitEncodeScalar<16>(raw_values, width, num_values,
out);
+ }
+ return ByteStreamSplitEncodeScalarDynamic(raw_values, width, num_values,
out);
+#undef ByteStreamSplitEncodePerhapsSimd
}
-template <int kNumStreams>
-void inline ByteStreamSplitDecode(const uint8_t* data, int64_t num_values,
int64_t stride,
- uint8_t* out) {
+inline void ByteStreamSplitDecode(const uint8_t* data, int width, int64_t
num_values,
+ int64_t stride, uint8_t* out) {
#if defined(ARROW_HAVE_SIMD_SPLIT)
- return ByteStreamSplitDecodeSimd<kNumStreams>(data, num_values, stride, out);
+#define ByteStreamSplitDecodePerhapsSimd ByteStreamSplitDecodeSimd
#else
- return ByteStreamSplitDecodeScalar<kNumStreams>(data, num_values, stride,
out);
+#define ByteStreamSplitDecodePerhapsSimd ByteStreamSplitDecodeScalar
#endif
+ switch (width) {
+ case 1:
+ memcpy(out, data, num_values);
+ return;
+ case 2:
+ return ByteStreamSplitDecodeScalar<2>(data, width, num_values, stride,
out);
+ case 4:
+ return ByteStreamSplitDecodePerhapsSimd<4>(data, width, num_values,
stride, out);
+ case 8:
+ return ByteStreamSplitDecodePerhapsSimd<8>(data, width, num_values,
stride, out);
+ case 16:
+ return ByteStreamSplitDecodeScalar<16>(data, width, num_values, stride,
out);
+ }
+ return ByteStreamSplitDecodeScalarDynamic(data, width, num_values, stride,
out);
+#undef ByteStreamSplitDecodePerhapsSimd
}
} // namespace arrow::util::internal
diff --git a/cpp/src/arrow/util/byte_stream_split_test.cc
b/cpp/src/arrow/util/byte_stream_split_test.cc
index 83ed8c9ba5..3a537725b0 100644
--- a/cpp/src/arrow/util/byte_stream_split_test.cc
+++ b/cpp/src/arrow/util/byte_stream_split_test.cc
@@ -16,6 +16,7 @@
// under the License.
#include <algorithm>
+#include <array>
#include <cmath>
#include <cstddef>
#include <functional>
@@ -35,7 +36,8 @@
namespace arrow::util::internal {
-using ByteStreamSplitTypes = ::testing::Types<float, double>;
+using ByteStreamSplitTypes =
+ ::testing::Types<int8_t, int16_t, int32_t, int64_t, std::array<uint8_t,
3>>;
template <typename Func>
struct NamedFunc {
@@ -63,23 +65,12 @@ class TestByteStreamSplitSpecialized : public
::testing::Test {
public:
static constexpr int kWidth = static_cast<int>(sizeof(T));
- using EncodeFunc =
NamedFunc<std::function<decltype(ByteStreamSplitEncode<kWidth>)>>;
- using DecodeFunc =
NamedFunc<std::function<decltype(ByteStreamSplitDecode<kWidth>)>>;
+ using EncodeFunc = NamedFunc<std::function<decltype(ByteStreamSplitEncode)>>;
+ using DecodeFunc = NamedFunc<std::function<decltype(ByteStreamSplitDecode)>>;
void SetUp() override {
- encode_funcs_.push_back({"reference", &ReferenceEncode});
- encode_funcs_.push_back({"scalar", &ByteStreamSplitEncodeScalar<kWidth>});
- decode_funcs_.push_back({"scalar", &ByteStreamSplitDecodeScalar<kWidth>});
-#if defined(ARROW_HAVE_SIMD_SPLIT)
- encode_funcs_.push_back({"simd", &ByteStreamSplitEncodeSimd<kWidth>});
- decode_funcs_.push_back({"simd", &ByteStreamSplitDecodeSimd<kWidth>});
- encode_funcs_.push_back({"simd128",
&ByteStreamSplitEncodeSimd128<kWidth>});
- decode_funcs_.push_back({"simd128",
&ByteStreamSplitDecodeSimd128<kWidth>});
-#endif
-#if defined(ARROW_HAVE_AVX2)
- encode_funcs_.push_back({"avx2", &ByteStreamSplitEncodeAvx2<kWidth>});
- decode_funcs_.push_back({"avx2", &ByteStreamSplitDecodeAvx2<kWidth>});
-#endif
+ decode_funcs_ = MakeDecodeFuncs();
+ encode_funcs_ = MakeEncodeFuncs();
}
void TestRoundtrip(int64_t num_values) {
@@ -92,12 +83,12 @@ class TestByteStreamSplitSpecialized : public
::testing::Test {
for (const auto& encode_func : encode_funcs_) {
ARROW_SCOPED_TRACE("encode_func = ", encode_func);
encoded.assign(encoded.size(), 0);
- encode_func.func(reinterpret_cast<const uint8_t*>(input.data()),
num_values,
+ encode_func.func(reinterpret_cast<const uint8_t*>(input.data()), kWidth,
num_values,
encoded.data());
for (const auto& decode_func : decode_funcs_) {
ARROW_SCOPED_TRACE("decode_func = ", decode_func);
decoded.assign(decoded.size(), T{});
- decode_func.func(encoded.data(), num_values, /*stride=*/num_values,
+ decode_func.func(encoded.data(), kWidth, num_values,
/*stride=*/num_values,
reinterpret_cast<uint8_t*>(decoded.data()));
ASSERT_EQ(decoded, input);
}
@@ -123,7 +114,8 @@ class TestByteStreamSplitSpecialized : public
::testing::Test {
int64_t offset = 0;
while (offset < num_values) {
auto chunk_size = std::min<int64_t>(num_values - offset,
chunk_size_dist(gen));
- decode_func.func(encoded.data() + offset, chunk_size,
/*stride=*/num_values,
+ decode_func.func(encoded.data() + offset, kWidth, chunk_size,
+ /*stride=*/num_values,
reinterpret_cast<uint8_t*>(decoded.data() + offset));
offset += chunk_size;
}
@@ -141,20 +133,48 @@ class TestByteStreamSplitSpecialized : public
::testing::Test {
static std::vector<T> MakeRandomInput(int64_t num_values) {
std::vector<T> input(num_values);
random_bytes(kWidth * num_values, seed_++,
reinterpret_cast<uint8_t*>(input.data()));
- // Avoid NaNs to ease comparison
- for (auto& value : input) {
- if (std::isnan(value)) {
- value = nan_replacement_++;
- }
- }
return input;
}
+ template <bool kSimdImplemented = (kWidth == 4 || kWidth == 8)>
+ static std::vector<DecodeFunc> MakeDecodeFuncs() {
+ std::vector<DecodeFunc> funcs;
+ funcs.push_back({"scalar_dynamic", &ByteStreamSplitDecodeScalarDynamic});
+ funcs.push_back({"scalar", &ByteStreamSplitDecodeScalar<kWidth>});
+#if defined(ARROW_HAVE_SIMD_SPLIT)
+ if constexpr (kSimdImplemented) {
+ funcs.push_back({"simd", &ByteStreamSplitDecodeSimd<kWidth>});
+ funcs.push_back({"simd128", &ByteStreamSplitDecodeSimd128<kWidth>});
+#if defined(ARROW_HAVE_AVX2)
+ funcs.push_back({"avx2", &ByteStreamSplitDecodeAvx2<kWidth>});
+#endif
+ }
+#endif // defined(ARROW_HAVE_SIMD_SPLIT)
+ return funcs;
+ }
+
+ template <bool kSimdImplemented = (kWidth == 4 || kWidth == 8)>
+ static std::vector<EncodeFunc> MakeEncodeFuncs() {
+ std::vector<EncodeFunc> funcs;
+ funcs.push_back({"reference", &ReferenceByteStreamSplitEncode});
+ funcs.push_back({"scalar_dynamic", &ByteStreamSplitEncodeScalarDynamic});
+ funcs.push_back({"scalar", &ByteStreamSplitEncodeScalar<kWidth>});
+#if defined(ARROW_HAVE_SIMD_SPLIT)
+ if constexpr (kSimdImplemented) {
+ funcs.push_back({"simd", &ByteStreamSplitEncodeSimd<kWidth>});
+ funcs.push_back({"simd128", &ByteStreamSplitEncodeSimd128<kWidth>});
+#if defined(ARROW_HAVE_AVX2)
+ funcs.push_back({"avx2", &ByteStreamSplitEncodeAvx2<kWidth>});
+#endif
+ }
+#endif // defined(ARROW_HAVE_SIMD_SPLIT)
+ return funcs;
+ }
+
std::vector<EncodeFunc> encode_funcs_;
std::vector<DecodeFunc> decode_funcs_;
static inline uint32_t seed_ = 42;
- static inline T nan_replacement_ = 0;
};
TYPED_TEST_SUITE(TestByteStreamSplitSpecialized, ByteStreamSplitTypes);
diff --git a/cpp/src/parquet/column_io_benchmark.cc
b/cpp/src/parquet/column_io_benchmark.cc
index 593765dcd4..b003b4eede 100644
--- a/cpp/src/parquet/column_io_benchmark.cc
+++ b/cpp/src/parquet/column_io_benchmark.cc
@@ -54,45 +54,57 @@ std::shared_ptr<ColumnDescriptor>
Int64Schema(Repetition::type repetition) {
}
void SetBytesProcessed(::benchmark::State& state, Repetition::type repetition)
{
- int64_t bytes_processed = state.iterations() * state.range(0) *
sizeof(int64_t);
+ int64_t num_values = state.iterations() * state.range(0);
+ int64_t bytes_processed = num_values * sizeof(int64_t);
if (repetition != Repetition::REQUIRED) {
- bytes_processed += state.iterations() * state.range(0) * sizeof(int16_t);
+ bytes_processed += num_values * sizeof(int16_t);
}
if (repetition == Repetition::REPEATED) {
- bytes_processed += state.iterations() * state.range(0) * sizeof(int16_t);
+ bytes_processed += num_values * sizeof(int16_t);
}
state.SetBytesProcessed(bytes_processed);
+ state.SetItemsProcessed(num_values);
}
-template <Repetition::type repetition,
- Compression::type codec = Compression::UNCOMPRESSED>
-static void BM_WriteInt64Column(::benchmark::State& state) {
+static void BM_WriteInt64Column(::benchmark::State& state, Repetition::type
repetition,
+ Compression::type codec, Encoding::type
encoding) {
format::ColumnChunk thrift_metadata;
::arrow::random::RandomArrayGenerator rgen(1337);
auto values = rgen.Int64(state.range(0), 0, 1000000, 0);
- const auto& i8_values = static_cast<const ::arrow::Int64Array&>(*values);
+ const auto& int64_values = static_cast<const ::arrow::Int64Array&>(*values);
std::vector<int16_t> definition_levels(state.range(0), 1);
std::vector<int16_t> repetition_levels(state.range(0), 0);
std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
std::shared_ptr<WriterProperties> properties = WriterProperties::Builder()
.compression(codec)
-
->encoding(Encoding::PLAIN)
+ ->encoding(encoding)
->disable_dictionary()
->build();
auto metadata = ColumnChunkMetaDataBuilder::Make(
properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata));
- while (state.KeepRunning()) {
+ int64_t data_size = values->length() * sizeof(int64_t);
+ int64_t stream_size = 0;
+ for (auto _ : state) {
auto stream = CreateOutputStream();
std::shared_ptr<Int64Writer> writer = BuildWriter(
state.range(0), stream, metadata.get(), schema.get(),
properties.get(), codec);
- writer->WriteBatch(i8_values.length(), definition_levels.data(),
- repetition_levels.data(), i8_values.raw_values());
+ writer->WriteBatch(int64_values.length(), definition_levels.data(),
+ repetition_levels.data(), int64_values.raw_values());
writer->Close();
+ stream_size = stream->Tell().ValueOrDie();
}
SetBytesProcessed(state, repetition);
+ state.counters["compression_ratio"] = static_cast<double>(data_size) /
stream_size;
+}
+
+template <Repetition::type repetition,
+ Compression::type codec = Compression::UNCOMPRESSED,
+ Encoding::type encoding = Encoding::PLAIN>
+static void BM_WriteInt64Column(::benchmark::State& state) {
+ BM_WriteInt64Column(state, repetition, codec, encoding);
}
BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED)->Arg(1 << 20);
@@ -106,6 +118,12 @@ BENCHMARK_TEMPLATE(BM_WriteInt64Column,
Repetition::OPTIONAL, Compression::SNAPP
->Arg(1 << 20);
BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED,
Compression::SNAPPY)
->Arg(1 << 20);
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED,
Compression::SNAPPY,
+ Encoding::BYTE_STREAM_SPLIT)
+ ->Arg(1 << 20);
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL,
Compression::SNAPPY,
+ Encoding::BYTE_STREAM_SPLIT)
+ ->Arg(1 << 20);
#endif
#ifdef ARROW_WITH_LZ4
@@ -115,6 +133,12 @@ BENCHMARK_TEMPLATE(BM_WriteInt64Column,
Repetition::OPTIONAL, Compression::LZ4)
->Arg(1 << 20);
BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED, Compression::LZ4)
->Arg(1 << 20);
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED, Compression::LZ4,
+ Encoding::BYTE_STREAM_SPLIT)
+ ->Arg(1 << 20);
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL, Compression::LZ4,
+ Encoding::BYTE_STREAM_SPLIT)
+ ->Arg(1 << 20);
#endif
#ifdef ARROW_WITH_ZSTD
@@ -124,6 +148,12 @@ BENCHMARK_TEMPLATE(BM_WriteInt64Column,
Repetition::OPTIONAL, Compression::ZSTD)
->Arg(1 << 20);
BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED,
Compression::ZSTD)
->Arg(1 << 20);
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED,
Compression::ZSTD,
+ Encoding::BYTE_STREAM_SPLIT)
+ ->Arg(1 << 20);
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL,
Compression::ZSTD,
+ Encoding::BYTE_STREAM_SPLIT)
+ ->Arg(1 << 20);
#endif
std::shared_ptr<Int64Reader> BuildReader(std::shared_ptr<Buffer>& buffer,
@@ -135,17 +165,20 @@ std::shared_ptr<Int64Reader>
BuildReader(std::shared_ptr<Buffer>& buffer,
ColumnReader::Make(schema, std::move(page_reader)));
}
-template <Repetition::type repetition,
- Compression::type codec = Compression::UNCOMPRESSED>
-static void BM_ReadInt64Column(::benchmark::State& state) {
+static void BM_ReadInt64Column(::benchmark::State& state, Repetition::type
repetition,
+ Compression::type codec, Encoding::type
encoding) {
format::ColumnChunk thrift_metadata;
- std::vector<int64_t> values(state.range(0), 128);
+
+ ::arrow::random::RandomArrayGenerator rgen(1337);
+ auto values = rgen.Int64(state.range(0), 0, 1000000, 0);
+ const auto& int64_values = static_cast<const ::arrow::Int64Array&>(*values);
+
std::vector<int16_t> definition_levels(state.range(0), 1);
std::vector<int16_t> repetition_levels(state.range(0), 0);
std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
std::shared_ptr<WriterProperties> properties = WriterProperties::Builder()
.compression(codec)
-
->encoding(Encoding::PLAIN)
+ ->encoding(encoding)
->disable_dictionary()
->build();
@@ -155,11 +188,14 @@ static void BM_ReadInt64Column(::benchmark::State& state)
{
auto stream = CreateOutputStream();
std::shared_ptr<Int64Writer> writer = BuildWriter(
state.range(0), stream, metadata.get(), schema.get(), properties.get(),
codec);
- writer->WriteBatch(values.size(), definition_levels.data(),
repetition_levels.data(),
- values.data());
+ writer->WriteBatch(int64_values.length(), definition_levels.data(),
+ repetition_levels.data(), int64_values.raw_values());
writer->Close();
PARQUET_ASSIGN_OR_THROW(auto src, stream->Finish());
+ int64_t stream_size = src->size();
+ int64_t data_size = int64_values.length() * sizeof(int64_t);
+
std::vector<int64_t> values_out(state.range(1));
std::vector<int16_t> definition_levels_out(state.range(1));
std::vector<int16_t> repetition_levels_out(state.range(1));
@@ -167,12 +203,20 @@ static void BM_ReadInt64Column(::benchmark::State& state)
{
std::shared_ptr<Int64Reader> reader =
BuildReader(src, state.range(1), codec, schema.get());
int64_t values_read = 0;
- for (size_t i = 0; i < values.size(); i += values_read) {
+ for (int64_t i = 0; i < int64_values.length(); i += values_read) {
reader->ReadBatch(values_out.size(), definition_levels_out.data(),
repetition_levels_out.data(), values_out.data(),
&values_read);
}
}
SetBytesProcessed(state, repetition);
+ state.counters["compression_ratio"] = static_cast<double>(data_size) /
stream_size;
+}
+
+template <Repetition::type repetition,
+ Compression::type codec = Compression::UNCOMPRESSED,
+ Encoding::type encoding = Encoding::PLAIN>
+static void BM_ReadInt64Column(::benchmark::State& state) {
+ BM_ReadInt64Column(state, repetition, codec, encoding);
}
void ReadColumnSetArgs(::benchmark::internal::Benchmark* bench) {
@@ -197,6 +241,13 @@ BENCHMARK_TEMPLATE(BM_ReadInt64Column,
Repetition::OPTIONAL, Compression::SNAPPY
->Apply(ReadColumnSetArgs);
BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED,
Compression::SNAPPY)
->Apply(ReadColumnSetArgs);
+
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED,
Compression::SNAPPY,
+ Encoding::BYTE_STREAM_SPLIT)
+ ->Apply(ReadColumnSetArgs);
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL,
Compression::SNAPPY,
+ Encoding::BYTE_STREAM_SPLIT)
+ ->Apply(ReadColumnSetArgs);
#endif
#ifdef ARROW_WITH_LZ4
@@ -206,6 +257,13 @@ BENCHMARK_TEMPLATE(BM_ReadInt64Column,
Repetition::OPTIONAL, Compression::LZ4)
->Apply(ReadColumnSetArgs);
BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED, Compression::LZ4)
->Apply(ReadColumnSetArgs);
+
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED, Compression::LZ4,
+ Encoding::BYTE_STREAM_SPLIT)
+ ->Apply(ReadColumnSetArgs);
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL, Compression::LZ4,
+ Encoding::BYTE_STREAM_SPLIT)
+ ->Apply(ReadColumnSetArgs);
#endif
#ifdef ARROW_WITH_ZSTD
@@ -215,6 +273,13 @@ BENCHMARK_TEMPLATE(BM_ReadInt64Column,
Repetition::OPTIONAL, Compression::ZSTD)
->Apply(ReadColumnSetArgs);
BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED, Compression::ZSTD)
->Apply(ReadColumnSetArgs);
+
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED, Compression::ZSTD,
+ Encoding::BYTE_STREAM_SPLIT)
+ ->Apply(ReadColumnSetArgs);
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL, Compression::ZSTD,
+ Encoding::BYTE_STREAM_SPLIT)
+ ->Apply(ReadColumnSetArgs);
#endif
static void BM_RleEncoding(::benchmark::State& state) {
diff --git a/cpp/src/parquet/column_writer_test.cc
b/cpp/src/parquet/column_writer_test.cc
index a8519a0f56..c99efd1796 100644
--- a/cpp/src/parquet/column_writer_test.cc
+++ b/cpp/src/parquet/column_writer_test.cc
@@ -507,23 +507,33 @@ TEST_F(TestValuesWriterInt32Type,
RequiredDeltaBinaryPacked) {
this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED);
}
+TEST_F(TestValuesWriterInt32Type, RequiredByteStreamSplit) {
+ this->TestRequiredWithEncoding(Encoding::BYTE_STREAM_SPLIT);
+}
+
TEST_F(TestValuesWriterInt64Type, RequiredDeltaBinaryPacked) {
this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED);
}
+TEST_F(TestValuesWriterInt64Type, RequiredByteStreamSplit) {
+ this->TestRequiredWithEncoding(Encoding::BYTE_STREAM_SPLIT);
+}
+
TEST_F(TestByteArrayValuesWriter, RequiredDeltaLengthByteArray) {
this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY);
}
-/*
-TYPED_TEST(TestByteArrayValuesWriter, RequiredDeltaByteArray) {
+TEST_F(TestByteArrayValuesWriter, RequiredDeltaByteArray) {
this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY);
}
TEST_F(TestFixedLengthByteArrayValuesWriter, RequiredDeltaByteArray) {
this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY);
}
-*/
+
+TEST_F(TestFixedLengthByteArrayValuesWriter, RequiredByteStreamSplit) {
+ this->TestRequiredWithEncoding(Encoding::BYTE_STREAM_SPLIT);
+}
TYPED_TEST(TestPrimitiveWriter, RequiredRLEDictionary) {
this->TestRequiredWithEncoding(Encoding::RLE_DICTIONARY);
diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc
index a3d1746536..3eed88f08b 100644
--- a/cpp/src/parquet/encoding.cc
+++ b/cpp/src/parquet/encoding.cc
@@ -73,6 +73,10 @@ namespace {
// unsigned, but the Java implementation uses signed ints.
constexpr size_t kMaxByteArraySize = std::numeric_limits<int32_t>::max();
+// ----------------------------------------------------------------------
+// Encoders
+// ----------------------------------------------------------------------
+
class EncoderImpl : virtual public Encoder {
public:
EncoderImpl(const ColumnDescriptor* descr, Encoding::type encoding,
MemoryPool* pool)
@@ -92,7 +96,7 @@ class EncoderImpl : virtual public Encoder {
MemoryPool* pool_;
/// Type length from descr
- int type_length_;
+ const int type_length_;
};
// ----------------------------------------------------------------------
@@ -262,8 +266,7 @@ inline void PlainEncoder<ByteArrayType>::Put(const
::arrow::Array& values) {
}
void AssertFixedSizeBinary(const ::arrow::Array& values, int type_length) {
- if (values.type_id() != ::arrow::Type::FIXED_SIZE_BINARY &&
- values.type_id() != ::arrow::Type::DECIMAL) {
+ if (!::arrow::is_fixed_size_binary(values.type_id())) {
throw ParquetException("Only FixedSizeBinaryArray and subclasses
supported");
}
if (checked_cast<const
::arrow::FixedSizeBinaryType&>(*values.type()).byte_width() !=
@@ -464,8 +467,6 @@ class DictEncoderImpl : public EncoderImpl, virtual public
DictEncoder<DType> {
return kDataPageBitWidthBytes + encoder.len();
}
- void set_type_length(int type_length) { this->type_length_ = type_length; }
-
/// Returns a conservative estimate of the number of bytes needed to encode
the buffered
/// indices. Used to size the buffer passed to WriteIndices().
int64_t EstimatedDataEncodedSize() override {
@@ -801,98 +802,154 @@ void DictEncoderImpl<ByteArrayType>::PutDictionary(const
::arrow::Array& values)
// ----------------------------------------------------------------------
// ByteStreamSplitEncoder<T> implementations
+// Common base class for all types
+
template <typename DType>
-class ByteStreamSplitEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
+class ByteStreamSplitEncoderBase : public EncoderImpl,
+ virtual public TypedEncoder<DType> {
public:
using T = typename DType::c_type;
using TypedEncoder<DType>::Put;
- explicit ByteStreamSplitEncoder(
- const ColumnDescriptor* descr,
- ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+ ByteStreamSplitEncoderBase(const ColumnDescriptor* descr, int byte_width,
+ ::arrow::MemoryPool* pool)
+ : EncoderImpl(descr, Encoding::BYTE_STREAM_SPLIT, pool),
+ sink_{pool},
+ byte_width_(byte_width),
+ num_values_in_buffer_{0} {}
- int64_t EstimatedDataEncodedSize() override;
- std::shared_ptr<Buffer> FlushValues() override;
+ int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
- void Put(const T* buffer, int num_values) override;
- void Put(const ::arrow::Array& values) override;
- void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
- int64_t valid_bits_offset) override;
+ std::shared_ptr<Buffer> FlushValues() override {
+ if (byte_width_ == 1) {
+ // Special-cased fast path
+ PARQUET_ASSIGN_OR_THROW(auto buf, sink_.Finish());
+ return buf;
+ }
+ auto output_buffer = AllocateBuffer(this->memory_pool(),
EstimatedDataEncodedSize());
+ uint8_t* output_buffer_raw = output_buffer->mutable_data();
+ const uint8_t* raw_values = sink_.data();
+ ::arrow::util::internal::ByteStreamSplitEncode(
+ raw_values, /*width=*/byte_width_, num_values_in_buffer_,
output_buffer_raw);
+ sink_.Reset();
+ num_values_in_buffer_ = 0;
+ return output_buffer;
+ }
- protected:
- template <typename ArrowType>
- void PutImpl(const ::arrow::Array& values) {
- if (values.type_id() != ArrowType::type_id) {
- throw ParquetException(std::string() + "direct put to " +
ArrowType::type_name() +
- " from " + values.type()->ToString() + " not
supported");
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override {
+ if (valid_bits != NULLPTR) {
+ PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values
* sizeof(T),
+
this->memory_pool()));
+ T* data = buffer->template mutable_data_as<T>();
+ int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+ src, num_values, valid_bits, valid_bits_offset, data);
+ Put(data, num_valid_values);
+ } else {
+ Put(src, num_values);
}
- const auto& data = *values.data();
- PutSpaced(data.GetValues<typename ArrowType::c_type>(1),
- static_cast<int>(data.length), data.GetValues<uint8_t>(0, 0),
data.offset);
}
+ protected:
::arrow::BufferBuilder sink_;
+ // Required because type_length_ is only filled in for FLBA
+ const int byte_width_;
int64_t num_values_in_buffer_;
};
-template <typename DType>
-ByteStreamSplitEncoder<DType>::ByteStreamSplitEncoder(const ColumnDescriptor*
descr,
- ::arrow::MemoryPool*
pool)
- : EncoderImpl(descr, Encoding::BYTE_STREAM_SPLIT, pool),
- sink_{pool},
- num_values_in_buffer_{0} {}
+// BYTE_STREAM_SPLIT encoder implementation for FLOAT, DOUBLE, INT32, INT64
template <typename DType>
-int64_t ByteStreamSplitEncoder<DType>::EstimatedDataEncodedSize() {
- return sink_.length();
-}
+class ByteStreamSplitEncoder : public ByteStreamSplitEncoderBase<DType> {
+ public:
+ using T = typename DType::c_type;
+ using ArrowType = typename EncodingTraits<DType>::ArrowType;
-template <typename DType>
-std::shared_ptr<Buffer> ByteStreamSplitEncoder<DType>::FlushValues() {
- std::shared_ptr<ResizableBuffer> output_buffer =
- AllocateBuffer(this->memory_pool(), EstimatedDataEncodedSize());
- uint8_t* output_buffer_raw = output_buffer->mutable_data();
- const uint8_t* raw_values = sink_.data();
- ::arrow::util::internal::ByteStreamSplitEncode<sizeof(T)>(
- raw_values, num_values_in_buffer_, output_buffer_raw);
- sink_.Reset();
- num_values_in_buffer_ = 0;
- return std::move(output_buffer);
-}
+ ByteStreamSplitEncoder(const ColumnDescriptor* descr,
+ ::arrow::MemoryPool* pool =
::arrow::default_memory_pool())
+ : ByteStreamSplitEncoderBase<DType>(descr,
+
/*byte_width=*/static_cast<int>(sizeof(T)),
+ pool) {}
-template <typename DType>
-void ByteStreamSplitEncoder<DType>::Put(const T* buffer, int num_values) {
- if (num_values > 0) {
- PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T)));
- num_values_in_buffer_ += num_values;
+ // Inherit Put(const std::vector<T>&...)
+ using TypedEncoder<DType>::Put;
+
+ void Put(const T* buffer, int num_values) override {
+ if (num_values > 0) {
+ PARQUET_THROW_NOT_OK(
+ this->sink_.Append(reinterpret_cast<const uint8_t*>(buffer),
+ num_values * static_cast<int64_t>(sizeof(T))));
+ this->num_values_in_buffer_ += num_values;
+ }
}
-}
-template <>
-void ByteStreamSplitEncoder<FloatType>::Put(const ::arrow::Array& values) {
- PutImpl<::arrow::FloatType>(values);
-}
+ void Put(const ::arrow::Array& values) override {
+ if (values.type_id() != ArrowType::type_id) {
+ throw ParquetException(std::string() + "direct put from " +
+ values.type()->ToString() + " not supported");
+ }
+ const auto& data = *values.data();
+ this->PutSpaced(data.GetValues<typename ArrowType::c_type>(1),
+ static_cast<int>(data.length), data.GetValues<uint8_t>(0,
0),
+ data.offset);
+ }
+};
+
+// BYTE_STREAM_SPLIT encoder implementation for FLBA
template <>
-void ByteStreamSplitEncoder<DoubleType>::Put(const ::arrow::Array& values) {
- PutImpl<::arrow::DoubleType>(values);
-}
+class ByteStreamSplitEncoder<FLBAType> : public
ByteStreamSplitEncoderBase<FLBAType> {
+ public:
+ using DType = FLBAType;
+ using T = FixedLenByteArray;
+ using ArrowType = ::arrow::FixedSizeBinaryArray;
-template <typename DType>
-void ByteStreamSplitEncoder<DType>::PutSpaced(const T* src, int num_values,
- const uint8_t* valid_bits,
- int64_t valid_bits_offset) {
- if (valid_bits != NULLPTR) {
- PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values *
sizeof(T),
-
this->memory_pool()));
- T* data = buffer->template mutable_data_as<T>();
- int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
- src, num_values, valid_bits, valid_bits_offset, data);
- Put(data, num_valid_values);
- } else {
- Put(src, num_values);
+ ByteStreamSplitEncoder(const ColumnDescriptor* descr,
+ ::arrow::MemoryPool* pool =
::arrow::default_memory_pool())
+ : ByteStreamSplitEncoderBase<DType>(descr,
+ /*byte_width=*/descr->type_length(),
pool) {}
+
+ // Inherit Put(const std::vector<T>&...)
+ using TypedEncoder<DType>::Put;
+
+ void Put(const T* buffer, int num_values) override {
+ if (byte_width_ > 0) {
+ const int64_t total_bytes = static_cast<int64_t>(num_values) *
byte_width_;
+ PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes));
+ for (int i = 0; i < num_values; ++i) {
+ // Write the result to the output stream
+ DCHECK(buffer[i].ptr != nullptr) << "Value ptr cannot be NULL";
+ sink_.UnsafeAppend(buffer[i].ptr, byte_width_);
+ }
+ }
+ this->num_values_in_buffer_ += num_values;
}
-}
+
+ void Put(const ::arrow::Array& values) override {
+ AssertFixedSizeBinary(values, byte_width_);
+ const auto& data = checked_cast<const
::arrow::FixedSizeBinaryArray&>(values);
+ if (data.null_count() == 0) {
+ // no nulls, just buffer the data
+ PARQUET_THROW_NOT_OK(sink_.Append(data.raw_values(), data.length() *
byte_width_));
+ this->num_values_in_buffer_ += data.length();
+ } else {
+ const int64_t num_values = data.length() - data.null_count();
+ const int64_t total_bytes = num_values * byte_width_;
+ PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes));
+ // TODO use VisitSetBitRunsVoid
+ for (int64_t i = 0; i < data.length(); i++) {
+ if (data.IsValid(i)) {
+ sink_.UnsafeAppend(data.Value(i), byte_width_);
+ }
+ }
+ this->num_values_in_buffer_ += num_values;
+ }
+ }
+};
+
+// ----------------------------------------------------------------------
+// Decoders
+// ----------------------------------------------------------------------
class DecoderImpl : virtual public Decoder {
public:
@@ -3559,136 +3616,162 @@ class DeltaByteArrayFLBADecoder : public
DeltaByteArrayDecoderImpl<FLBAType>,
};
// ----------------------------------------------------------------------
-// BYTE_STREAM_SPLIT
+// BYTE_STREAM_SPLIT decoders
template <typename DType>
-class ByteStreamSplitDecoder : public DecoderImpl, virtual public
TypedDecoder<DType> {
+class ByteStreamSplitDecoderBase : public DecoderImpl,
+ virtual public TypedDecoder<DType> {
public:
using T = typename DType::c_type;
- explicit ByteStreamSplitDecoder(const ColumnDescriptor* descr);
- int Decode(T* buffer, int max_values) override;
+ ByteStreamSplitDecoderBase(const ColumnDescriptor* descr, int byte_width)
+ : DecoderImpl(descr, Encoding::BYTE_STREAM_SPLIT),
byte_width_(byte_width) {}
- int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
- int64_t valid_bits_offset,
- typename EncodingTraits<DType>::Accumulator* builder)
override;
+ void SetData(int num_values, const uint8_t* data, int len) override {
+ if (static_cast<int64_t>(num_values) * byte_width_ != len) {
+ throw ParquetException("Data size (" + std::to_string(len) +
+ ") does not match number of values in
BYTE_STREAM_SPLIT (" +
+ std::to_string(num_values) + ")");
+ }
+ DecoderImpl::SetData(num_values, data, len);
+ stride_ = num_values_;
+ }
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
- typename EncodingTraits<DType>::DictAccumulator* builder)
override;
-
- void SetData(int num_values, const uint8_t* data, int len) override;
+ typename EncodingTraits<DType>::DictAccumulator* builder)
override {
+ ParquetException::NYI("DecodeArrow to DictAccumulator for
BYTE_STREAM_SPLIT");
+ }
- T* EnsureDecodeBuffer(int64_t min_values) {
- const int64_t size = sizeof(T) * min_values;
+ protected:
+ int DecodeRaw(uint8_t* out_buffer, int max_values) {
+ const int values_to_decode = std::min(num_values_, max_values);
+ ::arrow::util::internal::ByteStreamSplitDecode(data_, byte_width_,
values_to_decode,
+ stride_, out_buffer);
+ data_ += values_to_decode;
+ num_values_ -= values_to_decode;
+ len_ -= byte_width_ * values_to_decode;
+ return values_to_decode;
+ }
+
+ uint8_t* EnsureDecodeBuffer(int64_t min_values) {
+ const int64_t size = byte_width_ * min_values;
if (!decode_buffer_ || decode_buffer_->size() < size) {
- PARQUET_ASSIGN_OR_THROW(decode_buffer_, ::arrow::AllocateBuffer(size));
+ const auto alloc_size = ::arrow::bit_util::NextPower2(size);
+ PARQUET_ASSIGN_OR_THROW(decode_buffer_,
::arrow::AllocateBuffer(alloc_size));
}
- return decode_buffer_->mutable_data_as<T>();
+ return decode_buffer_->mutable_data();
}
- private:
- int num_values_in_buffer_{0};
+ const int byte_width_;
+ int stride_{0};
std::shared_ptr<Buffer> decode_buffer_;
-
- static constexpr int kNumStreams = sizeof(T);
};
-template <typename DType>
-ByteStreamSplitDecoder<DType>::ByteStreamSplitDecoder(const ColumnDescriptor*
descr)
- : DecoderImpl(descr, Encoding::BYTE_STREAM_SPLIT) {}
+// BYTE_STREAM_SPLIT decoder for FLOAT, DOUBLE, INT32, INT64
template <typename DType>
-void ByteStreamSplitDecoder<DType>::SetData(int num_values, const uint8_t*
data,
- int len) {
- if (num_values * static_cast<int64_t>(sizeof(T)) < len) {
- throw ParquetException(
- "Data size too large for number of values (padding in byte stream
split data "
- "page?)");
- }
- if (len % sizeof(T) != 0) {
- throw ParquetException("ByteStreamSplit data size " + std::to_string(len) +
- " not aligned with type " +
TypeToString(DType::type_num));
+class ByteStreamSplitDecoder : public ByteStreamSplitDecoderBase<DType> {
+ public:
+ using T = typename DType::c_type;
+
+ explicit ByteStreamSplitDecoder(const ColumnDescriptor* descr)
+ : ByteStreamSplitDecoderBase<DType>(descr, static_cast<int>(sizeof(T)))
{}
+
+ int Decode(T* buffer, int max_values) override {
+ return this->DecodeRaw(reinterpret_cast<uint8_t*>(buffer), max_values);
}
- num_values = len / sizeof(T);
- DecoderImpl::SetData(num_values, data, len);
- num_values_in_buffer_ = num_values_;
-}
-template <typename DType>
-int ByteStreamSplitDecoder<DType>::Decode(T* buffer, int max_values) {
- const int values_to_decode = std::min(num_values_, max_values);
- const int num_decoded_previously = num_values_in_buffer_ - num_values_;
- const uint8_t* data = data_ + num_decoded_previously;
+ using ByteStreamSplitDecoderBase<DType>::DecodeArrow;
- ::arrow::util::internal::ByteStreamSplitDecode<kNumStreams>(
- data, values_to_decode, num_values_in_buffer_,
reinterpret_cast<uint8_t*>(buffer));
- num_values_ -= values_to_decode;
- len_ -= sizeof(T) * values_to_decode;
- return values_to_decode;
-}
+ int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ typename EncodingTraits<DType>::Accumulator* builder)
override {
+ const int values_to_decode = num_values - null_count;
+ if (ARROW_PREDICT_FALSE(this->num_values_ < values_to_decode)) {
+ ParquetException::EofException();
+ }
-template <typename DType>
-int ByteStreamSplitDecoder<DType>::DecodeArrow(
- int num_values, int null_count, const uint8_t* valid_bits, int64_t
valid_bits_offset,
- typename EncodingTraits<DType>::Accumulator* builder) {
- constexpr int value_size = kNumStreams;
- int values_decoded = num_values - null_count;
- if (ARROW_PREDICT_FALSE(len_ < value_size * values_decoded)) {
- ParquetException::EofException();
+ PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
+
+ // Decode into intermediate buffer.
+ T* decode_out =
reinterpret_cast<T*>(this->EnsureDecodeBuffer(values_to_decode));
+ const int num_decoded = Decode(decode_out, values_to_decode);
+ DCHECK_EQ(num_decoded, values_to_decode);
+
+ // If null_count is 0, we could append in bulk or decode directly into
+ // builder. We could also decode in chunks, or use SpacedExpand. We don't
+ // bother currently, because DecodeArrow methods are only called for
ByteArray.
+ int64_t offset = 0;
+ VisitNullBitmapInline(
+ valid_bits, valid_bits_offset, num_values, null_count,
+ [&]() {
+ builder->UnsafeAppend(decode_out[offset]);
+ ++offset;
+ },
+ [&]() { builder->UnsafeAppendNull(); });
+
+ return values_to_decode;
}
+};
- PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
+// BYTE_STREAM_SPLIT decoder for FIXED_LEN_BYTE_ARRAY
- const int num_decoded_previously = num_values_in_buffer_ - num_values_;
- const uint8_t* data = data_ + num_decoded_previously;
- int offset = 0;
+template <>
+class ByteStreamSplitDecoder<FLBAType> : public
ByteStreamSplitDecoderBase<FLBAType>,
+ virtual public FLBADecoder {
+ public:
+ using DType = FLBAType;
+ using T = FixedLenByteArray;
-#if defined(ARROW_HAVE_SIMD_SPLIT)
- // Use fast decoding into intermediate buffer. This will also decode
- // some null values, but it's fast enough that we don't care.
- T* decode_out = EnsureDecodeBuffer(values_decoded);
- ::arrow::util::internal::ByteStreamSplitDecode<kNumStreams>(
- data, values_decoded, num_values_in_buffer_,
- reinterpret_cast<uint8_t*>(decode_out));
+ explicit ByteStreamSplitDecoder(const ColumnDescriptor* descr)
+ : ByteStreamSplitDecoderBase<DType>(descr, descr->type_length()) {}
- // XXX If null_count is 0, we could even append in bulk or decode directly
into
- // builder
- VisitNullBitmapInline(
- valid_bits, valid_bits_offset, num_values, null_count,
- [&]() {
- builder->UnsafeAppend(decode_out[offset]);
- ++offset;
- },
- [&]() { builder->UnsafeAppendNull(); });
+ int Decode(T* buffer, int max_values) override {
+ // Decode into intermediate buffer.
+ max_values = std::min(max_values, this->num_values_);
+ uint8_t* decode_out = this->EnsureDecodeBuffer(max_values);
+ const int num_decoded = this->DecodeRaw(decode_out, max_values);
+ DCHECK_EQ(num_decoded, max_values);
-#else
- // XXX should operate over runs of 0s / 1s
- VisitNullBitmapInline(
- valid_bits, valid_bits_offset, num_values, null_count,
- [&]() {
- uint8_t gathered_byte_data[kNumStreams];
- for (int b = 0; b < kNumStreams; ++b) {
- const int64_t byte_index = b * num_values_in_buffer_ + offset;
- gathered_byte_data[b] = data[byte_index];
- }
- builder->UnsafeAppend(SafeLoadAs<T>(&gathered_byte_data[0]));
- ++offset;
- },
- [&]() { builder->UnsafeAppendNull(); });
-#endif
+ for (int i = 0; i < num_decoded; ++i) {
+ buffer[i] = FixedLenByteArray(decode_out +
static_cast<int64_t>(byte_width_) * i);
+ }
+ return num_decoded;
+ }
- num_values_ -= values_decoded;
- len_ -= sizeof(T) * values_decoded;
- return values_decoded;
-}
+ using ByteStreamSplitDecoderBase<DType>::DecodeArrow;
-template <typename DType>
-int ByteStreamSplitDecoder<DType>::DecodeArrow(
- int num_values, int null_count, const uint8_t* valid_bits, int64_t
valid_bits_offset,
- typename EncodingTraits<DType>::DictAccumulator* builder) {
- ParquetException::NYI("DecodeArrow for ByteStreamSplitDecoder");
-}
+ int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ typename EncodingTraits<DType>::Accumulator* builder)
override {
+ const int values_to_decode = num_values - null_count;
+ if (ARROW_PREDICT_FALSE(this->num_values_ < values_to_decode)) {
+ ParquetException::EofException();
+ }
+
+ PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
+
+ // Decode into intermediate buffer.
+ uint8_t* decode_out = this->EnsureDecodeBuffer(values_to_decode);
+ const int num_decoded = this->DecodeRaw(decode_out, values_to_decode);
+ DCHECK_EQ(num_decoded, values_to_decode);
+
+ // If null_count is 0, we could append in bulk or decode directly into
+ // builder. We could also decode in chunks, or use SpacedExpand. We don't
+ // bother currently, because DecodeArrow methods are only called for
ByteArray.
+ int64_t offset = 0;
+ VisitNullBitmapInline(
+ valid_bits, valid_bits_offset, num_values, null_count,
+ [&]() {
+ builder->UnsafeAppend(decode_out + offset *
static_cast<int64_t>(byte_width_));
+ ++offset;
+ },
+ [&]() { builder->UnsafeAppendNull(); });
+
+ return values_to_decode;
+ }
+};
} // namespace
@@ -3742,12 +3825,20 @@ std::unique_ptr<Encoder> MakeEncoder(Type::type
type_num, Encoding::type encodin
}
} else if (encoding == Encoding::BYTE_STREAM_SPLIT) {
switch (type_num) {
+ case Type::INT32:
+ return std::make_unique<ByteStreamSplitEncoder<Int32Type>>(descr,
pool);
+ case Type::INT64:
+ return std::make_unique<ByteStreamSplitEncoder<Int64Type>>(descr,
pool);
case Type::FLOAT:
return std::make_unique<ByteStreamSplitEncoder<FloatType>>(descr,
pool);
case Type::DOUBLE:
return std::make_unique<ByteStreamSplitEncoder<DoubleType>>(descr,
pool);
+ case Type::FIXED_LEN_BYTE_ARRAY:
+ return std::make_unique<ByteStreamSplitEncoder<FLBAType>>(descr, pool);
default:
- throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and
DOUBLE");
+ throw ParquetException(
+ "BYTE_STREAM_SPLIT only supports FLOAT, DOUBLE, INT32, INT64 "
+ "and FIXED_LEN_BYTE_ARRAY");
}
} else if (encoding == Encoding::DELTA_BINARY_PACKED) {
switch (type_num) {
@@ -3816,12 +3907,20 @@ std::unique_ptr<Decoder> MakeDecoder(Type::type
type_num, Encoding::type encodin
}
} else if (encoding == Encoding::BYTE_STREAM_SPLIT) {
switch (type_num) {
+ case Type::INT32:
+ return std::make_unique<ByteStreamSplitDecoder<Int32Type>>(descr);
+ case Type::INT64:
+ return std::make_unique<ByteStreamSplitDecoder<Int64Type>>(descr);
case Type::FLOAT:
return std::make_unique<ByteStreamSplitDecoder<FloatType>>(descr);
case Type::DOUBLE:
return std::make_unique<ByteStreamSplitDecoder<DoubleType>>(descr);
+ case Type::FIXED_LEN_BYTE_ARRAY:
+ return std::make_unique<ByteStreamSplitDecoder<FLBAType>>(descr);
default:
- throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and
DOUBLE");
+ throw ParquetException(
+ "BYTE_STREAM_SPLIT only supports FLOAT, DOUBLE, INT32, INT64 "
+ "and FIXED_LEN_BYTE_ARRAY");
}
} else if (encoding == Encoding::DELTA_BINARY_PACKED) {
switch (type_num) {
diff --git a/cpp/src/parquet/encoding_benchmark.cc
b/cpp/src/parquet/encoding_benchmark.cc
index 3069e8c905..61959b659f 100644
--- a/cpp/src/parquet/encoding_benchmark.cc
+++ b/cpp/src/parquet/encoding_benchmark.cc
@@ -17,6 +17,11 @@
#include "benchmark/benchmark.h"
+#include <array>
+#include <cmath>
+#include <limits>
+#include <random>
+
#include "arrow/array.h"
#include "arrow/array/builder_binary.h"
#include "arrow/array/builder_dict.h"
@@ -31,10 +36,6 @@
#include "parquet/platform.h"
#include "parquet/schema.h"
-#include <cmath>
-#include <limits>
-#include <random>
-
using arrow::default_memory_pool;
using arrow::MemoryPool;
@@ -361,32 +362,81 @@ static void
BM_PlainDecodingSpacedDouble(benchmark::State& state) {
}
BENCHMARK(BM_PlainDecodingSpacedDouble)->Apply(BM_SpacedArgs);
+template <typename T>
+struct ByteStreamSplitDummyValue {
+ static constexpr T value() { return static_cast<T>(42); }
+};
+
+template <typename T, size_t N>
+struct ByteStreamSplitDummyValue<std::array<T, N>> {
+ using Array = std::array<T, N>;
+
+ static constexpr Array value() {
+ Array array;
+ array.fill(ByteStreamSplitDummyValue<T>::value());
+ return array;
+ }
+};
+
template <typename T, typename DecodeFunc>
static void BM_ByteStreamSplitDecode(benchmark::State& state, DecodeFunc&&
decode_func) {
- std::vector<T> values(state.range(0), 64.0);
+ const std::vector<T> values(state.range(0),
ByteStreamSplitDummyValue<T>::value());
const uint8_t* values_raw = reinterpret_cast<const uint8_t*>(values.data());
- std::vector<T> output(state.range(0), 0);
+ std::vector<T> output(state.range(0));
for (auto _ : state) {
- decode_func(values_raw, static_cast<int64_t>(values.size()),
- static_cast<int64_t>(values.size()),
+ decode_func(values_raw,
+ /*width=*/static_cast<int>(sizeof(T)),
+ /*num_values=*/static_cast<int64_t>(values.size()),
+ /*stride=*/static_cast<int64_t>(values.size()),
reinterpret_cast<uint8_t*>(output.data()));
benchmark::ClobberMemory();
}
state.SetBytesProcessed(state.iterations() * values.size() * sizeof(T));
+ state.SetItemsProcessed(state.iterations() * values.size());
}
template <typename T, typename EncodeFunc>
static void BM_ByteStreamSplitEncode(benchmark::State& state, EncodeFunc&&
encode_func) {
- std::vector<T> values(state.range(0), 64.0);
+ const std::vector<T> values(state.range(0),
ByteStreamSplitDummyValue<T>::value());
const uint8_t* values_raw = reinterpret_cast<const uint8_t*>(values.data());
- std::vector<uint8_t> output(state.range(0) * sizeof(T), 0);
+ std::vector<uint8_t> output(state.range(0) * sizeof(T));
for (auto _ : state) {
- encode_func(values_raw, values.size(), output.data());
+ encode_func(values_raw, /*width=*/static_cast<int>(sizeof(T)),
values.size(),
+ output.data());
benchmark::ClobberMemory();
}
state.SetBytesProcessed(state.iterations() * values.size() * sizeof(T));
+ state.SetItemsProcessed(state.iterations() * values.size());
+}
+
+static void BM_ByteStreamSplitDecode_Float_Generic(benchmark::State& state) {
+ BM_ByteStreamSplitDecode<float>(state,
::arrow::util::internal::ByteStreamSplitDecode);
+}
+
+static void BM_ByteStreamSplitDecode_Double_Generic(benchmark::State& state) {
+ BM_ByteStreamSplitDecode<double>(state,
::arrow::util::internal::ByteStreamSplitDecode);
+}
+
+template <int N>
+static void BM_ByteStreamSplitDecode_FLBA_Generic(benchmark::State& state) {
+ BM_ByteStreamSplitDecode<std::array<int8_t, N>>(
+ state, ::arrow::util::internal::ByteStreamSplitDecode);
+}
+
+static void BM_ByteStreamSplitEncode_Float_Generic(benchmark::State& state) {
+ BM_ByteStreamSplitEncode<float>(state,
::arrow::util::internal::ByteStreamSplitEncode);
+}
+
+static void BM_ByteStreamSplitEncode_Double_Generic(benchmark::State& state) {
+ BM_ByteStreamSplitEncode<double>(state,
::arrow::util::internal::ByteStreamSplitEncode);
+}
+
+template <int N>
+static void BM_ByteStreamSplitEncode_FLBA_Generic(benchmark::State& state) {
+ BM_ByteStreamSplitEncode<std::array<int8_t, N>>(
+ state, ::arrow::util::internal::ByteStreamSplitEncode);
}
static void BM_ByteStreamSplitDecode_Float_Scalar(benchmark::State& state) {
@@ -409,10 +459,29 @@ static void
BM_ByteStreamSplitEncode_Double_Scalar(benchmark::State& state) {
state,
::arrow::util::internal::ByteStreamSplitEncodeScalar<sizeof(double)>);
}
-BENCHMARK(BM_ByteStreamSplitDecode_Float_Scalar)->Range(MIN_RANGE, MAX_RANGE);
-BENCHMARK(BM_ByteStreamSplitDecode_Double_Scalar)->Range(MIN_RANGE, MAX_RANGE);
-BENCHMARK(BM_ByteStreamSplitEncode_Float_Scalar)->Range(MIN_RANGE, MAX_RANGE);
-BENCHMARK(BM_ByteStreamSplitEncode_Double_Scalar)->Range(MIN_RANGE, MAX_RANGE);
+static void ByteStreamSplitApply(::benchmark::internal::Benchmark* bench) {
+ // Reduce the number of variations by only testing the two range ends.
+ bench->Arg(MIN_RANGE)->Arg(MAX_RANGE);
+}
+
+BENCHMARK(BM_ByteStreamSplitDecode_Float_Generic)->Apply(ByteStreamSplitApply);
+BENCHMARK(BM_ByteStreamSplitDecode_Double_Generic)->Apply(ByteStreamSplitApply);
+BENCHMARK_TEMPLATE(BM_ByteStreamSplitDecode_FLBA_Generic,
2)->Apply(ByteStreamSplitApply);
+BENCHMARK_TEMPLATE(BM_ByteStreamSplitDecode_FLBA_Generic,
7)->Apply(ByteStreamSplitApply);
+BENCHMARK_TEMPLATE(BM_ByteStreamSplitDecode_FLBA_Generic, 16)
+ ->Apply(ByteStreamSplitApply);
+
+BENCHMARK(BM_ByteStreamSplitEncode_Float_Generic)->Apply(ByteStreamSplitApply);
+BENCHMARK(BM_ByteStreamSplitEncode_Double_Generic)->Apply(ByteStreamSplitApply);
+BENCHMARK_TEMPLATE(BM_ByteStreamSplitEncode_FLBA_Generic,
2)->Apply(ByteStreamSplitApply);
+BENCHMARK_TEMPLATE(BM_ByteStreamSplitEncode_FLBA_Generic,
7)->Apply(ByteStreamSplitApply);
+BENCHMARK_TEMPLATE(BM_ByteStreamSplitEncode_FLBA_Generic, 16)
+ ->Apply(ByteStreamSplitApply);
+
+BENCHMARK(BM_ByteStreamSplitDecode_Float_Scalar)->Apply(ByteStreamSplitApply);
+BENCHMARK(BM_ByteStreamSplitDecode_Double_Scalar)->Apply(ByteStreamSplitApply);
+BENCHMARK(BM_ByteStreamSplitEncode_Float_Scalar)->Apply(ByteStreamSplitApply);
+BENCHMARK(BM_ByteStreamSplitEncode_Double_Scalar)->Apply(ByteStreamSplitApply);
#if defined(ARROW_HAVE_SSE4_2)
static void BM_ByteStreamSplitDecode_Float_Sse2(benchmark::State& state) {
@@ -435,10 +504,10 @@ static void
BM_ByteStreamSplitEncode_Double_Sse2(benchmark::State& state) {
state,
::arrow::util::internal::ByteStreamSplitEncodeSimd128<sizeof(double)>);
}
-BENCHMARK(BM_ByteStreamSplitDecode_Float_Sse2)->Range(MIN_RANGE, MAX_RANGE);
-BENCHMARK(BM_ByteStreamSplitDecode_Double_Sse2)->Range(MIN_RANGE, MAX_RANGE);
-BENCHMARK(BM_ByteStreamSplitEncode_Float_Sse2)->Range(MIN_RANGE, MAX_RANGE);
-BENCHMARK(BM_ByteStreamSplitEncode_Double_Sse2)->Range(MIN_RANGE, MAX_RANGE);
+BENCHMARK(BM_ByteStreamSplitDecode_Float_Sse2)->Apply(ByteStreamSplitApply);
+BENCHMARK(BM_ByteStreamSplitDecode_Double_Sse2)->Apply(ByteStreamSplitApply);
+BENCHMARK(BM_ByteStreamSplitEncode_Float_Sse2)->Apply(ByteStreamSplitApply);
+BENCHMARK(BM_ByteStreamSplitEncode_Double_Sse2)->Apply(ByteStreamSplitApply);
#endif
#if defined(ARROW_HAVE_AVX2)
@@ -462,10 +531,10 @@ static void
BM_ByteStreamSplitEncode_Double_Avx2(benchmark::State& state) {
state,
::arrow::util::internal::ByteStreamSplitEncodeAvx2<sizeof(double)>);
}
-BENCHMARK(BM_ByteStreamSplitDecode_Float_Avx2)->Range(MIN_RANGE, MAX_RANGE);
-BENCHMARK(BM_ByteStreamSplitDecode_Double_Avx2)->Range(MIN_RANGE, MAX_RANGE);
-BENCHMARK(BM_ByteStreamSplitEncode_Float_Avx2)->Range(MIN_RANGE, MAX_RANGE);
-BENCHMARK(BM_ByteStreamSplitEncode_Double_Avx2)->Range(MIN_RANGE, MAX_RANGE);
+BENCHMARK(BM_ByteStreamSplitDecode_Float_Avx2)->Apply(ByteStreamSplitApply);
+BENCHMARK(BM_ByteStreamSplitDecode_Double_Avx2)->Apply(ByteStreamSplitApply);
+BENCHMARK(BM_ByteStreamSplitEncode_Float_Avx2)->Apply(ByteStreamSplitApply);
+BENCHMARK(BM_ByteStreamSplitEncode_Double_Avx2)->Apply(ByteStreamSplitApply);
#endif
#if defined(ARROW_HAVE_NEON)
diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc
index ee581622c8..ea0029f4c7 100644
--- a/cpp/src/parquet/encoding_test.cc
+++ b/cpp/src/parquet/encoding_test.cc
@@ -47,6 +47,7 @@
using arrow::default_memory_pool;
using arrow::MemoryPool;
using arrow::internal::checked_cast;
+using arrow::util::span;
namespace bit_util = arrow::bit_util;
@@ -711,8 +712,11 @@ class EncodingAdHocTyped : public ::testing::Test {
}
void ByteStreamSplit(int seed) {
- if (!std::is_same<ParquetType, FloatType>::value &&
- !std::is_same<ParquetType, DoubleType>::value) {
+ if constexpr (!std::is_same_v<ParquetType, FloatType> &&
+ !std::is_same_v<ParquetType, DoubleType> &&
+ !std::is_same_v<ParquetType, Int32Type> &&
+ !std::is_same_v<ParquetType, Int64Type> &&
+ !std::is_same_v<ParquetType, FLBAType>) {
return;
}
auto values = GetValues(seed);
@@ -1234,7 +1238,8 @@ class TestByteStreamSplitEncoding : public
TestEncodingBase<Type> {
encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
encode_buffer_ = encoder->FlushValues();
- decoder->SetData(num_values_, encode_buffer_->data(),
+ ASSERT_EQ(encode_buffer_->size(), physical_byte_width() * (num_values_ -
null_count));
+ decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
static_cast<int>(encode_buffer_->size()));
auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_,
null_count,
valid_bits, valid_bits_offset);
@@ -1249,33 +1254,58 @@ class TestByteStreamSplitEncoding : public
TestEncodingBase<Type> {
protected:
USING_BASE_MEMBERS();
- void CheckDecode(const uint8_t* encoded_data, const int64_t
encoded_data_size,
- const c_type* expected_decoded_data, const int
num_elements) {
+ template <typename U>
+ void CheckDecode(span<const uint8_t> encoded_data, span<const U>
expected_decoded_data,
+ const ColumnDescriptor* descr = nullptr) {
+ static_assert(sizeof(U) == sizeof(c_type));
+ static_assert(std::is_same_v<U, FLBA> == std::is_same_v<c_type, FLBA>);
+
std::unique_ptr<TypedDecoder<Type>> decoder =
- MakeTypedDecoder<Type>(Encoding::BYTE_STREAM_SPLIT);
- decoder->SetData(num_elements, encoded_data,
static_cast<int>(encoded_data_size));
- std::vector<c_type> decoded_data(num_elements);
- int num_decoded_elements = decoder->Decode(decoded_data.data(),
num_elements);
+ MakeTypedDecoder<Type>(Encoding::BYTE_STREAM_SPLIT, descr);
+ int num_elements = static_cast<int>(expected_decoded_data.size());
+ decoder->SetData(num_elements, encoded_data.data(),
+ static_cast<int>(encoded_data.size()));
+ std::vector<U> decoded_data(num_elements);
+ int num_decoded_elements =
+ decoder->Decode(reinterpret_cast<c_type*>(decoded_data.data()),
num_elements);
ASSERT_EQ(num_elements, num_decoded_elements);
- for (size_t i = 0U; i < decoded_data.size(); ++i) {
- ASSERT_EQ(expected_decoded_data[i], decoded_data[i]);
+ // Compare to expected values
+ if constexpr (std::is_same_v<c_type, FLBA>) {
+ auto type_length = descr->type_length();
+ for (int i = 0; i < num_elements; ++i) {
+ ASSERT_EQ(span<const uint8_t>(expected_decoded_data[i].ptr,
type_length),
+ span<const uint8_t>(decoded_data[i].ptr, type_length));
+ }
+ } else {
+ for (int i = 0; i < num_elements; ++i) {
+ ASSERT_EQ(expected_decoded_data[i], decoded_data[i]);
+ }
}
ASSERT_EQ(0, decoder->values_left());
}
- void CheckEncode(const c_type* data, const int num_elements,
- const uint8_t* expected_encoded_data,
- const int64_t encoded_data_size) {
- std::unique_ptr<TypedEncoder<Type>> encoder =
- MakeTypedEncoder<Type>(Encoding::BYTE_STREAM_SPLIT);
- encoder->Put(data, num_elements);
+ template <typename U>
+ void CheckEncode(span<const U> data, span<const uint8_t>
expected_encoded_data,
+ const ColumnDescriptor* descr = nullptr) {
+ static_assert(sizeof(U) == sizeof(c_type));
+ static_assert(std::is_same_v<U, FLBA> == std::is_same_v<c_type, FLBA>);
+
+ std::unique_ptr<TypedEncoder<Type>> encoder = MakeTypedEncoder<Type>(
+ Encoding::BYTE_STREAM_SPLIT, /*use_dictionary=*/false, descr);
+ int num_elements = static_cast<int>(data.size());
+ encoder->Put(reinterpret_cast<const c_type*>(data.data()), num_elements);
auto encoded_data = encoder->FlushValues();
- ASSERT_EQ(encoded_data_size, encoded_data->size());
+ ASSERT_EQ(expected_encoded_data.size(), encoded_data->size());
const uint8_t* encoded_data_raw = encoded_data->data();
for (int64_t i = 0; i < encoded_data->size(); ++i) {
ASSERT_EQ(expected_encoded_data[i], encoded_data_raw[i]);
}
}
+
+ int physical_byte_width() const {
+ return std::is_same_v<c_type, FLBA> ? descr_->type_length()
+ : static_cast<int>(sizeof(c_type));
+ }
};
template <typename c_type>
@@ -1287,54 +1317,97 @@ static std::vector<c_type> ToLittleEndian(const
std::vector<c_type>& input) {
return data;
}
-static_assert(sizeof(float) == sizeof(uint32_t),
- "BYTE_STREAM_SPLIT encoding tests assume float / uint32_t type
sizes");
-static_assert(sizeof(double) == sizeof(uint64_t),
- "BYTE_STREAM_SPLIT encoding tests assume double / uint64_t type
sizes");
-
-template <>
-void TestByteStreamSplitEncoding<FloatType>::CheckDecode() {
- const uint8_t data[] = {0x11, 0x22, 0x33, 0x44, 0x55, 0x66,
- 0x77, 0x88, 0x99, 0xAA, 0xBB, 0xCC};
- const auto expected_output =
- ToLittleEndian<uint32_t>({0xAA774411U, 0xBB885522U, 0xCC996633U});
- CheckDecode(data, static_cast<int64_t>(sizeof(data)),
- reinterpret_cast<const float*>(expected_output.data()),
- static_cast<int>(sizeof(data) / sizeof(float)));
-}
-
-template <>
-void TestByteStreamSplitEncoding<DoubleType>::CheckDecode() {
- const uint8_t data[] = {0xDE, 0xC0, 0x37, 0x13, 0x11, 0x22, 0x33, 0x44,
- 0xAA, 0xBB, 0xCC, 0xDD, 0x55, 0x66, 0x77, 0x88};
- const auto expected_output =
- ToLittleEndian<uint64_t>({0x7755CCAA331137DEULL, 0x8866DDBB442213C0ULL});
- CheckDecode(data, static_cast<int64_t>(sizeof(data)),
- reinterpret_cast<const double*>(expected_output.data()),
- static_cast<int>(sizeof(data) / sizeof(double)));
-}
-
-template <>
-void TestByteStreamSplitEncoding<DoubleType>::CheckEncode() {
- const auto data = ToLittleEndian<uint64_t>(
- {0x4142434445464748ULL, 0x0102030405060708ULL, 0xb1b2b3b4b5b6b7b8ULL});
- const uint8_t expected_output[24] = {
- 0x48, 0x08, 0xb8, 0x47, 0x07, 0xb7, 0x46, 0x06, 0xb6, 0x45, 0x05, 0xb5,
- 0x44, 0x04, 0xb4, 0x43, 0x03, 0xb3, 0x42, 0x02, 0xb2, 0x41, 0x01, 0xb1,
- };
- CheckEncode(reinterpret_cast<const double*>(data.data()),
static_cast<int>(data.size()),
- expected_output, sizeof(expected_output));
+std::shared_ptr<ColumnDescriptor> FLBAColumnDescriptor(int type_length) {
+ auto node =
+ schema::PrimitiveNode::Make("", Repetition::REQUIRED,
Type::FIXED_LEN_BYTE_ARRAY,
+ ConvertedType::NONE, type_length);
+ return std::make_shared<ColumnDescriptor>(std::move(node),
/*max_definition_level=*/0,
+ /*max_repetition_level=*/0);
}
-template <>
-void TestByteStreamSplitEncoding<FloatType>::CheckEncode() {
- const auto data = ToLittleEndian<uint32_t>({0xaabbccdd, 0x11223344});
- const uint8_t expected_output[8] = {0xdd, 0x44, 0xcc, 0x33, 0xbb, 0x22,
0xaa, 0x11};
- CheckEncode(reinterpret_cast<const float*>(data.data()),
static_cast<int>(data.size()),
- expected_output, sizeof(expected_output));
+template <typename Type>
+void TestByteStreamSplitEncoding<Type>::CheckDecode() {
+ if constexpr (std::is_same_v<c_type, FLBA>) {
+ // FIXED_LEN_BYTE_ARRAY
+ // - type_length = 3
+ {
+ const std::vector<uint8_t> data{0x11, 0x22, 0x33, 0x44, 0x55, 0x66,
+ 0x77, 0x88, 0x99, 0xAA, 0xBB, 0xCC};
+ const std::vector<uint8_t> raw_expected_output{0x11, 0x55, 0x99, 0x22,
0x66, 0xAA,
+ 0x33, 0x77, 0xBB, 0x44,
0x88, 0xCC};
+ const std::vector<FLBA> expected_output{
+ FLBA{&raw_expected_output[0]}, FLBA{&raw_expected_output[3]},
+ FLBA{&raw_expected_output[6]}, FLBA{&raw_expected_output[9]}};
+ CheckDecode(span{data}, span{expected_output},
FLBAColumnDescriptor(3).get());
+ }
+ // - type_length = 1
+ {
+ const std::vector<uint8_t> data{0x11, 0x22, 0x33};
+ const std::vector<uint8_t> raw_expected_output{0x11, 0x22, 0x33};
+ const std::vector<FLBA> expected_output{FLBA{&raw_expected_output[0]},
+ FLBA{&raw_expected_output[1]},
+ FLBA{&raw_expected_output[2]}};
+ CheckDecode(span{data}, span{expected_output},
FLBAColumnDescriptor(1).get());
+ }
+ } else if constexpr (sizeof(c_type) == 4) {
+ // INT32, FLOAT
+ const std::vector<uint8_t> data{0x11, 0x22, 0x33, 0x44, 0x55, 0x66,
+ 0x77, 0x88, 0x99, 0xAA, 0xBB, 0xCC};
+ const auto expected_output =
+ ToLittleEndian<uint32_t>({0xAA774411U, 0xBB885522U, 0xCC996633U});
+ CheckDecode(span{data}, span{expected_output});
+ } else {
+ // INT64, DOUBLE
+ const std::vector<uint8_t> data{0xDE, 0xC0, 0x37, 0x13, 0x11, 0x22, 0x33,
0x44,
+ 0xAA, 0xBB, 0xCC, 0xDD, 0x55, 0x66, 0x77,
0x88};
+ const auto expected_output =
+ ToLittleEndian<uint64_t>({0x7755CCAA331137DEULL,
0x8866DDBB442213C0ULL});
+ CheckDecode(span{data}, span{expected_output});
+ }
}
-typedef ::testing::Types<FloatType, DoubleType> ByteStreamSplitTypes;
+template <typename Type>
+void TestByteStreamSplitEncoding<Type>::CheckEncode() {
+ if constexpr (std::is_same_v<c_type, FLBA>) {
+ // FIXED_LEN_BYTE_ARRAY
+ // - type_length = 3
+ {
+ const std::vector<uint8_t> raw_data{0x11, 0x22, 0x33, 0x44, 0x55, 0x66,
+ 0x77, 0x88, 0x99, 0xAA, 0xBB, 0xCC};
+ const std::vector<FLBA> data{FLBA{&raw_data[0]}, FLBA{&raw_data[3]},
+ FLBA{&raw_data[6]}, FLBA{&raw_data[9]}};
+ const std::vector<uint8_t> expected_output{0x11, 0x44, 0x77, 0xAA, 0x22,
0x55,
+ 0x88, 0xBB, 0x33, 0x66, 0x99,
0xCC};
+ CheckEncode(span{data}, span{expected_output},
FLBAColumnDescriptor(3).get());
+ }
+ // - type_length = 1
+ {
+ const std::vector<uint8_t> raw_data{0x11, 0x22, 0x33};
+ const std::vector<FLBA> data{FLBA{&raw_data[0]}, FLBA{&raw_data[1]},
+ FLBA{&raw_data[2]}};
+ const std::vector<uint8_t> expected_output{0x11, 0x22, 0x33};
+ CheckEncode(span{data}, span{expected_output},
FLBAColumnDescriptor(1).get());
+ }
+ } else if constexpr (sizeof(c_type) == 4) {
+ // INT32, FLOAT
+ const auto data = ToLittleEndian<uint32_t>({0xaabbccddUL, 0x11223344UL});
+ const std::vector<uint8_t> expected_output{0xdd, 0x44, 0xcc, 0x33,
+ 0xbb, 0x22, 0xaa, 0x11};
+ CheckEncode(span{data}, span{expected_output});
+ } else {
+ // INT64, DOUBLE
+ const auto data = ToLittleEndian<uint64_t>(
+ {0x4142434445464748ULL, 0x0102030405060708ULL, 0xb1b2b3b4b5b6b7b8ULL});
+ const std::vector<uint8_t> expected_output{
+ 0x48, 0x08, 0xb8, 0x47, 0x07, 0xb7, 0x46, 0x06, 0xb6, 0x45, 0x05, 0xb5,
+ 0x44, 0x04, 0xb4, 0x43, 0x03, 0xb3, 0x42, 0x02, 0xb2, 0x41, 0x01, 0xb1,
+ };
+ CheckEncode(span{data}, span{expected_output});
+ }
+}
+
+using ByteStreamSplitTypes =
+ ::testing::Types<Int32Type, Int64Type, FloatType, DoubleType, FLBAType>;
TYPED_TEST_SUITE(TestByteStreamSplitEncoding, ByteStreamSplitTypes);
TYPED_TEST(TestByteStreamSplitEncoding, BasicRoundTrip) {
@@ -1397,30 +1470,20 @@ TYPED_TEST(TestByteStreamSplitEncoding,
CheckOnlyEncode) {
TEST(ByteStreamSplitEncodeDecode, InvalidDataTypes) {
// First check encoders.
- ASSERT_THROW(MakeTypedEncoder<Int32Type>(Encoding::BYTE_STREAM_SPLIT),
- ParquetException);
- ASSERT_THROW(MakeTypedEncoder<Int64Type>(Encoding::BYTE_STREAM_SPLIT),
- ParquetException);
ASSERT_THROW(MakeTypedEncoder<Int96Type>(Encoding::BYTE_STREAM_SPLIT),
ParquetException);
ASSERT_THROW(MakeTypedEncoder<BooleanType>(Encoding::BYTE_STREAM_SPLIT),
ParquetException);
ASSERT_THROW(MakeTypedEncoder<ByteArrayType>(Encoding::BYTE_STREAM_SPLIT),
ParquetException);
- ASSERT_THROW(MakeTypedEncoder<FLBAType>(Encoding::BYTE_STREAM_SPLIT),
ParquetException);
// Then check decoders.
- ASSERT_THROW(MakeTypedDecoder<Int32Type>(Encoding::BYTE_STREAM_SPLIT),
- ParquetException);
- ASSERT_THROW(MakeTypedDecoder<Int64Type>(Encoding::BYTE_STREAM_SPLIT),
- ParquetException);
ASSERT_THROW(MakeTypedDecoder<Int96Type>(Encoding::BYTE_STREAM_SPLIT),
ParquetException);
ASSERT_THROW(MakeTypedDecoder<BooleanType>(Encoding::BYTE_STREAM_SPLIT),
ParquetException);
ASSERT_THROW(MakeTypedDecoder<ByteArrayType>(Encoding::BYTE_STREAM_SPLIT),
ParquetException);
- ASSERT_THROW(MakeTypedDecoder<FLBAType>(Encoding::BYTE_STREAM_SPLIT),
ParquetException);
}
// ----------------------------------------------------------------------
diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc
index f9c2e06873..7b2169e247 100644
--- a/cpp/src/parquet/reader_test.cc
+++ b/cpp/src/parquet/reader_test.cc
@@ -21,6 +21,8 @@
#include <iostream>
#include <memory>
#include <string>
+#include <type_traits>
+#include <utility>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
@@ -33,6 +35,7 @@
#include "arrow/testing/random.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/config.h"
+#include "arrow/util/range.h"
#include "parquet/column_reader.h"
#include "parquet/column_scanner.h"
@@ -45,6 +48,7 @@
#include "parquet/test_util.h"
using arrow::internal::checked_pointer_cast;
+using arrow::internal::Zip;
namespace parquet {
using schema::GroupNode;
@@ -123,6 +127,10 @@ std::string concatenated_gzip_members() {
std::string byte_stream_split() { return
data_file("byte_stream_split.zstd.parquet"); }
+std::string byte_stream_split_extended() {
+ return data_file("byte_stream_split_extended.gzip.parquet");
+}
+
template <typename DType, typename ValueType = typename DType::c_type>
std::vector<ValueType> ReadColumnValues(ParquetFileReader* file_reader, int
row_group,
int column, int64_t
expected_values_read) {
@@ -137,6 +145,23 @@ std::vector<ValueType> ReadColumnValues(ParquetFileReader*
file_reader, int row_
return values;
}
+template <typename ValueType>
+void AssertColumnValuesEqual(const ColumnDescriptor* descr,
+ const std::vector<ValueType>& left_values,
+ const std::vector<ValueType>& right_values) {
+ if constexpr (std::is_same_v<ValueType, FLBA>) {
+ // operator== for FLBA in test_util.h is unusable (it hard-codes length to
12)
+ const auto length = descr->type_length();
+ for (const auto& [left, right] : Zip(left_values, right_values)) {
+ std::string_view left_view(reinterpret_cast<const char*>(left.ptr),
length);
+ std::string_view right_view(reinterpret_cast<const char*>(right.ptr),
length);
+ ASSERT_EQ(left_view, right_view);
+ }
+ } else {
+ ASSERT_EQ(left_values, right_values);
+ }
+}
+
// TODO: Assert on definition and repetition levels
template <typename DType, typename ValueType = typename DType::c_type>
void AssertColumnValues(std::shared_ptr<TypedColumnReader<DType>> col, int64_t
batch_size,
@@ -149,9 +174,46 @@ void
AssertColumnValues(std::shared_ptr<TypedColumnReader<DType>> col, int64_t b
auto levels_read =
col->ReadBatch(batch_size, nullptr, nullptr, values.data(),
&values_read);
ASSERT_EQ(expected_levels_read, levels_read);
+ ASSERT_EQ(expected_values_read, values_read);
+ AssertColumnValuesEqual(col->descr(), expected_values, values);
+}
+
+template <typename DType, typename ValueType = typename DType::c_type>
+void AssertColumnValuesEqual(std::shared_ptr<TypedColumnReader<DType>>
left_col,
+ std::shared_ptr<TypedColumnReader<DType>>
right_col,
+ int64_t batch_size, int64_t expected_levels_read,
+ int64_t expected_values_read) {
+ std::vector<ValueType> left_values(batch_size);
+ std::vector<ValueType> right_values(batch_size);
+ int64_t values_read, levels_read;
+
+ levels_read =
+ left_col->ReadBatch(batch_size, nullptr, nullptr, left_values.data(),
&values_read);
+ ASSERT_EQ(expected_levels_read, levels_read);
+ ASSERT_EQ(expected_values_read, values_read);
- ASSERT_EQ(expected_values, values);
+ levels_read = right_col->ReadBatch(batch_size, nullptr, nullptr,
right_values.data(),
+ &values_read);
+ ASSERT_EQ(expected_levels_read, levels_read);
ASSERT_EQ(expected_values_read, values_read);
+
+ AssertColumnValuesEqual(left_col->descr(), left_values, right_values);
+}
+
+template <typename DType, typename ValueType = typename DType::c_type>
+void AssertColumnValuesEqual(ParquetFileReader* file_reader, const
std::string& left_col,
+ const std::string& right_col, int64_t num_rows,
+ int row_group = 0) {
+ ARROW_SCOPED_TRACE("left_col = '", left_col, "', right_col = '", right_col,
"'");
+
+ auto left_col_index =
file_reader->metadata()->schema()->ColumnIndex(left_col);
+ auto right_col_index =
file_reader->metadata()->schema()->ColumnIndex(right_col);
+ auto row_group_reader = file_reader->RowGroup(row_group);
+ auto left_reader = checked_pointer_cast<TypedColumnReader<DType>>(
+ row_group_reader->Column(left_col_index));
+ auto right_reader = checked_pointer_cast<TypedColumnReader<DType>>(
+ row_group_reader->Column(right_col_index));
+ AssertColumnValuesEqual(left_reader, right_reader, num_rows, num_rows,
num_rows);
}
void CheckRowGroupMetadata(const RowGroupMetaData* rg_metadata,
@@ -1522,6 +1584,34 @@ TEST(TestByteStreamSplit, FloatIntegrationFile) {
}
#endif // ARROW_WITH_ZSTD
+#ifdef ARROW_WITH_ZLIB
+TEST(TestByteStreamSplit, ExtendedIntegrationFile) {
+ auto file_path = byte_stream_split_extended();
+ auto file = ParquetFileReader::OpenFile(file_path);
+
+ const int64_t kNumRows = 200;
+
+ ASSERT_EQ(kNumRows, file->metadata()->num_rows());
+ ASSERT_EQ(14, file->metadata()->num_columns());
+ ASSERT_EQ(1, file->metadata()->num_row_groups());
+
+ AssertColumnValuesEqual<FloatType>(file.get(), "float_plain",
"float_byte_stream_split",
+ kNumRows);
+ AssertColumnValuesEqual<DoubleType>(file.get(), "double_plain",
+ "double_byte_stream_split", kNumRows);
+ AssertColumnValuesEqual<Int32Type>(file.get(), "int32_plain",
"int32_byte_stream_split",
+ kNumRows);
+ AssertColumnValuesEqual<Int64Type>(file.get(), "int64_plain",
"int64_byte_stream_split",
+ kNumRows);
+ AssertColumnValuesEqual<FLBAType>(file.get(), "float16_plain",
+ "float16_byte_stream_split", kNumRows);
+ AssertColumnValuesEqual<FLBAType>(file.get(), "flba5_plain",
"flba5_byte_stream_split",
+ kNumRows);
+ AssertColumnValuesEqual<FLBAType>(file.get(), "decimal_plain",
+ "decimal_byte_stream_split", kNumRows);
+}
+#endif // ARROW_WITH_ZLIB
+
struct PageIndexReaderParam {
std::vector<int32_t> row_group_indices;
std::vector<int32_t> column_indices;
diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing
index 4cb3cff24c..74278bc4a1 160000
--- a/cpp/submodules/parquet-testing
+++ b/cpp/submodules/parquet-testing
@@ -1 +1 @@
-Subproject commit 4cb3cff24c965fb329cdae763eabce47395a68a0
+Subproject commit 74278bc4a1122d74945969e6dec405abd1533ec3
diff --git a/python/pyarrow/tests/parquet/test_basic.py
b/python/pyarrow/tests/parquet/test_basic.py
index bc21d709ec..56b967a059 100644
--- a/python/pyarrow/tests/parquet/test_basic.py
+++ b/python/pyarrow/tests/parquet/test_basic.py
@@ -323,6 +323,7 @@ def test_byte_stream_split():
# This is only a smoke test.
arr_float = pa.array(list(map(float, range(100))))
arr_int = pa.array(list(map(int, range(100))))
+ arr_bool = pa.array([True, False] * 50)
data_float = [arr_float, arr_float]
table = pa.Table.from_arrays(data_float, names=['a', 'b'])
@@ -342,16 +343,16 @@ def test_byte_stream_split():
use_byte_stream_split=['a', 'b'])
# Check with mixed column types.
- mixed_table = pa.Table.from_arrays([arr_float, arr_int],
- names=['a', 'b'])
+ mixed_table = pa.Table.from_arrays([arr_float, arr_float, arr_int,
arr_int],
+ names=['a', 'b', 'c', 'd'])
_check_roundtrip(mixed_table, expected=mixed_table,
- use_dictionary=['b'],
- use_byte_stream_split=['a'])
+ use_dictionary=['b', 'd'],
+ use_byte_stream_split=['a', 'c'])
# Try to use the wrong data type with the byte_stream_split encoding.
# This should throw an exception.
- table = pa.Table.from_arrays([arr_int], names=['tmp'])
- with pytest.raises(IOError):
+ table = pa.Table.from_arrays([arr_bool], names=['tmp'])
+ with pytest.raises(IOError, match='BYTE_STREAM_SPLIT only supports'):
_check_roundtrip(table, expected=table, use_byte_stream_split=True,
use_dictionary=False)
@@ -367,12 +368,13 @@ def test_column_encoding():
[arr_float, arr_int, arr_bin, arr_flba, arr_bool],
names=['a', 'b', 'c', 'd', 'e'])
- # Check "BYTE_STREAM_SPLIT" for column 'a' and "PLAIN" column_encoding for
- # column 'b' and 'c'.
+ # Check "BYTE_STREAM_SPLIT" for columns 'a', 'b', 'd'
+ # and "PLAIN" column_encoding for column 'c'.
_check_roundtrip(mixed_table, expected=mixed_table, use_dictionary=False,
column_encoding={'a': "BYTE_STREAM_SPLIT",
- 'b': "PLAIN",
- 'c': "PLAIN"})
+ 'b': "BYTE_STREAM_SPLIT",
+ 'c': "PLAIN",
+ 'd': "BYTE_STREAM_SPLIT"})
# Check "PLAIN" for all columns.
_check_roundtrip(mixed_table, expected=mixed_table,
@@ -406,20 +408,20 @@ def test_column_encoding():
use_dictionary=False,
column_encoding={'e': "RLE"})
- # Try to pass "BYTE_STREAM_SPLIT" column encoding for integer column 'b'.
- # This should throw an error as it is only supports FLOAT and DOUBLE.
+ # Try to pass "BYTE_STREAM_SPLIT" column encoding for boolean column 'e'.
+ # This should throw an error as it is does not support BOOLEAN.
with pytest.raises(IOError,
- match="BYTE_STREAM_SPLIT only supports FLOAT and"
- " DOUBLE"):
+ match="BYTE_STREAM_SPLIT only supports"):
_check_roundtrip(mixed_table, expected=mixed_table,
use_dictionary=False,
column_encoding={'a': "PLAIN",
- 'b': "BYTE_STREAM_SPLIT",
- 'c': "PLAIN"})
+ 'c': "PLAIN",
+ 'e': "BYTE_STREAM_SPLIT"})
# Try to pass use "DELTA_BINARY_PACKED" encoding on float column.
# This should throw an error as only integers are supported.
- with pytest.raises(OSError):
+ with pytest.raises(OSError,
+ match="DELTA_BINARY_PACKED encoder only supports"):
_check_roundtrip(mixed_table, expected=mixed_table,
use_dictionary=False,
column_encoding={'a': "DELTA_BINARY_PACKED",
@@ -429,13 +431,15 @@ def test_column_encoding():
# Try to pass "RLE_DICTIONARY".
# This should throw an error as dictionary encoding is already used by
# default and not supported to be specified as "fallback" encoding
- with pytest.raises(ValueError):
+ with pytest.raises(ValueError,
+ match="'RLE_DICTIONARY' is already used by default"):
_check_roundtrip(mixed_table, expected=mixed_table,
use_dictionary=False,
column_encoding="RLE_DICTIONARY")
# Try to pass unsupported encoding.
- with pytest.raises(ValueError):
+ with pytest.raises(ValueError,
+ match="Unsupported column encoding:
'MADE_UP_ENCODING'"):
_check_roundtrip(mixed_table, expected=mixed_table,
use_dictionary=False,
column_encoding={'a': "MADE_UP_ENCODING"})