This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new ace14018e Add benchmarks for `BYTE_STREAM_SPLIT` encoded Parquet
`FIXED_LEN_BYTE_ARRAY` data (#6204)
ace14018e is described below
commit ace14018ed3f6571b313e5e34761128242853fda
Author: Ed Seidl <[email protected]>
AuthorDate: Thu Aug 8 04:45:12 2024 -0700
Add benchmarks for `BYTE_STREAM_SPLIT` encoded Parquet
`FIXED_LEN_BYTE_ARRAY` data (#6204)
* save type_width for fixed_len_byte_array
* add decimal128 and float16 byte_stream_split benches
* add f16
* add decimal128 flba(16) bench
---
parquet/benches/arrow_reader.rs | 235 +++++++++++++++++++++++++++++-
parquet/benches/encoding.rs | 27 +++-
parquet/src/util/test_common/page_util.rs | 6 +-
3 files changed, 261 insertions(+), 7 deletions(-)
diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs
index 814e75c24..18e16f0a4 100644
--- a/parquet/benches/arrow_reader.rs
+++ b/parquet/benches/arrow_reader.rs
@@ -20,6 +20,7 @@ use arrow::datatypes::DataType;
use arrow_schema::Field;
use criterion::measurement::WallTime;
use criterion::{criterion_group, criterion_main, BenchmarkGroup, Criterion};
+use half::f16;
use num::FromPrimitive;
use num_bigint::BigInt;
use parquet::arrow::array_reader::{
@@ -65,6 +66,8 @@ fn build_test_schema() -> SchemaDescPtr {
}
REQUIRED BYTE_ARRAY mandatory_binary_leaf;
OPTIONAL BYTE_ARRAY optional_binary_leaf;
+ REQUIRED FIXED_LEN_BYTE_ARRAY (2) mandatory_f16_leaf (Float16);
+ OPTIONAL FIXED_LEN_BYTE_ARRAY (2) optional_f16_leaf (Float16);
}
";
parse_message_type(message_type)
@@ -84,6 +87,64 @@ pub fn seedable_rng() -> StdRng {
StdRng::seed_from_u64(42)
}
+// support byte array for float16
+fn build_encoded_f16_bytes_page_iterator<T>(
+ column_desc: ColumnDescPtr,
+ null_density: f32,
+ encoding: Encoding,
+ min: f32,
+ max: f32,
+) -> impl PageIterator + Clone
+where
+ T: parquet::data_type::DataType,
+ T::T: From<Vec<u8>>,
+{
+ let max_def_level = column_desc.max_def_level();
+ let max_rep_level = column_desc.max_rep_level();
+ let rep_levels = vec![0; VALUES_PER_PAGE];
+ let mut rng = seedable_rng();
+ let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+ for _i in 0..NUM_ROW_GROUPS {
+ let mut column_chunk_pages = Vec::new();
+ for _j in 0..PAGES_PER_GROUP {
+ // generate page
+ let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+ let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+ for _k in 0..VALUES_PER_PAGE {
+ let def_level = if rng.gen::<f32>() < null_density {
+ max_def_level - 1
+ } else {
+ max_def_level
+ };
+ if def_level == max_def_level {
+ // create the Float16 value
+ let value = f16::from_f32(rng.gen_range(min..max));
+ // Float16 in parquet is stored little-endian
+ let bytes = match column_desc.physical_type() {
+ Type::FIXED_LEN_BYTE_ARRAY => {
+ // Float16 annotates FIXED_LEN_BYTE_ARRAY(2)
+ assert_eq!(column_desc.type_length(), 2);
+ value.to_le_bytes().to_vec()
+ }
+ _ => unimplemented!(),
+ };
+ let value = T::T::from(bytes);
+ values.push(value);
+ }
+ def_levels.push(def_level);
+ }
+ let mut page_builder =
+ DataPageBuilderImpl::new(column_desc.clone(), values.len() as
u32, true);
+ page_builder.add_rep_levels(max_rep_level, &rep_levels);
+ page_builder.add_def_levels(max_def_level, &def_levels);
+ page_builder.add_values::<T>(encoding, &values);
+ column_chunk_pages.push(page_builder.consume());
+ }
+ pages.push(column_chunk_pages);
+ }
+ InMemoryPageIterator::new(pages)
+}
+
// support byte array for decimal
fn build_encoded_decimal_bytes_page_iterator<T>(
column_desc: ColumnDescPtr,
@@ -494,6 +555,19 @@ fn create_primitive_array_reader(
}
}
+fn create_f16_by_bytes_reader(
+ page_iterator: impl PageIterator + 'static,
+ column_desc: ColumnDescPtr,
+) -> Box<dyn ArrayReader> {
+ let physical_type = column_desc.physical_type();
+ match physical_type {
+ Type::FIXED_LEN_BYTE_ARRAY => {
+ make_fixed_len_byte_array_reader(Box::new(page_iterator),
column_desc, None).unwrap()
+ }
+ _ => unimplemented!(),
+ }
+}
+
fn create_decimal_by_bytes_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
@@ -616,6 +690,131 @@ fn bench_byte_decimal<T>(
});
}
+fn bench_byte_stream_split_f16<T>(
+ group: &mut BenchmarkGroup<WallTime>,
+ mandatory_column_desc: &ColumnDescPtr,
+ optional_column_desc: &ColumnDescPtr,
+ min: f32,
+ max: f32,
+) where
+ T: parquet::data_type::DataType,
+ T::T: From<Vec<u8>>,
+{
+ let mut count: usize = 0;
+
+ // byte_stream_split encoded, no NULLs
+ let data = build_encoded_f16_bytes_page_iterator::<T>(
+ mandatory_column_desc.clone(),
+ 0.0,
+ Encoding::BYTE_STREAM_SPLIT,
+ min,
+ max,
+ );
+ group.bench_function("byte_stream_split encoded, mandatory, no NULLs", |b|
{
+ b.iter(|| {
+ let array_reader =
+ create_f16_by_bytes_reader(data.clone(),
mandatory_column_desc.clone());
+ count = bench_array_reader(array_reader);
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+ });
+
+ let data = build_encoded_f16_bytes_page_iterator::<T>(
+ optional_column_desc.clone(),
+ 0.0,
+ Encoding::BYTE_STREAM_SPLIT,
+ min,
+ max,
+ );
+ group.bench_function("byte_stream_split encoded, optional, no NULLs", |b| {
+ b.iter(|| {
+ let array_reader =
+ create_f16_by_bytes_reader(data.clone(),
optional_column_desc.clone());
+ count = bench_array_reader(array_reader);
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+ });
+
+ let data = build_encoded_f16_bytes_page_iterator::<T>(
+ optional_column_desc.clone(),
+ 0.5,
+ Encoding::BYTE_STREAM_SPLIT,
+ min,
+ max,
+ );
+ group.bench_function("byte_stream_split encoded, optional, half NULLs",
|b| {
+ b.iter(|| {
+ let array_reader =
+ create_f16_by_bytes_reader(data.clone(),
optional_column_desc.clone());
+ count = bench_array_reader(array_reader);
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+ });
+}
+
+fn bench_byte_stream_split_decimal<T>(
+ group: &mut BenchmarkGroup<WallTime>,
+ mandatory_column_desc: &ColumnDescPtr,
+ optional_column_desc: &ColumnDescPtr,
+ min: i128,
+ max: i128,
+) where
+ T: parquet::data_type::DataType,
+ T::T: From<Vec<u8>>,
+{
+ let mut count: usize = 0;
+
+ // byte_stream_split encoded, no NULLs
+ let data = build_encoded_decimal_bytes_page_iterator::<T>(
+ mandatory_column_desc.clone(),
+ 0.0,
+ Encoding::BYTE_STREAM_SPLIT,
+ min,
+ max,
+ );
+ group.bench_function("byte_stream_split encoded, mandatory, no NULLs", |b|
{
+ b.iter(|| {
+ let array_reader =
+ create_decimal_by_bytes_reader(data.clone(),
mandatory_column_desc.clone());
+ count = bench_array_reader(array_reader);
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+ });
+
+ let data = build_encoded_decimal_bytes_page_iterator::<T>(
+ optional_column_desc.clone(),
+ 0.0,
+ Encoding::BYTE_STREAM_SPLIT,
+ min,
+ max,
+ );
+ group.bench_function("byte_stream_split encoded, optional, no NULLs", |b| {
+ b.iter(|| {
+ let array_reader =
+ create_decimal_by_bytes_reader(data.clone(),
optional_column_desc.clone());
+ count = bench_array_reader(array_reader);
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+ });
+
+ // half null
+ let data = build_encoded_decimal_bytes_page_iterator::<T>(
+ optional_column_desc.clone(),
+ 0.5,
+ Encoding::BYTE_STREAM_SPLIT,
+ min,
+ max,
+ );
+ group.bench_function("byte_stream_split encoded, optional, half NULLs",
|b| {
+ b.iter(|| {
+ let array_reader =
+ create_decimal_by_bytes_reader(data.clone(),
optional_column_desc.clone());
+ count = bench_array_reader(array_reader);
+ });
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+ });
+}
+
fn bench_primitive<T>(
group: &mut BenchmarkGroup<WallTime>,
mandatory_column_desc: &ColumnDescPtr,
@@ -797,6 +996,35 @@ fn bench_primitive<T>(
});
}
+fn byte_stream_split_benches(c: &mut Criterion) {
+ let schema = build_test_schema();
+
+ let mut group =
c.benchmark_group("arrow_array_reader/BYTE_STREAM_SPLIT/Decimal128Array");
+ let mandatory_decimal4_leaf_desc = schema.column(12);
+ let optional_decimal4_leaf_desc = schema.column(13);
+ bench_byte_stream_split_decimal::<FixedLenByteArrayType>(
+ &mut group,
+ &mandatory_decimal4_leaf_desc,
+ &optional_decimal4_leaf_desc,
+ // precision is 16: the max is 9999999999999999
+ 9999999999999000,
+ 9999999999999999,
+ );
+ group.finish();
+
+ let mut group =
c.benchmark_group("arrow_array_reader/BYTE_STREAM_SPLIT/Float16Array");
+ let mandatory_f16_leaf_desc = schema.column(17);
+ let optional_f16_leaf_desc = schema.column(18);
+ bench_byte_stream_split_f16::<FixedLenByteArrayType>(
+ &mut group,
+ &mandatory_f16_leaf_desc,
+ &optional_f16_leaf_desc,
+ -1.0,
+ 1.0,
+ );
+ group.finish();
+}
+
fn decimal_benches(c: &mut Criterion) {
let schema = build_test_schema();
// parquet int32, logical type decimal(8,2)
@@ -1334,5 +1562,10 @@ fn add_benches(c: &mut Criterion) {
});
}
-criterion_group!(benches, add_benches, decimal_benches,);
+criterion_group!(
+ benches,
+ add_benches,
+ decimal_benches,
+ byte_stream_split_benches,
+);
criterion_main!(benches);
diff --git a/parquet/benches/encoding.rs b/parquet/benches/encoding.rs
index 80befe8da..bc18a49da 100644
--- a/parquet/benches/encoding.rs
+++ b/parquet/benches/encoding.rs
@@ -16,15 +16,23 @@
// under the License.
use criterion::*;
+use half::f16;
use parquet::basic::Encoding;
-use parquet::data_type::{DataType, DoubleType, FloatType};
+use parquet::data_type::{
+ DataType, DoubleType, FixedLenByteArray, FixedLenByteArrayType, FloatType,
+};
use parquet::decoding::{get_decoder, Decoder};
use parquet::encoding::get_encoder;
use parquet::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath,
Type};
use rand::prelude::*;
use std::sync::Arc;
-fn bench_typed<T: DataType>(c: &mut Criterion, values: &[T::T], encoding:
Encoding) {
+fn bench_typed<T: DataType>(
+ c: &mut Criterion,
+ values: &[T::T],
+ encoding: Encoding,
+ type_length: i32,
+) {
let name = format!(
"dtype={}, encoding={:?}",
std::any::type_name::<T::T>(),
@@ -33,6 +41,7 @@ fn bench_typed<T: DataType>(c: &mut Criterion, values:
&[T::T], encoding: Encodi
let column_desc_ptr = ColumnDescPtr::new(ColumnDescriptor::new(
Arc::new(
Type::primitive_type_builder("", T::get_physical_type())
+ .with_length(type_length)
.build()
.unwrap(),
),
@@ -68,15 +77,25 @@ fn criterion_benchmark(c: &mut Criterion) {
let mut rng = StdRng::seed_from_u64(0);
let n = 16 * 1024;
+ let mut f16s = Vec::new();
let mut f32s = Vec::new();
let mut f64s = Vec::new();
+ let mut d128s = Vec::new();
for _ in 0..n {
+ f16s.push(FixedLenByteArray::from(
+ f16::from_f32(rng.gen::<f32>()).to_le_bytes().to_vec(),
+ ));
f32s.push(rng.gen::<f32>());
f64s.push(rng.gen::<f64>());
+ d128s.push(FixedLenByteArray::from(
+ rng.gen::<i128>().to_be_bytes().to_vec(),
+ ));
}
- bench_typed::<FloatType>(c, &f32s, Encoding::BYTE_STREAM_SPLIT);
- bench_typed::<DoubleType>(c, &f64s, Encoding::BYTE_STREAM_SPLIT);
+ bench_typed::<FloatType>(c, &f32s, Encoding::BYTE_STREAM_SPLIT, 0);
+ bench_typed::<DoubleType>(c, &f64s, Encoding::BYTE_STREAM_SPLIT, 0);
+ bench_typed::<FixedLenByteArrayType>(c, &f16s,
Encoding::BYTE_STREAM_SPLIT, 2);
+ bench_typed::<FixedLenByteArrayType>(c, &d128s,
Encoding::BYTE_STREAM_SPLIT, 16);
}
criterion_group!(benches, criterion_benchmark);
diff --git a/parquet/src/util/test_common/page_util.rs
b/parquet/src/util/test_common/page_util.rs
index 3db43aef0..a1709efa9 100644
--- a/parquet/src/util/test_common/page_util.rs
+++ b/parquet/src/util/test_common/page_util.rs
@@ -51,13 +51,14 @@ pub struct DataPageBuilderImpl {
rep_levels_byte_len: u32,
def_levels_byte_len: u32,
datapage_v2: bool,
+ type_width: i32,
}
impl DataPageBuilderImpl {
// `num_values` is the number of non-null values to put in the data page.
// `datapage_v2` flag is used to indicate if the generated data page
should use V2
// format or not.
- pub fn new(_desc: ColumnDescPtr, num_values: u32, datapage_v2: bool) ->
Self {
+ pub fn new(desc: ColumnDescPtr, num_values: u32, datapage_v2: bool) ->
Self {
DataPageBuilderImpl {
encoding: None,
num_values,
@@ -65,6 +66,7 @@ impl DataPageBuilderImpl {
rep_levels_byte_len: 0,
def_levels_byte_len: 0,
datapage_v2,
+ type_width: desc.type_length(),
}
}
@@ -111,7 +113,7 @@ impl DataPageBuilder for DataPageBuilderImpl {
// Create test column descriptor.
let desc = {
let ty = SchemaType::primitive_type_builder("t",
T::get_physical_type())
- .with_length(0)
+ .with_length(self.type_width)
.build()
.unwrap();
Arc::new(ColumnDescriptor::new(