This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new a364e4a16d GH-39978: [C++][Parquet] Expand BYTE_STREAM_SPLIT to 
support FIXED_LEN_BYTE_ARRAY, INT32 and INT64 (#40094)
a364e4a16d is described below

commit a364e4a16d596f7fcccd76ad8fc27092c59e5e74
Author: Antoine Pitrou <[email protected]>
AuthorDate: Tue Mar 19 12:06:28 2024 +0100

    GH-39978: [C++][Parquet] Expand BYTE_STREAM_SPLIT to support 
FIXED_LEN_BYTE_ARRAY, INT32 and INT64 (#40094)
    
    ### What changes are included in this PR?
    
    Implement the format addition described in 
https://issues.apache.org/jira/browse/PARQUET-2414 .
    
    ### Are these changes tested?
    
    Yes.
    
    ### Are there any user-facing changes?
    
    Yes (additional types supported for Parquet encoding).
    
    * GitHub Issue: #39978
    
    Authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/util/byte_stream_split_internal.h | 126 +++++--
 cpp/src/arrow/util/byte_stream_split_test.cc    |  72 ++--
 cpp/src/parquet/column_io_benchmark.cc          | 103 +++++-
 cpp/src/parquet/column_writer_test.cc           |  16 +-
 cpp/src/parquet/encoding.cc                     | 445 +++++++++++++++---------
 cpp/src/parquet/encoding_benchmark.cc           | 115 ++++--
 cpp/src/parquet/encoding_test.cc                | 207 +++++++----
 cpp/src/parquet/reader_test.cc                  |  92 ++++-
 cpp/submodules/parquet-testing                  |   2 +-
 python/pyarrow/tests/parquet/test_basic.py      |  42 ++-
 10 files changed, 849 insertions(+), 371 deletions(-)

diff --git a/cpp/src/arrow/util/byte_stream_split_internal.h 
b/cpp/src/arrow/util/byte_stream_split_internal.h
index 77284d695b..8bca0d442c 100644
--- a/cpp/src/arrow/util/byte_stream_split_internal.h
+++ b/cpp/src/arrow/util/byte_stream_split_internal.h
@@ -19,11 +19,14 @@
 
 #include "arrow/util/endian.h"
 #include "arrow/util/simd.h"
+#include "arrow/util/small_vector.h"
 #include "arrow/util/ubsan.h"
 
 #include <algorithm>
+#include <array>
 #include <cassert>
 #include <cstdint>
+#include <cstring>
 
 #if defined(ARROW_HAVE_NEON) || defined(ARROW_HAVE_SSE4_2)
 #include <xsimd/xsimd.hpp>
@@ -38,10 +41,11 @@ namespace arrow::util::internal {
 
 #if defined(ARROW_HAVE_NEON) || defined(ARROW_HAVE_SSE4_2)
 template <int kNumStreams>
-void ByteStreamSplitDecodeSimd128(const uint8_t* data, int64_t num_values, 
int64_t stride,
-                                  uint8_t* out) {
+void ByteStreamSplitDecodeSimd128(const uint8_t* data, int width, int64_t 
num_values,
+                                  int64_t stride, uint8_t* out) {
   using simd_batch = xsimd::make_sized_batch_t<int8_t, 16>;
 
+  assert(width == kNumStreams);
   static_assert(kNumStreams == 4 || kNumStreams == 8, "Invalid number of 
streams.");
   constexpr int kNumStreamsLog2 = (kNumStreams == 8 ? 3 : 2);
   constexpr int64_t kBlockSize = sizeof(simd_batch) * kNumStreams;
@@ -92,10 +96,11 @@ void ByteStreamSplitDecodeSimd128(const uint8_t* data, 
int64_t num_values, int64
 }
 
 template <int kNumStreams>
-void ByteStreamSplitEncodeSimd128(const uint8_t* raw_values, const int64_t 
num_values,
-                                  uint8_t* output_buffer_raw) {
+void ByteStreamSplitEncodeSimd128(const uint8_t* raw_values, int width,
+                                  const int64_t num_values, uint8_t* 
output_buffer_raw) {
   using simd_batch = xsimd::make_sized_batch_t<int8_t, 16>;
 
+  assert(width == kNumStreams);
   static_assert(kNumStreams == 4 || kNumStreams == 8, "Invalid number of 
streams.");
   constexpr int kBlockSize = sizeof(simd_batch) * kNumStreams;
 
@@ -215,15 +220,17 @@ void ByteStreamSplitEncodeSimd128(const uint8_t* 
raw_values, const int64_t num_v
 
 #if defined(ARROW_HAVE_AVX2)
 template <int kNumStreams>
-void ByteStreamSplitDecodeAvx2(const uint8_t* data, int64_t num_values, 
int64_t stride,
-                               uint8_t* out) {
+void ByteStreamSplitDecodeAvx2(const uint8_t* data, int width, int64_t 
num_values,
+                               int64_t stride, uint8_t* out) {
+  assert(width == kNumStreams);
   static_assert(kNumStreams == 4 || kNumStreams == 8, "Invalid number of 
streams.");
   constexpr int kNumStreamsLog2 = (kNumStreams == 8 ? 3 : 2);
   constexpr int64_t kBlockSize = sizeof(__m256i) * kNumStreams;
 
   const int64_t size = num_values * kNumStreams;
   if (size < kBlockSize)  // Back to SSE for small size
-    return ByteStreamSplitDecodeSimd128<kNumStreams>(data, num_values, stride, 
out);
+    return ByteStreamSplitDecodeSimd128<kNumStreams>(data, width, num_values, 
stride,
+                                                     out);
   const int64_t num_blocks = size / kBlockSize;
 
   // First handle suffix.
@@ -299,18 +306,19 @@ void ByteStreamSplitDecodeAvx2(const uint8_t* data, 
int64_t num_values, int64_t
 }
 
 template <int kNumStreams>
-void ByteStreamSplitEncodeAvx2(const uint8_t* raw_values, const int64_t 
num_values,
-                               uint8_t* output_buffer_raw) {
+void ByteStreamSplitEncodeAvx2(const uint8_t* raw_values, int width,
+                               const int64_t num_values, uint8_t* 
output_buffer_raw) {
+  assert(width == kNumStreams);
   static_assert(kNumStreams == 4 || kNumStreams == 8, "Invalid number of 
streams.");
   constexpr int kBlockSize = sizeof(__m256i) * kNumStreams;
 
   if constexpr (kNumStreams == 8)  // Back to SSE, currently no path for 
double.
-    return ByteStreamSplitEncodeSimd128<kNumStreams>(raw_values, num_values,
+    return ByteStreamSplitEncodeSimd128<kNumStreams>(raw_values, width, 
num_values,
                                                      output_buffer_raw);
 
   const int64_t size = num_values * kNumStreams;
   if (size < kBlockSize)  // Back to SSE for small size
-    return ByteStreamSplitEncodeSimd128<kNumStreams>(raw_values, num_values,
+    return ByteStreamSplitEncodeSimd128<kNumStreams>(raw_values, width, 
num_values,
                                                      output_buffer_raw);
   const int64_t num_blocks = size / kBlockSize;
   const __m256i* raw_values_simd = reinterpret_cast<const 
__m256i*>(raw_values);
@@ -373,25 +381,26 @@ void ByteStreamSplitEncodeAvx2(const uint8_t* raw_values, 
const int64_t num_valu
 
 #if defined(ARROW_HAVE_SIMD_SPLIT)
 template <int kNumStreams>
-void inline ByteStreamSplitDecodeSimd(const uint8_t* data, int64_t num_values,
+void inline ByteStreamSplitDecodeSimd(const uint8_t* data, int width, int64_t 
num_values,
                                       int64_t stride, uint8_t* out) {
 #if defined(ARROW_HAVE_AVX2)
-  return ByteStreamSplitDecodeAvx2<kNumStreams>(data, num_values, stride, out);
+  return ByteStreamSplitDecodeAvx2<kNumStreams>(data, width, num_values, 
stride, out);
 #elif defined(ARROW_HAVE_SSE4_2) || defined(ARROW_HAVE_NEON)
-  return ByteStreamSplitDecodeSimd128<kNumStreams>(data, num_values, stride, 
out);
+  return ByteStreamSplitDecodeSimd128<kNumStreams>(data, width, num_values, 
stride, out);
 #else
 #error "ByteStreamSplitDecodeSimd not implemented"
 #endif
 }
 
 template <int kNumStreams>
-void inline ByteStreamSplitEncodeSimd(const uint8_t* raw_values, const int64_t 
num_values,
+void inline ByteStreamSplitEncodeSimd(const uint8_t* raw_values, int width,
+                                      const int64_t num_values,
                                       uint8_t* output_buffer_raw) {
 #if defined(ARROW_HAVE_AVX2)
-  return ByteStreamSplitEncodeAvx2<kNumStreams>(raw_values, num_values,
+  return ByteStreamSplitEncodeAvx2<kNumStreams>(raw_values, width, num_values,
                                                 output_buffer_raw);
 #elif defined(ARROW_HAVE_SSE4_2) || defined(ARROW_HAVE_NEON)
-  return ByteStreamSplitEncodeSimd128<kNumStreams>(raw_values, num_values,
+  return ByteStreamSplitEncodeSimd128<kNumStreams>(raw_values, width, 
num_values,
                                                    output_buffer_raw);
 #else
 #error "ByteStreamSplitEncodeSimd not implemented"
@@ -492,18 +501,30 @@ inline void DoMergeStreams(const uint8_t** src_streams, 
int width, int64_t nvalu
 }
 
 template <int kNumStreams>
-void ByteStreamSplitEncodeScalar(const uint8_t* raw_values, const int64_t 
num_values,
-                                 uint8_t* output_buffer_raw) {
+void ByteStreamSplitEncodeScalar(const uint8_t* raw_values, int width,
+                                 const int64_t num_values, uint8_t* out) {
+  assert(width == kNumStreams);
   std::array<uint8_t*, kNumStreams> dest_streams;
   for (int stream = 0; stream < kNumStreams; ++stream) {
-    dest_streams[stream] = &output_buffer_raw[stream * num_values];
+    dest_streams[stream] = &out[stream * num_values];
   }
   DoSplitStreams(raw_values, kNumStreams, num_values, dest_streams.data());
 }
 
+inline void ByteStreamSplitEncodeScalarDynamic(const uint8_t* raw_values, int 
width,
+                                               const int64_t num_values, 
uint8_t* out) {
+  ::arrow::internal::SmallVector<uint8_t*, 16> dest_streams;
+  dest_streams.resize(width);
+  for (int stream = 0; stream < width; ++stream) {
+    dest_streams[stream] = &out[stream * num_values];
+  }
+  DoSplitStreams(raw_values, width, num_values, dest_streams.data());
+}
+
 template <int kNumStreams>
-void ByteStreamSplitDecodeScalar(const uint8_t* data, int64_t num_values, 
int64_t stride,
-                                 uint8_t* out) {
+void ByteStreamSplitDecodeScalar(const uint8_t* data, int width, int64_t 
num_values,
+                                 int64_t stride, uint8_t* out) {
+  assert(width == kNumStreams);
   std::array<const uint8_t*, kNumStreams> src_streams;
   for (int stream = 0; stream < kNumStreams; ++stream) {
     src_streams[stream] = &data[stream * stride];
@@ -511,26 +532,63 @@ void ByteStreamSplitDecodeScalar(const uint8_t* data, 
int64_t num_values, int64_
   DoMergeStreams(src_streams.data(), kNumStreams, num_values, out);
 }
 
-template <int kNumStreams>
-void inline ByteStreamSplitEncode(const uint8_t* raw_values, const int64_t 
num_values,
-                                  uint8_t* output_buffer_raw) {
+inline void ByteStreamSplitDecodeScalarDynamic(const uint8_t* data, int width,
+                                               int64_t num_values, int64_t 
stride,
+                                               uint8_t* out) {
+  ::arrow::internal::SmallVector<const uint8_t*, 16> src_streams;
+  src_streams.resize(width);
+  for (int stream = 0; stream < width; ++stream) {
+    src_streams[stream] = &data[stream * stride];
+  }
+  DoMergeStreams(src_streams.data(), width, num_values, out);
+}
+
+inline void ByteStreamSplitEncode(const uint8_t* raw_values, int width,
+                                  const int64_t num_values, uint8_t* out) {
 #if defined(ARROW_HAVE_SIMD_SPLIT)
-  return ByteStreamSplitEncodeSimd<kNumStreams>(raw_values, num_values,
-                                                output_buffer_raw);
+#define ByteStreamSplitEncodePerhapsSimd ByteStreamSplitEncodeSimd
 #else
-  return ByteStreamSplitEncodeScalar<kNumStreams>(raw_values, num_values,
-                                                  output_buffer_raw);
+#define ByteStreamSplitEncodePerhapsSimd ByteStreamSplitEncodeScalar
 #endif
+  switch (width) {
+    case 1:
+      memcpy(out, raw_values, num_values);
+      return;
+    case 2:
+      return ByteStreamSplitEncodeScalar<2>(raw_values, width, num_values, 
out);
+    case 4:
+      return ByteStreamSplitEncodePerhapsSimd<4>(raw_values, width, 
num_values, out);
+    case 8:
+      return ByteStreamSplitEncodePerhapsSimd<8>(raw_values, width, 
num_values, out);
+    case 16:
+      return ByteStreamSplitEncodeScalar<16>(raw_values, width, num_values, 
out);
+  }
+  return ByteStreamSplitEncodeScalarDynamic(raw_values, width, num_values, 
out);
+#undef ByteStreamSplitEncodePerhapsSimd
 }
 
-template <int kNumStreams>
-void inline ByteStreamSplitDecode(const uint8_t* data, int64_t num_values, 
int64_t stride,
-                                  uint8_t* out) {
+inline void ByteStreamSplitDecode(const uint8_t* data, int width, int64_t 
num_values,
+                                  int64_t stride, uint8_t* out) {
 #if defined(ARROW_HAVE_SIMD_SPLIT)
-  return ByteStreamSplitDecodeSimd<kNumStreams>(data, num_values, stride, out);
+#define ByteStreamSplitDecodePerhapsSimd ByteStreamSplitDecodeSimd
 #else
-  return ByteStreamSplitDecodeScalar<kNumStreams>(data, num_values, stride, 
out);
+#define ByteStreamSplitDecodePerhapsSimd ByteStreamSplitDecodeScalar
 #endif
+  switch (width) {
+    case 1:
+      memcpy(out, data, num_values);
+      return;
+    case 2:
+      return ByteStreamSplitDecodeScalar<2>(data, width, num_values, stride, 
out);
+    case 4:
+      return ByteStreamSplitDecodePerhapsSimd<4>(data, width, num_values, 
stride, out);
+    case 8:
+      return ByteStreamSplitDecodePerhapsSimd<8>(data, width, num_values, 
stride, out);
+    case 16:
+      return ByteStreamSplitDecodeScalar<16>(data, width, num_values, stride, 
out);
+  }
+  return ByteStreamSplitDecodeScalarDynamic(data, width, num_values, stride, 
out);
+#undef ByteStreamSplitDecodePerhapsSimd
 }
 
 }  // namespace arrow::util::internal
diff --git a/cpp/src/arrow/util/byte_stream_split_test.cc 
b/cpp/src/arrow/util/byte_stream_split_test.cc
index 83ed8c9ba5..3a537725b0 100644
--- a/cpp/src/arrow/util/byte_stream_split_test.cc
+++ b/cpp/src/arrow/util/byte_stream_split_test.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include <algorithm>
+#include <array>
 #include <cmath>
 #include <cstddef>
 #include <functional>
@@ -35,7 +36,8 @@
 
 namespace arrow::util::internal {
 
-using ByteStreamSplitTypes = ::testing::Types<float, double>;
+using ByteStreamSplitTypes =
+    ::testing::Types<int8_t, int16_t, int32_t, int64_t, std::array<uint8_t, 
3>>;
 
 template <typename Func>
 struct NamedFunc {
@@ -63,23 +65,12 @@ class TestByteStreamSplitSpecialized : public 
::testing::Test {
  public:
   static constexpr int kWidth = static_cast<int>(sizeof(T));
 
-  using EncodeFunc = 
NamedFunc<std::function<decltype(ByteStreamSplitEncode<kWidth>)>>;
-  using DecodeFunc = 
NamedFunc<std::function<decltype(ByteStreamSplitDecode<kWidth>)>>;
+  using EncodeFunc = NamedFunc<std::function<decltype(ByteStreamSplitEncode)>>;
+  using DecodeFunc = NamedFunc<std::function<decltype(ByteStreamSplitDecode)>>;
 
   void SetUp() override {
-    encode_funcs_.push_back({"reference", &ReferenceEncode});
-    encode_funcs_.push_back({"scalar", &ByteStreamSplitEncodeScalar<kWidth>});
-    decode_funcs_.push_back({"scalar", &ByteStreamSplitDecodeScalar<kWidth>});
-#if defined(ARROW_HAVE_SIMD_SPLIT)
-    encode_funcs_.push_back({"simd", &ByteStreamSplitEncodeSimd<kWidth>});
-    decode_funcs_.push_back({"simd", &ByteStreamSplitDecodeSimd<kWidth>});
-    encode_funcs_.push_back({"simd128", 
&ByteStreamSplitEncodeSimd128<kWidth>});
-    decode_funcs_.push_back({"simd128", 
&ByteStreamSplitDecodeSimd128<kWidth>});
-#endif
-#if defined(ARROW_HAVE_AVX2)
-    encode_funcs_.push_back({"avx2", &ByteStreamSplitEncodeAvx2<kWidth>});
-    decode_funcs_.push_back({"avx2", &ByteStreamSplitDecodeAvx2<kWidth>});
-#endif
+    decode_funcs_ = MakeDecodeFuncs();
+    encode_funcs_ = MakeEncodeFuncs();
   }
 
   void TestRoundtrip(int64_t num_values) {
@@ -92,12 +83,12 @@ class TestByteStreamSplitSpecialized : public 
::testing::Test {
     for (const auto& encode_func : encode_funcs_) {
       ARROW_SCOPED_TRACE("encode_func = ", encode_func);
       encoded.assign(encoded.size(), 0);
-      encode_func.func(reinterpret_cast<const uint8_t*>(input.data()), 
num_values,
+      encode_func.func(reinterpret_cast<const uint8_t*>(input.data()), kWidth, 
num_values,
                        encoded.data());
       for (const auto& decode_func : decode_funcs_) {
         ARROW_SCOPED_TRACE("decode_func = ", decode_func);
         decoded.assign(decoded.size(), T{});
-        decode_func.func(encoded.data(), num_values, /*stride=*/num_values,
+        decode_func.func(encoded.data(), kWidth, num_values, 
/*stride=*/num_values,
                          reinterpret_cast<uint8_t*>(decoded.data()));
         ASSERT_EQ(decoded, input);
       }
@@ -123,7 +114,8 @@ class TestByteStreamSplitSpecialized : public 
::testing::Test {
       int64_t offset = 0;
       while (offset < num_values) {
         auto chunk_size = std::min<int64_t>(num_values - offset, 
chunk_size_dist(gen));
-        decode_func.func(encoded.data() + offset, chunk_size, 
/*stride=*/num_values,
+        decode_func.func(encoded.data() + offset, kWidth, chunk_size,
+                         /*stride=*/num_values,
                          reinterpret_cast<uint8_t*>(decoded.data() + offset));
         offset += chunk_size;
       }
@@ -141,20 +133,48 @@ class TestByteStreamSplitSpecialized : public 
::testing::Test {
   static std::vector<T> MakeRandomInput(int64_t num_values) {
     std::vector<T> input(num_values);
     random_bytes(kWidth * num_values, seed_++, 
reinterpret_cast<uint8_t*>(input.data()));
-    // Avoid NaNs to ease comparison
-    for (auto& value : input) {
-      if (std::isnan(value)) {
-        value = nan_replacement_++;
-      }
-    }
     return input;
   }
 
+  template <bool kSimdImplemented = (kWidth == 4 || kWidth == 8)>
+  static std::vector<DecodeFunc> MakeDecodeFuncs() {
+    std::vector<DecodeFunc> funcs;
+    funcs.push_back({"scalar_dynamic", &ByteStreamSplitDecodeScalarDynamic});
+    funcs.push_back({"scalar", &ByteStreamSplitDecodeScalar<kWidth>});
+#if defined(ARROW_HAVE_SIMD_SPLIT)
+    if constexpr (kSimdImplemented) {
+      funcs.push_back({"simd", &ByteStreamSplitDecodeSimd<kWidth>});
+      funcs.push_back({"simd128", &ByteStreamSplitDecodeSimd128<kWidth>});
+#if defined(ARROW_HAVE_AVX2)
+      funcs.push_back({"avx2", &ByteStreamSplitDecodeAvx2<kWidth>});
+#endif
+    }
+#endif  // defined(ARROW_HAVE_SIMD_SPLIT)
+    return funcs;
+  }
+
+  template <bool kSimdImplemented = (kWidth == 4 || kWidth == 8)>
+  static std::vector<EncodeFunc> MakeEncodeFuncs() {
+    std::vector<EncodeFunc> funcs;
+    funcs.push_back({"reference", &ReferenceByteStreamSplitEncode});
+    funcs.push_back({"scalar_dynamic", &ByteStreamSplitEncodeScalarDynamic});
+    funcs.push_back({"scalar", &ByteStreamSplitEncodeScalar<kWidth>});
+#if defined(ARROW_HAVE_SIMD_SPLIT)
+    if constexpr (kSimdImplemented) {
+      funcs.push_back({"simd", &ByteStreamSplitEncodeSimd<kWidth>});
+      funcs.push_back({"simd128", &ByteStreamSplitEncodeSimd128<kWidth>});
+#if defined(ARROW_HAVE_AVX2)
+      funcs.push_back({"avx2", &ByteStreamSplitEncodeAvx2<kWidth>});
+#endif
+    }
+#endif  // defined(ARROW_HAVE_SIMD_SPLIT)
+    return funcs;
+  }
+
   std::vector<EncodeFunc> encode_funcs_;
   std::vector<DecodeFunc> decode_funcs_;
 
   static inline uint32_t seed_ = 42;
-  static inline T nan_replacement_ = 0;
 };
 
 TYPED_TEST_SUITE(TestByteStreamSplitSpecialized, ByteStreamSplitTypes);
diff --git a/cpp/src/parquet/column_io_benchmark.cc 
b/cpp/src/parquet/column_io_benchmark.cc
index 593765dcd4..b003b4eede 100644
--- a/cpp/src/parquet/column_io_benchmark.cc
+++ b/cpp/src/parquet/column_io_benchmark.cc
@@ -54,45 +54,57 @@ std::shared_ptr<ColumnDescriptor> 
Int64Schema(Repetition::type repetition) {
 }
 
 void SetBytesProcessed(::benchmark::State& state, Repetition::type repetition) 
{
-  int64_t bytes_processed = state.iterations() * state.range(0) * 
sizeof(int64_t);
+  int64_t num_values = state.iterations() * state.range(0);
+  int64_t bytes_processed = num_values * sizeof(int64_t);
   if (repetition != Repetition::REQUIRED) {
-    bytes_processed += state.iterations() * state.range(0) * sizeof(int16_t);
+    bytes_processed += num_values * sizeof(int16_t);
   }
   if (repetition == Repetition::REPEATED) {
-    bytes_processed += state.iterations() * state.range(0) * sizeof(int16_t);
+    bytes_processed += num_values * sizeof(int16_t);
   }
   state.SetBytesProcessed(bytes_processed);
+  state.SetItemsProcessed(num_values);
 }
 
-template <Repetition::type repetition,
-          Compression::type codec = Compression::UNCOMPRESSED>
-static void BM_WriteInt64Column(::benchmark::State& state) {
+static void BM_WriteInt64Column(::benchmark::State& state, Repetition::type 
repetition,
+                                Compression::type codec, Encoding::type 
encoding) {
   format::ColumnChunk thrift_metadata;
 
   ::arrow::random::RandomArrayGenerator rgen(1337);
   auto values = rgen.Int64(state.range(0), 0, 1000000, 0);
-  const auto& i8_values = static_cast<const ::arrow::Int64Array&>(*values);
+  const auto& int64_values = static_cast<const ::arrow::Int64Array&>(*values);
 
   std::vector<int16_t> definition_levels(state.range(0), 1);
   std::vector<int16_t> repetition_levels(state.range(0), 0);
   std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
   std::shared_ptr<WriterProperties> properties = WriterProperties::Builder()
                                                      .compression(codec)
-                                                     
->encoding(Encoding::PLAIN)
+                                                     ->encoding(encoding)
                                                      ->disable_dictionary()
                                                      ->build();
   auto metadata = ColumnChunkMetaDataBuilder::Make(
       properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata));
 
-  while (state.KeepRunning()) {
+  int64_t data_size = values->length() * sizeof(int64_t);
+  int64_t stream_size = 0;
+  for (auto _ : state) {
     auto stream = CreateOutputStream();
     std::shared_ptr<Int64Writer> writer = BuildWriter(
         state.range(0), stream, metadata.get(), schema.get(), 
properties.get(), codec);
-    writer->WriteBatch(i8_values.length(), definition_levels.data(),
-                       repetition_levels.data(), i8_values.raw_values());
+    writer->WriteBatch(int64_values.length(), definition_levels.data(),
+                       repetition_levels.data(), int64_values.raw_values());
     writer->Close();
+    stream_size = stream->Tell().ValueOrDie();
   }
   SetBytesProcessed(state, repetition);
+  state.counters["compression_ratio"] = static_cast<double>(data_size) / 
stream_size;
+}
+
+template <Repetition::type repetition,
+          Compression::type codec = Compression::UNCOMPRESSED,
+          Encoding::type encoding = Encoding::PLAIN>
+static void BM_WriteInt64Column(::benchmark::State& state) {
+  BM_WriteInt64Column(state, repetition, codec, encoding);
 }
 
 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED)->Arg(1 << 20);
@@ -106,6 +118,12 @@ BENCHMARK_TEMPLATE(BM_WriteInt64Column, 
Repetition::OPTIONAL, Compression::SNAPP
     ->Arg(1 << 20);
 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED, 
Compression::SNAPPY)
     ->Arg(1 << 20);
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED, 
Compression::SNAPPY,
+                   Encoding::BYTE_STREAM_SPLIT)
+    ->Arg(1 << 20);
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL, 
Compression::SNAPPY,
+                   Encoding::BYTE_STREAM_SPLIT)
+    ->Arg(1 << 20);
 #endif
 
 #ifdef ARROW_WITH_LZ4
@@ -115,6 +133,12 @@ BENCHMARK_TEMPLATE(BM_WriteInt64Column, 
Repetition::OPTIONAL, Compression::LZ4)
     ->Arg(1 << 20);
 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED, Compression::LZ4)
     ->Arg(1 << 20);
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED, Compression::LZ4,
+                   Encoding::BYTE_STREAM_SPLIT)
+    ->Arg(1 << 20);
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL, Compression::LZ4,
+                   Encoding::BYTE_STREAM_SPLIT)
+    ->Arg(1 << 20);
 #endif
 
 #ifdef ARROW_WITH_ZSTD
@@ -124,6 +148,12 @@ BENCHMARK_TEMPLATE(BM_WriteInt64Column, 
Repetition::OPTIONAL, Compression::ZSTD)
     ->Arg(1 << 20);
 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED, 
Compression::ZSTD)
     ->Arg(1 << 20);
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED, 
Compression::ZSTD,
+                   Encoding::BYTE_STREAM_SPLIT)
+    ->Arg(1 << 20);
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL, 
Compression::ZSTD,
+                   Encoding::BYTE_STREAM_SPLIT)
+    ->Arg(1 << 20);
 #endif
 
 std::shared_ptr<Int64Reader> BuildReader(std::shared_ptr<Buffer>& buffer,
@@ -135,17 +165,20 @@ std::shared_ptr<Int64Reader> 
BuildReader(std::shared_ptr<Buffer>& buffer,
       ColumnReader::Make(schema, std::move(page_reader)));
 }
 
-template <Repetition::type repetition,
-          Compression::type codec = Compression::UNCOMPRESSED>
-static void BM_ReadInt64Column(::benchmark::State& state) {
+static void BM_ReadInt64Column(::benchmark::State& state, Repetition::type 
repetition,
+                               Compression::type codec, Encoding::type 
encoding) {
   format::ColumnChunk thrift_metadata;
-  std::vector<int64_t> values(state.range(0), 128);
+
+  ::arrow::random::RandomArrayGenerator rgen(1337);
+  auto values = rgen.Int64(state.range(0), 0, 1000000, 0);
+  const auto& int64_values = static_cast<const ::arrow::Int64Array&>(*values);
+
   std::vector<int16_t> definition_levels(state.range(0), 1);
   std::vector<int16_t> repetition_levels(state.range(0), 0);
   std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
   std::shared_ptr<WriterProperties> properties = WriterProperties::Builder()
                                                      .compression(codec)
-                                                     
->encoding(Encoding::PLAIN)
+                                                     ->encoding(encoding)
                                                      ->disable_dictionary()
                                                      ->build();
 
@@ -155,11 +188,14 @@ static void BM_ReadInt64Column(::benchmark::State& state) 
{
   auto stream = CreateOutputStream();
   std::shared_ptr<Int64Writer> writer = BuildWriter(
       state.range(0), stream, metadata.get(), schema.get(), properties.get(), 
codec);
-  writer->WriteBatch(values.size(), definition_levels.data(), 
repetition_levels.data(),
-                     values.data());
+  writer->WriteBatch(int64_values.length(), definition_levels.data(),
+                     repetition_levels.data(), int64_values.raw_values());
   writer->Close();
 
   PARQUET_ASSIGN_OR_THROW(auto src, stream->Finish());
+  int64_t stream_size = src->size();
+  int64_t data_size = int64_values.length() * sizeof(int64_t);
+
   std::vector<int64_t> values_out(state.range(1));
   std::vector<int16_t> definition_levels_out(state.range(1));
   std::vector<int16_t> repetition_levels_out(state.range(1));
@@ -167,12 +203,20 @@ static void BM_ReadInt64Column(::benchmark::State& state) 
{
     std::shared_ptr<Int64Reader> reader =
         BuildReader(src, state.range(1), codec, schema.get());
     int64_t values_read = 0;
-    for (size_t i = 0; i < values.size(); i += values_read) {
+    for (int64_t i = 0; i < int64_values.length(); i += values_read) {
       reader->ReadBatch(values_out.size(), definition_levels_out.data(),
                         repetition_levels_out.data(), values_out.data(), 
&values_read);
     }
   }
   SetBytesProcessed(state, repetition);
+  state.counters["compression_ratio"] = static_cast<double>(data_size) / 
stream_size;
+}
+
+template <Repetition::type repetition,
+          Compression::type codec = Compression::UNCOMPRESSED,
+          Encoding::type encoding = Encoding::PLAIN>
+static void BM_ReadInt64Column(::benchmark::State& state) {
+  BM_ReadInt64Column(state, repetition, codec, encoding);
 }
 
 void ReadColumnSetArgs(::benchmark::internal::Benchmark* bench) {
@@ -197,6 +241,13 @@ BENCHMARK_TEMPLATE(BM_ReadInt64Column, 
Repetition::OPTIONAL, Compression::SNAPPY
     ->Apply(ReadColumnSetArgs);
 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED, 
Compression::SNAPPY)
     ->Apply(ReadColumnSetArgs);
+
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED, 
Compression::SNAPPY,
+                   Encoding::BYTE_STREAM_SPLIT)
+    ->Apply(ReadColumnSetArgs);
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL, 
Compression::SNAPPY,
+                   Encoding::BYTE_STREAM_SPLIT)
+    ->Apply(ReadColumnSetArgs);
 #endif
 
 #ifdef ARROW_WITH_LZ4
@@ -206,6 +257,13 @@ BENCHMARK_TEMPLATE(BM_ReadInt64Column, 
Repetition::OPTIONAL, Compression::LZ4)
     ->Apply(ReadColumnSetArgs);
 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED, Compression::LZ4)
     ->Apply(ReadColumnSetArgs);
+
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED, Compression::LZ4,
+                   Encoding::BYTE_STREAM_SPLIT)
+    ->Apply(ReadColumnSetArgs);
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL, Compression::LZ4,
+                   Encoding::BYTE_STREAM_SPLIT)
+    ->Apply(ReadColumnSetArgs);
 #endif
 
 #ifdef ARROW_WITH_ZSTD
@@ -215,6 +273,13 @@ BENCHMARK_TEMPLATE(BM_ReadInt64Column, 
Repetition::OPTIONAL, Compression::ZSTD)
     ->Apply(ReadColumnSetArgs);
 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED, Compression::ZSTD)
     ->Apply(ReadColumnSetArgs);
+
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED, Compression::ZSTD,
+                   Encoding::BYTE_STREAM_SPLIT)
+    ->Apply(ReadColumnSetArgs);
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL, Compression::ZSTD,
+                   Encoding::BYTE_STREAM_SPLIT)
+    ->Apply(ReadColumnSetArgs);
 #endif
 
 static void BM_RleEncoding(::benchmark::State& state) {
diff --git a/cpp/src/parquet/column_writer_test.cc 
b/cpp/src/parquet/column_writer_test.cc
index a8519a0f56..c99efd1796 100644
--- a/cpp/src/parquet/column_writer_test.cc
+++ b/cpp/src/parquet/column_writer_test.cc
@@ -507,23 +507,33 @@ TEST_F(TestValuesWriterInt32Type, 
RequiredDeltaBinaryPacked) {
   this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED);
 }
 
+TEST_F(TestValuesWriterInt32Type, RequiredByteStreamSplit) {
+  this->TestRequiredWithEncoding(Encoding::BYTE_STREAM_SPLIT);
+}
+
 TEST_F(TestValuesWriterInt64Type, RequiredDeltaBinaryPacked) {
   this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED);
 }
 
+TEST_F(TestValuesWriterInt64Type, RequiredByteStreamSplit) {
+  this->TestRequiredWithEncoding(Encoding::BYTE_STREAM_SPLIT);
+}
+
 TEST_F(TestByteArrayValuesWriter, RequiredDeltaLengthByteArray) {
   this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY);
 }
 
-/*
-TYPED_TEST(TestByteArrayValuesWriter, RequiredDeltaByteArray) {
+TEST_F(TestByteArrayValuesWriter, RequiredDeltaByteArray) {
   this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY);
 }
 
 TEST_F(TestFixedLengthByteArrayValuesWriter, RequiredDeltaByteArray) {
   this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY);
 }
-*/
+
+TEST_F(TestFixedLengthByteArrayValuesWriter, RequiredByteStreamSplit) {
+  this->TestRequiredWithEncoding(Encoding::BYTE_STREAM_SPLIT);
+}
 
 TYPED_TEST(TestPrimitiveWriter, RequiredRLEDictionary) {
   this->TestRequiredWithEncoding(Encoding::RLE_DICTIONARY);
diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc
index a3d1746536..3eed88f08b 100644
--- a/cpp/src/parquet/encoding.cc
+++ b/cpp/src/parquet/encoding.cc
@@ -73,6 +73,10 @@ namespace {
 // unsigned, but the Java implementation uses signed ints.
 constexpr size_t kMaxByteArraySize = std::numeric_limits<int32_t>::max();
 
+// ----------------------------------------------------------------------
+// Encoders
+// ----------------------------------------------------------------------
+
 class EncoderImpl : virtual public Encoder {
  public:
   EncoderImpl(const ColumnDescriptor* descr, Encoding::type encoding, 
MemoryPool* pool)
@@ -92,7 +96,7 @@ class EncoderImpl : virtual public Encoder {
   MemoryPool* pool_;
 
   /// Type length from descr
-  int type_length_;
+  const int type_length_;
 };
 
 // ----------------------------------------------------------------------
@@ -262,8 +266,7 @@ inline void PlainEncoder<ByteArrayType>::Put(const 
::arrow::Array& values) {
 }
 
 void AssertFixedSizeBinary(const ::arrow::Array& values, int type_length) {
-  if (values.type_id() != ::arrow::Type::FIXED_SIZE_BINARY &&
-      values.type_id() != ::arrow::Type::DECIMAL) {
+  if (!::arrow::is_fixed_size_binary(values.type_id())) {
     throw ParquetException("Only FixedSizeBinaryArray and subclasses 
supported");
   }
   if (checked_cast<const 
::arrow::FixedSizeBinaryType&>(*values.type()).byte_width() !=
@@ -464,8 +467,6 @@ class DictEncoderImpl : public EncoderImpl, virtual public 
DictEncoder<DType> {
     return kDataPageBitWidthBytes + encoder.len();
   }
 
-  void set_type_length(int type_length) { this->type_length_ = type_length; }
-
   /// Returns a conservative estimate of the number of bytes needed to encode 
the buffered
   /// indices. Used to size the buffer passed to WriteIndices().
   int64_t EstimatedDataEncodedSize() override {
@@ -801,98 +802,154 @@ void DictEncoderImpl<ByteArrayType>::PutDictionary(const 
::arrow::Array& values)
 // ----------------------------------------------------------------------
 // ByteStreamSplitEncoder<T> implementations
 
+// Common base class for all types
+
 template <typename DType>
-class ByteStreamSplitEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+class ByteStreamSplitEncoderBase : public EncoderImpl,
+                                   virtual public TypedEncoder<DType> {
  public:
   using T = typename DType::c_type;
   using TypedEncoder<DType>::Put;
 
-  explicit ByteStreamSplitEncoder(
-      const ColumnDescriptor* descr,
-      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+  ByteStreamSplitEncoderBase(const ColumnDescriptor* descr, int byte_width,
+                             ::arrow::MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::BYTE_STREAM_SPLIT, pool),
+        sink_{pool},
+        byte_width_(byte_width),
+        num_values_in_buffer_{0} {}
 
-  int64_t EstimatedDataEncodedSize() override;
-  std::shared_ptr<Buffer> FlushValues() override;
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
 
-  void Put(const T* buffer, int num_values) override;
-  void Put(const ::arrow::Array& values) override;
-  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
-                 int64_t valid_bits_offset) override;
+  std::shared_ptr<Buffer> FlushValues() override {
+    if (byte_width_ == 1) {
+      // Special-cased fast path
+      PARQUET_ASSIGN_OR_THROW(auto buf, sink_.Finish());
+      return buf;
+    }
+    auto output_buffer = AllocateBuffer(this->memory_pool(), 
EstimatedDataEncodedSize());
+    uint8_t* output_buffer_raw = output_buffer->mutable_data();
+    const uint8_t* raw_values = sink_.data();
+    ::arrow::util::internal::ByteStreamSplitEncode(
+        raw_values, /*width=*/byte_width_, num_values_in_buffer_, 
output_buffer_raw);
+    sink_.Reset();
+    num_values_in_buffer_ = 0;
+    return output_buffer;
+  }
 
- protected:
-  template <typename ArrowType>
-  void PutImpl(const ::arrow::Array& values) {
-    if (values.type_id() != ArrowType::type_id) {
-      throw ParquetException(std::string() + "direct put to " + 
ArrowType::type_name() +
-                             " from " + values.type()->ToString() + " not 
supported");
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    if (valid_bits != NULLPTR) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values 
* sizeof(T),
+                                                                   
this->memory_pool()));
+      T* data = buffer->template mutable_data_as<T>();
+      int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+          src, num_values, valid_bits, valid_bits_offset, data);
+      Put(data, num_valid_values);
+    } else {
+      Put(src, num_values);
     }
-    const auto& data = *values.data();
-    PutSpaced(data.GetValues<typename ArrowType::c_type>(1),
-              static_cast<int>(data.length), data.GetValues<uint8_t>(0, 0), 
data.offset);
   }
 
+ protected:
   ::arrow::BufferBuilder sink_;
+  // Required because type_length_ is only filled in for FLBA
+  const int byte_width_;
   int64_t num_values_in_buffer_;
 };
 
-template <typename DType>
-ByteStreamSplitEncoder<DType>::ByteStreamSplitEncoder(const ColumnDescriptor* 
descr,
-                                                      ::arrow::MemoryPool* 
pool)
-    : EncoderImpl(descr, Encoding::BYTE_STREAM_SPLIT, pool),
-      sink_{pool},
-      num_values_in_buffer_{0} {}
+// BYTE_STREAM_SPLIT encoder implementation for FLOAT, DOUBLE, INT32, INT64
 
 template <typename DType>
-int64_t ByteStreamSplitEncoder<DType>::EstimatedDataEncodedSize() {
-  return sink_.length();
-}
+class ByteStreamSplitEncoder : public ByteStreamSplitEncoderBase<DType> {
+ public:
+  using T = typename DType::c_type;
+  using ArrowType = typename EncodingTraits<DType>::ArrowType;
 
-template <typename DType>
-std::shared_ptr<Buffer> ByteStreamSplitEncoder<DType>::FlushValues() {
-  std::shared_ptr<ResizableBuffer> output_buffer =
-      AllocateBuffer(this->memory_pool(), EstimatedDataEncodedSize());
-  uint8_t* output_buffer_raw = output_buffer->mutable_data();
-  const uint8_t* raw_values = sink_.data();
-  ::arrow::util::internal::ByteStreamSplitEncode<sizeof(T)>(
-      raw_values, num_values_in_buffer_, output_buffer_raw);
-  sink_.Reset();
-  num_values_in_buffer_ = 0;
-  return std::move(output_buffer);
-}
+  ByteStreamSplitEncoder(const ColumnDescriptor* descr,
+                         ::arrow::MemoryPool* pool = 
::arrow::default_memory_pool())
+      : ByteStreamSplitEncoderBase<DType>(descr,
+                                          
/*byte_width=*/static_cast<int>(sizeof(T)),
+                                          pool) {}
 
-template <typename DType>
-void ByteStreamSplitEncoder<DType>::Put(const T* buffer, int num_values) {
-  if (num_values > 0) {
-    PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T)));
-    num_values_in_buffer_ += num_values;
+  // Inherit Put(const std::vector<T>&...)
+  using TypedEncoder<DType>::Put;
+
+  void Put(const T* buffer, int num_values) override {
+    if (num_values > 0) {
+      PARQUET_THROW_NOT_OK(
+          this->sink_.Append(reinterpret_cast<const uint8_t*>(buffer),
+                             num_values * static_cast<int64_t>(sizeof(T))));
+      this->num_values_in_buffer_ += num_values;
+    }
   }
-}
 
-template <>
-void ByteStreamSplitEncoder<FloatType>::Put(const ::arrow::Array& values) {
-  PutImpl<::arrow::FloatType>(values);
-}
+  void Put(const ::arrow::Array& values) override {
+    if (values.type_id() != ArrowType::type_id) {
+      throw ParquetException(std::string() + "direct put from " +
+                             values.type()->ToString() + " not supported");
+    }
+    const auto& data = *values.data();
+    this->PutSpaced(data.GetValues<typename ArrowType::c_type>(1),
+                    static_cast<int>(data.length), data.GetValues<uint8_t>(0, 
0),
+                    data.offset);
+  }
+};
+
+// BYTE_STREAM_SPLIT encoder implementation for FLBA
 
 template <>
-void ByteStreamSplitEncoder<DoubleType>::Put(const ::arrow::Array& values) {
-  PutImpl<::arrow::DoubleType>(values);
-}
+class ByteStreamSplitEncoder<FLBAType> : public 
ByteStreamSplitEncoderBase<FLBAType> {
+ public:
+  using DType = FLBAType;
+  using T = FixedLenByteArray;
+  using ArrowType = ::arrow::FixedSizeBinaryArray;
 
-template <typename DType>
-void ByteStreamSplitEncoder<DType>::PutSpaced(const T* src, int num_values,
-                                              const uint8_t* valid_bits,
-                                              int64_t valid_bits_offset) {
-  if (valid_bits != NULLPTR) {
-    PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * 
sizeof(T),
-                                                                 
this->memory_pool()));
-    T* data = buffer->template mutable_data_as<T>();
-    int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
-        src, num_values, valid_bits, valid_bits_offset, data);
-    Put(data, num_valid_values);
-  } else {
-    Put(src, num_values);
+  ByteStreamSplitEncoder(const ColumnDescriptor* descr,
+                         ::arrow::MemoryPool* pool = 
::arrow::default_memory_pool())
+      : ByteStreamSplitEncoderBase<DType>(descr,
+                                          /*byte_width=*/descr->type_length(), 
pool) {}
+
+  // Inherit Put(const std::vector<T>&...)
+  using TypedEncoder<DType>::Put;
+
+  void Put(const T* buffer, int num_values) override {
+    if (byte_width_ > 0) {
+      const int64_t total_bytes = static_cast<int64_t>(num_values) * 
byte_width_;
+      PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes));
+      for (int i = 0; i < num_values; ++i) {
+        // Write the result to the output stream
+        DCHECK(buffer[i].ptr != nullptr) << "Value ptr cannot be NULL";
+        sink_.UnsafeAppend(buffer[i].ptr, byte_width_);
+      }
+    }
+    this->num_values_in_buffer_ += num_values;
   }
-}
+
+  void Put(const ::arrow::Array& values) override {
+    AssertFixedSizeBinary(values, byte_width_);
+    const auto& data = checked_cast<const 
::arrow::FixedSizeBinaryArray&>(values);
+    if (data.null_count() == 0) {
+      // no nulls, just buffer the data
+      PARQUET_THROW_NOT_OK(sink_.Append(data.raw_values(), data.length() * 
byte_width_));
+      this->num_values_in_buffer_ += data.length();
+    } else {
+      const int64_t num_values = data.length() - data.null_count();
+      const int64_t total_bytes = num_values * byte_width_;
+      PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes));
+      // TODO use VisitSetBitRunsVoid
+      for (int64_t i = 0; i < data.length(); i++) {
+        if (data.IsValid(i)) {
+          sink_.UnsafeAppend(data.Value(i), byte_width_);
+        }
+      }
+      this->num_values_in_buffer_ += num_values;
+    }
+  }
+};
+
+// ----------------------------------------------------------------------
+// Decoders
+// ----------------------------------------------------------------------
 
 class DecoderImpl : virtual public Decoder {
  public:
@@ -3559,136 +3616,162 @@ class DeltaByteArrayFLBADecoder : public 
DeltaByteArrayDecoderImpl<FLBAType>,
 };
 
 // ----------------------------------------------------------------------
-// BYTE_STREAM_SPLIT
+// BYTE_STREAM_SPLIT decoders
 
 template <typename DType>
-class ByteStreamSplitDecoder : public DecoderImpl, virtual public 
TypedDecoder<DType> {
+class ByteStreamSplitDecoderBase : public DecoderImpl,
+                                   virtual public TypedDecoder<DType> {
  public:
   using T = typename DType::c_type;
-  explicit ByteStreamSplitDecoder(const ColumnDescriptor* descr);
 
-  int Decode(T* buffer, int max_values) override;
+  ByteStreamSplitDecoderBase(const ColumnDescriptor* descr, int byte_width)
+      : DecoderImpl(descr, Encoding::BYTE_STREAM_SPLIT), 
byte_width_(byte_width) {}
 
-  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
-                  int64_t valid_bits_offset,
-                  typename EncodingTraits<DType>::Accumulator* builder) 
override;
+  void SetData(int num_values, const uint8_t* data, int len) override {
+    if (static_cast<int64_t>(num_values) * byte_width_ != len) {
+      throw ParquetException("Data size (" + std::to_string(len) +
+                             ") does not match number of values in 
BYTE_STREAM_SPLIT (" +
+                             std::to_string(num_values) + ")");
+    }
+    DecoderImpl::SetData(num_values, data, len);
+    stride_ = num_values_;
+  }
 
   int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
                   int64_t valid_bits_offset,
-                  typename EncodingTraits<DType>::DictAccumulator* builder) 
override;
-
-  void SetData(int num_values, const uint8_t* data, int len) override;
+                  typename EncodingTraits<DType>::DictAccumulator* builder) 
override {
+    ParquetException::NYI("DecodeArrow to DictAccumulator for 
BYTE_STREAM_SPLIT");
+  }
 
-  T* EnsureDecodeBuffer(int64_t min_values) {
-    const int64_t size = sizeof(T) * min_values;
+ protected:
+  int DecodeRaw(uint8_t* out_buffer, int max_values) {
+    const int values_to_decode = std::min(num_values_, max_values);
+    ::arrow::util::internal::ByteStreamSplitDecode(data_, byte_width_, 
values_to_decode,
+                                                   stride_, out_buffer);
+    data_ += values_to_decode;
+    num_values_ -= values_to_decode;
+    len_ -= byte_width_ * values_to_decode;
+    return values_to_decode;
+  }
+
+  uint8_t* EnsureDecodeBuffer(int64_t min_values) {
+    const int64_t size = byte_width_ * min_values;
     if (!decode_buffer_ || decode_buffer_->size() < size) {
-      PARQUET_ASSIGN_OR_THROW(decode_buffer_, ::arrow::AllocateBuffer(size));
+      const auto alloc_size = ::arrow::bit_util::NextPower2(size);
+      PARQUET_ASSIGN_OR_THROW(decode_buffer_, 
::arrow::AllocateBuffer(alloc_size));
     }
-    return decode_buffer_->mutable_data_as<T>();
+    return decode_buffer_->mutable_data();
   }
 
- private:
-  int num_values_in_buffer_{0};
+  const int byte_width_;
+  int stride_{0};
   std::shared_ptr<Buffer> decode_buffer_;
-
-  static constexpr int kNumStreams = sizeof(T);
 };
 
-template <typename DType>
-ByteStreamSplitDecoder<DType>::ByteStreamSplitDecoder(const ColumnDescriptor* 
descr)
-    : DecoderImpl(descr, Encoding::BYTE_STREAM_SPLIT) {}
+// BYTE_STREAM_SPLIT decoder for FLOAT, DOUBLE, INT32, INT64
 
 template <typename DType>
-void ByteStreamSplitDecoder<DType>::SetData(int num_values, const uint8_t* 
data,
-                                            int len) {
-  if (num_values * static_cast<int64_t>(sizeof(T)) < len) {
-    throw ParquetException(
-        "Data size too large for number of values (padding in byte stream 
split data "
-        "page?)");
-  }
-  if (len % sizeof(T) != 0) {
-    throw ParquetException("ByteStreamSplit data size " + std::to_string(len) +
-                           " not aligned with type " + 
TypeToString(DType::type_num));
+class ByteStreamSplitDecoder : public ByteStreamSplitDecoderBase<DType> {
+ public:
+  using T = typename DType::c_type;
+
+  explicit ByteStreamSplitDecoder(const ColumnDescriptor* descr)
+      : ByteStreamSplitDecoderBase<DType>(descr, static_cast<int>(sizeof(T))) 
{}
+
+  int Decode(T* buffer, int max_values) override {
+    return this->DecodeRaw(reinterpret_cast<uint8_t*>(buffer), max_values);
   }
-  num_values = len / sizeof(T);
-  DecoderImpl::SetData(num_values, data, len);
-  num_values_in_buffer_ = num_values_;
-}
 
-template <typename DType>
-int ByteStreamSplitDecoder<DType>::Decode(T* buffer, int max_values) {
-  const int values_to_decode = std::min(num_values_, max_values);
-  const int num_decoded_previously = num_values_in_buffer_ - num_values_;
-  const uint8_t* data = data_ + num_decoded_previously;
+  using ByteStreamSplitDecoderBase<DType>::DecodeArrow;
 
-  ::arrow::util::internal::ByteStreamSplitDecode<kNumStreams>(
-      data, values_to_decode, num_values_in_buffer_, 
reinterpret_cast<uint8_t*>(buffer));
-  num_values_ -= values_to_decode;
-  len_ -= sizeof(T) * values_to_decode;
-  return values_to_decode;
-}
+  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+                  int64_t valid_bits_offset,
+                  typename EncodingTraits<DType>::Accumulator* builder) 
override {
+    const int values_to_decode = num_values - null_count;
+    if (ARROW_PREDICT_FALSE(this->num_values_ < values_to_decode)) {
+      ParquetException::EofException();
+    }
 
-template <typename DType>
-int ByteStreamSplitDecoder<DType>::DecodeArrow(
-    int num_values, int null_count, const uint8_t* valid_bits, int64_t 
valid_bits_offset,
-    typename EncodingTraits<DType>::Accumulator* builder) {
-  constexpr int value_size = kNumStreams;
-  int values_decoded = num_values - null_count;
-  if (ARROW_PREDICT_FALSE(len_ < value_size * values_decoded)) {
-    ParquetException::EofException();
+    PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
+
+    // Decode into intermediate buffer.
+    T* decode_out = 
reinterpret_cast<T*>(this->EnsureDecodeBuffer(values_to_decode));
+    const int num_decoded = Decode(decode_out, values_to_decode);
+    DCHECK_EQ(num_decoded, values_to_decode);
+
+    // If null_count is 0, we could append in bulk or decode directly into
+    // builder. We could also decode in chunks, or use SpacedExpand. We don't
+    // bother currently, because DecodeArrow methods are only called for 
ByteArray.
+    int64_t offset = 0;
+    VisitNullBitmapInline(
+        valid_bits, valid_bits_offset, num_values, null_count,
+        [&]() {
+          builder->UnsafeAppend(decode_out[offset]);
+          ++offset;
+        },
+        [&]() { builder->UnsafeAppendNull(); });
+
+    return values_to_decode;
   }
+};
 
-  PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
+// BYTE_STREAM_SPLIT decoder for FIXED_LEN_BYTE_ARRAY
 
-  const int num_decoded_previously = num_values_in_buffer_ - num_values_;
-  const uint8_t* data = data_ + num_decoded_previously;
-  int offset = 0;
+template <>
+class ByteStreamSplitDecoder<FLBAType> : public 
ByteStreamSplitDecoderBase<FLBAType>,
+                                         virtual public FLBADecoder {
+ public:
+  using DType = FLBAType;
+  using T = FixedLenByteArray;
 
-#if defined(ARROW_HAVE_SIMD_SPLIT)
-  // Use fast decoding into intermediate buffer.  This will also decode
-  // some null values, but it's fast enough that we don't care.
-  T* decode_out = EnsureDecodeBuffer(values_decoded);
-  ::arrow::util::internal::ByteStreamSplitDecode<kNumStreams>(
-      data, values_decoded, num_values_in_buffer_,
-      reinterpret_cast<uint8_t*>(decode_out));
+  explicit ByteStreamSplitDecoder(const ColumnDescriptor* descr)
+      : ByteStreamSplitDecoderBase<DType>(descr, descr->type_length()) {}
 
-  // XXX If null_count is 0, we could even append in bulk or decode directly 
into
-  // builder
-  VisitNullBitmapInline(
-      valid_bits, valid_bits_offset, num_values, null_count,
-      [&]() {
-        builder->UnsafeAppend(decode_out[offset]);
-        ++offset;
-      },
-      [&]() { builder->UnsafeAppendNull(); });
+  int Decode(T* buffer, int max_values) override {
+    // Decode into intermediate buffer.
+    max_values = std::min(max_values, this->num_values_);
+    uint8_t* decode_out = this->EnsureDecodeBuffer(max_values);
+    const int num_decoded = this->DecodeRaw(decode_out, max_values);
+    DCHECK_EQ(num_decoded, max_values);
 
-#else
-  // XXX should operate over runs of 0s / 1s
-  VisitNullBitmapInline(
-      valid_bits, valid_bits_offset, num_values, null_count,
-      [&]() {
-        uint8_t gathered_byte_data[kNumStreams];
-        for (int b = 0; b < kNumStreams; ++b) {
-          const int64_t byte_index = b * num_values_in_buffer_ + offset;
-          gathered_byte_data[b] = data[byte_index];
-        }
-        builder->UnsafeAppend(SafeLoadAs<T>(&gathered_byte_data[0]));
-        ++offset;
-      },
-      [&]() { builder->UnsafeAppendNull(); });
-#endif
+    for (int i = 0; i < num_decoded; ++i) {
+      buffer[i] = FixedLenByteArray(decode_out + 
static_cast<int64_t>(byte_width_) * i);
+    }
+    return num_decoded;
+  }
 
-  num_values_ -= values_decoded;
-  len_ -= sizeof(T) * values_decoded;
-  return values_decoded;
-}
+  using ByteStreamSplitDecoderBase<DType>::DecodeArrow;
 
-template <typename DType>
-int ByteStreamSplitDecoder<DType>::DecodeArrow(
-    int num_values, int null_count, const uint8_t* valid_bits, int64_t 
valid_bits_offset,
-    typename EncodingTraits<DType>::DictAccumulator* builder) {
-  ParquetException::NYI("DecodeArrow for ByteStreamSplitDecoder");
-}
+  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+                  int64_t valid_bits_offset,
+                  typename EncodingTraits<DType>::Accumulator* builder) 
override {
+    const int values_to_decode = num_values - null_count;
+    if (ARROW_PREDICT_FALSE(this->num_values_ < values_to_decode)) {
+      ParquetException::EofException();
+    }
+
+    PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
+
+    // Decode into intermediate buffer.
+    uint8_t* decode_out = this->EnsureDecodeBuffer(values_to_decode);
+    const int num_decoded = this->DecodeRaw(decode_out, values_to_decode);
+    DCHECK_EQ(num_decoded, values_to_decode);
+
+    // If null_count is 0, we could append in bulk or decode directly into
+    // builder. We could also decode in chunks, or use SpacedExpand. We don't
+    // bother currently, because DecodeArrow methods are only called for 
ByteArray.
+    int64_t offset = 0;
+    VisitNullBitmapInline(
+        valid_bits, valid_bits_offset, num_values, null_count,
+        [&]() {
+          builder->UnsafeAppend(decode_out + offset * 
static_cast<int64_t>(byte_width_));
+          ++offset;
+        },
+        [&]() { builder->UnsafeAppendNull(); });
+
+    return values_to_decode;
+  }
+};
 
 }  // namespace
 
@@ -3742,12 +3825,20 @@ std::unique_ptr<Encoder> MakeEncoder(Type::type 
type_num, Encoding::type encodin
     }
   } else if (encoding == Encoding::BYTE_STREAM_SPLIT) {
     switch (type_num) {
+      case Type::INT32:
+        return std::make_unique<ByteStreamSplitEncoder<Int32Type>>(descr, 
pool);
+      case Type::INT64:
+        return std::make_unique<ByteStreamSplitEncoder<Int64Type>>(descr, 
pool);
       case Type::FLOAT:
         return std::make_unique<ByteStreamSplitEncoder<FloatType>>(descr, 
pool);
       case Type::DOUBLE:
         return std::make_unique<ByteStreamSplitEncoder<DoubleType>>(descr, 
pool);
+      case Type::FIXED_LEN_BYTE_ARRAY:
+        return std::make_unique<ByteStreamSplitEncoder<FLBAType>>(descr, pool);
       default:
-        throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and 
DOUBLE");
+        throw ParquetException(
+            "BYTE_STREAM_SPLIT only supports FLOAT, DOUBLE, INT32, INT64 "
+            "and FIXED_LEN_BYTE_ARRAY");
     }
   } else if (encoding == Encoding::DELTA_BINARY_PACKED) {
     switch (type_num) {
@@ -3816,12 +3907,20 @@ std::unique_ptr<Decoder> MakeDecoder(Type::type 
type_num, Encoding::type encodin
     }
   } else if (encoding == Encoding::BYTE_STREAM_SPLIT) {
     switch (type_num) {
+      case Type::INT32:
+        return std::make_unique<ByteStreamSplitDecoder<Int32Type>>(descr);
+      case Type::INT64:
+        return std::make_unique<ByteStreamSplitDecoder<Int64Type>>(descr);
       case Type::FLOAT:
         return std::make_unique<ByteStreamSplitDecoder<FloatType>>(descr);
       case Type::DOUBLE:
         return std::make_unique<ByteStreamSplitDecoder<DoubleType>>(descr);
+      case Type::FIXED_LEN_BYTE_ARRAY:
+        return std::make_unique<ByteStreamSplitDecoder<FLBAType>>(descr);
       default:
-        throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and 
DOUBLE");
+        throw ParquetException(
+            "BYTE_STREAM_SPLIT only supports FLOAT, DOUBLE, INT32, INT64 "
+            "and FIXED_LEN_BYTE_ARRAY");
     }
   } else if (encoding == Encoding::DELTA_BINARY_PACKED) {
     switch (type_num) {
diff --git a/cpp/src/parquet/encoding_benchmark.cc 
b/cpp/src/parquet/encoding_benchmark.cc
index 3069e8c905..61959b659f 100644
--- a/cpp/src/parquet/encoding_benchmark.cc
+++ b/cpp/src/parquet/encoding_benchmark.cc
@@ -17,6 +17,11 @@
 
 #include "benchmark/benchmark.h"
 
+#include <array>
+#include <cmath>
+#include <limits>
+#include <random>
+
 #include "arrow/array.h"
 #include "arrow/array/builder_binary.h"
 #include "arrow/array/builder_dict.h"
@@ -31,10 +36,6 @@
 #include "parquet/platform.h"
 #include "parquet/schema.h"
 
-#include <cmath>
-#include <limits>
-#include <random>
-
 using arrow::default_memory_pool;
 using arrow::MemoryPool;
 
@@ -361,32 +362,81 @@ static void 
BM_PlainDecodingSpacedDouble(benchmark::State& state) {
 }
 BENCHMARK(BM_PlainDecodingSpacedDouble)->Apply(BM_SpacedArgs);
 
+template <typename T>
+struct ByteStreamSplitDummyValue {
+  static constexpr T value() { return static_cast<T>(42); }
+};
+
+template <typename T, size_t N>
+struct ByteStreamSplitDummyValue<std::array<T, N>> {
+  using Array = std::array<T, N>;
+
+  static constexpr Array value() {
+    Array array;
+    array.fill(ByteStreamSplitDummyValue<T>::value());
+    return array;
+  }
+};
+
 template <typename T, typename DecodeFunc>
 static void BM_ByteStreamSplitDecode(benchmark::State& state, DecodeFunc&& 
decode_func) {
-  std::vector<T> values(state.range(0), 64.0);
+  const std::vector<T> values(state.range(0), 
ByteStreamSplitDummyValue<T>::value());
   const uint8_t* values_raw = reinterpret_cast<const uint8_t*>(values.data());
-  std::vector<T> output(state.range(0), 0);
+  std::vector<T> output(state.range(0));
 
   for (auto _ : state) {
-    decode_func(values_raw, static_cast<int64_t>(values.size()),
-                static_cast<int64_t>(values.size()),
+    decode_func(values_raw,
+                /*width=*/static_cast<int>(sizeof(T)),
+                /*num_values=*/static_cast<int64_t>(values.size()),
+                /*stride=*/static_cast<int64_t>(values.size()),
                 reinterpret_cast<uint8_t*>(output.data()));
     benchmark::ClobberMemory();
   }
   state.SetBytesProcessed(state.iterations() * values.size() * sizeof(T));
+  state.SetItemsProcessed(state.iterations() * values.size());
 }
 
 template <typename T, typename EncodeFunc>
 static void BM_ByteStreamSplitEncode(benchmark::State& state, EncodeFunc&& 
encode_func) {
-  std::vector<T> values(state.range(0), 64.0);
+  const std::vector<T> values(state.range(0), 
ByteStreamSplitDummyValue<T>::value());
   const uint8_t* values_raw = reinterpret_cast<const uint8_t*>(values.data());
-  std::vector<uint8_t> output(state.range(0) * sizeof(T), 0);
+  std::vector<uint8_t> output(state.range(0) * sizeof(T));
 
   for (auto _ : state) {
-    encode_func(values_raw, values.size(), output.data());
+    encode_func(values_raw, /*width=*/static_cast<int>(sizeof(T)), 
values.size(),
+                output.data());
     benchmark::ClobberMemory();
   }
   state.SetBytesProcessed(state.iterations() * values.size() * sizeof(T));
+  state.SetItemsProcessed(state.iterations() * values.size());
+}
+
+static void BM_ByteStreamSplitDecode_Float_Generic(benchmark::State& state) {
+  BM_ByteStreamSplitDecode<float>(state, 
::arrow::util::internal::ByteStreamSplitDecode);
+}
+
+static void BM_ByteStreamSplitDecode_Double_Generic(benchmark::State& state) {
+  BM_ByteStreamSplitDecode<double>(state, 
::arrow::util::internal::ByteStreamSplitDecode);
+}
+
+template <int N>
+static void BM_ByteStreamSplitDecode_FLBA_Generic(benchmark::State& state) {
+  BM_ByteStreamSplitDecode<std::array<int8_t, N>>(
+      state, ::arrow::util::internal::ByteStreamSplitDecode);
+}
+
+static void BM_ByteStreamSplitEncode_Float_Generic(benchmark::State& state) {
+  BM_ByteStreamSplitEncode<float>(state, 
::arrow::util::internal::ByteStreamSplitEncode);
+}
+
+static void BM_ByteStreamSplitEncode_Double_Generic(benchmark::State& state) {
+  BM_ByteStreamSplitEncode<double>(state, 
::arrow::util::internal::ByteStreamSplitEncode);
+}
+
+template <int N>
+static void BM_ByteStreamSplitEncode_FLBA_Generic(benchmark::State& state) {
+  BM_ByteStreamSplitEncode<std::array<int8_t, N>>(
+      state, ::arrow::util::internal::ByteStreamSplitEncode);
 }
 
 static void BM_ByteStreamSplitDecode_Float_Scalar(benchmark::State& state) {
@@ -409,10 +459,29 @@ static void 
BM_ByteStreamSplitEncode_Double_Scalar(benchmark::State& state) {
       state, 
::arrow::util::internal::ByteStreamSplitEncodeScalar<sizeof(double)>);
 }
 
-BENCHMARK(BM_ByteStreamSplitDecode_Float_Scalar)->Range(MIN_RANGE, MAX_RANGE);
-BENCHMARK(BM_ByteStreamSplitDecode_Double_Scalar)->Range(MIN_RANGE, MAX_RANGE);
-BENCHMARK(BM_ByteStreamSplitEncode_Float_Scalar)->Range(MIN_RANGE, MAX_RANGE);
-BENCHMARK(BM_ByteStreamSplitEncode_Double_Scalar)->Range(MIN_RANGE, MAX_RANGE);
+static void ByteStreamSplitApply(::benchmark::internal::Benchmark* bench) {
+  // Reduce the number of variations by only testing the two range ends.
+  bench->Arg(MIN_RANGE)->Arg(MAX_RANGE);
+}
+
+BENCHMARK(BM_ByteStreamSplitDecode_Float_Generic)->Apply(ByteStreamSplitApply);
+BENCHMARK(BM_ByteStreamSplitDecode_Double_Generic)->Apply(ByteStreamSplitApply);
+BENCHMARK_TEMPLATE(BM_ByteStreamSplitDecode_FLBA_Generic, 
2)->Apply(ByteStreamSplitApply);
+BENCHMARK_TEMPLATE(BM_ByteStreamSplitDecode_FLBA_Generic, 
7)->Apply(ByteStreamSplitApply);
+BENCHMARK_TEMPLATE(BM_ByteStreamSplitDecode_FLBA_Generic, 16)
+    ->Apply(ByteStreamSplitApply);
+
+BENCHMARK(BM_ByteStreamSplitEncode_Float_Generic)->Apply(ByteStreamSplitApply);
+BENCHMARK(BM_ByteStreamSplitEncode_Double_Generic)->Apply(ByteStreamSplitApply);
+BENCHMARK_TEMPLATE(BM_ByteStreamSplitEncode_FLBA_Generic, 
2)->Apply(ByteStreamSplitApply);
+BENCHMARK_TEMPLATE(BM_ByteStreamSplitEncode_FLBA_Generic, 
7)->Apply(ByteStreamSplitApply);
+BENCHMARK_TEMPLATE(BM_ByteStreamSplitEncode_FLBA_Generic, 16)
+    ->Apply(ByteStreamSplitApply);
+
+BENCHMARK(BM_ByteStreamSplitDecode_Float_Scalar)->Apply(ByteStreamSplitApply);
+BENCHMARK(BM_ByteStreamSplitDecode_Double_Scalar)->Apply(ByteStreamSplitApply);
+BENCHMARK(BM_ByteStreamSplitEncode_Float_Scalar)->Apply(ByteStreamSplitApply);
+BENCHMARK(BM_ByteStreamSplitEncode_Double_Scalar)->Apply(ByteStreamSplitApply);
 
 #if defined(ARROW_HAVE_SSE4_2)
 static void BM_ByteStreamSplitDecode_Float_Sse2(benchmark::State& state) {
@@ -435,10 +504,10 @@ static void 
BM_ByteStreamSplitEncode_Double_Sse2(benchmark::State& state) {
       state, 
::arrow::util::internal::ByteStreamSplitEncodeSimd128<sizeof(double)>);
 }
 
-BENCHMARK(BM_ByteStreamSplitDecode_Float_Sse2)->Range(MIN_RANGE, MAX_RANGE);
-BENCHMARK(BM_ByteStreamSplitDecode_Double_Sse2)->Range(MIN_RANGE, MAX_RANGE);
-BENCHMARK(BM_ByteStreamSplitEncode_Float_Sse2)->Range(MIN_RANGE, MAX_RANGE);
-BENCHMARK(BM_ByteStreamSplitEncode_Double_Sse2)->Range(MIN_RANGE, MAX_RANGE);
+BENCHMARK(BM_ByteStreamSplitDecode_Float_Sse2)->Apply(ByteStreamSplitApply);
+BENCHMARK(BM_ByteStreamSplitDecode_Double_Sse2)->Apply(ByteStreamSplitApply);
+BENCHMARK(BM_ByteStreamSplitEncode_Float_Sse2)->Apply(ByteStreamSplitApply);
+BENCHMARK(BM_ByteStreamSplitEncode_Double_Sse2)->Apply(ByteStreamSplitApply);
 #endif
 
 #if defined(ARROW_HAVE_AVX2)
@@ -462,10 +531,10 @@ static void 
BM_ByteStreamSplitEncode_Double_Avx2(benchmark::State& state) {
       state, 
::arrow::util::internal::ByteStreamSplitEncodeAvx2<sizeof(double)>);
 }
 
-BENCHMARK(BM_ByteStreamSplitDecode_Float_Avx2)->Range(MIN_RANGE, MAX_RANGE);
-BENCHMARK(BM_ByteStreamSplitDecode_Double_Avx2)->Range(MIN_RANGE, MAX_RANGE);
-BENCHMARK(BM_ByteStreamSplitEncode_Float_Avx2)->Range(MIN_RANGE, MAX_RANGE);
-BENCHMARK(BM_ByteStreamSplitEncode_Double_Avx2)->Range(MIN_RANGE, MAX_RANGE);
+BENCHMARK(BM_ByteStreamSplitDecode_Float_Avx2)->Apply(ByteStreamSplitApply);
+BENCHMARK(BM_ByteStreamSplitDecode_Double_Avx2)->Apply(ByteStreamSplitApply);
+BENCHMARK(BM_ByteStreamSplitEncode_Float_Avx2)->Apply(ByteStreamSplitApply);
+BENCHMARK(BM_ByteStreamSplitEncode_Double_Avx2)->Apply(ByteStreamSplitApply);
 #endif
 
 #if defined(ARROW_HAVE_NEON)
diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc
index ee581622c8..ea0029f4c7 100644
--- a/cpp/src/parquet/encoding_test.cc
+++ b/cpp/src/parquet/encoding_test.cc
@@ -47,6 +47,7 @@
 using arrow::default_memory_pool;
 using arrow::MemoryPool;
 using arrow::internal::checked_cast;
+using arrow::util::span;
 
 namespace bit_util = arrow::bit_util;
 
@@ -711,8 +712,11 @@ class EncodingAdHocTyped : public ::testing::Test {
   }
 
   void ByteStreamSplit(int seed) {
-    if (!std::is_same<ParquetType, FloatType>::value &&
-        !std::is_same<ParquetType, DoubleType>::value) {
+    if constexpr (!std::is_same_v<ParquetType, FloatType> &&
+                  !std::is_same_v<ParquetType, DoubleType> &&
+                  !std::is_same_v<ParquetType, Int32Type> &&
+                  !std::is_same_v<ParquetType, Int64Type> &&
+                  !std::is_same_v<ParquetType, FLBAType>) {
       return;
     }
     auto values = GetValues(seed);
@@ -1234,7 +1238,8 @@ class TestByteStreamSplitEncoding : public 
TestEncodingBase<Type> {
 
     encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
     encode_buffer_ = encoder->FlushValues();
-    decoder->SetData(num_values_, encode_buffer_->data(),
+    ASSERT_EQ(encode_buffer_->size(), physical_byte_width() * (num_values_ - 
null_count));
+    decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
                      static_cast<int>(encode_buffer_->size()));
     auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, 
null_count,
                                                 valid_bits, valid_bits_offset);
@@ -1249,33 +1254,58 @@ class TestByteStreamSplitEncoding : public 
TestEncodingBase<Type> {
  protected:
   USING_BASE_MEMBERS();
 
-  void CheckDecode(const uint8_t* encoded_data, const int64_t 
encoded_data_size,
-                   const c_type* expected_decoded_data, const int 
num_elements) {
+  template <typename U>
+  void CheckDecode(span<const uint8_t> encoded_data, span<const U> 
expected_decoded_data,
+                   const ColumnDescriptor* descr = nullptr) {
+    static_assert(sizeof(U) == sizeof(c_type));
+    static_assert(std::is_same_v<U, FLBA> == std::is_same_v<c_type, FLBA>);
+
     std::unique_ptr<TypedDecoder<Type>> decoder =
-        MakeTypedDecoder<Type>(Encoding::BYTE_STREAM_SPLIT);
-    decoder->SetData(num_elements, encoded_data, 
static_cast<int>(encoded_data_size));
-    std::vector<c_type> decoded_data(num_elements);
-    int num_decoded_elements = decoder->Decode(decoded_data.data(), 
num_elements);
+        MakeTypedDecoder<Type>(Encoding::BYTE_STREAM_SPLIT, descr);
+    int num_elements = static_cast<int>(expected_decoded_data.size());
+    decoder->SetData(num_elements, encoded_data.data(),
+                     static_cast<int>(encoded_data.size()));
+    std::vector<U> decoded_data(num_elements);
+    int num_decoded_elements =
+        decoder->Decode(reinterpret_cast<c_type*>(decoded_data.data()), 
num_elements);
     ASSERT_EQ(num_elements, num_decoded_elements);
-    for (size_t i = 0U; i < decoded_data.size(); ++i) {
-      ASSERT_EQ(expected_decoded_data[i], decoded_data[i]);
+    // Compare to expected values
+    if constexpr (std::is_same_v<c_type, FLBA>) {
+      auto type_length = descr->type_length();
+      for (int i = 0; i < num_elements; ++i) {
+        ASSERT_EQ(span<const uint8_t>(expected_decoded_data[i].ptr, 
type_length),
+                  span<const uint8_t>(decoded_data[i].ptr, type_length));
+      }
+    } else {
+      for (int i = 0; i < num_elements; ++i) {
+        ASSERT_EQ(expected_decoded_data[i], decoded_data[i]);
+      }
     }
     ASSERT_EQ(0, decoder->values_left());
   }
 
-  void CheckEncode(const c_type* data, const int num_elements,
-                   const uint8_t* expected_encoded_data,
-                   const int64_t encoded_data_size) {
-    std::unique_ptr<TypedEncoder<Type>> encoder =
-        MakeTypedEncoder<Type>(Encoding::BYTE_STREAM_SPLIT);
-    encoder->Put(data, num_elements);
+  template <typename U>
+  void CheckEncode(span<const U> data, span<const uint8_t> 
expected_encoded_data,
+                   const ColumnDescriptor* descr = nullptr) {
+    static_assert(sizeof(U) == sizeof(c_type));
+    static_assert(std::is_same_v<U, FLBA> == std::is_same_v<c_type, FLBA>);
+
+    std::unique_ptr<TypedEncoder<Type>> encoder = MakeTypedEncoder<Type>(
+        Encoding::BYTE_STREAM_SPLIT, /*use_dictionary=*/false, descr);
+    int num_elements = static_cast<int>(data.size());
+    encoder->Put(reinterpret_cast<const c_type*>(data.data()), num_elements);
     auto encoded_data = encoder->FlushValues();
-    ASSERT_EQ(encoded_data_size, encoded_data->size());
+    ASSERT_EQ(expected_encoded_data.size(), encoded_data->size());
     const uint8_t* encoded_data_raw = encoded_data->data();
     for (int64_t i = 0; i < encoded_data->size(); ++i) {
       ASSERT_EQ(expected_encoded_data[i], encoded_data_raw[i]);
     }
   }
+
+  int physical_byte_width() const {
+    return std::is_same_v<c_type, FLBA> ? descr_->type_length()
+                                        : static_cast<int>(sizeof(c_type));
+  }
 };
 
 template <typename c_type>
@@ -1287,54 +1317,97 @@ static std::vector<c_type> ToLittleEndian(const 
std::vector<c_type>& input) {
   return data;
 }
 
-static_assert(sizeof(float) == sizeof(uint32_t),
-              "BYTE_STREAM_SPLIT encoding tests assume float / uint32_t type 
sizes");
-static_assert(sizeof(double) == sizeof(uint64_t),
-              "BYTE_STREAM_SPLIT encoding tests assume double / uint64_t type 
sizes");
-
-template <>
-void TestByteStreamSplitEncoding<FloatType>::CheckDecode() {
-  const uint8_t data[] = {0x11, 0x22, 0x33, 0x44, 0x55, 0x66,
-                          0x77, 0x88, 0x99, 0xAA, 0xBB, 0xCC};
-  const auto expected_output =
-      ToLittleEndian<uint32_t>({0xAA774411U, 0xBB885522U, 0xCC996633U});
-  CheckDecode(data, static_cast<int64_t>(sizeof(data)),
-              reinterpret_cast<const float*>(expected_output.data()),
-              static_cast<int>(sizeof(data) / sizeof(float)));
-}
-
-template <>
-void TestByteStreamSplitEncoding<DoubleType>::CheckDecode() {
-  const uint8_t data[] = {0xDE, 0xC0, 0x37, 0x13, 0x11, 0x22, 0x33, 0x44,
-                          0xAA, 0xBB, 0xCC, 0xDD, 0x55, 0x66, 0x77, 0x88};
-  const auto expected_output =
-      ToLittleEndian<uint64_t>({0x7755CCAA331137DEULL, 0x8866DDBB442213C0ULL});
-  CheckDecode(data, static_cast<int64_t>(sizeof(data)),
-              reinterpret_cast<const double*>(expected_output.data()),
-              static_cast<int>(sizeof(data) / sizeof(double)));
-}
-
-template <>
-void TestByteStreamSplitEncoding<DoubleType>::CheckEncode() {
-  const auto data = ToLittleEndian<uint64_t>(
-      {0x4142434445464748ULL, 0x0102030405060708ULL, 0xb1b2b3b4b5b6b7b8ULL});
-  const uint8_t expected_output[24] = {
-      0x48, 0x08, 0xb8, 0x47, 0x07, 0xb7, 0x46, 0x06, 0xb6, 0x45, 0x05, 0xb5,
-      0x44, 0x04, 0xb4, 0x43, 0x03, 0xb3, 0x42, 0x02, 0xb2, 0x41, 0x01, 0xb1,
-  };
-  CheckEncode(reinterpret_cast<const double*>(data.data()), 
static_cast<int>(data.size()),
-              expected_output, sizeof(expected_output));
+std::shared_ptr<ColumnDescriptor> FLBAColumnDescriptor(int type_length) {
+  auto node =
+      schema::PrimitiveNode::Make("", Repetition::REQUIRED, 
Type::FIXED_LEN_BYTE_ARRAY,
+                                  ConvertedType::NONE, type_length);
+  return std::make_shared<ColumnDescriptor>(std::move(node), 
/*max_definition_level=*/0,
+                                            /*max_repetition_level=*/0);
 }
 
-template <>
-void TestByteStreamSplitEncoding<FloatType>::CheckEncode() {
-  const auto data = ToLittleEndian<uint32_t>({0xaabbccdd, 0x11223344});
-  const uint8_t expected_output[8] = {0xdd, 0x44, 0xcc, 0x33, 0xbb, 0x22, 
0xaa, 0x11};
-  CheckEncode(reinterpret_cast<const float*>(data.data()), 
static_cast<int>(data.size()),
-              expected_output, sizeof(expected_output));
+template <typename Type>
+void TestByteStreamSplitEncoding<Type>::CheckDecode() {
+  if constexpr (std::is_same_v<c_type, FLBA>) {
+    // FIXED_LEN_BYTE_ARRAY
+    // - type_length = 3
+    {
+      const std::vector<uint8_t> data{0x11, 0x22, 0x33, 0x44, 0x55, 0x66,
+                                      0x77, 0x88, 0x99, 0xAA, 0xBB, 0xCC};
+      const std::vector<uint8_t> raw_expected_output{0x11, 0x55, 0x99, 0x22, 
0x66, 0xAA,
+                                                     0x33, 0x77, 0xBB, 0x44, 
0x88, 0xCC};
+      const std::vector<FLBA> expected_output{
+          FLBA{&raw_expected_output[0]}, FLBA{&raw_expected_output[3]},
+          FLBA{&raw_expected_output[6]}, FLBA{&raw_expected_output[9]}};
+      CheckDecode(span{data}, span{expected_output}, 
FLBAColumnDescriptor(3).get());
+    }
+    // - type_length = 1
+    {
+      const std::vector<uint8_t> data{0x11, 0x22, 0x33};
+      const std::vector<uint8_t> raw_expected_output{0x11, 0x22, 0x33};
+      const std::vector<FLBA> expected_output{FLBA{&raw_expected_output[0]},
+                                              FLBA{&raw_expected_output[1]},
+                                              FLBA{&raw_expected_output[2]}};
+      CheckDecode(span{data}, span{expected_output}, 
FLBAColumnDescriptor(1).get());
+    }
+  } else if constexpr (sizeof(c_type) == 4) {
+    // INT32, FLOAT
+    const std::vector<uint8_t> data{0x11, 0x22, 0x33, 0x44, 0x55, 0x66,
+                                    0x77, 0x88, 0x99, 0xAA, 0xBB, 0xCC};
+    const auto expected_output =
+        ToLittleEndian<uint32_t>({0xAA774411U, 0xBB885522U, 0xCC996633U});
+    CheckDecode(span{data}, span{expected_output});
+  } else {
+    // INT64, DOUBLE
+    const std::vector<uint8_t> data{0xDE, 0xC0, 0x37, 0x13, 0x11, 0x22, 0x33, 
0x44,
+                                    0xAA, 0xBB, 0xCC, 0xDD, 0x55, 0x66, 0x77, 
0x88};
+    const auto expected_output =
+        ToLittleEndian<uint64_t>({0x7755CCAA331137DEULL, 
0x8866DDBB442213C0ULL});
+    CheckDecode(span{data}, span{expected_output});
+  }
 }
 
-typedef ::testing::Types<FloatType, DoubleType> ByteStreamSplitTypes;
+template <typename Type>
+void TestByteStreamSplitEncoding<Type>::CheckEncode() {
+  if constexpr (std::is_same_v<c_type, FLBA>) {
+    // FIXED_LEN_BYTE_ARRAY
+    // - type_length = 3
+    {
+      const std::vector<uint8_t> raw_data{0x11, 0x22, 0x33, 0x44, 0x55, 0x66,
+                                          0x77, 0x88, 0x99, 0xAA, 0xBB, 0xCC};
+      const std::vector<FLBA> data{FLBA{&raw_data[0]}, FLBA{&raw_data[3]},
+                                   FLBA{&raw_data[6]}, FLBA{&raw_data[9]}};
+      const std::vector<uint8_t> expected_output{0x11, 0x44, 0x77, 0xAA, 0x22, 
0x55,
+                                                 0x88, 0xBB, 0x33, 0x66, 0x99, 
0xCC};
+      CheckEncode(span{data}, span{expected_output}, 
FLBAColumnDescriptor(3).get());
+    }
+    // - type_length = 1
+    {
+      const std::vector<uint8_t> raw_data{0x11, 0x22, 0x33};
+      const std::vector<FLBA> data{FLBA{&raw_data[0]}, FLBA{&raw_data[1]},
+                                   FLBA{&raw_data[2]}};
+      const std::vector<uint8_t> expected_output{0x11, 0x22, 0x33};
+      CheckEncode(span{data}, span{expected_output}, 
FLBAColumnDescriptor(1).get());
+    }
+  } else if constexpr (sizeof(c_type) == 4) {
+    // INT32, FLOAT
+    const auto data = ToLittleEndian<uint32_t>({0xaabbccddUL, 0x11223344UL});
+    const std::vector<uint8_t> expected_output{0xdd, 0x44, 0xcc, 0x33,
+                                               0xbb, 0x22, 0xaa, 0x11};
+    CheckEncode(span{data}, span{expected_output});
+  } else {
+    // INT64, DOUBLE
+    const auto data = ToLittleEndian<uint64_t>(
+        {0x4142434445464748ULL, 0x0102030405060708ULL, 0xb1b2b3b4b5b6b7b8ULL});
+    const std::vector<uint8_t> expected_output{
+        0x48, 0x08, 0xb8, 0x47, 0x07, 0xb7, 0x46, 0x06, 0xb6, 0x45, 0x05, 0xb5,
+        0x44, 0x04, 0xb4, 0x43, 0x03, 0xb3, 0x42, 0x02, 0xb2, 0x41, 0x01, 0xb1,
+    };
+    CheckEncode(span{data}, span{expected_output});
+  }
+}
+
+using ByteStreamSplitTypes =
+    ::testing::Types<Int32Type, Int64Type, FloatType, DoubleType, FLBAType>;
 TYPED_TEST_SUITE(TestByteStreamSplitEncoding, ByteStreamSplitTypes);
 
 TYPED_TEST(TestByteStreamSplitEncoding, BasicRoundTrip) {
@@ -1397,30 +1470,20 @@ TYPED_TEST(TestByteStreamSplitEncoding, 
CheckOnlyEncode) {
 
 TEST(ByteStreamSplitEncodeDecode, InvalidDataTypes) {
   // First check encoders.
-  ASSERT_THROW(MakeTypedEncoder<Int32Type>(Encoding::BYTE_STREAM_SPLIT),
-               ParquetException);
-  ASSERT_THROW(MakeTypedEncoder<Int64Type>(Encoding::BYTE_STREAM_SPLIT),
-               ParquetException);
   ASSERT_THROW(MakeTypedEncoder<Int96Type>(Encoding::BYTE_STREAM_SPLIT),
                ParquetException);
   ASSERT_THROW(MakeTypedEncoder<BooleanType>(Encoding::BYTE_STREAM_SPLIT),
                ParquetException);
   ASSERT_THROW(MakeTypedEncoder<ByteArrayType>(Encoding::BYTE_STREAM_SPLIT),
                ParquetException);
-  ASSERT_THROW(MakeTypedEncoder<FLBAType>(Encoding::BYTE_STREAM_SPLIT), 
ParquetException);
 
   // Then check decoders.
-  ASSERT_THROW(MakeTypedDecoder<Int32Type>(Encoding::BYTE_STREAM_SPLIT),
-               ParquetException);
-  ASSERT_THROW(MakeTypedDecoder<Int64Type>(Encoding::BYTE_STREAM_SPLIT),
-               ParquetException);
   ASSERT_THROW(MakeTypedDecoder<Int96Type>(Encoding::BYTE_STREAM_SPLIT),
                ParquetException);
   ASSERT_THROW(MakeTypedDecoder<BooleanType>(Encoding::BYTE_STREAM_SPLIT),
                ParquetException);
   ASSERT_THROW(MakeTypedDecoder<ByteArrayType>(Encoding::BYTE_STREAM_SPLIT),
                ParquetException);
-  ASSERT_THROW(MakeTypedDecoder<FLBAType>(Encoding::BYTE_STREAM_SPLIT), 
ParquetException);
 }
 
 // ----------------------------------------------------------------------
diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc
index f9c2e06873..7b2169e247 100644
--- a/cpp/src/parquet/reader_test.cc
+++ b/cpp/src/parquet/reader_test.cc
@@ -21,6 +21,8 @@
 #include <iostream>
 #include <memory>
 #include <string>
+#include <type_traits>
+#include <utility>
 
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
@@ -33,6 +35,7 @@
 #include "arrow/testing/random.h"
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/config.h"
+#include "arrow/util/range.h"
 
 #include "parquet/column_reader.h"
 #include "parquet/column_scanner.h"
@@ -45,6 +48,7 @@
 #include "parquet/test_util.h"
 
 using arrow::internal::checked_pointer_cast;
+using arrow::internal::Zip;
 
 namespace parquet {
 using schema::GroupNode;
@@ -123,6 +127,10 @@ std::string concatenated_gzip_members() {
 
 std::string byte_stream_split() { return 
data_file("byte_stream_split.zstd.parquet"); }
 
+std::string byte_stream_split_extended() {
+  return data_file("byte_stream_split_extended.gzip.parquet");
+}
+
 template <typename DType, typename ValueType = typename DType::c_type>
 std::vector<ValueType> ReadColumnValues(ParquetFileReader* file_reader, int 
row_group,
                                         int column, int64_t 
expected_values_read) {
@@ -137,6 +145,23 @@ std::vector<ValueType> ReadColumnValues(ParquetFileReader* 
file_reader, int row_
   return values;
 }
 
+template <typename ValueType>
+void AssertColumnValuesEqual(const ColumnDescriptor* descr,
+                             const std::vector<ValueType>& left_values,
+                             const std::vector<ValueType>& right_values) {
+  if constexpr (std::is_same_v<ValueType, FLBA>) {
+    // operator== for FLBA in test_util.h is unusable (it hard-codes length to 
12)
+    const auto length = descr->type_length();
+    for (const auto& [left, right] : Zip(left_values, right_values)) {
+      std::string_view left_view(reinterpret_cast<const char*>(left.ptr), 
length);
+      std::string_view right_view(reinterpret_cast<const char*>(right.ptr), 
length);
+      ASSERT_EQ(left_view, right_view);
+    }
+  } else {
+    ASSERT_EQ(left_values, right_values);
+  }
+}
+
 // TODO: Assert on definition and repetition levels
 template <typename DType, typename ValueType = typename DType::c_type>
 void AssertColumnValues(std::shared_ptr<TypedColumnReader<DType>> col, int64_t 
batch_size,
@@ -149,9 +174,46 @@ void 
AssertColumnValues(std::shared_ptr<TypedColumnReader<DType>> col, int64_t b
   auto levels_read =
       col->ReadBatch(batch_size, nullptr, nullptr, values.data(), 
&values_read);
   ASSERT_EQ(expected_levels_read, levels_read);
+  ASSERT_EQ(expected_values_read, values_read);
+  AssertColumnValuesEqual(col->descr(), expected_values, values);
+}
+
+template <typename DType, typename ValueType = typename DType::c_type>
+void AssertColumnValuesEqual(std::shared_ptr<TypedColumnReader<DType>> 
left_col,
+                             std::shared_ptr<TypedColumnReader<DType>> 
right_col,
+                             int64_t batch_size, int64_t expected_levels_read,
+                             int64_t expected_values_read) {
+  std::vector<ValueType> left_values(batch_size);
+  std::vector<ValueType> right_values(batch_size);
+  int64_t values_read, levels_read;
+
+  levels_read =
+      left_col->ReadBatch(batch_size, nullptr, nullptr, left_values.data(), 
&values_read);
+  ASSERT_EQ(expected_levels_read, levels_read);
+  ASSERT_EQ(expected_values_read, values_read);
 
-  ASSERT_EQ(expected_values, values);
+  levels_read = right_col->ReadBatch(batch_size, nullptr, nullptr, 
right_values.data(),
+                                     &values_read);
+  ASSERT_EQ(expected_levels_read, levels_read);
   ASSERT_EQ(expected_values_read, values_read);
+
+  AssertColumnValuesEqual(left_col->descr(), left_values, right_values);
+}
+
+template <typename DType, typename ValueType = typename DType::c_type>
+void AssertColumnValuesEqual(ParquetFileReader* file_reader, const 
std::string& left_col,
+                             const std::string& right_col, int64_t num_rows,
+                             int row_group = 0) {
+  ARROW_SCOPED_TRACE("left_col = '", left_col, "', right_col = '", right_col, 
"'");
+
+  auto left_col_index = 
file_reader->metadata()->schema()->ColumnIndex(left_col);
+  auto right_col_index = 
file_reader->metadata()->schema()->ColumnIndex(right_col);
+  auto row_group_reader = file_reader->RowGroup(row_group);
+  auto left_reader = checked_pointer_cast<TypedColumnReader<DType>>(
+      row_group_reader->Column(left_col_index));
+  auto right_reader = checked_pointer_cast<TypedColumnReader<DType>>(
+      row_group_reader->Column(right_col_index));
+  AssertColumnValuesEqual(left_reader, right_reader, num_rows, num_rows, 
num_rows);
 }
 
 void CheckRowGroupMetadata(const RowGroupMetaData* rg_metadata,
@@ -1522,6 +1584,34 @@ TEST(TestByteStreamSplit, FloatIntegrationFile) {
 }
 #endif  // ARROW_WITH_ZSTD
 
+#ifdef ARROW_WITH_ZLIB
+TEST(TestByteStreamSplit, ExtendedIntegrationFile) {
+  auto file_path = byte_stream_split_extended();
+  auto file = ParquetFileReader::OpenFile(file_path);
+
+  const int64_t kNumRows = 200;
+
+  ASSERT_EQ(kNumRows, file->metadata()->num_rows());
+  ASSERT_EQ(14, file->metadata()->num_columns());
+  ASSERT_EQ(1, file->metadata()->num_row_groups());
+
+  AssertColumnValuesEqual<FloatType>(file.get(), "float_plain", 
"float_byte_stream_split",
+                                     kNumRows);
+  AssertColumnValuesEqual<DoubleType>(file.get(), "double_plain",
+                                      "double_byte_stream_split", kNumRows);
+  AssertColumnValuesEqual<Int32Type>(file.get(), "int32_plain", 
"int32_byte_stream_split",
+                                     kNumRows);
+  AssertColumnValuesEqual<Int64Type>(file.get(), "int64_plain", 
"int64_byte_stream_split",
+                                     kNumRows);
+  AssertColumnValuesEqual<FLBAType>(file.get(), "float16_plain",
+                                    "float16_byte_stream_split", kNumRows);
+  AssertColumnValuesEqual<FLBAType>(file.get(), "flba5_plain", 
"flba5_byte_stream_split",
+                                    kNumRows);
+  AssertColumnValuesEqual<FLBAType>(file.get(), "decimal_plain",
+                                    "decimal_byte_stream_split", kNumRows);
+}
+#endif  // ARROW_WITH_ZLIB
+
 struct PageIndexReaderParam {
   std::vector<int32_t> row_group_indices;
   std::vector<int32_t> column_indices;
diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing
index 4cb3cff24c..74278bc4a1 160000
--- a/cpp/submodules/parquet-testing
+++ b/cpp/submodules/parquet-testing
@@ -1 +1 @@
-Subproject commit 4cb3cff24c965fb329cdae763eabce47395a68a0
+Subproject commit 74278bc4a1122d74945969e6dec405abd1533ec3
diff --git a/python/pyarrow/tests/parquet/test_basic.py 
b/python/pyarrow/tests/parquet/test_basic.py
index bc21d709ec..56b967a059 100644
--- a/python/pyarrow/tests/parquet/test_basic.py
+++ b/python/pyarrow/tests/parquet/test_basic.py
@@ -323,6 +323,7 @@ def test_byte_stream_split():
     # This is only a smoke test.
     arr_float = pa.array(list(map(float, range(100))))
     arr_int = pa.array(list(map(int, range(100))))
+    arr_bool = pa.array([True, False] * 50)
     data_float = [arr_float, arr_float]
     table = pa.Table.from_arrays(data_float, names=['a', 'b'])
 
@@ -342,16 +343,16 @@ def test_byte_stream_split():
                      use_byte_stream_split=['a', 'b'])
 
     # Check with mixed column types.
-    mixed_table = pa.Table.from_arrays([arr_float, arr_int],
-                                       names=['a', 'b'])
+    mixed_table = pa.Table.from_arrays([arr_float, arr_float, arr_int, 
arr_int],
+                                       names=['a', 'b', 'c', 'd'])
     _check_roundtrip(mixed_table, expected=mixed_table,
-                     use_dictionary=['b'],
-                     use_byte_stream_split=['a'])
+                     use_dictionary=['b', 'd'],
+                     use_byte_stream_split=['a', 'c'])
 
     # Try to use the wrong data type with the byte_stream_split encoding.
     # This should throw an exception.
-    table = pa.Table.from_arrays([arr_int], names=['tmp'])
-    with pytest.raises(IOError):
+    table = pa.Table.from_arrays([arr_bool], names=['tmp'])
+    with pytest.raises(IOError, match='BYTE_STREAM_SPLIT only supports'):
         _check_roundtrip(table, expected=table, use_byte_stream_split=True,
                          use_dictionary=False)
 
@@ -367,12 +368,13 @@ def test_column_encoding():
         [arr_float, arr_int, arr_bin, arr_flba, arr_bool],
         names=['a', 'b', 'c', 'd', 'e'])
 
-    # Check "BYTE_STREAM_SPLIT" for column 'a' and "PLAIN" column_encoding for
-    # column 'b' and 'c'.
+    # Check "BYTE_STREAM_SPLIT" for columns 'a', 'b', 'd'
+    # and "PLAIN" column_encoding for column 'c'.
     _check_roundtrip(mixed_table, expected=mixed_table, use_dictionary=False,
                      column_encoding={'a': "BYTE_STREAM_SPLIT",
-                                      'b': "PLAIN",
-                                      'c': "PLAIN"})
+                                      'b': "BYTE_STREAM_SPLIT",
+                                      'c': "PLAIN",
+                                      'd': "BYTE_STREAM_SPLIT"})
 
     # Check "PLAIN" for all columns.
     _check_roundtrip(mixed_table, expected=mixed_table,
@@ -406,20 +408,20 @@ def test_column_encoding():
                      use_dictionary=False,
                      column_encoding={'e': "RLE"})
 
-    # Try to pass "BYTE_STREAM_SPLIT" column encoding for integer column 'b'.
-    # This should throw an error as it is only supports FLOAT and DOUBLE.
+    # Try to pass "BYTE_STREAM_SPLIT" column encoding for boolean column 'e'.
+    # This should throw an error as it is does not support BOOLEAN.
     with pytest.raises(IOError,
-                       match="BYTE_STREAM_SPLIT only supports FLOAT and"
-                             " DOUBLE"):
+                       match="BYTE_STREAM_SPLIT only supports"):
         _check_roundtrip(mixed_table, expected=mixed_table,
                          use_dictionary=False,
                          column_encoding={'a': "PLAIN",
-                                          'b': "BYTE_STREAM_SPLIT",
-                                          'c': "PLAIN"})
+                                          'c': "PLAIN",
+                                          'e': "BYTE_STREAM_SPLIT"})
 
     # Try to pass use "DELTA_BINARY_PACKED" encoding on float column.
     # This should throw an error as only integers are supported.
-    with pytest.raises(OSError):
+    with pytest.raises(OSError,
+                       match="DELTA_BINARY_PACKED encoder only supports"):
         _check_roundtrip(mixed_table, expected=mixed_table,
                          use_dictionary=False,
                          column_encoding={'a': "DELTA_BINARY_PACKED",
@@ -429,13 +431,15 @@ def test_column_encoding():
     # Try to pass "RLE_DICTIONARY".
     # This should throw an error as dictionary encoding is already used by
     # default and not supported to be specified as "fallback" encoding
-    with pytest.raises(ValueError):
+    with pytest.raises(ValueError,
+                       match="'RLE_DICTIONARY' is already used by default"):
         _check_roundtrip(mixed_table, expected=mixed_table,
                          use_dictionary=False,
                          column_encoding="RLE_DICTIONARY")
 
     # Try to pass unsupported encoding.
-    with pytest.raises(ValueError):
+    with pytest.raises(ValueError,
+                       match="Unsupported column encoding: 
'MADE_UP_ENCODING'"):
         _check_roundtrip(mixed_table, expected=mixed_table,
                          use_dictionary=False,
                          column_encoding={'a': "MADE_UP_ENCODING"})


Reply via email to