liurenjie1024 commented on code in PR #738:
URL: https://github.com/apache/iceberg-rust/pull/738#discussion_r1920149166
##########
crates/iceberg/src/spec/manifest.rs:
##########
@@ -210,38 +284,207 @@ impl ManifestWriter {
deleted_rows: 0,
min_seq_num: None,
key_metadata,
- partitions: vec![],
+ manifset_entries: Vec::new(),
+ metadata,
}
}
fn construct_partition_summaries(
&mut self,
partition_type: &StructType,
) -> Result<Vec<FieldSummary>> {
- let partitions = std::mem::take(&mut self.partitions);
let mut field_stats: Vec<_> = partition_type
.fields()
.iter()
.map(|f|
PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone()))
.collect();
- for partition in partitions {
- for (literal, stat) in
partition.into_iter().zip_eq(field_stats.iter_mut()) {
+ for partition in self.manifset_entries.iter().map(|e|
&e.data_file.partition) {
+ for (literal, stat) in
partition.iter().zip_eq(field_stats.iter_mut()) {
let primitive_literal = literal.map(|v|
v.as_primitive_literal().unwrap());
stat.update(primitive_literal)?;
}
}
Ok(field_stats.into_iter().map(|stat| stat.finish()).collect())
}
- /// Write a manifest.
- pub async fn write(mut self, manifest: Manifest) -> Result<ManifestFile> {
+ fn check_data_file(&self, data_file: &DataFile) -> Result<()> {
+ match self.metadata.content {
+ ManifestContentType::Data => {
+ if data_file.content != DataContentType::Data {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Content type of entry {:?} should have
DataContentType::Data",
+ data_file.content
+ ),
+ ));
+ }
+ }
+ ManifestContentType::Deletes => {
+ if data_file.content != DataContentType::EqualityDeletes
+ && data_file.content != DataContentType::PositionDeletes
+ {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!("Content type of entry {:?} should have
DataContentType::EqualityDeletes or DataContentType::PositionDeletes",
data_file.content),
+ ));
+ }
+ }
+ }
+ Ok(())
+ }
+
+ /// Add a new manifest entry. This method will update following status of
the entry:
+ /// - Update the entry status to `Added`
+ /// - Set the snapshot id to the current snapshot id
+ /// - Set the sequence number to `None` if it is invalid(smaller than 0)
+ /// - Set the file sequence number to `None`
+ pub(crate) fn add(&mut self, mut entry: ManifestEntry) -> Result<()> {
+ self.check_data_file(&entry.data_file)?;
+ if entry.sequence_number().is_some_and(|n| n >= 0) {
+ entry.status = ManifestStatus::Added;
+ entry.snapshot_id = self.snapshot_id;
+ entry.file_sequence_number = None;
+ } else {
+ entry.status = ManifestStatus::Added;
+ entry.snapshot_id = self.snapshot_id;
+ entry.sequence_number = None;
+ entry.file_sequence_number = None;
+ };
+ self.add_entry(entry)?;
+ Ok(())
+ }
+
+ /// Add an added entry for a file with a specific sequence number. The
entry's snapshot ID will be this manifest's snapshot ID. The entry's data
sequence
+ /// number will be the provided data sequence number. The entry's file
sequence number will be
+ /// assigned at commit.
+ pub fn add_file(&mut self, data_file: DataFile, sequence_number: i64) ->
Result<()> {
+ self.check_data_file(&data_file)?;
+ let entry = ManifestEntry {
+ status: ManifestStatus::Added,
+ snapshot_id: self.snapshot_id,
+ sequence_number: (sequence_number >= 0).then_some(sequence_number),
+ file_sequence_number: None,
+ data_file,
+ };
+ self.add_entry(entry)?;
+ Ok(())
+ }
+
+ /// Add a delete manifest entry. This method will update following status
of the entry:
+ /// - Update the entry status to `Deleted`
+ /// - Set the snapshot id to the current snapshot id
+ ///
+ /// # TODO
+ /// Remove this allow later
+ #[allow(dead_code)]
+ pub(crate) fn delete(&mut self, mut entry: ManifestEntry) -> Result<()> {
+ self.check_data_file(&entry.data_file)?;
+ entry.status = ManifestStatus::Deleted;
+ entry.snapshot_id = self.snapshot_id;
+ self.add_entry(entry)?;
+ Ok(())
+ }
+
+ /// Add a delete manifest entry. The entry's snapshot ID will be this
manifest's snapshot ID.
+ /// However, the original data and file sequence numbers of the file must
be preserved when
+ /// the file is marked as deleted.
+ pub fn delete_file(
+ &mut self,
+ data_file: DataFile,
+ sequence_number: i64,
+ file_sequence_number: i64,
+ ) -> Result<()> {
+ self.check_data_file(&data_file)?;
+ let entry = ManifestEntry {
+ status: ManifestStatus::Deleted,
+ snapshot_id: self.snapshot_id,
+ sequence_number: Some(sequence_number),
+ file_sequence_number: Some(file_sequence_number),
+ data_file,
+ };
+ self.add_entry(entry)?;
+ Ok(())
+ }
+
+ /// Add an existing manifest entry. This method will update following
status of the entry:
+ /// - Update the entry status to `Existing`
+ ///
+ /// # TODO
+ /// Remove this allow later
+ #[allow(dead_code)]
+ pub(crate) fn existing(&mut self, mut entry: ManifestEntry) -> Result<()> {
+ self.check_data_file(&entry.data_file)?;
+ entry.status = ManifestStatus::Existing;
+ self.add_entry(entry)?;
+ Ok(())
+ }
+
+ /// Add an existing manifest entry. The original data and file sequence
numbers, snapshot ID,
+ /// which were assigned at commit, must be preserved when adding an
existing entry.
+ pub fn existing_file(
+ &mut self,
+ data_file: DataFile,
+ snapshot_id: i64,
+ sequence_number: i64,
+ file_sequence_number: i64,
Review Comment:
This should be optional, `file_sequence_number` could be inherited from
snapshot.
##########
crates/iceberg/src/spec/manifest.rs:
##########
@@ -210,38 +284,207 @@ impl ManifestWriter {
deleted_rows: 0,
min_seq_num: None,
key_metadata,
- partitions: vec![],
+ manifset_entries: Vec::new(),
+ metadata,
}
}
fn construct_partition_summaries(
&mut self,
partition_type: &StructType,
) -> Result<Vec<FieldSummary>> {
- let partitions = std::mem::take(&mut self.partitions);
let mut field_stats: Vec<_> = partition_type
.fields()
.iter()
.map(|f|
PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone()))
.collect();
- for partition in partitions {
- for (literal, stat) in
partition.into_iter().zip_eq(field_stats.iter_mut()) {
+ for partition in self.manifset_entries.iter().map(|e|
&e.data_file.partition) {
+ for (literal, stat) in
partition.iter().zip_eq(field_stats.iter_mut()) {
let primitive_literal = literal.map(|v|
v.as_primitive_literal().unwrap());
stat.update(primitive_literal)?;
}
}
Ok(field_stats.into_iter().map(|stat| stat.finish()).collect())
}
- /// Write a manifest.
- pub async fn write(mut self, manifest: Manifest) -> Result<ManifestFile> {
+ fn check_data_file(&self, data_file: &DataFile) -> Result<()> {
+ match self.metadata.content {
+ ManifestContentType::Data => {
+ if data_file.content != DataContentType::Data {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Content type of entry {:?} should have
DataContentType::Data",
+ data_file.content
+ ),
+ ));
+ }
+ }
+ ManifestContentType::Deletes => {
+ if data_file.content != DataContentType::EqualityDeletes
+ && data_file.content != DataContentType::PositionDeletes
+ {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!("Content type of entry {:?} should have
DataContentType::EqualityDeletes or DataContentType::PositionDeletes",
data_file.content),
+ ));
+ }
+ }
+ }
+ Ok(())
+ }
+
+ /// Add a new manifest entry. This method will update following status of
the entry:
+ /// - Update the entry status to `Added`
+ /// - Set the snapshot id to the current snapshot id
+ /// - Set the sequence number to `None` if it is invalid(smaller than 0)
+ /// - Set the file sequence number to `None`
+ pub(crate) fn add(&mut self, mut entry: ManifestEntry) -> Result<()> {
+ self.check_data_file(&entry.data_file)?;
+ if entry.sequence_number().is_some_and(|n| n >= 0) {
+ entry.status = ManifestStatus::Added;
+ entry.snapshot_id = self.snapshot_id;
+ entry.file_sequence_number = None;
+ } else {
+ entry.status = ManifestStatus::Added;
+ entry.snapshot_id = self.snapshot_id;
+ entry.sequence_number = None;
+ entry.file_sequence_number = None;
+ };
+ self.add_entry(entry)?;
+ Ok(())
+ }
+
+ /// Add an added entry for a file with a specific sequence number. The
entry's snapshot ID will be this manifest's snapshot ID. The entry's data
sequence
+ /// number will be the provided data sequence number. The entry's file
sequence number will be
+ /// assigned at commit.
+ pub fn add_file(&mut self, data_file: DataFile, sequence_number: i64) ->
Result<()> {
+ self.check_data_file(&data_file)?;
+ let entry = ManifestEntry {
+ status: ManifestStatus::Added,
+ snapshot_id: self.snapshot_id,
+ sequence_number: (sequence_number >= 0).then_some(sequence_number),
+ file_sequence_number: None,
+ data_file,
+ };
+ self.add_entry(entry)?;
+ Ok(())
+ }
+
+ /// Add a delete manifest entry. This method will update following status
of the entry:
+ /// - Update the entry status to `Deleted`
+ /// - Set the snapshot id to the current snapshot id
+ ///
+ /// # TODO
+ /// Remove this allow later
+ #[allow(dead_code)]
+ pub(crate) fn delete(&mut self, mut entry: ManifestEntry) -> Result<()> {
+ self.check_data_file(&entry.data_file)?;
+ entry.status = ManifestStatus::Deleted;
+ entry.snapshot_id = self.snapshot_id;
+ self.add_entry(entry)?;
+ Ok(())
+ }
+
+ /// Add a delete manifest entry. The entry's snapshot ID will be this
manifest's snapshot ID.
+ /// However, the original data and file sequence numbers of the file must
be preserved when
+ /// the file is marked as deleted.
+ pub fn delete_file(
+ &mut self,
+ data_file: DataFile,
+ sequence_number: i64,
+ file_sequence_number: i64,
Review Comment:
This should be optional, file_sequence_number could be inherited from
snapshot.
##########
crates/iceberg/src/spec/manifest.rs:
##########
@@ -41,6 +41,9 @@ use crate::io::OutputFile;
use crate::spec::PartitionField;
use crate::{Error, ErrorKind};
+/// Placeholder for snapshot ID. The field with this value must be replaced
with the actual snapshot ID before it is committed.
+pub const UNASSIGNED_SNAPSHOT_ID: i64 = -1;
Review Comment:
We should move this to `snapshot` module.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]