rshkv commented on code in PR #863:
URL: https://github.com/apache/iceberg-rust/pull/863#discussion_r1901273157
##########
crates/iceberg/src/arrow/schema.rs:
##########
@@ -814,6 +814,193 @@ get_parquet_stat_as_datum!(min);
get_parquet_stat_as_datum!(max);
+/// Utilities to deal with [arrow_array::builder] types in the Iceberg context.
+pub(crate) mod builder {
+ use arrow_array::builder::*;
+ use arrow_array::cast::AsArray;
+ use arrow_array::types::*;
+ use arrow_array::{ArrayRef, Datum as ArrowDatum};
+ use arrow_schema::{DataType, TimeUnit};
+ use ordered_float::OrderedFloat;
+
+ use crate::spec::{Literal, PrimitiveLiteral};
+ use crate::{Error, ErrorKind};
+
+ /// A helper wrapping [ArrayBuilder] for building arrays without declaring
the inner type at
+ /// compile-time when types are determined dynamically (e.g. based on some
column type).
+ /// A [DataType] is given at construction time which is used to later
downcast the inner array
+ /// and provided values.
+ pub(crate) struct AnyArrayBuilder {
+ data_type: DataType,
+ inner: Box<dyn ArrayBuilder>,
+ }
+
+ impl AnyArrayBuilder {
+ pub(crate) fn new(data_type: &DataType) -> Self {
+ Self {
+ data_type: data_type.clone(),
+ inner: make_builder(data_type, 0),
+ }
+ }
+
+ pub(crate) fn finish(&mut self) -> ArrayRef {
+ self.inner.finish()
+ }
+
+ /// Append an [[arrow_array::Datum]] value.
+ pub(crate) fn append_datum(&mut self, value: &dyn ArrowDatum) ->
crate::Result<()> {
+ let (array, is_scalar) = value.get();
+ assert!(is_scalar, "Can only append scalar datum");
+
+ match array.data_type() {
Review Comment:
This is list is exhaustive based on the `ArrowSchemaVisitor::primitive`
function above. I.e., every type produced there is covered here.
##########
crates/iceberg/src/scan.rs:
##########
@@ -1084,6 +1087,50 @@ pub mod tests {
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.key_metadata(None)
+ // Note:
+ // The bounds below need to agree with the
test data written below
+ // into the Parquet file. If not, tests that
rely on filter scans
+ // fail because of wrong bounds.
+ .lower_bounds(HashMap::from([
+ (1, Datum::long(1)),
+ (2, Datum::long(2)),
+ (3, Datum::long(3)),
+ (4, Datum::string("Apache")),
+ (5, Datum::double(100)),
+ (6, Datum::int(100)),
+ (7, Datum::long(100)),
+ (8, Datum::bool(false)),
+ (9, Datum::float(100.0)),
+ // decimal values are not supported by
schema::get_arrow_datum
+ // (10, Datum::decimal(Decimal(123, 2))),
+ (11, Datum::date(0)),
+ (12, Datum::timestamp_micros(0)),
+ (13, Datum::timestamptz_micros(0)),
+ // ns timestamps, uuid, fixed, binary are
currently not
+ // supported in schema::get_arrow_datum
+ ]))
+ .upper_bounds(HashMap::from([
+ (1, Datum::long(1)),
+ (2, Datum::long(5)),
+ (3, Datum::long(4)),
+ (4, Datum::string("Iceberg")),
+ (5, Datum::double(200)),
+ (6, Datum::int(200)),
+ (7, Datum::long(200)),
+ (8, Datum::bool(true)),
+ (9, Datum::float(200.0)),
+ // decimal values are not supported by
schema::get_arrow_datum
+ // (10, Datum::decimal(Decimal(123, 2))),
+ (11, Datum::date(0)),
+ (12, Datum::timestamp_micros(0)),
+ (13, Datum::timestamptz_micros(0)),
+ // ns timestamps, uuid, fixed, binary are
currently not
+ // supported in schema::get_arrow_datum
Review Comment:
Could add support but thought that might be for another PR.
##########
crates/iceberg/src/scan.rs:
##########
@@ -1084,6 +1087,50 @@ pub mod tests {
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.key_metadata(None)
+ // Note:
+ // The bounds below need to agree with the
test data written below
+ // into the Parquet file. If not, tests that
rely on filter scans
+ // fail because of wrong bounds.
+ .lower_bounds(HashMap::from([
+ (1, Datum::long(1)),
+ (2, Datum::long(2)),
+ (3, Datum::long(3)),
+ (4, Datum::string("Apache")),
+ (5, Datum::double(100)),
+ (6, Datum::int(100)),
+ (7, Datum::long(100)),
+ (8, Datum::bool(false)),
+ (9, Datum::float(100.0)),
+ // decimal values are not supported by
schema::get_arrow_datum
+ // (10, Datum::decimal(Decimal(123, 2))),
+ (11, Datum::date(0)),
+ (12, Datum::timestamp_micros(0)),
+ (13, Datum::timestamptz_micros(0)),
+ // ns timestamps, uuid, fixed, binary are
currently not
+ // supported in schema::get_arrow_datum
+ ]))
+ .upper_bounds(HashMap::from([
+ (1, Datum::long(1)),
+ (2, Datum::long(5)),
+ (3, Datum::long(4)),
+ (4, Datum::string("Iceberg")),
+ (5, Datum::double(200)),
+ (6, Datum::int(200)),
+ (7, Datum::long(200)),
+ (8, Datum::bool(true)),
+ (9, Datum::float(200.0)),
+ // decimal values are not supported by
schema::get_arrow_datum
+ // (10, Datum::decimal(Decimal(123, 2))),
+ (11, Datum::date(0)),
+ (12, Datum::timestamp_micros(0)),
+ (13, Datum::timestamptz_micros(0)),
+ // ns timestamps, uuid, fixed, binary are
currently not
+ // supported in schema::get_arrow_datum
+ ]))
+ .column_sizes(HashMap::from([(1, 1u64), (2,
1u64)]))
+ .value_counts(HashMap::from([(1, 2u64), (2,
2u64)]))
+ .null_value_counts(HashMap::from([(1, 3u64),
(2, 3u64)]))
+ .nan_value_counts(HashMap::from([(1, 4u64),
(2, 4u64)]))
Review Comment:
This isn't based on the test data but wanted to have that reflected in tests.
##########
crates/iceberg/src/metadata_scan.rs:
##########
@@ -128,6 +141,84 @@ impl<'a> SnapshotsTable<'a> {
}
}
+/// Entries table containing the manifest file's entries.
+///
+/// The table has one row for each manifest file entry in the current
snapshot's manifest list file.
+/// For reference, see the Java implementation of [`ManifestEntry`][1].
+///
+/// [1]:
https://github.com/apache/iceberg/blob/apache-iceberg-1.7.1/core/src/main/java/org/apache/iceberg/ManifestEntry.java
+pub struct EntriesTable<'a> {
+ table: &'a Table,
+}
+
+impl<'a> EntriesTable<'a> {
+ /// Get the schema for the manifest entries table.
+ pub fn schema(&self) -> Schema {
+ Schema::new(vec
enum values.
In Java, the `status` column is `i32`
[(here)](https://github.com/apache/iceberg/blob/4a432839233f2343a9eae8255532f911f06358ef/core/src/main/java/org/apache/iceberg/ManifestEntry.java#L45)
but in Python this is
[u8](https://github.com/apache/iceberg-python/blob/a051584a3684392d2db6556449eb299145d47d15/pyiceberg/table/inspect.py#L121).
My preference would be `u8` but treating the Java implementation as
authoritative.
##########
crates/iceberg/src/arrow/schema.rs:
##########
@@ -814,6 +814,193 @@ get_parquet_stat_as_datum!(min);
get_parquet_stat_as_datum!(max);
+/// Utilities to deal with [arrow_array::builder] types in the Iceberg context.
+pub(crate) mod builder {
+ use arrow_array::builder::*;
+ use arrow_array::cast::AsArray;
+ use arrow_array::types::*;
+ use arrow_array::{ArrayRef, Datum as ArrowDatum};
+ use arrow_schema::{DataType, TimeUnit};
+ use ordered_float::OrderedFloat;
+
+ use crate::spec::{Literal, PrimitiveLiteral};
+ use crate::{Error, ErrorKind};
+
+ /// A helper wrapping [ArrayBuilder] for building arrays without declaring
the inner type at
+ /// compile-time when types are determined dynamically (e.g. based on some
column type).
+ /// A [DataType] is given at construction time which is used to later
downcast the inner array
+ /// and provided values.
+ pub(crate) struct AnyArrayBuilder {
+ data_type: DataType,
+ inner: Box<dyn ArrayBuilder>,
+ }
+
+ impl AnyArrayBuilder {
+ pub(crate) fn new(data_type: &DataType) -> Self {
+ Self {
+ data_type: data_type.clone(),
+ inner: make_builder(data_type, 0),
+ }
+ }
+
+ pub(crate) fn finish(&mut self) -> ArrayRef {
+ self.inner.finish()
+ }
+
+ /// Append an [[arrow_array::Datum]] value.
+ pub(crate) fn append_datum(&mut self, value: &dyn ArrowDatum) ->
crate::Result<()> {
+ let (array, is_scalar) = value.get();
+ assert!(is_scalar, "Can only append scalar datum");
+
+ match array.data_type() {
+ DataType::Boolean => self
+ .builder::<BooleanBuilder>()?
+ .append_value(array.as_boolean().value(0)),
+ DataType::Int32 => self
+ .builder::<Int32Builder>()?
+ .append_value(array.as_primitive::<Int32Type>().value(0)),
+ DataType::Int64 => self
+ .builder::<Int64Builder>()?
+ .append_value(array.as_primitive::<Int64Type>().value(0)),
+ DataType::Float32 => self
+ .builder::<Float32Builder>()?
+
.append_value(array.as_primitive::<Float32Type>().value(0)),
+ DataType::Float64 => self
+ .builder::<Float64Builder>()?
+
.append_value(array.as_primitive::<Float64Type>().value(0)),
+ DataType::Decimal128(_, _) => self
+ .builder::<Decimal128Builder>()?
+
.append_value(array.as_primitive::<Decimal128Type>().value(0)),
+ DataType::Date32 => self
+ .builder::<Date32Builder>()?
+ .append_value(array.as_primitive::<Date32Type>().value(0)),
+ DataType::Time64(TimeUnit::Microsecond) => self
+ .builder::<Time64MicrosecondBuilder>()?
+
.append_value(array.as_primitive::<Time64MicrosecondType>().value(0)),
+ DataType::Timestamp(TimeUnit::Microsecond, _) => self
+ .builder::<TimestampMicrosecondBuilder>()?
+
.append_value(array.as_primitive::<TimestampMicrosecondType>().value(0)),
+ DataType::Timestamp(TimeUnit::Nanosecond, _) => self
+ .builder::<TimestampNanosecondBuilder>()?
+
.append_value(array.as_primitive::<TimestampNanosecondType>().value(0)),
Review Comment:
I understand it's correct to ignore the timezone here because that's not
captured in the builder.
##########
crates/iceberg/src/table.rs:
##########
@@ -203,7 +203,7 @@ impl Table {
/// Creates a metadata table which provides table-like APIs for inspecting
metadata.
/// See [`MetadataTable`] for more details.
- pub fn metadata_table(self) -> MetadataTable {
+ pub fn metadata_table(&self) -> MetadataTable<'_> {
Review Comment:
Addressing this comment
https://github.com/apache/iceberg-rust/pull/822#discussion_r1899617821. I
prefer that but don't _need_ to do here.
##########
crates/iceberg/src/metadata_scan.rs:
##########
@@ -255,8 +345,515 @@ impl<'a> ManifestsTable<'a> {
}
}
+/// Builds the struct describing data files listed in a table manifest.
+///
+/// For reference, see the Java implementation of [`DataFile`][1].
+///
+/// [1]:
https://github.com/apache/iceberg/blob/apache-iceberg-1.7.1/api/src/main/java/org/apache/iceberg/DataFile.java
+struct DataFileStructBuilder<'a> {
+ // Reference to table metadata to retrieve partition specs based on
partition spec ids
+ table_metadata: &'a TableMetadata,
+ // Below are the field builders of the "data_file" struct
+ content: Int8Builder,
+ file_path: StringBuilder,
+ file_format: StringBuilder,
+ partition: PartitionValuesStructBuilder,
+ // The count types in the Java and PyIceberg implementation i64 however
the values coming from
+ // the deserialized data files are u64. We agree with the latter to avoid
casting.
+ record_count: Int64Builder,
+ file_size_in_bytes: Int64Builder,
+ column_sizes: MapBuilder<Int32Builder, Int64Builder>,
+ value_counts: MapBuilder<Int32Builder, Int64Builder>,
+ null_value_counts: MapBuilder<Int32Builder, Int64Builder>,
+ nan_value_counts: MapBuilder<Int32Builder, Int64Builder>,
+ lower_bounds: MapBuilder<Int32Builder, BinaryBuilder>,
+ upper_bounds: MapBuilder<Int32Builder, BinaryBuilder>,
+ key_metadata: BinaryBuilder,
+ split_offsets: ListBuilder<Int64Builder>,
+ equality_ids: ListBuilder<Int32Builder>,
+ sort_order_ids: Int32Builder,
+}
+
+impl<'a> DataFileStructBuilder<'a> {
+ fn new(table_metadata: &'a TableMetadata) -> Self {
+ Self {
+ table_metadata,
+ content: Int8Builder::new(),
+ file_path: StringBuilder::new(),
+ file_format: StringBuilder::new(),
+ partition: PartitionValuesStructBuilder::new(table_metadata),
+ record_count: Int64Builder::new(),
+ file_size_in_bytes: Int64Builder::new(),
+ column_sizes: MapBuilder::new(None, Int32Builder::new(),
Int64Builder::new()),
+ value_counts: MapBuilder::new(None, Int32Builder::new(),
Int64Builder::new()),
+ null_value_counts: MapBuilder::new(None, Int32Builder::new(),
Int64Builder::new()),
+ nan_value_counts: MapBuilder::new(None, Int32Builder::new(),
Int64Builder::new()),
+ lower_bounds: MapBuilder::new(None, Int32Builder::new(),
BinaryBuilder::new()),
+ upper_bounds: MapBuilder::new(None, Int32Builder::new(),
BinaryBuilder::new()),
+ key_metadata: BinaryBuilder::new(),
+ split_offsets: ListBuilder::new(Int64Builder::new()),
+ equality_ids: ListBuilder::new(Int32Builder::new()),
+ sort_order_ids: Int32Builder::new(),
+ }
+ }
+
+ fn fields(table_metadata: &TableMetadata) -> Fields {
+ vec![
+ Field::new("content", DataType::Int8, false),
+ Field::new("file_path", DataType::Utf8, false),
+ Field::new("file_format", DataType::Utf8, false),
+ Field::new(
+ "partition",
+
DataType::Struct(PartitionValuesStructBuilder::combined_partition_fields(
+ table_metadata,
+ )),
+ false,
+ ),
+ Field::new("record_count", DataType::Int64, false),
+ Field::new("file_size_in_bytes", DataType::Int64, false),
+ Field::new(
+ "column_sizes",
+ Self::column_id_to_value_type(DataType::Int64),
+ true,
+ ),
+ Field::new(
+ "value_counts",
+ Self::column_id_to_value_type(DataType::Int64),
+ true,
+ ),
+ Field::new(
+ "null_value_counts",
+ Self::column_id_to_value_type(DataType::Int64),
+ true,
+ ),
+ Field::new(
+ "nan_value_counts",
+ Self::column_id_to_value_type(DataType::Int64),
+ true,
+ ),
+ Field::new(
+ "lower_bounds",
+ Self::column_id_to_value_type(DataType::Binary),
+ true,
+ ),
+ Field::new(
+ "upper_bounds",
+ Self::column_id_to_value_type(DataType::Binary),
+ true,
+ ),
+ Field::new("key_metadata", DataType::Binary, true),
+ Field::new(
+ "split_offsets",
+ DataType::new_list(DataType::Int64, true),
+ true,
+ ),
+ Field::new(
+ "equality_ids",
+ DataType::new_list(DataType::Int32, true),
+ true,
+ ),
+ Field::new("sort_order_id", DataType::Int32, true),
+ ]
+ .into()
+ }
+
+ /// Construct a new struct type that maps from column ids (i32) to the
provided value type.
+ /// Keys, values, and the whole struct are non-nullable.
+ fn column_id_to_value_type(value_type: DataType) -> DataType {
+ DataType::Map(
+ Arc::new(Field::new(
+ "entries",
+ DataType::Struct(
+ vec![
+ Field::new("keys", DataType::Int32, false),
+ Field::new("values", value_type, true),
+ ]
+ .into(),
+ ),
+ false,
+ )),
+ false,
+ )
+ }
+
+ fn append(&mut self, manifest_file: &ManifestFile, data_file: &DataFile)
-> Result<()> {
+ self.content.append_value(data_file.content as i8);
+ self.file_path.append_value(data_file.file_path());
+ self.file_format
+ .append_value(data_file.file_format().to_string().to_uppercase());
+ self.partition.append(
+ self.partition_spec(manifest_file)?.clone().fields(),
+ data_file.partition(),
+ )?;
+ self.record_count
+ .append_value(data_file.record_count() as i64);
+ self.file_size_in_bytes
+ .append_value(data_file.file_size_in_bytes() as i64);
Review Comment:
The casting is slightly annoying given we're dealing with non-negative
types. But the Python and Java implementation use i64.
##########
crates/iceberg/src/scan.rs:
##########
@@ -1084,6 +1087,50 @@ pub mod tests {
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.key_metadata(None)
+ // Note:
+ // The bounds below need to agree with the
test data written below
+ // into the Parquet file. If not, tests that
rely on filter scans
+ // fail because of wrong bounds.
+ .lower_bounds(HashMap::from([
+ (1, Datum::long(1)),
+ (2, Datum::long(2)),
+ (3, Datum::long(3)),
+ (4, Datum::string("Apache")),
+ (5, Datum::double(100)),
+ (6, Datum::int(100)),
+ (7, Datum::long(100)),
+ (8, Datum::bool(false)),
+ (9, Datum::float(100.0)),
+ // decimal values are not supported by
schema::get_arrow_datum
+ // (10, Datum::decimal(Decimal(123, 2))),
+ (11, Datum::date(0)),
+ (12, Datum::timestamp_micros(0)),
+ (13, Datum::timestamptz_micros(0)),
+ // ns timestamps, uuid, fixed, binary are
currently not
+ // supported in schema::get_arrow_datum
+ ]))
+ .upper_bounds(HashMap::from([
+ (1, Datum::long(1)),
+ (2, Datum::long(5)),
+ (3, Datum::long(4)),
+ (4, Datum::string("Iceberg")),
+ (5, Datum::double(200)),
+ (6, Datum::int(200)),
+ (7, Datum::long(200)),
+ (8, Datum::bool(true)),
+ (9, Datum::float(200.0)),
+ // decimal values are not supported by
schema::get_arrow_datum
+ // (10, Datum::decimal(Decimal(123, 2))),
+ (11, Datum::date(0)),
+ (12, Datum::timestamp_micros(0)),
+ (13, Datum::timestamptz_micros(0)),
+ // ns timestamps, uuid, fixed, binary are
currently not
+ // supported in schema::get_arrow_datum
+ ]))
Review Comment:
Adding these so we cover those as types in the lower and upper bounds.
I'm trying to limit the changes I'm making because it's already a lot. My
preference would be to cover all types as partition columns as well.
##########
crates/iceberg/src/metadata_scan.rs:
##########
@@ -255,8 +345,515 @@ impl<'a> ManifestsTable<'a> {
}
}
+/// Builds the struct describing data files listed in a table manifest.
+///
+/// For reference, see the Java implementation of [`DataFile`][1].
+///
+/// [1]:
https://github.com/apache/iceberg/blob/apache-iceberg-1.7.1/api/src/main/java/org/apache/iceberg/DataFile.java
+struct DataFileStructBuilder<'a> {
+ // Reference to table metadata to retrieve partition specs based on
partition spec ids
+ table_metadata: &'a TableMetadata,
+ // Below are the field builders of the "data_file" struct
+ content: Int8Builder,
+ file_path: StringBuilder,
+ file_format: StringBuilder,
+ partition: PartitionValuesStructBuilder,
+ // The count types in the Java and PyIceberg implementation i64 however
the values coming from
+ // the deserialized data files are u64. We agree with the latter to avoid
casting.
+ record_count: Int64Builder,
+ file_size_in_bytes: Int64Builder,
+ column_sizes: MapBuilder<Int32Builder, Int64Builder>,
+ value_counts: MapBuilder<Int32Builder, Int64Builder>,
+ null_value_counts: MapBuilder<Int32Builder, Int64Builder>,
+ nan_value_counts: MapBuilder<Int32Builder, Int64Builder>,
+ lower_bounds: MapBuilder<Int32Builder, BinaryBuilder>,
+ upper_bounds: MapBuilder<Int32Builder, BinaryBuilder>,
+ key_metadata: BinaryBuilder,
+ split_offsets: ListBuilder<Int64Builder>,
+ equality_ids: ListBuilder<Int32Builder>,
+ sort_order_ids: Int32Builder,
+}
+
+impl<'a> DataFileStructBuilder<'a> {
+ fn new(table_metadata: &'a TableMetadata) -> Self {
+ Self {
+ table_metadata,
+ content: Int8Builder::new(),
+ file_path: StringBuilder::new(),
+ file_format: StringBuilder::new(),
+ partition: PartitionValuesStructBuilder::new(table_metadata),
+ record_count: Int64Builder::new(),
Review Comment:
The `manifests` table merged in #861 prefers using `PrimitiveBuilder::new()`
which works because `Int64Builder` is just `PrimitiveBuilder<Int64Type>`.
I prefer saying `Int64Builder` here to be explicit about the type but happy
to change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]