This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new e294de77 feat: View Metadata Builder (#908)
e294de77 is described below
commit e294de77cabef294fb38c9ab822eb154367d544f
Author: Christian <[email protected]>
AuthorDate: Tue Feb 25 14:34:34 2025 +0100
feat: View Metadata Builder (#908)
This PR is not completely ready yet as I believe the current mechanism
of view expiration is flawed.
I opened a PR in Java to demonstrate the problem and use for
discussions:
https://github.com/apache/iceberg/pull/12051
Feedback from anyone is welcome. I am not sure what the best solutions
looks like.
---------
Co-authored-by: Fokko Driesprong <[email protected]>
---
crates/iceberg/src/io/mod.rs | 2 +-
crates/iceberg/src/spec/mod.rs | 1 +
crates/iceberg/src/spec/table_metadata_builder.rs | 3 +-
crates/iceberg/src/spec/view_metadata.rs | 145 +-
crates/iceberg/src/spec/view_metadata_builder.rs | 1581 +++++++++++++++++++++
crates/iceberg/src/spec/view_version.rs | 80 +-
6 files changed, 1720 insertions(+), 92 deletions(-)
diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs
index 215ea67c..948c57d7 100644
--- a/crates/iceberg/src/io/mod.rs
+++ b/crates/iceberg/src/io/mod.rs
@@ -89,6 +89,6 @@ mod storage_gcs;
#[cfg(feature = "storage-gcs")]
pub use storage_gcs::*;
-fn is_truthy(value: &str) -> bool {
+pub(crate) fn is_truthy(value: &str) -> bool {
["true", "t", "1", "on"].contains(&value.to_lowercase().as_str())
}
diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs
index b3f13b8d..c049483d 100644
--- a/crates/iceberg/src/spec/mod.rs
+++ b/crates/iceberg/src/spec/mod.rs
@@ -31,6 +31,7 @@ mod table_metadata_builder;
mod transform;
mod values;
mod view_metadata;
+mod view_metadata_builder;
mod view_version;
pub use datatypes::*;
diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs
b/crates/iceberg/src/spec/table_metadata_builder.rs
index 28e2f4e8..26ee52b5 100644
--- a/crates/iceberg/src/spec/table_metadata_builder.rs
+++ b/crates/iceberg/src/spec/table_metadata_builder.rs
@@ -138,7 +138,6 @@ impl TableMetadataBuilder {
}
/// Creates a new table metadata builder from the given metadata to modify
it.
-
/// `current_file_location` is the location where the current version
/// of the metadata file is stored. This is used to update the metadata
log.
/// If `current_file_location` is `None`, the metadata log will not be
updated.
@@ -312,7 +311,7 @@ impl TableMetadataBuilder {
Ok(self)
}
- /// Set the location of the table metadata, stripping any trailing slashes.
+ /// Set the location of the table, stripping any trailing slashes.
pub fn set_location(mut self, location: String) -> Self {
let location = location.trim_end_matches('/').to_string();
if self.metadata.location != location {
diff --git a/crates/iceberg/src/spec/view_metadata.rs
b/crates/iceberg/src/spec/view_metadata.rs
index 7c247c17..162b6f4c 100644
--- a/crates/iceberg/src/spec/view_metadata.rs
+++ b/crates/iceberg/src/spec/view_metadata.rs
@@ -29,16 +29,27 @@ use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
use uuid::Uuid;
-use super::view_version::{ViewVersion, ViewVersionId, ViewVersionRef};
+pub use super::view_metadata_builder::ViewMetadataBuilder;
+use super::view_version::{ViewVersionId, ViewVersionRef};
use super::{SchemaId, SchemaRef};
-use crate::catalog::ViewCreation;
use crate::error::{timestamp_ms_to_utc, Result};
+use crate::{Error, ErrorKind};
/// Reference to [`ViewMetadata`].
pub type ViewMetadataRef = Arc<ViewMetadata>;
+// ID of the initial version of views
pub(crate) static INITIAL_VIEW_VERSION_ID: i32 = 1;
+/// Property key for allowing to drop dialects when replacing a view.
+pub const VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED: &str =
"replace.drop-dialect.allowed";
+/// Default value for the property key for allowing to drop dialects when
replacing a view.
+pub const VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED_DEFAULT: bool = false;
+/// Property key for the number of history entries to keep.
+pub const VIEW_PROPERTY_VERSION_HISTORY_SIZE: &str =
"version.history.num-entries";
+/// Default value for the property key for the number of history entries to
keep.
+pub const VIEW_PROPERTY_VERSION_HISTORY_SIZE_DEFAULT: usize = 10;
+
#[derive(Debug, PartialEq, Deserialize, Eq, Clone)]
#[serde(try_from = "ViewMetadataEnum", into = "ViewMetadataEnum")]
/// Fields for the version 1 of the view metadata.
@@ -60,7 +71,7 @@ pub struct ViewMetadata {
/// change to current-version-id
pub(crate) version_log: Vec<ViewVersionLog>,
/// A list of schemas, stored as objects with schema-id.
- pub(crate) schemas: HashMap<i32, SchemaRef>,
+ pub(crate) schemas: HashMap<SchemaId, SchemaRef>,
/// A string to string map of view properties.
/// Properties are used for metadata such as comment and for settings that
/// affect view maintenance. This is not intended to be used for arbitrary
metadata.
@@ -68,6 +79,12 @@ pub struct ViewMetadata {
}
impl ViewMetadata {
+ /// Convert this View Metadata into a builder for modification.
+ #[must_use]
+ pub fn into_builder(self) -> ViewMetadataBuilder {
+ ViewMetadataBuilder::new_from_metadata(self)
+ }
+
/// Returns format version of this metadata.
#[inline]
pub fn format_version(&self) -> ViewFormatVersion {
@@ -143,65 +160,36 @@ impl ViewMetadata {
pub fn history(&self) -> &[ViewVersionLog] {
&self.version_log
}
-}
-
-/// Manipulating view metadata.
-pub struct ViewMetadataBuilder(ViewMetadata);
-
-impl ViewMetadataBuilder {
- /// Creates a new view metadata builder from the given view metadata.
- pub fn new(origin: ViewMetadata) -> Self {
- Self(origin)
- }
-
- /// Creates a new view metadata builder from the given view creation.
- pub fn from_view_creation(view_creation: ViewCreation) -> Result<Self> {
- let ViewCreation {
- location,
- schema,
- properties,
- name: _,
- representations,
- default_catalog,
- default_namespace,
- summary,
- } = view_creation;
- let initial_version_id = super::INITIAL_VIEW_VERSION_ID;
- let version = ViewVersion::builder()
- .with_default_catalog(default_catalog)
- .with_default_namespace(default_namespace)
- .with_representations(representations)
- .with_schema_id(schema.schema_id())
- .with_summary(summary)
- .with_timestamp_ms(Utc::now().timestamp_millis())
- .with_version_id(initial_version_id)
- .build();
-
- let versions = HashMap::from_iter(vec![(initial_version_id,
version.into())]);
-
- let view_metadata = ViewMetadata {
- format_version: ViewFormatVersion::V1,
- view_uuid: Uuid::now_v7(),
- location,
- current_version_id: initial_version_id,
- versions,
- version_log: Vec::new(),
- schemas: HashMap::from_iter(vec![(schema.schema_id(),
Arc::new(schema))]),
- properties,
- };
- Ok(Self(view_metadata))
+ /// Validate the view metadata.
+ pub(super) fn validate(&self) -> Result<()> {
+ self.validate_current_version_id()?;
+ self.validate_current_schema_id()?;
+ Ok(())
}
- /// Changes uuid of view metadata.
- pub fn assign_uuid(mut self, uuid: Uuid) -> Result<Self> {
- self.0.view_uuid = uuid;
- Ok(self)
+ fn validate_current_version_id(&self) -> Result<()> {
+ if !self.versions.contains_key(&self.current_version_id) {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "No version exists with the current version id {}.",
+ self.current_version_id
+ ),
+ ));
+ }
+ Ok(())
}
- /// Returns the new view metadata after changes.
- pub fn build(self) -> Result<ViewMetadata> {
- Ok(self.0)
+ fn validate_current_schema_id(&self) -> Result<()> {
+ let schema_id = self.current_version().schema_id();
+ if !self.schemas.contains_key(&schema_id) {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!("No schema exists with the schema id {}.", schema_id),
+ ));
+ }
+ Ok(())
}
}
@@ -258,7 +246,7 @@ pub(super) mod _serde {
use crate::spec::table_metadata::_serde::VersionNumber;
use crate::spec::view_version::_serde::ViewVersionV1;
use crate::spec::{ViewMetadata, ViewVersion};
- use crate::{Error, ErrorKind};
+ use crate::Error;
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(untagged)]
@@ -326,28 +314,8 @@ pub(super) mod _serde {
.map(|x| Ok((x.version_id,
Arc::new(ViewVersion::from(x)))))
.collect::<Result<Vec<_>, Error>>()?,
);
- // Make sure at least the current schema exists
- let current_version =
- versions
- .get(&value.current_version_id)
- .ok_or(self::Error::new(
- ErrorKind::DataInvalid,
- format!(
- "No version exists with the current version id
{}.",
- value.current_version_id
- ),
- ))?;
- if !schemas.contains_key(¤t_version.schema_id()) {
- return Err(self::Error::new(
- ErrorKind::DataInvalid,
- format!(
- "No schema exists with the schema id {}.",
- current_version.schema_id()
- ),
- ));
- }
- Ok(ViewMetadata {
+ let view_metadata = ViewMetadata {
format_version: ViewFormatVersion::V1,
view_uuid: value.view_uuid,
location: value.location,
@@ -356,7 +324,9 @@ pub(super) mod _serde {
current_version_id: value.current_version_id,
versions,
version_log: value.version_log,
- })
+ };
+ view_metadata.validate()?;
+ Ok(view_metadata)
}
}
@@ -423,7 +393,7 @@ impl Display for ViewFormatVersion {
}
#[cfg(test)]
-mod tests {
+pub(crate) mod tests {
use std::collections::HashMap;
use std::fs;
use std::sync::Arc;
@@ -435,7 +405,7 @@ mod tests {
use super::{ViewFormatVersion, ViewMetadataBuilder, ViewVersionLog};
use crate::spec::{
NestedField, PrimitiveType, Schema, SqlViewRepresentation, Type,
ViewMetadata,
- ViewRepresentations, ViewVersion,
+ ViewRepresentations, ViewVersion, INITIAL_VIEW_VERSION_ID,
};
use crate::{NamespaceIdent, ViewCreation};
@@ -449,7 +419,7 @@ mod tests {
assert_eq!(parsed_json_value, desered_type);
}
- fn get_test_view_metadata(file_name: &str) -> ViewMetadata {
+ pub(crate) fn get_test_view_metadata(file_name: &str) -> ViewMetadata {
let path = format!("testdata/view_metadata/{}", file_name);
let metadata: String = fs::read_to_string(path).unwrap();
@@ -578,13 +548,14 @@ mod tests {
let metadata = ViewMetadataBuilder::from_view_creation(creation)
.unwrap()
.build()
- .unwrap();
+ .unwrap()
+ .metadata;
assert_eq!(
metadata.location(),
"s3://bucket/warehouse/default.db/event_agg"
);
- assert_eq!(metadata.current_version_id(), 1);
+ assert_eq!(metadata.current_version_id(), INITIAL_VIEW_VERSION_ID);
assert_eq!(metadata.versions().count(), 1);
assert_eq!(metadata.schemas_iter().count(), 1);
assert_eq!(metadata.properties().len(), 0);
@@ -652,9 +623,9 @@ mod tests {
#[test]
fn test_view_builder_assign_uuid() {
let metadata = get_test_view_metadata("ViewMetadataV1Valid.json");
- let metadata_builder = ViewMetadataBuilder::new(metadata);
+ let metadata_builder = metadata.into_builder();
let uuid = Uuid::new_v4();
- let metadata =
metadata_builder.assign_uuid(uuid).unwrap().build().unwrap();
+ let metadata =
metadata_builder.assign_uuid(uuid).build().unwrap().metadata;
assert_eq!(metadata.uuid(), uuid);
}
diff --git a/crates/iceberg/src/spec/view_metadata_builder.rs
b/crates/iceberg/src/spec/view_metadata_builder.rs
new file mode 100644
index 00000000..43b78c92
--- /dev/null
+++ b/crates/iceberg/src/spec/view_metadata_builder.rs
@@ -0,0 +1,1581 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
+use chrono::Utc;
+use itertools::Itertools;
+use uuid::Uuid;
+
+use super::{
+ Schema, SchemaId, TableMetadataBuilder, ViewFormatVersion, ViewMetadata,
ViewRepresentation,
+ ViewVersion, ViewVersionLog, ViewVersionRef, DEFAULT_SCHEMA_ID,
INITIAL_VIEW_VERSION_ID,
+ ONE_MINUTE_MS, VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED,
+ VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED_DEFAULT,
VIEW_PROPERTY_VERSION_HISTORY_SIZE,
+ VIEW_PROPERTY_VERSION_HISTORY_SIZE_DEFAULT,
+};
+use crate::catalog::ViewUpdate;
+use crate::error::{Error, ErrorKind, Result};
+use crate::io::is_truthy;
+use crate::ViewCreation;
+
+/// Manipulating view metadata.
+///
+/// For this builder the order of called functions matters.
+/// All operations applied to the `ViewMetadata` are tracked in `changes` as
a chronologically
+/// ordered vec of `ViewUpdate`.
+/// If an operation does not lead to a change of the `ViewMetadata`, the
corresponding update
+/// is omitted from `changes`.
+#[derive(Debug, Clone)]
+pub struct ViewMetadataBuilder {
+ metadata: ViewMetadata,
+ changes: Vec<ViewUpdate>,
+ last_added_schema_id: Option<SchemaId>,
+ last_added_version_id: Option<SchemaId>,
+ history_entry: Option<ViewVersionLog>,
+ // Previous view version is only used during build to check
+ // weather dialects are dropped or not.
+ previous_view_version: Option<ViewVersionRef>,
+}
+
+#[derive(Debug, Clone, PartialEq)]
+/// Result of modifying or creating a `ViewMetadata`.
+pub struct ViewMetadataBuildResult {
+ /// The new `ViewMetadata`.
+ pub metadata: ViewMetadata,
+ /// The changes that were applied to the metadata.
+ pub changes: Vec<ViewUpdate>,
+}
+
+impl ViewMetadataBuilder {
+ const LAST_ADDED: i32 = TableMetadataBuilder::LAST_ADDED;
+
+ /// Creates a new view metadata builder.
+ pub fn new(
+ location: String,
+ schema: Schema,
+ view_version: ViewVersion,
+ format_version: ViewFormatVersion,
+ properties: HashMap<String, String>,
+ ) -> Result<Self> {
+ let builder = Self {
+ metadata: ViewMetadata {
+ format_version,
+ view_uuid: Uuid::now_v7(),
+ location: "".to_string(), // Overwritten immediately by
set_location
+ current_version_id: -1, // Overwritten immediately by
set_current_version,
+ versions: HashMap::new(), // Overwritten immediately by
set_current_version
+ version_log: Vec::new(),
+ schemas: HashMap::new(), // Overwritten immediately by
set_current_version
+ properties: HashMap::new(), // Overwritten immediately by
set_properties
+ },
+ changes: vec![],
+ last_added_schema_id: None, // Overwritten immediately by
set_current_version
+ last_added_version_id: None, // Overwritten immediately by
set_current_version
+ history_entry: None,
+ previous_view_version: None, // This is a new view
+ };
+
+ builder
+ .set_location(location)
+ .set_current_version(view_version, schema)?
+ .set_properties(properties)
+ }
+
+ /// Creates a new view metadata builder from the given metadata to modify
it.
+ #[must_use]
+ pub fn new_from_metadata(previous: ViewMetadata) -> Self {
+ let previous_view_version = previous.current_version().clone();
+ Self {
+ metadata: previous,
+ changes: Vec::default(),
+ last_added_schema_id: None,
+ last_added_version_id: None,
+ history_entry: None,
+ previous_view_version: Some(previous_view_version),
+ }
+ }
+
+ /// Creates a new view metadata builder from the given view creation.
+ pub fn from_view_creation(view_creation: ViewCreation) -> Result<Self> {
+ let ViewCreation {
+ location,
+ schema,
+ properties,
+ name: _,
+ representations,
+ default_catalog,
+ default_namespace,
+ summary,
+ } = view_creation;
+ let version = ViewVersion::builder()
+ .with_default_catalog(default_catalog)
+ .with_default_namespace(default_namespace)
+ .with_representations(representations)
+ .with_schema_id(schema.schema_id())
+ .with_summary(summary)
+ .with_timestamp_ms(Utc::now().timestamp_millis())
+ .with_version_id(INITIAL_VIEW_VERSION_ID)
+ .build();
+
+ Self::new(location, schema, version, ViewFormatVersion::V1, properties)
+ }
+
+ /// Upgrade `FormatVersion`. Downgrades are not allowed.
+ ///
+ /// # Errors
+ /// - Cannot downgrade to older format versions.
+ pub fn upgrade_format_version(self, format_version: ViewFormatVersion) ->
Result<Self> {
+ if format_version < self.metadata.format_version {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot downgrade ViewFormatVersion from {} to {}",
+ self.metadata.format_version, format_version
+ ),
+ ));
+ }
+
+ if format_version != self.metadata.format_version {
+ match format_version {
+ ViewFormatVersion::V1 => {
+ // No changes needed for V1
+ }
+ }
+ }
+
+ Ok(self)
+ }
+
+ /// Set the location of the view, stripping any trailing slashes.
+ pub fn set_location(mut self, location: String) -> Self {
+ let location = location.trim_end_matches('/').to_string();
+ if self.metadata.location != location {
+ self.changes.push(ViewUpdate::SetLocation {
+ location: location.clone(),
+ });
+ self.metadata.location = location;
+ }
+
+ self
+ }
+
+ /// Set an existing view version as the current version.
+ ///
+ /// # Errors
+ /// - The specified `version_id` does not exist.
+ /// - The specified `version_id` is `-1` but no version has been added.
+ pub fn set_current_version_id(mut self, mut version_id: i32) ->
Result<Self> {
+ if version_id == Self::LAST_ADDED &&
self.last_added_version_id.is_none() {
+ version_id = self.last_added_version_id.ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Cannot set current version id to last added version: no
version has been added.",
+ )
+ })?;
+ }
+ let version_id = version_id; // make immutable
+
+ if version_id == self.metadata.current_version_id {
+ return Ok(self);
+ }
+
+ let version = self.metadata.versions.get(&version_id).ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot set current version to unknown version with id:
{}",
+ version_id
+ ),
+ )
+ })?;
+
+ self.metadata.current_version_id = version_id;
+
+ if self.last_added_version_id == Some(version_id) {
+ self.changes.push(ViewUpdate::SetCurrentViewVersion {
+ view_version_id: Self::LAST_ADDED,
+ });
+ } else {
+ self.changes.push(ViewUpdate::SetCurrentViewVersion {
+ view_version_id: version_id,
+ });
+ }
+
+ self.history_entry = Some(version.log());
+
+ Ok(self)
+ }
+
+ /// Add a new view version and set it as current.
+ pub fn set_current_version(
+ mut self,
+ view_version: ViewVersion,
+ schema: Schema,
+ ) -> Result<Self> {
+ let schema_id = self.add_schema_internal(schema);
+ let view_version = view_version.with_schema_id(schema_id);
+ let view_version_id = self.add_version_internal(view_version)?;
+ self.set_current_version_id(view_version_id)
+ }
+
+ /// Add a new version to the view.
+ ///
+ /// # Errors
+ /// - The schema ID of the version is set to `-1`, but no schema has been
added.
+ /// - The schema ID of the specified version is unknown.
+ /// - Multiple queries for the same dialect are added.
+ pub fn add_version(mut self, view_version: ViewVersion) -> Result<Self> {
+ self.add_version_internal(view_version)?;
+
+ Ok(self)
+ }
+
+ fn add_version_internal(&mut self, view_version: ViewVersion) ->
Result<i32> {
+ let version_id =
self.reuse_or_create_new_view_version_id(&view_version);
+ let view_version = view_version.with_version_id(version_id);
+
+ if self.metadata.versions.contains_key(&version_id) {
+ // ToDo Discuss: Similar to TableMetadata sort-order, Java does
not add changes
+ // in this case. I prefer to add changes as the state of the
builder is
+ // potentially mutated (`last_added_version_id`), thus we should
record the change.
+ if self.last_added_version_id != Some(version_id) {
+ self.changes.push(ViewUpdate::AddViewVersion {
+ view_version: view_version.with_version_id(version_id),
+ });
+ self.last_added_version_id = Some(version_id);
+ }
+ return Ok(version_id);
+ }
+
+ let view_version = if view_version.schema_id() == Self::LAST_ADDED {
+ let last_added_schema_id = self.last_added_schema_id.ok_or_else(||
{
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Cannot set last added schema: no schema has been added",
+ )
+ })?;
+ view_version.with_schema_id(last_added_schema_id)
+ } else {
+ view_version
+ };
+
+ if !self
+ .metadata
+ .schemas
+ .contains_key(&view_version.schema_id())
+ {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot add version with unknown schema: {}",
+ view_version.schema_id()
+ ),
+ ));
+ }
+
+ require_unique_dialects(&view_version)?;
+
+ // ToDo Discuss: This check is not present in Java.
+ // The `TableMetadataBuilder` uses these checks in multiple places -
also in Java.
+ // If we think delayed requests are a problem, I think we should also
add it here.
+ if let Some(last) = self.metadata.version_log.last() {
+ // commits can happen concurrently from different machines.
+ // A tolerance helps us avoid failure for small clock skew
+ if view_version.timestamp_ms() - last.timestamp_ms() <
-ONE_MINUTE_MS {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Invalid snapshot timestamp {}: before last snapshot
timestamp {}",
+ view_version.timestamp_ms(),
+ last.timestamp_ms()
+ ),
+ ));
+ }
+ }
+
+ self.metadata
+ .versions
+ .insert(version_id, Arc::new(view_version.clone()));
+
+ let view_version = if let Some(last_added_schema_id) =
self.last_added_schema_id {
+ if view_version.schema_id() == last_added_schema_id {
+ view_version.with_schema_id(Self::LAST_ADDED)
+ } else {
+ view_version
+ }
+ } else {
+ view_version
+ };
+ self.changes
+ .push(ViewUpdate::AddViewVersion { view_version });
+
+ self.last_added_version_id = Some(version_id);
+
+ Ok(version_id)
+ }
+
+ fn reuse_or_create_new_view_version_id(&self, new_view_version:
&ViewVersion) -> i32 {
+ self.metadata
+ .versions
+ .iter()
+ .find_map(|(id, other_version)| {
+ new_view_version
+ .behaves_identical_to(other_version)
+ .then_some(*id)
+ })
+ .unwrap_or_else(|| {
+ self.get_highest_view_version_id()
+ .map(|id| id + 1)
+ .unwrap_or(INITIAL_VIEW_VERSION_ID)
+ })
+ }
+
+ fn get_highest_view_version_id(&self) -> Option<i32> {
+ self.metadata.versions.keys().max().copied()
+ }
+
+ /// Add a new schema to the view.
+ pub fn add_schema(mut self, schema: Schema) -> Self {
+ self.add_schema_internal(schema);
+
+ self
+ }
+
+ fn add_schema_internal(&mut self, schema: Schema) -> SchemaId {
+ let schema_id = self.reuse_or_create_new_schema_id(&schema);
+
+ if self.metadata.schemas.contains_key(&schema_id) {
+ // ToDo Discuss: Java does not add changes in this case. I prefer
to add changes
+ // as the state of the builder is potentially mutated
(`last_added_schema_id`),
+ // thus we should record the change.
+ if self.last_added_schema_id != Some(schema_id) {
+ self.changes.push(ViewUpdate::AddSchema {
+ schema: schema.clone().with_schema_id(schema_id),
+ last_column_id: None,
+ });
+ self.last_added_schema_id = Some(schema_id);
+ }
+ return schema_id;
+ }
+
+ let schema = schema.with_schema_id(schema_id);
+
+ self.metadata
+ .schemas
+ .insert(schema_id, Arc::new(schema.clone()));
+ let last_column_id = schema.highest_field_id();
+ self.changes.push(ViewUpdate::AddSchema {
+ schema,
+ last_column_id: Some(last_column_id),
+ });
+
+ self.last_added_schema_id = Some(schema_id);
+
+ schema_id
+ }
+
+ fn reuse_or_create_new_schema_id(&self, new_schema: &Schema) -> SchemaId {
+ self.metadata
+ .schemas
+ .iter()
+ .find_map(|(id, schema)|
new_schema.is_same_schema(schema).then_some(*id))
+ .unwrap_or_else(|| {
+ self.get_highest_schema_id()
+ .map(|id| id + 1)
+ .unwrap_or(DEFAULT_SCHEMA_ID)
+ })
+ }
+
+ fn get_highest_schema_id(&self) -> Option<SchemaId> {
+ self.metadata.schemas.keys().max().copied()
+ }
+
+ /// Update properties of the view.
+ pub fn set_properties(mut self, updates: HashMap<String, String>) ->
Result<Self> {
+ if updates.is_empty() {
+ return Ok(self);
+ }
+
+ let num_versions_to_keep = updates
+ .get(VIEW_PROPERTY_VERSION_HISTORY_SIZE)
+ .and_then(|v| v.parse::<i64>().ok())
+ .unwrap_or(1);
+ if num_versions_to_keep < 0 {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "{} must be positive but was {}",
+ VIEW_PROPERTY_VERSION_HISTORY_SIZE, num_versions_to_keep
+ ),
+ ));
+ }
+
+ self.metadata.properties.extend(updates.clone());
+ self.changes.push(ViewUpdate::SetProperties { updates });
+
+ Ok(self)
+ }
+
+ /// Remove properties from the view
+ pub fn remove_properties(mut self, removals: &[String]) -> Self {
+ if removals.is_empty() {
+ return self;
+ }
+
+ for property in removals {
+ self.metadata.properties.remove(property);
+ }
+
+ self.changes.push(ViewUpdate::RemoveProperties {
+ removals: removals.to_vec(),
+ });
+
+ self
+ }
+
+ /// Assign a new UUID to the view.
+ pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
+ if self.metadata.view_uuid != uuid {
+ self.metadata.view_uuid = uuid;
+ self.changes.push(ViewUpdate::AssignUuid { uuid });
+ }
+
+ self
+ }
+
+ /// Build the `ViewMetadata` from the changes.
+ pub fn build(mut self) -> Result<ViewMetadataBuildResult> {
+ if let Some(history_entry) = self.history_entry.take() {
+ self.metadata.version_log.push(history_entry);
+ }
+
+ // We should run validate before `self.metadata.current_version()`
below,
+ // as it might panic if the metadata is invalid.
+ self.metadata.validate()?;
+
+ if let Some(previous) = self.previous_view_version.take() {
+ if !allow_replace_drop_dialects(&self.metadata.properties) {
+ require_no_dialect_dropped(&previous,
self.metadata.current_version())?;
+ }
+ }
+
+ let _expired_versions = self.expire_versions();
+ self.metadata.version_log = update_version_log(
+ self.metadata.version_log,
+ self.metadata.versions.keys().copied().collect(),
+ );
+
+ Ok(ViewMetadataBuildResult {
+ metadata: self.metadata,
+ changes: self.changes,
+ })
+ }
+
+ /// Removes expired versions from the view and returns them.
+ fn expire_versions(&mut self) -> Vec<ViewVersionRef> {
+ let num_versions_to_keep = self
+ .metadata
+ .properties
+ .get(VIEW_PROPERTY_VERSION_HISTORY_SIZE)
+ .and_then(|v| v.parse::<usize>().ok())
+ .unwrap_or(VIEW_PROPERTY_VERSION_HISTORY_SIZE_DEFAULT)
+ .max(1);
+
+ // expire old versions, but keep at least the versions added in this
builder
+ let num_added_versions = self
+ .changes
+ .iter()
+ .filter(|update| matches!(update, ViewUpdate::AddViewVersion { ..
}))
+ .count();
+ let num_versions_to_keep =
num_added_versions.max(num_versions_to_keep);
+
+ if self.metadata.versions.len() > num_versions_to_keep {
+ // version ids are assigned sequentially. keep the latest versions
by ID.
+ let mut versions_to_keep = self
+ .metadata
+ .versions
+ .keys()
+ .copied()
+ .sorted()
+ .rev()
+ .take(num_versions_to_keep)
+ .collect::<HashSet<_>>();
+
+ // always retain current version
+ if !versions_to_keep.contains(&self.metadata.current_version_id) {
+ // Remove the lowest ID
+ if num_versions_to_keep > num_added_versions {
+ let lowest_id = versions_to_keep.iter().min().copied();
+ lowest_id.map(|id| versions_to_keep.remove(&id));
+ }
+ // Add the current version ID
+ versions_to_keep.insert(self.metadata.current_version_id);
+ }
+
+ let mut expired_versions = Vec::new();
+ // remove all versions which are not in versions_to_keep from the
metadata
+ // and add them to the expired_versions list
+ self.metadata.versions.retain(|id, version| {
+ if versions_to_keep.contains(id) {
+ true
+ } else {
+ expired_versions.push(version.clone());
+ false
+ }
+ });
+
+ expired_versions
+ } else {
+ Vec::new()
+ }
+ }
+}
+
+/// Expire version log entries that are no longer relevant.
+/// Returns the history entries to retain.
+fn update_version_log(
+ version_log: Vec<ViewVersionLog>,
+ ids_to_keep: HashSet<i32>,
+) -> Vec<ViewVersionLog> {
+ let mut retained_history = Vec::new();
+ for log_entry in version_log {
+ if ids_to_keep.contains(&log_entry.version_id()) {
+ retained_history.push(log_entry);
+ } else {
+ retained_history.clear();
+ }
+ }
+ retained_history
+}
+
+fn allow_replace_drop_dialects(properties: &HashMap<String, String>) -> bool {
+ properties
+ .get(VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED)
+ .map_or(
+ VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED_DEFAULT,
+ |value| is_truthy(value),
+ )
+}
+
+fn require_no_dialect_dropped(previous: &ViewVersion, current: &ViewVersion)
-> Result<()> {
+ let base_dialects = lowercase_sql_dialects_for(previous);
+ let updated_dialects = lowercase_sql_dialects_for(current);
+
+ if !updated_dialects.is_superset(&base_dialects) {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot replace view due to loss of view dialects: \nPrevious
dialects: {:?}\nNew dialects: {:?}\nSet {} to true to allow dropping dialects.",
+ Vec::from_iter(base_dialects),
Vec::from_iter(updated_dialects), VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED
+ ),
+ ));
+ }
+
+ Ok(())
+}
+
+fn lowercase_sql_dialects_for(view_version: &ViewVersion) -> HashSet<String> {
+ view_version
+ .representations()
+ .iter()
+ .map(|repr| match repr {
+ ViewRepresentation::Sql(sql_repr) =>
sql_repr.dialect.to_lowercase(),
+ })
+ .collect()
+}
+
+pub(super) fn require_unique_dialects(view_version: &ViewVersion) ->
Result<()> {
+ let mut seen_dialects =
HashSet::with_capacity(view_version.representations().len());
+ for repr in view_version.representations().iter() {
+ match repr {
+ ViewRepresentation::Sql(sql_repr) => {
+ if !seen_dialects.insert(sql_repr.dialect.to_lowercase()) {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Invalid view version: Cannot add multiple queries
for dialect {}",
+ sql_repr.dialect
+ ),
+ ));
+ }
+ }
+ }
+ }
+ Ok(())
+}
+
+#[cfg(test)]
+mod test {
+ use super::super::view_metadata::tests::get_test_view_metadata;
+ use super::*;
+ use crate::spec::{
+ NestedField, PrimitiveType, SqlViewRepresentation, Type,
ViewRepresentations,
+ };
+ use crate::NamespaceIdent;
+
+ fn new_view_version(id: usize, schema_id: SchemaId, sql: &str) ->
ViewVersion {
+ new_view_version_with_dialect(id, schema_id, sql, vec!["spark"])
+ }
+
+ fn new_view_version_with_dialect(
+ id: usize,
+ schema_id: SchemaId,
+ sql: &str,
+ dialects: Vec<&str>,
+ ) -> ViewVersion {
+ ViewVersion::builder()
+ .with_version_id(id as i32)
+ .with_schema_id(schema_id)
+ .with_timestamp_ms(1573518431300)
+ .with_default_catalog(Some("prod".to_string()))
+ .with_summary(HashMap::from_iter(vec![(
+ "user".to_string(),
+ "some-user".to_string(),
+ )]))
+ .with_representations(ViewRepresentations(
+ dialects
+ .iter()
+ .map(|dialect| {
+ ViewRepresentation::Sql(SqlViewRepresentation {
+ dialect: dialect.to_string(),
+ sql: sql.to_string(),
+ })
+ })
+ .collect(),
+ ))
+ .with_default_namespace(NamespaceIdent::new("default".to_string()))
+ .build()
+ }
+
+ fn builder_without_changes() -> ViewMetadataBuilder {
+
ViewMetadataBuilder::new_from_metadata(get_test_view_metadata("ViewMetadataV1Valid.json"))
+ }
+
+ #[test]
+ fn test_minimal_builder() {
+ let location = "s3://bucket/table".to_string();
+ let schema = Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![])
+ .build()
+ .unwrap();
+ // Version ID and schema should be re-assigned
+ let version = new_view_version(20, 21, "select 1 as count");
+ let format_version = ViewFormatVersion::V1;
+ let properties = HashMap::from_iter(vec![("key".to_string(),
"value".to_string())]);
+
+ let build_result = ViewMetadataBuilder::new(
+ location.clone(),
+ schema.clone(),
+ version.clone(),
+ format_version,
+ properties.clone(),
+ )
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let metadata = build_result.metadata;
+ assert_eq!(metadata.location, location);
+ assert_eq!(metadata.current_version_id, INITIAL_VIEW_VERSION_ID);
+ assert_eq!(metadata.format_version, format_version);
+ assert_eq!(metadata.properties, properties);
+ assert_eq!(metadata.versions.len(), 1);
+ assert_eq!(metadata.schemas.len(), 1);
+ assert_eq!(metadata.version_log.len(), 1);
+ assert_eq!(
+
Arc::unwrap_or_clone(metadata.versions[&INITIAL_VIEW_VERSION_ID].clone()),
+ version
+ .clone()
+ .with_version_id(INITIAL_VIEW_VERSION_ID)
+ .with_schema_id(0)
+ );
+
+ let changes = build_result.changes;
+ assert_eq!(changes.len(), 5);
+ assert!(changes.contains(&ViewUpdate::SetLocation { location }));
+ assert!(changes.contains(&ViewUpdate::AddViewVersion {
+ view_version: version
+ .with_version_id(INITIAL_VIEW_VERSION_ID)
+ .with_schema_id(-1)
+ }));
+ assert!(changes.contains(&ViewUpdate::SetCurrentViewVersion {
+ view_version_id: -1
+ }));
+ assert!(changes.contains(&ViewUpdate::AddSchema {
+ schema: schema.clone().with_schema_id(0),
+ last_column_id: Some(0)
+ }));
+ assert!(changes.contains(&ViewUpdate::SetProperties {
+ updates: properties
+ }));
+ }
+
+ #[test]
+ fn test_version_expiration() {
+ let v1 = new_view_version(0, 1, "select 1 as count");
+ let v2 = new_view_version(0, 1, "select count(1) as count from t2");
+ let v3 = new_view_version(0, 1, "select count from t1");
+
+ let builder = builder_without_changes()
+ .add_version(v1)
+ .unwrap()
+ .add_version(v2)
+ .unwrap()
+ .add_version(v3)
+ .unwrap();
+ let builder_without_changes =
builder.clone().build().unwrap().metadata.into_builder();
+
+ // No limit on versions
+ let metadata = builder.clone().build().unwrap().metadata;
+ assert_eq!(
+ metadata.versions.keys().cloned().collect::<HashSet<_>>(),
+ HashSet::from_iter(vec![1, 2, 3, 4])
+ );
+
+ // Limit to 2 versions, we still want to keep 3 versions as 3 where
added during this build
+ // Plus the current version
+ let metadata = builder
+ .clone()
+ .set_properties(HashMap::from_iter(vec![(
+ VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
+ "2".to_string(),
+ )]))
+ .unwrap()
+ .build()
+ .unwrap()
+ .metadata;
+ assert_eq!(
+ metadata.versions.keys().cloned().collect::<HashSet<_>>(),
+ HashSet::from_iter(vec![1, 2, 3, 4])
+ );
+ assert_eq!(metadata.version_log.len(), 1);
+
+ // Limit to 2 versions in new build, only keep 2.
+ // One of them should be the current
+ let metadata = builder_without_changes
+ .clone()
+ .set_properties(HashMap::from_iter(vec![(
+ VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
+ "2".to_string(),
+ )]))
+ .unwrap()
+ .build()
+ .unwrap()
+ .metadata;
+ assert_eq!(
+ metadata.versions.keys().cloned().collect::<HashSet<_>>(),
+ HashSet::from_iter(vec![1, 4])
+ );
+
+ // Keep at least 1 version irrespective of the limit.
+ // This is the current version
+ let metadata = builder_without_changes
+ .set_properties(HashMap::from_iter(vec![(
+ VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
+ "0".to_string(),
+ )]))
+ .unwrap()
+ .build()
+ .unwrap()
+ .metadata;
+ assert_eq!(
+ metadata.versions.keys().cloned().collect::<HashSet<_>>(),
+ HashSet::from_iter(vec![1])
+ );
+ }
+
+ #[test]
+ fn test_update_version_log() {
+ let v1 = new_view_version(1, 1, "select 1 as count");
+ let v2 = new_view_version(2, 1, "select count(1) as count from t2");
+ let v3 = new_view_version(3, 1, "select count from t1");
+
+ let one = ViewVersionLog::new(1, v1.timestamp_ms());
+ let two = ViewVersionLog::new(2, v2.timestamp_ms());
+ let three = ViewVersionLog::new(3, v3.timestamp_ms());
+
+ assert_eq!(
+ update_version_log(
+ vec![one.clone(), two.clone(), three.clone()],
+ HashSet::from_iter(vec![1, 2, 3])
+ ),
+ vec![one.clone(), two.clone(), three.clone()]
+ );
+
+ // one was an invalid entry in the history, so all previous elements
are removed
+ assert_eq!(
+ update_version_log(
+ vec![
+ three.clone(),
+ two.clone(),
+ one.clone(),
+ two.clone(),
+ three.clone()
+ ],
+ HashSet::from_iter(vec![2, 3])
+ ),
+ vec![two.clone(), three.clone()]
+ );
+
+ // two was an invalid entry in the history, so all previous elements
are removed
+ assert_eq!(
+ update_version_log(
+ vec![
+ one.clone(),
+ two.clone(),
+ three.clone(),
+ one.clone(),
+ three.clone()
+ ],
+ HashSet::from_iter(vec![1, 3])
+ ),
+ vec![three.clone(), one.clone(), three.clone()]
+ );
+ }
+
+ #[test]
+ fn test_assign_uuid() {
+ let builder = builder_without_changes();
+ let uuid = Uuid::now_v7();
+ let build_result = builder.clone().assign_uuid(uuid).build().unwrap();
+ assert_eq!(build_result.metadata.view_uuid, uuid);
+ assert_eq!(build_result.changes, vec![ViewUpdate::AssignUuid { uuid
}]);
+ }
+
+ #[test]
+ fn test_set_location() {
+ let builder = builder_without_changes();
+ let location = "s3://bucket/table".to_string();
+ let build_result = builder
+ .clone()
+ .set_location(location.clone())
+ .build()
+ .unwrap();
+ assert_eq!(build_result.metadata.location, location);
+ assert_eq!(build_result.changes, vec![ViewUpdate::SetLocation {
+ location
+ }]);
+ }
+
+ #[test]
+ fn test_set_and_remove_properties() {
+ let builder = builder_without_changes();
+ let properties = HashMap::from_iter(vec![
+ ("key1".to_string(), "value1".to_string()),
+ ("key2".to_string(), "value2".to_string()),
+ ]);
+ let build_result = builder
+ .clone()
+ .set_properties(properties.clone())
+ .unwrap()
+ .remove_properties(&["key2".to_string(), "key3".to_string()])
+ .build()
+ .unwrap();
+ assert_eq!(
+ build_result.metadata.properties.get("key1"),
+ Some(&"value1".to_string())
+ );
+ assert_eq!(build_result.metadata.properties.get("key2"), None);
+ assert_eq!(build_result.changes, vec![
+ ViewUpdate::SetProperties {
+ updates: properties
+ },
+ ViewUpdate::RemoveProperties {
+ removals: vec!["key2".to_string(), "key3".to_string()]
+ }
+ ]);
+ }
+
+ #[test]
+ fn test_add_schema() {
+ let builder = builder_without_changes();
+ let schema = Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![])
+ .build()
+ .unwrap();
+ let build_result =
builder.clone().add_schema(schema.clone()).build().unwrap();
+ assert_eq!(build_result.metadata.schemas.len(), 2);
+ assert_eq!(build_result.changes, vec![ViewUpdate::AddSchema {
+ schema: schema.clone().with_schema_id(2),
+ last_column_id: Some(0)
+ }]);
+
+ // Add schema again - id is reused
+ let build_result =
builder.clone().add_schema(schema.clone()).build().unwrap();
+ assert_eq!(build_result.metadata.schemas.len(), 2);
+ assert_eq!(build_result.changes, vec![ViewUpdate::AddSchema {
+ schema: schema.clone().with_schema_id(2),
+ last_column_id: Some(0)
+ }]);
+ }
+
+ #[test]
+ fn test_add_and_set_current_version() {
+ let builder = builder_without_changes();
+ let v1 = new_view_version(2, 1, "select 1 as count");
+ let v2 = new_view_version(3, 2, "select count(1) as count from t2");
+ let v2_schema = Schema::builder()
+ .with_schema_id(2)
+ .with_fields(vec![])
+ .build()
+ .unwrap();
+
+ let build_result = builder
+ .clone()
+ .add_version(v1.clone())
+ .unwrap()
+ .add_schema(v2_schema.clone())
+ .add_version(v2.clone())
+ .unwrap()
+ .set_current_version_id(3)
+ .unwrap()
+ .build()
+ .unwrap();
+
+ assert_eq!(build_result.metadata.current_version_id, 3);
+ assert_eq!(build_result.metadata.versions.len(), 3);
+ assert_eq!(build_result.metadata.schemas.len(), 2);
+ assert_eq!(build_result.metadata.version_log.len(), 2);
+ assert_eq!(
+ Arc::unwrap_or_clone(build_result.metadata.versions[&2].clone()),
+ v1.clone().with_version_id(2).with_schema_id(1)
+ );
+ assert_eq!(
+ Arc::unwrap_or_clone(build_result.metadata.versions[&3].clone()),
+ v2.clone().with_version_id(3).with_schema_id(2)
+ );
+ assert_eq!(build_result.changes.len(), 4);
+ assert_eq!(build_result.changes, vec![
+ ViewUpdate::AddViewVersion {
+ view_version: v1.clone().with_version_id(2).with_schema_id(1)
+ },
+ ViewUpdate::AddSchema {
+ schema: v2_schema.clone().with_schema_id(2),
+ last_column_id: Some(0)
+ },
+ ViewUpdate::AddViewVersion {
+ view_version: v2.clone().with_version_id(3).with_schema_id(-1)
+ },
+ ViewUpdate::SetCurrentViewVersion {
+ view_version_id: -1
+ }
+ ]);
+ assert_eq!(
+ build_result
+ .metadata
+ .version_log
+ .iter()
+ .map(|v| v.version_id())
+ .collect::<Vec<_>>(),
+ vec![1, 3]
+ );
+ }
+
+ #[test]
+ fn test_schema_and_version_id_reassignment() {
+ let builder = builder_without_changes();
+ let v1 = new_view_version(0, 1, "select 1 as count");
+ let v2 = new_view_version(0, 2, "select count(1) as count from t2");
+ let v2_schema = Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![])
+ .build()
+ .unwrap();
+
+ let build_result = builder
+ .clone()
+ .add_version(v1.clone())
+ .unwrap()
+ .set_current_version(v2.clone(), v2_schema.clone())
+ .unwrap()
+ .build()
+ .unwrap();
+
+ assert_eq!(build_result.metadata.current_version_id, 3);
+ assert_eq!(build_result.metadata.versions.len(), 3);
+ assert_eq!(build_result.metadata.schemas.len(), 2);
+ assert_eq!(build_result.metadata.version_log.len(), 2);
+ assert_eq!(
+ Arc::unwrap_or_clone(build_result.metadata.versions[&2].clone()),
+ v1.clone().with_version_id(2).with_schema_id(1)
+ );
+ assert_eq!(
+ Arc::unwrap_or_clone(build_result.metadata.versions[&3].clone()),
+ v2.clone().with_version_id(3).with_schema_id(2)
+ );
+ assert_eq!(build_result.changes.len(), 4);
+ assert_eq!(build_result.changes, vec![
+ ViewUpdate::AddViewVersion {
+ view_version: v1.clone().with_version_id(2).with_schema_id(1)
+ },
+ ViewUpdate::AddSchema {
+ schema: v2_schema.clone().with_schema_id(2),
+ last_column_id: Some(0)
+ },
+ ViewUpdate::AddViewVersion {
+ view_version: v2.clone().with_version_id(3).with_schema_id(-1)
+ },
+ ViewUpdate::SetCurrentViewVersion {
+ view_version_id: -1
+ }
+ ]);
+ assert_eq!(
+ build_result
+ .metadata
+ .version_log
+ .iter()
+ .map(|v| v.version_id())
+ .collect::<Vec<_>>(),
+ vec![1, 3]
+ );
+ }
+
+ #[test]
+ fn test_view_version_deduplication() {
+ let builder = builder_without_changes();
+ let v1 = new_view_version(0, 1, "select * from ns.tbl");
+
+ assert_eq!(builder.metadata.versions.len(), 1);
+ let build_result = builder
+ .clone()
+ .add_version(v1.clone())
+ .unwrap()
+ .add_version(v1)
+ .unwrap()
+ .build()
+ .unwrap();
+
+ assert_eq!(build_result.metadata.versions.len(), 2);
+ assert_eq!(build_result.metadata.schemas.len(), 1);
+ }
+
+ #[test]
+ fn test_view_version_and_schema_deduplication() {
+ let schema_one = Schema::builder()
+ .with_schema_id(5)
+ .with_fields(vec![NestedField::required(
+ 1,
+ "x",
+ Type::Primitive(PrimitiveType::Long),
+ )
+ .into()])
+ .build()
+ .unwrap();
+ let schema_two = Schema::builder()
+ .with_schema_id(7)
+ .with_fields(vec![NestedField::required(
+ 1,
+ "y",
+ Type::Primitive(PrimitiveType::Long),
+ )
+ .into()])
+ .build()
+ .unwrap();
+ let schema_three = Schema::builder()
+ .with_schema_id(9)
+ .with_fields(vec![NestedField::required(
+ 1,
+ "z",
+ Type::Primitive(PrimitiveType::Long),
+ )
+ .into()])
+ .build()
+ .unwrap();
+
+ let v1 = new_view_version(1, 5, "select * from ns.tbl");
+ let v2 = new_view_version(1, 7, "select count(*) from ns.tbl");
+ let v3 = new_view_version(1, 9, "select count(*) as count from
ns.tbl");
+
+ let build_result = builder_without_changes()
+ .add_schema(schema_one.clone())
+ .add_schema(schema_two.clone())
+ .add_schema(schema_three.clone())
+ .set_current_version(v1.clone(), schema_one.clone())
+ .unwrap()
+ .set_current_version(v2.clone(), schema_two.clone())
+ .unwrap()
+ .set_current_version(v3.clone(), schema_three.clone())
+ .unwrap()
+ .set_current_version(v3.clone(), schema_three.clone())
+ .unwrap()
+ .set_current_version(v2.clone(), schema_two.clone())
+ .unwrap()
+ .set_current_version(v1.clone(), schema_one.clone())
+ .unwrap()
+ .build()
+ .unwrap();
+
+ assert_eq!(
+
Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
+ v1.clone().with_version_id(2).with_schema_id(2)
+ );
+ assert_eq!(build_result.metadata.versions.len(), 4);
+ assert_eq!(
+ build_result.metadata.versions[&2],
+ Arc::new(v1.clone().with_version_id(2).with_schema_id(2))
+ );
+ assert_eq!(
+ build_result.metadata.versions[&3],
+ Arc::new(v2.clone().with_version_id(3).with_schema_id(3))
+ );
+ assert_eq!(
+ build_result.metadata.versions[&4],
+ Arc::new(v3.clone().with_version_id(4).with_schema_id(4))
+ );
+ assert_eq!(
+ // Remove schema_id 1 and get struct only
+ build_result
+ .metadata
+ .schemas_iter()
+ .filter(|s| s.schema_id() != 1)
+ .sorted_by_key(|s| s.schema_id())
+ .map(|s| s.as_struct())
+ .collect::<Vec<_>>(),
+ vec![
+ schema_one.as_struct(),
+ schema_two.as_struct(),
+ schema_three.as_struct()
+ ]
+ )
+ }
+
+ #[test]
+ fn test_error_on_missing_schema() {
+ let builder = builder_without_changes();
+ // Missing schema
+ assert!(builder
+ .clone()
+ .add_version(new_view_version(0, 10, "SELECT * FROM foo"))
+ .unwrap_err()
+ .to_string()
+ .contains("Cannot add version with unknown schema: 10"));
+
+ // Missing last added schema
+ assert!(builder
+ .clone()
+ .add_version(new_view_version(0, -1, "SELECT * FROM foo"))
+ .unwrap_err()
+ .to_string()
+ .contains("Cannot set last added schema: no schema has been
added"));
+ }
+
+ #[test]
+ fn test_error_on_missing_current_version() {
+ let builder = builder_without_changes();
+ assert!(builder
+ .clone()
+ .set_current_version_id(-1)
+ .unwrap_err()
+ .to_string()
+ .contains(
+ "Cannot set current version id to last added version: no
version has been added."
+ ));
+ assert!(builder
+ .clone()
+ .set_current_version_id(10)
+ .unwrap_err()
+ .to_string()
+ .contains("Cannot set current version to unknown version with id:
10"));
+ }
+
+ #[test]
+ fn test_error_when_setting_negative_version_history_size() {
+ let builder = builder_without_changes();
+ assert!(builder
+ .clone()
+ .set_properties(HashMap::from_iter(vec![(
+ VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
+ "-1".to_string(),
+ )]))
+ .unwrap_err()
+ .to_string()
+ .contains("version.history.num-entries must be positive but was
-1"));
+ }
+
+ #[test]
+ fn test_view_version_changes() {
+ let builder = builder_without_changes();
+
+ let v1 = new_view_version(2, 1, "select 1 as count");
+ let v2 = new_view_version(3, 1, "select count(1) as count from t2");
+
+ let changes = builder
+ .clone()
+ .add_version(v1.clone())
+ .unwrap()
+ .add_version(v2.clone())
+ .unwrap()
+ .build()
+ .unwrap()
+ .changes;
+
+ assert_eq!(changes.len(), 2);
+ assert_eq!(changes, vec![
+ ViewUpdate::AddViewVersion {
+ view_version: v1.clone()
+ },
+ ViewUpdate::AddViewVersion {
+ view_version: v2.clone()
+ }
+ ]);
+ }
+
+ #[test]
+ fn test_dropping_dialect_fails_by_default() {
+ let builder = builder_without_changes();
+
+ let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo",
vec!["spark"]);
+ let spark_trino =
+ new_view_version_with_dialect(0, 0, "SELECT * FROM foo",
vec!["spark", "trino"]);
+ let schema = Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![])
+ .build()
+ .unwrap();
+
+ let err = builder
+ .set_current_version(spark_trino, schema.clone())
+ .unwrap()
+ .build()
+ .unwrap()
+ .metadata
+ .into_builder()
+ .set_current_version(spark, schema)
+ .unwrap()
+ .build()
+ .unwrap_err();
+
+ assert!(err
+ .to_string()
+ .contains("Cannot replace view due to loss of view dialects"));
+ }
+
+ #[test]
+ fn test_dropping_dialects_does_not_fail_when_allowed() {
+ let builder = builder_without_changes();
+
+ let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo",
vec!["spark"]);
+ let spark_trino =
+ new_view_version_with_dialect(0, 0, "SELECT * FROM foo",
vec!["spark", "trino"]);
+ let schema = Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![])
+ .build()
+ .unwrap();
+
+ let build_result = builder
+ .set_properties(HashMap::from_iter(vec![(
+ VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED.to_string(),
+ "true".to_string(),
+ )]))
+ .unwrap()
+ .set_current_version(spark_trino, schema.clone())
+ .unwrap()
+ .build()
+ .unwrap()
+ .metadata
+ .into_builder()
+ .set_current_version(spark.clone(), schema)
+ .unwrap()
+ .build()
+ .unwrap();
+
+ assert_eq!(
+
Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
+ spark.with_version_id(3).with_schema_id(2)
+ );
+ }
+
+ #[test]
+ fn test_can_add_dialects_by_default() {
+ let builder = builder_without_changes();
+
+ let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo",
vec!["spark"]);
+ let spark_trino =
+ new_view_version_with_dialect(0, 0, "SELECT * FROM foo",
vec!["spark", "trino"]);
+
+ let schema = Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![])
+ .build()
+ .unwrap();
+
+ let build_result = builder
+ .set_current_version(spark.clone(), schema.clone())
+ .unwrap()
+ .build()
+ .unwrap()
+ .metadata
+ .into_builder()
+ .set_current_version(spark_trino.clone(), schema.clone())
+ .unwrap()
+ .build()
+ .unwrap();
+
+ assert_eq!(
+
Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
+ spark_trino.with_version_id(3).with_schema_id(2)
+ );
+ }
+
+ #[test]
+ fn test_can_update_dialect_by_default() {
+ let builder = builder_without_changes();
+
+ let spark_v1 = new_view_version_with_dialect(0, 0, "SELECT * FROM
foo", vec!["spark"]);
+ let spark_v2 = new_view_version_with_dialect(0, 0, "SELECT * FROM
bar", vec!["spark"]);
+
+ let schema = Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![])
+ .build()
+ .unwrap();
+
+ let build_result = builder
+ .set_current_version(spark_v1.clone(), schema.clone())
+ .unwrap()
+ .build()
+ .unwrap()
+ .metadata
+ .into_builder()
+ .set_current_version(spark_v2.clone(), schema.clone())
+ .unwrap()
+ .build()
+ .unwrap();
+
+ assert_eq!(
+
Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
+ spark_v2.with_version_id(3).with_schema_id(2)
+ );
+ }
+
+ #[test]
+ fn test_dropping_dialects_allowed_and_then_disallowed() {
+ let builder = builder_without_changes();
+
+ let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo",
vec!["spark"]);
+ let trino = new_view_version_with_dialect(0, 0, "SELECT * FROM foo",
vec!["trino"]);
+
+ let schema = Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![])
+ .build()
+ .unwrap();
+
+ let updated = builder
+ .set_current_version(spark.clone(), schema.clone())
+ .unwrap()
+ .build()
+ .unwrap()
+ .metadata
+ .into_builder()
+ .set_current_version(trino.clone(), schema.clone())
+ .unwrap()
+ .set_properties(HashMap::from_iter(vec![(
+ VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED.to_string(),
+ "true".to_string(),
+ )]))
+ .unwrap()
+ .build()
+ .unwrap();
+
+ assert_eq!(
+ Arc::unwrap_or_clone(updated.metadata.current_version().clone()),
+ trino.with_version_id(3).with_schema_id(2)
+ );
+
+ let err = updated
+ .metadata
+ .into_builder()
+ .set_current_version(spark.clone(), schema.clone())
+ .unwrap()
+ .set_properties(HashMap::from_iter(vec![(
+ VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED.to_string(),
+ "false".to_string(),
+ )]))
+ .unwrap()
+ .build()
+ .unwrap_err();
+
+ assert!(err
+ .to_string()
+ .contains("Cannot replace view due to loss of view dialects"));
+ }
+
+ #[test]
+ fn test_require_no_dialect_dropped() {
+ let previous = ViewVersion::builder()
+ .with_version_id(0)
+ .with_schema_id(0)
+ .with_timestamp_ms(0)
+ .with_representations(ViewRepresentations(vec![
+ ViewRepresentation::Sql(SqlViewRepresentation {
+ dialect: "trino".to_string(),
+ sql: "SELECT * FROM foo".to_string(),
+ }),
+ ViewRepresentation::Sql(SqlViewRepresentation {
+ dialect: "spark".to_string(),
+ sql: "SELECT * FROM bar".to_string(),
+ }),
+ ]))
+ .with_default_namespace(NamespaceIdent::new("default".to_string()))
+ .build();
+
+ let current = ViewVersion::builder()
+ .with_version_id(0)
+ .with_schema_id(0)
+ .with_timestamp_ms(0)
+
.with_representations(ViewRepresentations(vec![ViewRepresentation::Sql(
+ SqlViewRepresentation {
+ dialect: "trino".to_string(),
+ sql: "SELECT * FROM foo".to_string(),
+ },
+ )]))
+ .with_default_namespace(NamespaceIdent::new("default".to_string()))
+ .build();
+
+ assert!(require_no_dialect_dropped(&previous, ¤t).is_err());
+
+ let current = ViewVersion::builder()
+ .with_version_id(0)
+ .with_schema_id(0)
+ .with_timestamp_ms(0)
+ .with_representations(ViewRepresentations(vec![
+ ViewRepresentation::Sql(SqlViewRepresentation {
+ dialect: "spark".to_string(),
+ sql: "SELECT * FROM bar".to_string(),
+ }),
+ ViewRepresentation::Sql(SqlViewRepresentation {
+ dialect: "trino".to_string(),
+ sql: "SELECT * FROM foo".to_string(),
+ }),
+ ]))
+ .with_default_namespace(NamespaceIdent::new("default".to_string()))
+ .build();
+
+ assert!(require_no_dialect_dropped(&previous, ¤t).is_ok());
+ }
+
+ #[test]
+ fn test_allow_replace_drop_dialects() {
+ use std::collections::HashMap;
+
+ use super::allow_replace_drop_dialects;
+
+ let mut properties = HashMap::new();
+ assert!(!allow_replace_drop_dialects(&properties));
+
+ properties.insert(
+ "replace.drop-dialect.allowed".to_string(),
+ "true".to_string(),
+ );
+ assert!(allow_replace_drop_dialects(&properties));
+
+ properties.insert(
+ "replace.drop-dialect.allowed".to_string(),
+ "false".to_string(),
+ );
+ assert!(!allow_replace_drop_dialects(&properties));
+
+ properties.insert(
+ "replace.drop-dialect.allowed".to_string(),
+ "TRUE".to_string(),
+ );
+ assert!(allow_replace_drop_dialects(&properties));
+
+ properties.insert(
+ "replace.drop-dialect.allowed".to_string(),
+ "FALSE".to_string(),
+ );
+ assert!(!allow_replace_drop_dialects(&properties));
+ }
+
+ #[test]
+ fn test_lowercase_sql_dialects_for() {
+ let view_version = ViewVersion::builder()
+ .with_version_id(0)
+ .with_schema_id(0)
+ .with_timestamp_ms(0)
+ .with_representations(ViewRepresentations(vec![
+ ViewRepresentation::Sql(SqlViewRepresentation {
+ dialect: "STARROCKS".to_string(),
+ sql: "SELECT * FROM foo".to_string(),
+ }),
+ ViewRepresentation::Sql(SqlViewRepresentation {
+ dialect: "trino".to_string(),
+ sql: "SELECT * FROM bar".to_string(),
+ }),
+ ViewRepresentation::Sql(SqlViewRepresentation {
+ dialect: "Spark".to_string(),
+ sql: "SELECT * FROM bar".to_string(),
+ }),
+ ]))
+ .with_default_namespace(NamespaceIdent::new("default".to_string()))
+ .build();
+
+ let dialects = lowercase_sql_dialects_for(&view_version);
+ assert_eq!(dialects.len(), 3);
+ assert!(dialects.contains("trino"));
+ assert!(dialects.contains("spark"));
+ assert!(dialects.contains("starrocks"));
+ }
+
+ #[test]
+ fn test_require_unique_dialects() {
+ let view_version = ViewVersion::builder()
+ .with_version_id(0)
+ .with_schema_id(0)
+ .with_timestamp_ms(0)
+ .with_representations(ViewRepresentations(vec![
+ ViewRepresentation::Sql(SqlViewRepresentation {
+ dialect: "trino".to_string(),
+ sql: "SELECT * FROM foo".to_string(),
+ }),
+ ViewRepresentation::Sql(SqlViewRepresentation {
+ dialect: "trino".to_string(),
+ sql: "SELECT * FROM bar".to_string(),
+ }),
+ ]))
+ .with_default_namespace(NamespaceIdent::new("default".to_string()))
+ .build();
+
+ assert!(require_unique_dialects(&view_version).is_err());
+
+ let view_version = ViewVersion::builder()
+ .with_version_id(0)
+ .with_schema_id(0)
+ .with_timestamp_ms(0)
+ .with_representations(ViewRepresentations(vec![
+ ViewRepresentation::Sql(SqlViewRepresentation {
+ dialect: "trino".to_string(),
+ sql: "SELECT * FROM foo".to_string(),
+ }),
+ ViewRepresentation::Sql(SqlViewRepresentation {
+ dialect: "spark".to_string(),
+ sql: "SELECT * FROM bar".to_string(),
+ }),
+ ]))
+ .with_default_namespace(NamespaceIdent::new("default".to_string()))
+ .build();
+
+ assert!(require_unique_dialects(&view_version).is_ok());
+ }
+}
diff --git a/crates/iceberg/src/spec/view_version.rs
b/crates/iceberg/src/spec/view_version.rs
index 30686b5a..b13d87a9 100644
--- a/crates/iceberg/src/spec/view_version.rs
+++ b/crates/iceberg/src/spec/view_version.rs
@@ -27,6 +27,7 @@ use serde::{Deserialize, Serialize};
use typed_builder::TypedBuilder;
use super::view_metadata::ViewVersionLog;
+use super::INITIAL_VIEW_VERSION_ID;
use crate::catalog::NamespaceIdent;
use crate::error::{timestamp_ms_to_utc, Result};
use crate::spec::{SchemaId, SchemaRef, ViewMetadata};
@@ -44,12 +45,14 @@ pub type ViewVersionId = i32;
/// A view versions represents the definition of a view at a specific point in
time.
pub struct ViewVersion {
/// A unique long ID
+ #[builder(default = INITIAL_VIEW_VERSION_ID)]
version_id: ViewVersionId,
/// ID of the schema for the view version
schema_id: SchemaId,
/// Timestamp when the version was created (ms from epoch)
timestamp_ms: i64,
/// A string to string map of summary metadata about the version
+ #[builder(default = HashMap::new())]
summary: HashMap<String, String>,
/// A list of representations for the view definition.
representations: ViewRepresentations,
@@ -124,10 +127,36 @@ impl ViewVersion {
}
/// Retrieve the history log entry for this view version.
- #[allow(dead_code)]
pub(crate) fn log(&self) -> ViewVersionLog {
ViewVersionLog::new(self.version_id, self.timestamp_ms)
}
+
+ /// Check if this view version behaves the same as another view spec.
+ ///
+ /// Returns true if the view version is equal to the other view version
+ /// with `timestamp_ms` and `version_id` ignored. The following must be
identical:
+ /// * Summary (all of them)
+ /// * Representations
+ /// * Default Catalog
+ /// * Default Namespace
+ /// * The Schema ID
+ pub(crate) fn behaves_identical_to(&self, other: &Self) -> bool {
+ self.summary == other.summary
+ && self.representations == other.representations
+ && self.default_catalog == other.default_catalog
+ && self.default_namespace == other.default_namespace
+ && self.schema_id == other.schema_id
+ }
+
+ /// Change the version id of this view version.
+ pub fn with_version_id(self, version_id: i32) -> Self {
+ Self { version_id, ..self }
+ }
+
+ /// Change the schema id of this view version.
+ pub fn with_schema_id(self, schema_id: SchemaId) -> Self {
+ Self { schema_id, ..self }
+ }
}
/// A list of view representations.
@@ -148,7 +177,7 @@ impl ViewRepresentations {
}
/// Get an iterator over the representations
- pub fn iter(&self) -> impl Iterator<Item = &'_ ViewRepresentation> {
+ pub fn iter(&self) -> impl ExactSizeIterator<Item = &'_
ViewRepresentation> {
self.0.iter()
}
}
@@ -253,6 +282,7 @@ mod tests {
use crate::spec::view_version::ViewVersion;
use crate::spec::view_version::_serde::ViewVersionV1;
use crate::spec::ViewRepresentations;
+ use crate::NamespaceIdent;
#[test]
fn view_version() {
@@ -310,4 +340,50 @@ mod tests {
vec!["default".to_string()]
);
}
+
+ #[test]
+ fn test_behaves_identical_to() {
+ let view_version = ViewVersion::builder()
+ .with_version_id(1)
+ .with_schema_id(1)
+ .with_timestamp_ms(1573518431292)
+ .with_summary({
+ let mut map = std::collections::HashMap::new();
+ map.insert("engine-name".to_string(), "Spark".to_string());
+ map.insert("engineVersion".to_string(), "3.3.2".to_string());
+ map
+ })
+
.with_representations(ViewRepresentations(vec![super::ViewRepresentation::Sql(
+ super::SqlViewRepresentation {
+ sql: "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM
events\nGROUP BY 2"
+ .to_string(),
+ dialect: "spark".to_string(),
+ },
+ )]))
+ .with_default_catalog(Some("prod".to_string()))
+ .with_default_namespace(NamespaceIdent::new("default".to_string()))
+ .build();
+
+ let mut identical_view_version = view_version.clone();
+ identical_view_version.version_id = 2;
+ identical_view_version.timestamp_ms = 1573518431293;
+
+ let different_view_version = ViewVersion::builder()
+ .with_version_id(view_version.version_id())
+ .with_schema_id(view_version.schema_id())
+ .with_timestamp_ms(view_version.timestamp_ms())
+ .with_summary(view_version.summary().clone())
+
.with_representations(ViewRepresentations(vec![super::ViewRepresentation::Sql(
+ super::SqlViewRepresentation {
+ sql: "SELECT * from events".to_string(),
+ dialect: "spark".to_string(),
+ },
+ )]))
+ .with_default_catalog(view_version.default_catalog().cloned())
+ .with_default_namespace(view_version.default_namespace().clone())
+ .build();
+
+ assert!(view_version.behaves_identical_to(&identical_view_version));
+ assert!(!view_version.behaves_identical_to(&different_view_version));
+ }
}