This is an automated email from the ASF dual-hosted git repository.
etseidl pushed a commit to branch gh5854_thrift_remodel
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/gh5854_thrift_remodel by this
push:
new c327d7f44a [thrift-remodel] Rework thrift reader API (#8341)
c327d7f44a is described below
commit c327d7f44ab53ca879ee30783abcdd1da5d072bd
Author: Ed Seidl <[email protected]>
AuthorDate: Wed Sep 17 12:12:31 2025 -0700
[thrift-remodel] Rework thrift reader API (#8341)
# Which issue does this PR close?
**Note: this targets a feature branch, not main**
- Part of #5854.
# Rationale for this change
As I started on decoding thrift page headers, I found that the way I had
been going was no longer going to work. This PR begins the process of
abstracting the thrift reader to allow for other implementations.
# What changes are included in this PR?
In addition to reworking the reader itself, this PR moves away from the
previous `TryFrom` approach and instead adds a `ReadThrift` trait.
# Are these changes tested?
Should be covered by existing tests
# Are there any user-facing changes?
Yes
---
parquet/src/basic.rs | 52 ++--
parquet/src/file/column_crypto_metadata.rs | 4 +-
parquet/src/file/metadata/mod.rs | 4 +-
parquet/src/file/metadata/reader.rs | 6 +-
parquet/src/file/metadata/thrift_gen.rs | 24 +-
parquet/src/file/page_encoding_stats.rs | 6 +-
parquet/src/file/page_index/index_reader.rs | 14 +-
parquet/src/file/page_index/offset_index.rs | 12 +-
parquet/src/parquet_macros.rs | 91 +++---
parquet/src/parquet_thrift.rs | 414 ++++++++++++++--------------
10 files changed, 311 insertions(+), 316 deletions(-)
diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs
index 5fffb56cdf..44fe66aff7 100644
--- a/parquet/src/basic.rs
+++ b/parquet/src/basic.rs
@@ -26,8 +26,8 @@ use std::{fmt, str};
pub use crate::compression::{BrotliLevel, GzipLevel, ZstdLevel};
use crate::parquet_thrift::{
- ElementType, FieldType, ThriftCompactInputProtocol,
ThriftCompactOutputProtocol, WriteThrift,
- WriteThriftField,
+ ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
ThriftCompactOutputProtocol,
+ WriteThrift, WriteThriftField,
};
use crate::{thrift_enum, thrift_struct, thrift_union_all_empty};
@@ -165,9 +165,8 @@ pub enum ConvertedType {
INTERVAL,
}
-impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for ConvertedType {
- type Error = ParquetError;
- fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> {
+impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for
ConvertedType {
+ fn read_thrift(prot: &mut R) -> Result<Self> {
let val = prot.read_i32()?;
Ok(match val {
0 => Self::UTF8,
@@ -361,12 +360,9 @@ pub enum LogicalType {
},
}
-impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for LogicalType {
- type Error = ParquetError;
- fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> {
- prot.read_struct_begin()?;
-
- let field_ident = prot.read_field_begin()?;
+impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for LogicalType {
+ fn read_thrift(prot: &mut R) -> Result<Self> {
+ let field_ident = prot.read_field_begin(0)?;
if field_ident.field_type == FieldType::Stop {
return Err(general_err!("received empty union from remote
LogicalType"));
}
@@ -388,7 +384,7 @@ impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for
LogicalType {
Self::Enum
}
5 => {
- let val = DecimalType::try_from(&mut *prot)?;
+ let val = DecimalType::read_thrift(&mut *prot)?;
Self::Decimal {
scale: val.scale,
precision: val.precision,
@@ -399,21 +395,21 @@ impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for
LogicalType {
Self::Date
}
7 => {
- let val = TimeType::try_from(&mut *prot)?;
+ let val = TimeType::read_thrift(&mut *prot)?;
Self::Time {
is_adjusted_to_u_t_c: val.is_adjusted_to_u_t_c,
unit: val.unit,
}
}
8 => {
- let val = TimestampType::try_from(&mut *prot)?;
+ let val = TimestampType::read_thrift(&mut *prot)?;
Self::Timestamp {
is_adjusted_to_u_t_c: val.is_adjusted_to_u_t_c,
unit: val.unit,
}
}
10 => {
- let val = IntType::try_from(&mut *prot)?;
+ let val = IntType::read_thrift(&mut *prot)?;
Self::Integer {
is_signed: val.is_signed,
bit_width: val.bit_width,
@@ -440,19 +436,19 @@ impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for
LogicalType {
Self::Float16
}
16 => {
- let val = VariantType::try_from(&mut *prot)?;
+ let val = VariantType::read_thrift(&mut *prot)?;
Self::Variant {
specification_version: val.specification_version,
}
}
17 => {
- let val = GeometryType::try_from(&mut *prot)?;
+ let val = GeometryType::read_thrift(&mut *prot)?;
Self::Geometry {
crs: val.crs.map(|s| s.to_owned()),
}
}
18 => {
- let val = GeographyType::try_from(&mut *prot)?;
+ let val = GeographyType::read_thrift(&mut *prot)?;
Self::Geography {
crs: val.crs.map(|s| s.to_owned()),
algorithm: val.algorithm,
@@ -465,13 +461,12 @@ impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for
LogicalType {
}
}
};
- let field_ident = prot.read_field_begin()?;
+ let field_ident = prot.read_field_begin(field_ident.id)?;
if field_ident.field_type != FieldType::Stop {
return Err(general_err!(
"Received multiple fields for union from remote LogicalType"
));
}
- prot.read_struct_end()?;
Ok(ret)
}
}
@@ -756,9 +751,8 @@ pub enum Compression {
LZ4_RAW,
}
-impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for Compression {
- type Error = ParquetError;
- fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> {
+impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for Compression {
+ fn read_thrift(prot: &mut R) -> Result<Self> {
let val = prot.read_i32()?;
Ok(match val {
0 => Self::UNCOMPRESSED,
@@ -1124,12 +1118,9 @@ impl ColumnOrder {
}
}
-impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for ColumnOrder {
- type Error = ParquetError;
-
- fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> {
- prot.read_struct_begin()?;
- let field_ident = prot.read_field_begin()?;
+impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for ColumnOrder {
+ fn read_thrift(prot: &mut R) -> Result<Self> {
+ let field_ident = prot.read_field_begin(0)?;
if field_ident.field_type == FieldType::Stop {
return Err(general_err!("Received empty union from remote
ColumnOrder"));
}
@@ -1144,13 +1135,12 @@ impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>>
for ColumnOrder {
Self::UNKNOWN
}
};
- let field_ident = prot.read_field_begin()?;
+ let field_ident = prot.read_field_begin(field_ident.id)?;
if field_ident.field_type != FieldType::Stop {
return Err(general_err!(
"Received multiple fields for union from remote ColumnOrder"
));
}
- prot.read_struct_end()?;
Ok(ret)
}
}
diff --git a/parquet/src/file/column_crypto_metadata.rs
b/parquet/src/file/column_crypto_metadata.rs
index 5bba073579..6a538bd42b 100644
--- a/parquet/src/file/column_crypto_metadata.rs
+++ b/parquet/src/file/column_crypto_metadata.rs
@@ -26,8 +26,8 @@ use crate::format::{
EncryptionWithFooterKey as TEncryptionWithFooterKey,
};
use crate::parquet_thrift::{
- ElementType, FieldType, ThriftCompactInputProtocol,
ThriftCompactOutputProtocol, WriteThrift,
- WriteThriftField,
+ read_thrift_vec, ElementType, FieldType, ReadThrift,
ThriftCompactInputProtocol,
+ ThriftCompactOutputProtocol, WriteThrift, WriteThriftField,
};
use crate::{thrift_struct, thrift_union};
diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs
index d23d46a33b..95e9a48b46 100644
--- a/parquet/src/file/metadata/mod.rs
+++ b/parquet/src/file/metadata/mod.rs
@@ -121,8 +121,8 @@ use crate::{
use crate::{
basic::{ColumnOrder, Compression, Encoding, Type},
parquet_thrift::{
- ElementType, FieldType, ThriftCompactInputProtocol,
ThriftCompactOutputProtocol,
- WriteThrift, WriteThriftField,
+ ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
+ ThriftCompactOutputProtocol, WriteThrift, WriteThriftField,
},
};
use crate::{
diff --git a/parquet/src/file/metadata/reader.rs
b/parquet/src/file/metadata/reader.rs
index 46022b459d..73c6a8ee40 100644
--- a/parquet/src/file/metadata/reader.rs
+++ b/parquet/src/file/metadata/reader.rs
@@ -19,7 +19,7 @@ use std::{io::Read, ops::Range};
#[cfg(feature = "encryption")]
use crate::encryption::decrypt::{CryptoContext, FileDecryptionProperties};
-use crate::parquet_thrift::ThriftCompactInputProtocol;
+use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
use bytes::Bytes;
use crate::errors::{ParquetError, Result};
@@ -962,8 +962,8 @@ impl ParquetMetaDataReader {
///
/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
- let mut prot = ThriftCompactInputProtocol::new(buf);
- ParquetMetaData::try_from(&mut prot)
+ let mut prot = ThriftSliceInputProtocol::new(buf);
+ ParquetMetaData::read_thrift(&mut prot)
}
}
diff --git a/parquet/src/file/metadata/thrift_gen.rs
b/parquet/src/file/metadata/thrift_gen.rs
index 06229fb181..b656bacc8c 100644
--- a/parquet/src/file/metadata/thrift_gen.rs
+++ b/parquet/src/file/metadata/thrift_gen.rs
@@ -35,8 +35,8 @@ use crate::{
statistics::ValueStatistics,
},
parquet_thrift::{
- ElementType, FieldType, ThriftCompactInputProtocol,
ThriftCompactOutputProtocol,
- WriteThrift, WriteThriftField,
+ read_thrift_vec, ElementType, FieldType, ReadThrift,
ThriftCompactInputProtocol,
+ ThriftCompactOutputProtocol, WriteThrift, WriteThriftField,
},
schema::types::{parquet_schema_from_array, ColumnDescriptor,
SchemaDescriptor},
thrift_struct, thrift_union,
@@ -46,6 +46,7 @@ use crate::{
use crate::{
encryption::decrypt::{FileDecryptionProperties, FileDecryptor},
file::column_crypto_metadata::ColumnCryptoMetaData,
+ parquet_thrift::ThriftSliceInputProtocol,
schema::types::SchemaDescPtr,
};
@@ -669,8 +670,8 @@ fn row_group_from_encrypted_thrift(
)
})?;
- let mut prot =
ThriftCompactInputProtocol::new(decrypted_cc_buf.as_slice());
- let col_meta = ColumnMetaData::try_from(&mut prot)?;
+ let mut prot =
ThriftSliceInputProtocol::new(decrypted_cc_buf.as_slice());
+ let col_meta = ColumnMetaData::read_thrift(&mut prot)?;
c.meta_data = Some(col_meta);
columns.push(convert_column(c, d.clone())?);
} else {
@@ -699,14 +700,14 @@ pub(crate) fn parquet_metadata_with_encryption(
encrypted_footer: bool,
buf: &[u8],
) -> Result<ParquetMetaData> {
- let mut prot = ThriftCompactInputProtocol::new(buf);
+ let mut prot = ThriftSliceInputProtocol::new(buf);
let mut file_decryptor = None;
let decrypted_fmd_buf;
if encrypted_footer {
if let Some(file_decryption_properties) = file_decryption_properties {
let t_file_crypto_metadata: FileCryptoMetaData =
- FileCryptoMetaData::try_from(&mut prot)
+ FileCryptoMetaData::read_thrift(&mut prot)
.map_err(|e| general_err!("Could not parse crypto
metadata: {}", e))?;
let supply_aad_prefix = match
&t_file_crypto_metadata.encryption_algorithm {
EncryptionAlgorithm::AES_GCM_V1(algo) =>
algo.supply_aad_prefix,
@@ -734,7 +735,7 @@ pub(crate) fn parquet_metadata_with_encryption(
"Provided footer key and AAD were unable to decrypt
parquet footer"
)
})?;
- prot = ThriftCompactInputProtocol::new(decrypted_fmd_buf.as_ref());
+ prot = ThriftSliceInputProtocol::new(decrypted_fmd_buf.as_ref());
file_decryptor = Some(decryptor);
} else {
@@ -744,7 +745,7 @@ pub(crate) fn parquet_metadata_with_encryption(
}
}
- let file_meta = super::thrift_gen::FileMetaData::try_from(&mut prot)
+ let file_meta = super::thrift_gen::FileMetaData::read_thrift(&mut prot)
.map_err(|e| general_err!("Could not parse metadata: {}", e))?;
let version = file_meta.version;
@@ -852,10 +853,9 @@ pub(super) fn get_file_decryptor(
/// Create ParquetMetaData from thrift input. Note that this only decodes the
file metadata in
/// the Parquet footer. Page indexes will need to be added later.
-impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for ParquetMetaData {
- type Error = ParquetError;
- fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> {
- let file_meta = super::thrift_gen::FileMetaData::try_from(prot)?;
+impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for
ParquetMetaData {
+ fn read_thrift(prot: &mut R) -> Result<Self> {
+ let file_meta = super::thrift_gen::FileMetaData::read_thrift(prot)?;
let version = file_meta.version;
let num_rows = file_meta.num_rows;
diff --git a/parquet/src/file/page_encoding_stats.rs
b/parquet/src/file/page_encoding_stats.rs
index 2d433dc9b3..934e177de0 100644
--- a/parquet/src/file/page_encoding_stats.rs
+++ b/parquet/src/file/page_encoding_stats.rs
@@ -20,10 +20,10 @@
use std::io::Write;
use crate::basic::{Encoding, PageType};
-use crate::errors::{ParquetError, Result};
+use crate::errors::Result;
use crate::parquet_thrift::{
- ElementType, FieldType, ThriftCompactInputProtocol,
ThriftCompactOutputProtocol, WriteThrift,
- WriteThriftField,
+ ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
ThriftCompactOutputProtocol,
+ WriteThrift, WriteThriftField,
};
use crate::thrift_struct;
diff --git a/parquet/src/file/page_index/index_reader.rs
b/parquet/src/file/page_index/index_reader.rs
index e9cf119224..3db597954e 100644
--- a/parquet/src/file/page_index/index_reader.rs
+++ b/parquet/src/file/page_index/index_reader.rs
@@ -27,8 +27,8 @@ use crate::file::page_index::column_index::{
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::reader::ChunkReader;
use crate::parquet_thrift::{
- ElementType, FieldType, ThriftCompactInputProtocol,
ThriftCompactOutputProtocol, WriteThrift,
- WriteThriftField,
+ read_thrift_vec, ElementType, FieldType, ReadThrift,
ThriftCompactInputProtocol,
+ ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift,
WriteThriftField,
};
use crate::thrift_struct;
use std::io::Write;
@@ -136,15 +136,15 @@ pub fn read_offset_indexes<R: ChunkReader>(
}
pub(crate) fn decode_offset_index(data: &[u8]) -> Result<OffsetIndexMetaData,
ParquetError> {
- let mut prot = ThriftCompactInputProtocol::new(data);
+ let mut prot = ThriftSliceInputProtocol::new(data);
// Try to read fast-path first. If that fails, fall back to slower but
more robust
// decoder.
match OffsetIndexMetaData::try_from_fast(&mut prot) {
Ok(offset_index) => Ok(offset_index),
Err(_) => {
- prot = ThriftCompactInputProtocol::new(data);
- OffsetIndexMetaData::try_from(&mut prot)
+ prot = ThriftSliceInputProtocol::new(data);
+ OffsetIndexMetaData::read_thrift(&mut prot)
}
}
}
@@ -166,8 +166,8 @@ pub(crate) fn decode_column_index(
data: &[u8],
column_type: Type,
) -> Result<ColumnIndexMetaData, ParquetError> {
- let mut prot = ThriftCompactInputProtocol::new(data);
- let index = ThriftColumnIndex::try_from(&mut prot)?;
+ let mut prot = ThriftSliceInputProtocol::new(data);
+ let index = ThriftColumnIndex::read_thrift(&mut prot)?;
let index = match column_type {
Type::BOOLEAN => {
diff --git a/parquet/src/file/page_index/offset_index.rs
b/parquet/src/file/page_index/offset_index.rs
index ac2620af09..2153b8ed30 100644
--- a/parquet/src/file/page_index/offset_index.rs
+++ b/parquet/src/file/page_index/offset_index.rs
@@ -22,8 +22,8 @@
use std::io::Write;
use crate::parquet_thrift::{
- ElementType, FieldType, ThriftCompactInputProtocol,
ThriftCompactOutputProtocol, WriteThrift,
- WriteThriftField,
+ read_thrift_vec, ElementType, FieldType, ReadThrift,
ThriftCompactInputProtocol,
+ ThriftCompactOutputProtocol, WriteThrift, WriteThriftField,
};
use crate::{
errors::{ParquetError, Result},
@@ -113,7 +113,9 @@ impl OffsetIndexMetaData {
// Fast-path read of offset index. This works because we expect all field
deltas to be 1,
// and there's no nesting beyond PageLocation, so no need to save the last
field id. Like
// read_page_locations(), this will fail if absolute field id's are used.
- pub(super) fn try_from_fast<'a>(prot: &mut ThriftCompactInputProtocol<'a>)
-> Result<Self> {
+ pub(super) fn try_from_fast<'a, R: ThriftCompactInputProtocol<'a>>(
+ prot: &mut R,
+ ) -> Result<Self> {
// Offset index is a struct with 2 fields. First field is an array of
PageLocations,
// the second an optional array of i64.
@@ -140,7 +142,7 @@ impl OffsetIndexMetaData {
"encountered unknown field while reading OffsetIndex"
));
}
- let vec = Vec::<i64>::try_from(&mut *prot)?;
+ let vec = read_thrift_vec::<i64, R>(&mut *prot)?;
unencoded_byte_array_data_bytes = Some(vec);
// this one should be Stop
@@ -164,7 +166,7 @@ impl OffsetIndexMetaData {
// Note: this will fail if the fields are either out of order, or if a
suboptimal
// encoder doesn't use field deltas.
-fn read_page_location<'a>(prot: &mut ThriftCompactInputProtocol<'a>) ->
Result<PageLocation> {
+fn read_page_location<'a, R: ThriftCompactInputProtocol<'a>>(prot: &mut R) ->
Result<PageLocation> {
// there are 3 fields, all mandatory, so all field deltas should be 1
let (field_type, delta) = prot.read_field_header()?;
if delta != 1 || field_type != FieldType::I64 as u8 {
diff --git a/parquet/src/parquet_macros.rs b/parquet/src/parquet_macros.rs
index 939f3cb339..889e5fafef 100644
--- a/parquet/src/parquet_macros.rs
+++ b/parquet/src/parquet_macros.rs
@@ -37,10 +37,9 @@ macro_rules! thrift_enum {
$($(#[cfg_attr(not(doctest), $($field_attrs)*)])* $field_name =
$field_value,)*
}
- impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for $identifier {
- type Error = ParquetError;
+ impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for
$identifier {
#[allow(deprecated)]
- fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) ->
Result<Self> {
+ fn read_thrift(prot: &mut R) -> Result<Self> {
let val = prot.read_i32()?;
match val {
$($field_value => Ok(Self::$field_name),)*
@@ -109,12 +108,9 @@ macro_rules! thrift_union_all_empty {
$($(#[cfg_attr(not(doctest), $($field_attrs)*)])* $field_name),*
}
- impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for $identifier {
- type Error = ParquetError;
-
- fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) ->
Result<Self> {
- prot.read_struct_begin()?;
- let field_ident = prot.read_field_begin()?;
+ impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for
$identifier {
+ fn read_thrift(prot: &mut R) -> Result<Self> {
+ let field_ident = prot.read_field_begin(0)?;
if field_ident.field_type == FieldType::Stop {
return Err(general_err!("Received empty union from remote
{}", stringify!($identifier)));
}
@@ -128,13 +124,12 @@ macro_rules! thrift_union_all_empty {
return Err(general_err!("Unexpected {} {}",
stringify!($identifier), field_ident.id));
}
};
- let field_ident = prot.read_field_begin()?;
+ let field_ident = prot.read_field_begin(field_ident.id)?;
if field_ident.field_type != FieldType::Stop {
return Err(general_err!(
"Received multiple fields for union from remote {}",
stringify!($identifier)
));
}
- prot.read_struct_end()?;
Ok(ret)
}
}
@@ -195,12 +190,9 @@ macro_rules! thrift_union {
$($(#[cfg_attr(not(doctest), $($field_attrs)*)])* $field_name $( (
$crate::__thrift_union_type!{$field_type $($field_lt)? $($element_type)?} )
)?),*
}
- impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for $identifier
$(<$lt>)? {
- type Error = ParquetError;
-
- fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) ->
Result<Self> {
- prot.read_struct_begin()?;
- let field_ident = prot.read_field_begin()?;
+ impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for
$identifier $(<$lt>)? {
+ fn read_thrift(prot: &mut R) -> Result<Self> {
+ let field_ident = prot.read_field_begin(0)?;
if field_ident.field_type == FieldType::Stop {
return Err(general_err!("Received empty union from remote
{}", stringify!($identifier)));
}
@@ -213,13 +205,12 @@ macro_rules! thrift_union {
return Err(general_err!("Unexpected {} {}",
stringify!($identifier), field_ident.id));
}
};
- let field_ident = prot.read_field_begin()?;
+ let field_ident = prot.read_field_begin(field_ident.id)?;
if field_ident.field_type != FieldType::Stop {
return Err(general_err!(
concat!("Received multiple fields for union from
remote {}", stringify!($identifier))
));
}
- prot.read_struct_end()?;
Ok(ret)
}
}
@@ -283,27 +274,26 @@ macro_rules! thrift_struct {
$($(#[cfg_attr(not(doctest), $($field_attrs)*)])* $vis
$field_name: $crate::__thrift_required_or_optional!($required_or_optional
$crate::__thrift_field_type!($field_type $($field_lt)? $($element_type)?))),*
}
- impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for $identifier
$(<$lt>)? {
- type Error = ParquetError;
- fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) ->
Result<Self> {
+ impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for
$identifier $(<$lt>)? {
+ fn read_thrift(prot: &mut R) -> Result<Self> {
$(let mut $field_name:
Option<$crate::__thrift_field_type!($field_type $($field_lt)?
$($element_type)?)> = None;)*
- prot.read_struct_begin()?;
+ let mut last_field_id = 0i16;
loop {
- let field_ident = prot.read_field_begin()?;
+ let field_ident = prot.read_field_begin(last_field_id)?;
if field_ident.field_type == FieldType::Stop {
break;
}
match field_ident.id {
$($field_id => {
- let val = $crate::__thrift_read_field!(prot,
$field_type $($field_lt)? $($element_type)?);
+ let val = $crate::__thrift_read_field!(prot,
field_ident, $field_type $($field_lt)? $($element_type)?);
$field_name = Some(val);
})*
_ => {
prot.skip(field_ident.field_type)?;
}
};
+ last_field_id = field_ident.id;
}
- prot.read_struct_end()?;
$($crate::__thrift_result_required_or_optional!($required_or_optional
$field_name);)*
Ok(Self {
$($field_name),*
@@ -417,39 +407,42 @@ macro_rules! __thrift_result_required_or_optional {
#[doc(hidden)]
#[macro_export]
macro_rules! __thrift_read_field {
- ($prot:tt, list $lt:lifetime binary) => {
- Vec::<&'a [u8]>::try_from(&mut *$prot)?
+ ($prot:tt, $field_ident:tt, list $lt:lifetime binary) => {
+ read_thrift_vec::<&'a [u8], R>(&mut *$prot)?
};
- ($prot:tt, list $lt:lifetime $element_type:ident) => {
- Vec::<$element_type>::try_from(&mut *$prot)?
+ ($prot:tt, $field_ident:tt, list $lt:lifetime $element_type:ident) => {
+ read_thrift_vec::<$element_type, R>(&mut *$prot)?
};
- ($prot:tt, list string) => {
- Vec::<String>::try_from(&mut *$prot)?
+ ($prot:tt, $field_ident:tt, list string) => {
+ read_thrift_vec::<String, R>(&mut *$prot)?
};
- ($prot:tt, list $element_type:ident) => {
- Vec::<$element_type>::try_from(&mut *$prot)?
+ ($prot:tt, $field_ident:tt, list $element_type:ident) => {
+ read_thrift_vec::<$element_type, R>(&mut *$prot)?
};
- ($prot:tt, string $lt:lifetime) => {
- <&$lt str>::try_from(&mut *$prot)?
+ ($prot:tt, $field_ident:tt, string $lt:lifetime) => {
+ <&$lt str>::read_thrift(&mut *$prot)?
};
- ($prot:tt, binary $lt:lifetime) => {
- <&$lt [u8]>::try_from(&mut *$prot)?
+ ($prot:tt, $field_ident:tt, binary $lt:lifetime) => {
+ <&$lt [u8]>::read_thrift(&mut *$prot)?
};
- ($prot:tt, $field_type:ident $lt:lifetime) => {
- $field_type::try_from(&mut *$prot)?
+ ($prot:tt, $field_ident:tt, $field_type:ident $lt:lifetime) => {
+ $field_type::read_thrift(&mut *$prot)?
};
- ($prot:tt, string) => {
- String::try_from(&mut *$prot)?
+ ($prot:tt, $field_ident:tt, string) => {
+ String::read_thrift(&mut *$prot)?
};
- ($prot:tt, binary) => {
+ ($prot:tt, $field_ident:tt, binary) => {
// this one needs to not conflict with `list<i8>`
$prot.read_bytes()?.to_vec()
};
- ($prot:tt, double) => {
- $crate::parquet_thrift::OrderedF64::try_from(&mut *$prot)?
+ ($prot:tt, $field_ident:tt, double) => {
+ $crate::parquet_thrift::OrderedF64::read_thrift(&mut *$prot)?
+ };
+ ($prot:tt, $field_ident:tt, bool) => {
+ $field_ident.bool_val.unwrap()
};
- ($prot:tt, $field_type:ident) => {
- $field_type::try_from(&mut *$prot)?
+ ($prot:tt, $field_ident:tt, $field_type:ident) => {
+ $field_type::read_thrift(&mut *$prot)?
};
}
@@ -482,10 +475,10 @@ macro_rules! __thrift_union_type {
#[macro_export]
macro_rules! __thrift_read_variant {
($prot:tt, $field_name:ident $field_type:ident) => {
- Self::$field_name($field_type::try_from(&mut *$prot)?)
+ Self::$field_name($field_type::read_thrift(&mut *$prot)?)
};
($prot:tt, $field_name:ident list $field_type:ident) => {
- Self::$field_name(Vec::<$field_type>::try_from(&mut *$prot)?)
+ Self::$field_name(Vec::<$field_type>::read_thrift(&mut *$prot)?)
};
($prot:tt, $field_name:ident) => {{
$prot.skip_empty_struct()?;
diff --git a/parquet/src/parquet_thrift.rs b/parquet/src/parquet_thrift.rs
index 9b83c0a01b..17847d0b71 100644
--- a/parquet/src/parquet_thrift.rs
+++ b/parquet/src/parquet_thrift.rs
@@ -24,10 +24,9 @@ use std::{cmp::Ordering, io::Write};
use crate::errors::{ParquetError, Result};
-// Couldn't implement thrift structs with f64 do to lack of Eq
-// for f64. This is a hacky workaround for now...there are other
-// wrappers out there that should probably be used instead.
-// thrift seems to re-export an impl from ordered-float
+/// Wrapper for thrift `double` fields. This is used to provide
+/// an implementation of `Eq` for floats. This implementation
+/// uses IEEE 754 total order.
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct OrderedF64(f64);
@@ -156,53 +155,52 @@ impl TryFrom<u8> for ElementType {
}
}
+/// Struct used to describe a [thrift struct] field during decoding.
+///
+/// [thrift struct]:
https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md#struct-encoding
pub(crate) struct FieldIdentifier {
+ /// The type for the field.
pub(crate) field_type: FieldType,
+ /// The field's `id`. May be computed from delta or directly decoded.
pub(crate) id: i16,
+ /// Stores the value for booleans.
+ ///
+ /// Boolean fields store no data, instead the field type is either boolean
true, or
+ /// boolean false.
+ pub(crate) bool_val: Option<bool>,
}
+/// Struct used to describe a [thrift list].
+///
+/// [thrift list]:
https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md#list-and-set
#[derive(Clone, Debug, Eq, PartialEq)]
pub(crate) struct ListIdentifier {
+ /// The type for each element in the list.
pub(crate) element_type: ElementType,
+ /// Number of elements contained in the list.
pub(crate) size: i32,
}
-/// A more performant implementation of [`TCompactInputProtocol`] that reads a
slice
+/// Low-level object used to deserialize structs encoded with the Thrift
[compact] protocol.
///
-/// [`TCompactInputProtocol`]: thrift::protocol::TCompactInputProtocol
-pub(crate) struct ThriftCompactInputProtocol<'a> {
- buf: &'a [u8],
- // Identifier of the last field deserialized for a struct.
- last_read_field_id: i16,
- // Stack of the last read field ids (a new entry is added each time a
nested struct is read).
- read_field_id_stack: Vec<i16>,
- // Boolean value for a field.
- // Saved because boolean fields and their value are encoded in a single
byte,
- // and reading the field only occurs after the field id is read.
- pending_read_bool_value: Option<bool>,
-}
-
-impl<'b, 'a: 'b> ThriftCompactInputProtocol<'a> {
- pub fn new(buf: &'a [u8]) -> Self {
- Self {
- buf,
- last_read_field_id: 0,
- read_field_id_stack: Vec::with_capacity(16),
- pending_read_bool_value: None,
- }
- }
+/// Implementation of this trait must provide the low-level functions
`read_byte`, `read_bytes`,
+/// `skip_bytes`, and `read_double`. These primitives are used by the default
functions provided
+/// here to perform deserialization.
+///
+/// [compact]:
https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md
+pub(crate) trait ThriftCompactInputProtocol<'a> {
+ /// Read a single byte from the input.
+ fn read_byte(&mut self) -> Result<u8>;
- pub fn reset_buffer(&mut self, buf: &'a [u8]) {
- self.buf = buf;
- self.last_read_field_id = 0;
- self.read_field_id_stack.clear();
- self.pending_read_bool_value = None;
- }
+ /// Read a Thrift encoded [binary] from the input.
+ ///
+ /// [binary]:
https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md#binary-encoding
+ fn read_bytes(&mut self) -> Result<&'a [u8]>;
- pub fn as_slice(&self) -> &'a [u8] {
- self.buf
- }
+ /// Skip the next `n` bytes of input.
+ fn skip_bytes(&mut self, n: usize) -> Result<()>;
+ /// Read a ULEB128 encoded unsigned varint from the input.
fn read_vlq(&mut self) -> Result<u64> {
let mut in_progress = 0;
let mut shift = 0;
@@ -216,12 +214,14 @@ impl<'b, 'a: 'b> ThriftCompactInputProtocol<'a> {
}
}
+ /// Read a zig-zag encoded signed varint from the input.
fn read_zig_zag(&mut self) -> Result<i64> {
let val = self.read_vlq()?;
Ok((val >> 1) as i64 ^ -((val & 1) as i64))
}
- fn read_list_set_begin(&mut self) -> Result<(ElementType, i32)> {
+ /// Read the [`ListIdentifier`] for a Thrift encoded list.
+ fn read_list_begin(&mut self) -> Result<ListIdentifier> {
let header = self.read_byte()?;
let element_type = ElementType::try_from(header & 0x0f)?;
@@ -233,162 +233,118 @@ impl<'b, 'a: 'b> ThriftCompactInputProtocol<'a> {
self.read_vlq()? as _
};
- Ok((element_type, element_count))
- }
-
- pub(crate) fn read_struct_begin(&mut self) -> Result<()> {
- self.read_field_id_stack.push(self.last_read_field_id);
- self.last_read_field_id = 0;
- Ok(())
- }
-
- pub(crate) fn read_struct_end(&mut self) -> Result<()> {
- self.last_read_field_id = self
- .read_field_id_stack
- .pop()
- .expect("should have previous field ids");
- Ok(())
- }
-
- // This is a specialized version of read_field_begin, solely for use in
parsing
- // PageLocation structs in the offset index. This function assumes that
the delta
- // field will always be less than 0xf, fields will be in order, and no
boolean fields
- // will be read. This also skips validation of the field type.
- //
- // Returns a tuple of (field_type, field_delta)
- pub(crate) fn read_field_header(&mut self) -> Result<(u8, u8)> {
- let field_type = self.read_byte()?;
- let field_delta = (field_type & 0xf0) >> 4;
- let field_type = field_type & 0xf;
- Ok((field_type, field_delta))
+ Ok(ListIdentifier {
+ element_type,
+ size: element_count,
+ })
}
- pub(crate) fn read_field_begin(&mut self) -> Result<FieldIdentifier> {
+ /// Read the [`FieldIdentifier`] for a field in a Thrift encoded struct.
+ fn read_field_begin(&mut self, last_field_id: i16) ->
Result<FieldIdentifier> {
// we can read at least one byte, which is:
// - the type
// - the field delta and the type
let field_type = self.read_byte()?;
let field_delta = (field_type & 0xf0) >> 4;
let field_type = FieldType::try_from(field_type & 0xf)?;
+ let mut bool_val: Option<bool> = None;
match field_type {
FieldType::Stop => Ok(FieldIdentifier {
field_type: FieldType::Stop,
id: 0,
+ bool_val,
}),
_ => {
// special handling for bools
if field_type == FieldType::BooleanFalse {
- self.pending_read_bool_value = Some(false);
+ bool_val = Some(false);
} else if field_type == FieldType::BooleanTrue {
- self.pending_read_bool_value = Some(true);
+ bool_val = Some(true);
}
- if field_delta != 0 {
- self.last_read_field_id = self
- .last_read_field_id
- .checked_add(field_delta as i16)
- .map_or_else(
- || {
- Err(general_err!(format!(
- "cannot add {} to {}",
- field_delta, self.last_read_field_id
- )))
- },
- Ok,
- )?;
+ let field_id = if field_delta != 0 {
+ last_field_id.checked_add(field_delta as i16).map_or_else(
+ || {
+ Err(general_err!(format!(
+ "cannot add {} to {}",
+ field_delta, last_field_id
+ )))
+ },
+ Ok,
+ )?
} else {
- self.last_read_field_id = self.read_i16()?;
+ self.read_i16()?
};
Ok(FieldIdentifier {
field_type,
- id: self.last_read_field_id,
+ id: field_id,
+ bool_val,
})
}
}
}
- pub(crate) fn read_bool(&mut self) -> Result<bool> {
- match self.pending_read_bool_value.take() {
- Some(b) => Ok(b),
- None => {
- let b = self.read_byte()?;
- // Previous versions of the thrift specification said to use 0
and 1 inside collections,
- // but that differed from existing implementations.
- // The specification was updated in
https://github.com/apache/thrift/commit/2c29c5665bc442e703480bb0ee60fe925ffe02e8.
- // At least the go implementation seems to have followed the
previously documented values.
- match b {
- 0x01 => Ok(true),
- 0x00 | 0x02 => Ok(false),
- unkn => Err(general_err!(format!("cannot convert {unkn}
into bool"))),
- }
- }
- }
+ /// This is a specialized version of [`Self::read_field_begin`], solely
for use in parsing
+ /// simple structs. This function assumes that the delta field will always
be less than 0xf,
+ /// fields will be in order, and no boolean fields will be read.
+ /// This also skips validation of the field type.
+ ///
+ /// Returns a tuple of `(field_type, field_delta)`.
+ fn read_field_header(&mut self) -> Result<(u8, u8)> {
+ let field_type = self.read_byte()?;
+ let field_delta = (field_type & 0xf0) >> 4;
+ let field_type = field_type & 0xf;
+ Ok((field_type, field_delta))
}
- pub(crate) fn read_bytes(&mut self) -> Result<&'b [u8]> {
- let len = self.read_vlq()? as usize;
- let ret = self.buf.get(..len).ok_or_else(eof_error)?;
- self.buf = &self.buf[len..];
- Ok(ret)
+ /// Read a boolean list element. This should not be used for struct
fields. For the latter,
+ /// use the [`FieldIdentifier::bool_val`] field.
+ fn read_bool(&mut self) -> Result<bool> {
+ let b = self.read_byte()?;
+ // Previous versions of the thrift specification said to use 0 and 1
inside collections,
+ // but that differed from existing implementations.
+ // The specification was updated in
https://github.com/apache/thrift/commit/2c29c5665bc442e703480bb0ee60fe925ffe02e8.
+ // At least the go implementation seems to have followed the
previously documented values.
+ match b {
+ 0x01 => Ok(true),
+ 0x00 | 0x02 => Ok(false),
+ unkn => Err(general_err!(format!("cannot convert {unkn} into
bool"))),
+ }
}
- pub(crate) fn read_string(&mut self) -> Result<&'b str> {
+ /// Read a Thrift [binary] as a UTF-8 encoded string.
+ ///
+ /// [binary]:
https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md#binary-encoding
+ fn read_string(&mut self) -> Result<&'a str> {
let slice = self.read_bytes()?;
Ok(std::str::from_utf8(slice)?)
}
- pub(crate) fn read_i8(&mut self) -> Result<i8> {
+ /// Read an `i8`.
+ fn read_i8(&mut self) -> Result<i8> {
Ok(self.read_byte()? as _)
}
- pub(crate) fn read_i16(&mut self) -> Result<i16> {
+ /// Read an `i16`.
+ fn read_i16(&mut self) -> Result<i16> {
Ok(self.read_zig_zag()? as _)
}
- pub(crate) fn read_i32(&mut self) -> Result<i32> {
+ /// Read an `i32`.
+ fn read_i32(&mut self) -> Result<i32> {
Ok(self.read_zig_zag()? as _)
}
- pub(crate) fn read_i64(&mut self) -> Result<i64> {
+ /// Read an `i64`.
+ fn read_i64(&mut self) -> Result<i64> {
self.read_zig_zag()
}
- pub(crate) fn read_double(&mut self) -> Result<f64> {
- let slice = self.buf.get(..8).ok_or_else(eof_error)?;
- self.buf = &self.buf[8..];
- match slice.try_into() {
- Ok(slice) => Ok(f64::from_le_bytes(slice)),
- Err(_) => Err(general_err!("Unexpected error converting slice")),
- }
- }
-
- pub(crate) fn read_list_begin(&mut self) -> Result<ListIdentifier> {
- let (element_type, element_count) = self.read_list_set_begin()?;
- Ok(ListIdentifier {
- element_type,
- size: element_count,
- })
- }
-
- pub(crate) fn read_list_end(&mut self) -> Result<()> {
- Ok(())
- }
-
- #[inline]
- fn read_byte(&mut self) -> Result<u8> {
- let ret = *self.buf.first().ok_or_else(eof_error)?;
- self.buf = &self.buf[1..];
- Ok(ret)
- }
-
- #[inline]
- fn skip_bytes(&mut self, n: usize) -> Result<()> {
- self.buf.get(..n).ok_or_else(eof_error)?;
- self.buf = &self.buf[n..];
- Ok(())
- }
+ /// Read a Thrift `double` as `f64`.
+ fn read_double(&mut self) -> Result<f64>;
+ /// Skip a ULEB128 encoded varint.
fn skip_vlq(&mut self) -> Result<()> {
loop {
let byte = self.read_byte()?;
@@ -398,21 +354,25 @@ impl<'b, 'a: 'b> ThriftCompactInputProtocol<'a> {
}
}
+ /// Skip a thrift [binary].
+ ///
+ /// [binary]:
https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md#binary-encoding
fn skip_binary(&mut self) -> Result<()> {
let len = self.read_vlq()? as usize;
self.skip_bytes(len)
}
/// Skip a field with type `field_type` recursively until the default
- /// maximum skip depth is reached.
- pub(crate) fn skip(&mut self, field_type: FieldType) -> Result<()> {
- // TODO: magic number
- self.skip_till_depth(field_type, 64)
+ /// maximum skip depth (currently 64) is reached.
+ fn skip(&mut self, field_type: FieldType) -> Result<()> {
+ const DEFAULT_SKIP_DEPTH: i8 = 64;
+ self.skip_till_depth(field_type, DEFAULT_SKIP_DEPTH)
}
/// Empty structs in unions consist of a single byte of 0 for the field
stop record.
- /// This skips that byte without pushing to the field id stack.
- pub(crate) fn skip_empty_struct(&mut self) -> Result<()> {
+ /// This skips that byte without encuring the cost of processing the
[`FieldIdentifier`].
+ /// Will return an error if the struct is not actually empty.
+ fn skip_empty_struct(&mut self) -> Result<()> {
let b = self.read_byte()?;
if b != 0 {
Err(general_err!("Empty struct has fields"))
@@ -428,7 +388,8 @@ impl<'b, 'a: 'b> ThriftCompactInputProtocol<'a> {
}
match field_type {
- FieldType::BooleanFalse | FieldType::BooleanTrue =>
self.read_bool().map(|_| ()),
+ // boolean field has no data
+ FieldType::BooleanFalse | FieldType::BooleanTrue => Ok(()),
FieldType::Byte => self.read_i8().map(|_| ()),
FieldType::I16 => self.skip_vlq().map(|_| ()),
FieldType::I32 => self.skip_vlq().map(|_| ()),
@@ -436,15 +397,16 @@ impl<'b, 'a: 'b> ThriftCompactInputProtocol<'a> {
FieldType::Double => self.skip_bytes(8).map(|_| ()),
FieldType::Binary => self.skip_binary().map(|_| ()),
FieldType::Struct => {
- self.read_struct_begin()?;
+ let mut last_field_id = 0i16;
loop {
- let field_ident = self.read_field_begin()?;
+ let field_ident = self.read_field_begin(last_field_id)?;
if field_ident.field_type == FieldType::Stop {
break;
}
self.skip_till_depth(field_ident.field_type, depth - 1)?;
+ last_field_id = field_ident.id;
}
- self.read_struct_end()
+ Ok(())
}
FieldType::List => {
let list_ident = self.read_list_begin()?;
@@ -452,7 +414,7 @@ impl<'b, 'a: 'b> ThriftCompactInputProtocol<'a> {
let element_type =
FieldType::try_from(list_ident.element_type)?;
self.skip_till_depth(element_type, depth - 1)?;
}
- self.read_list_end()
+ Ok(())
}
// no list or map types in parquet format
u => Err(general_err!(format!("cannot skip field type {:?}", &u))),
@@ -460,90 +422,142 @@ impl<'b, 'a: 'b> ThriftCompactInputProtocol<'a> {
}
}
+/// A high performance Thrift reader that reads from a slice of bytes.
+pub(crate) struct ThriftSliceInputProtocol<'a> {
+ buf: &'a [u8],
+}
+
+impl<'a> ThriftSliceInputProtocol<'a> {
+ /// Create a new `ThriftSliceInputProtocol` using the bytes in `buf`.
+ pub fn new(buf: &'a [u8]) -> Self {
+ Self { buf }
+ }
+
+ /// Re-initialize this reader with a new slice.
+ pub fn reset_buffer(&mut self, buf: &'a [u8]) {
+ self.buf = buf;
+ }
+
+ /// Return the current buffer as a slice.
+ pub fn as_slice(&self) -> &'a [u8] {
+ self.buf
+ }
+}
+
+impl<'b, 'a: 'b> ThriftCompactInputProtocol<'b> for
ThriftSliceInputProtocol<'a> {
+ #[inline]
+ fn read_byte(&mut self) -> Result<u8> {
+ let ret = *self.buf.first().ok_or_else(eof_error)?;
+ self.buf = &self.buf[1..];
+ Ok(ret)
+ }
+
+ fn read_bytes(&mut self) -> Result<&'b [u8]> {
+ let len = self.read_vlq()? as usize;
+ let ret = self.buf.get(..len).ok_or_else(eof_error)?;
+ self.buf = &self.buf[len..];
+ Ok(ret)
+ }
+
+ #[inline]
+ fn skip_bytes(&mut self, n: usize) -> Result<()> {
+ self.buf.get(..n).ok_or_else(eof_error)?;
+ self.buf = &self.buf[n..];
+ Ok(())
+ }
+
+ fn read_double(&mut self) -> Result<f64> {
+ let slice = self.buf.get(..8).ok_or_else(eof_error)?;
+ self.buf = &self.buf[8..];
+ match slice.try_into() {
+ Ok(slice) => Ok(f64::from_le_bytes(slice)),
+ Err(_) => Err(general_err!("Unexpected error converting slice")),
+ }
+ }
+}
+
fn eof_error() -> ParquetError {
eof_err!("Unexpected EOF")
}
-impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for bool {
- type Error = ParquetError;
- fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> {
+/// Trait implemented for objects that can be deserialized from a Thrift input
stream.
+/// Implementations are provided for Thrift primitive types.
+pub(crate) trait ReadThrift<'a, R: ThriftCompactInputProtocol<'a>> {
+ /// Read an object of type `Self` from the input protocol object.
+ fn read_thrift(prot: &mut R) -> Result<Self>
+ where
+ Self: Sized;
+}
+
+impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for bool {
+ fn read_thrift(prot: &mut R) -> Result<Self> {
prot.read_bool()
}
}
-impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for i8 {
- type Error = ParquetError;
- fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> {
+impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for i8 {
+ fn read_thrift(prot: &mut R) -> Result<Self> {
prot.read_i8()
}
}
-impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for i16 {
- type Error = ParquetError;
- fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> {
+impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for i16 {
+ fn read_thrift(prot: &mut R) -> Result<Self> {
prot.read_i16()
}
}
-impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for i32 {
- type Error = ParquetError;
- fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> {
+impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for i32 {
+ fn read_thrift(prot: &mut R) -> Result<Self> {
prot.read_i32()
}
}
-impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for i64 {
- type Error = ParquetError;
- fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> {
+impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for i64 {
+ fn read_thrift(prot: &mut R) -> Result<Self> {
prot.read_i64()
}
}
-impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for OrderedF64 {
- type Error = ParquetError;
- fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> {
+impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for OrderedF64 {
+ fn read_thrift(prot: &mut R) -> Result<Self> {
Ok(OrderedF64(prot.read_double()?))
}
}
-impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for &'a str {
- type Error = ParquetError;
- fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> {
+impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for &'a str {
+ fn read_thrift(prot: &mut R) -> Result<Self> {
prot.read_string()
}
}
-impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for String {
- type Error = ParquetError;
- fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> {
+impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for String {
+ fn read_thrift(prot: &mut R) -> Result<Self> {
Ok(prot.read_string()?.to_owned())
}
}
-impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for &'a [u8] {
- type Error = ParquetError;
- fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> {
+impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for &'a [u8] {
+ fn read_thrift(prot: &mut R) -> Result<Self> {
prot.read_bytes()
}
}
-impl<'a, T> TryFrom<&mut ThriftCompactInputProtocol<'a>> for Vec<T>
+/// Read a Thrift encoded [list] from the input protocol object.
+///
+/// [list]:
https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md#list-and-set
+pub(crate) fn read_thrift_vec<'a, T, R>(prot: &mut R) -> Result<Vec<T>>
where
- T: for<'b> TryFrom<&'b mut ThriftCompactInputProtocol<'a>>,
- ParquetError: for<'b> From<<T as TryFrom<&'b mut
ThriftCompactInputProtocol<'a>>>::Error>,
+ R: ThriftCompactInputProtocol<'a>,
+ T: ReadThrift<'a, R>,
{
- type Error = ParquetError;
-
- fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self,
Self::Error> {
- let list_ident = prot.read_list_begin()?;
- let mut res = Vec::with_capacity(list_ident.size as usize);
- for _ in 0..list_ident.size {
- let val = T::try_from(prot)?;
- res.push(val);
- }
-
- Ok(res)
+ let list_ident = prot.read_list_begin()?;
+ let mut res = Vec::with_capacity(list_ident.size as usize);
+ for _ in 0..list_ident.size {
+ let val = T::read_thrift(prot)?;
+ res.push(val);
}
+ Ok(res)
}
/////////////////////////
@@ -983,11 +997,7 @@ pub(crate) mod tests {
pub(crate) fn test_roundtrip<T>(val: T)
where
- T: for<'a> TryFrom<&'a mut ThriftCompactInputProtocol<'a>>
- + WriteThrift
- + PartialEq
- + Debug,
- for<'a> <T as TryFrom<&'a mut ThriftCompactInputProtocol<'a>>>::Error:
Debug,
+ T: for<'a> ReadThrift<'a, ThriftSliceInputProtocol<'a>> + WriteThrift
+ PartialEq + Debug,
{
let buf = Vec::<u8>::new();
let mut writer = ThriftCompactOutputProtocol::new(buf);
@@ -995,8 +1005,8 @@ pub(crate) mod tests {
//println!("serialized: {:x?}", writer.inner());
- let mut prot = ThriftCompactInputProtocol::new(writer.inner());
- let read_val = T::try_from(&mut prot).unwrap();
+ let mut prot = ThriftSliceInputProtocol::new(writer.inner());
+ let read_val = T::read_thrift(&mut prot).unwrap();
assert_eq!(val, read_val);
}