This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new e294de77 feat: View Metadata Builder (#908)
e294de77 is described below

commit e294de77cabef294fb38c9ab822eb154367d544f
Author: Christian <[email protected]>
AuthorDate: Tue Feb 25 14:34:34 2025 +0100

    feat: View Metadata Builder (#908)
    
    This PR is not completely ready yet as I believe the current mechanism
    of view expiration is flawed.
    I opened a PR in Java to demonstrate the problem and use for
    discussions:
    https://github.com/apache/iceberg/pull/12051
    
    Feedback from anyone is welcome. I am not sure what the best solutions
    looks like.
    
    ---------
    
    Co-authored-by: Fokko Driesprong <[email protected]>
---
 crates/iceberg/src/io/mod.rs                      |    2 +-
 crates/iceberg/src/spec/mod.rs                    |    1 +
 crates/iceberg/src/spec/table_metadata_builder.rs |    3 +-
 crates/iceberg/src/spec/view_metadata.rs          |  145 +-
 crates/iceberg/src/spec/view_metadata_builder.rs  | 1581 +++++++++++++++++++++
 crates/iceberg/src/spec/view_version.rs           |   80 +-
 6 files changed, 1720 insertions(+), 92 deletions(-)

diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs
index 215ea67c..948c57d7 100644
--- a/crates/iceberg/src/io/mod.rs
+++ b/crates/iceberg/src/io/mod.rs
@@ -89,6 +89,6 @@ mod storage_gcs;
 #[cfg(feature = "storage-gcs")]
 pub use storage_gcs::*;
 
-fn is_truthy(value: &str) -> bool {
+pub(crate) fn is_truthy(value: &str) -> bool {
     ["true", "t", "1", "on"].contains(&value.to_lowercase().as_str())
 }
diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs
index b3f13b8d..c049483d 100644
--- a/crates/iceberg/src/spec/mod.rs
+++ b/crates/iceberg/src/spec/mod.rs
@@ -31,6 +31,7 @@ mod table_metadata_builder;
 mod transform;
 mod values;
 mod view_metadata;
+mod view_metadata_builder;
 mod view_version;
 
 pub use datatypes::*;
diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs 
b/crates/iceberg/src/spec/table_metadata_builder.rs
index 28e2f4e8..26ee52b5 100644
--- a/crates/iceberg/src/spec/table_metadata_builder.rs
+++ b/crates/iceberg/src/spec/table_metadata_builder.rs
@@ -138,7 +138,6 @@ impl TableMetadataBuilder {
     }
 
     /// Creates a new table metadata builder from the given metadata to modify 
it.
-
     /// `current_file_location` is the location where the current version
     /// of the metadata file is stored. This is used to update the metadata 
log.
     /// If `current_file_location` is `None`, the metadata log will not be 
updated.
@@ -312,7 +311,7 @@ impl TableMetadataBuilder {
         Ok(self)
     }
 
-    /// Set the location of the table metadata, stripping any trailing slashes.
+    /// Set the location of the table, stripping any trailing slashes.
     pub fn set_location(mut self, location: String) -> Self {
         let location = location.trim_end_matches('/').to_string();
         if self.metadata.location != location {
diff --git a/crates/iceberg/src/spec/view_metadata.rs 
b/crates/iceberg/src/spec/view_metadata.rs
index 7c247c17..162b6f4c 100644
--- a/crates/iceberg/src/spec/view_metadata.rs
+++ b/crates/iceberg/src/spec/view_metadata.rs
@@ -29,16 +29,27 @@ use serde::{Deserialize, Serialize};
 use serde_repr::{Deserialize_repr, Serialize_repr};
 use uuid::Uuid;
 
-use super::view_version::{ViewVersion, ViewVersionId, ViewVersionRef};
+pub use super::view_metadata_builder::ViewMetadataBuilder;
+use super::view_version::{ViewVersionId, ViewVersionRef};
 use super::{SchemaId, SchemaRef};
-use crate::catalog::ViewCreation;
 use crate::error::{timestamp_ms_to_utc, Result};
+use crate::{Error, ErrorKind};
 
 /// Reference to [`ViewMetadata`].
 pub type ViewMetadataRef = Arc<ViewMetadata>;
 
+// ID of the initial version of views
 pub(crate) static INITIAL_VIEW_VERSION_ID: i32 = 1;
 
+/// Property key for allowing to drop dialects when replacing a view.
+pub const VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED: &str = 
"replace.drop-dialect.allowed";
+/// Default value for the property key for allowing to drop dialects when 
replacing a view.
+pub const VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED_DEFAULT: bool = false;
+/// Property key for the number of history entries to keep.
+pub const VIEW_PROPERTY_VERSION_HISTORY_SIZE: &str = 
"version.history.num-entries";
+/// Default value for the property key for the number of history entries to 
keep.
+pub const VIEW_PROPERTY_VERSION_HISTORY_SIZE_DEFAULT: usize = 10;
+
 #[derive(Debug, PartialEq, Deserialize, Eq, Clone)]
 #[serde(try_from = "ViewMetadataEnum", into = "ViewMetadataEnum")]
 /// Fields for the version 1 of the view metadata.
@@ -60,7 +71,7 @@ pub struct ViewMetadata {
     /// change to current-version-id
     pub(crate) version_log: Vec<ViewVersionLog>,
     /// A list of schemas, stored as objects with schema-id.
-    pub(crate) schemas: HashMap<i32, SchemaRef>,
+    pub(crate) schemas: HashMap<SchemaId, SchemaRef>,
     /// A string to string map of view properties.
     /// Properties are used for metadata such as comment and for settings that
     /// affect view maintenance. This is not intended to be used for arbitrary 
metadata.
@@ -68,6 +79,12 @@ pub struct ViewMetadata {
 }
 
 impl ViewMetadata {
+    /// Convert this View Metadata into a builder for modification.
+    #[must_use]
+    pub fn into_builder(self) -> ViewMetadataBuilder {
+        ViewMetadataBuilder::new_from_metadata(self)
+    }
+
     /// Returns format version of this metadata.
     #[inline]
     pub fn format_version(&self) -> ViewFormatVersion {
@@ -143,65 +160,36 @@ impl ViewMetadata {
     pub fn history(&self) -> &[ViewVersionLog] {
         &self.version_log
     }
-}
-
-/// Manipulating view metadata.
-pub struct ViewMetadataBuilder(ViewMetadata);
-
-impl ViewMetadataBuilder {
-    /// Creates a new view metadata builder from the given view metadata.
-    pub fn new(origin: ViewMetadata) -> Self {
-        Self(origin)
-    }
-
-    /// Creates a new view metadata builder from the given view creation.
-    pub fn from_view_creation(view_creation: ViewCreation) -> Result<Self> {
-        let ViewCreation {
-            location,
-            schema,
-            properties,
-            name: _,
-            representations,
-            default_catalog,
-            default_namespace,
-            summary,
-        } = view_creation;
-        let initial_version_id = super::INITIAL_VIEW_VERSION_ID;
-        let version = ViewVersion::builder()
-            .with_default_catalog(default_catalog)
-            .with_default_namespace(default_namespace)
-            .with_representations(representations)
-            .with_schema_id(schema.schema_id())
-            .with_summary(summary)
-            .with_timestamp_ms(Utc::now().timestamp_millis())
-            .with_version_id(initial_version_id)
-            .build();
-
-        let versions = HashMap::from_iter(vec![(initial_version_id, 
version.into())]);
-
-        let view_metadata = ViewMetadata {
-            format_version: ViewFormatVersion::V1,
-            view_uuid: Uuid::now_v7(),
-            location,
-            current_version_id: initial_version_id,
-            versions,
-            version_log: Vec::new(),
-            schemas: HashMap::from_iter(vec![(schema.schema_id(), 
Arc::new(schema))]),
-            properties,
-        };
 
-        Ok(Self(view_metadata))
+    /// Validate the view metadata.
+    pub(super) fn validate(&self) -> Result<()> {
+        self.validate_current_version_id()?;
+        self.validate_current_schema_id()?;
+        Ok(())
     }
 
-    /// Changes uuid of view metadata.
-    pub fn assign_uuid(mut self, uuid: Uuid) -> Result<Self> {
-        self.0.view_uuid = uuid;
-        Ok(self)
+    fn validate_current_version_id(&self) -> Result<()> {
+        if !self.versions.contains_key(&self.current_version_id) {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "No version exists with the current version id {}.",
+                    self.current_version_id
+                ),
+            ));
+        }
+        Ok(())
     }
 
-    /// Returns the new view metadata after changes.
-    pub fn build(self) -> Result<ViewMetadata> {
-        Ok(self.0)
+    fn validate_current_schema_id(&self) -> Result<()> {
+        let schema_id = self.current_version().schema_id();
+        if !self.schemas.contains_key(&schema_id) {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!("No schema exists with the schema id {}.", schema_id),
+            ));
+        }
+        Ok(())
     }
 }
 
@@ -258,7 +246,7 @@ pub(super) mod _serde {
     use crate::spec::table_metadata::_serde::VersionNumber;
     use crate::spec::view_version::_serde::ViewVersionV1;
     use crate::spec::{ViewMetadata, ViewVersion};
-    use crate::{Error, ErrorKind};
+    use crate::Error;
 
     #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
     #[serde(untagged)]
@@ -326,28 +314,8 @@ pub(super) mod _serde {
                     .map(|x| Ok((x.version_id, 
Arc::new(ViewVersion::from(x)))))
                     .collect::<Result<Vec<_>, Error>>()?,
             );
-            // Make sure at least the current schema exists
-            let current_version =
-                versions
-                    .get(&value.current_version_id)
-                    .ok_or(self::Error::new(
-                        ErrorKind::DataInvalid,
-                        format!(
-                            "No version exists with the current version id 
{}.",
-                            value.current_version_id
-                        ),
-                    ))?;
-            if !schemas.contains_key(&current_version.schema_id()) {
-                return Err(self::Error::new(
-                    ErrorKind::DataInvalid,
-                    format!(
-                        "No schema exists with the schema id {}.",
-                        current_version.schema_id()
-                    ),
-                ));
-            }
 
-            Ok(ViewMetadata {
+            let view_metadata = ViewMetadata {
                 format_version: ViewFormatVersion::V1,
                 view_uuid: value.view_uuid,
                 location: value.location,
@@ -356,7 +324,9 @@ pub(super) mod _serde {
                 current_version_id: value.current_version_id,
                 versions,
                 version_log: value.version_log,
-            })
+            };
+            view_metadata.validate()?;
+            Ok(view_metadata)
         }
     }
 
@@ -423,7 +393,7 @@ impl Display for ViewFormatVersion {
 }
 
 #[cfg(test)]
-mod tests {
+pub(crate) mod tests {
     use std::collections::HashMap;
     use std::fs;
     use std::sync::Arc;
@@ -435,7 +405,7 @@ mod tests {
     use super::{ViewFormatVersion, ViewMetadataBuilder, ViewVersionLog};
     use crate::spec::{
         NestedField, PrimitiveType, Schema, SqlViewRepresentation, Type, 
ViewMetadata,
-        ViewRepresentations, ViewVersion,
+        ViewRepresentations, ViewVersion, INITIAL_VIEW_VERSION_ID,
     };
     use crate::{NamespaceIdent, ViewCreation};
 
@@ -449,7 +419,7 @@ mod tests {
         assert_eq!(parsed_json_value, desered_type);
     }
 
-    fn get_test_view_metadata(file_name: &str) -> ViewMetadata {
+    pub(crate) fn get_test_view_metadata(file_name: &str) -> ViewMetadata {
         let path = format!("testdata/view_metadata/{}", file_name);
         let metadata: String = fs::read_to_string(path).unwrap();
 
@@ -578,13 +548,14 @@ mod tests {
         let metadata = ViewMetadataBuilder::from_view_creation(creation)
             .unwrap()
             .build()
-            .unwrap();
+            .unwrap()
+            .metadata;
 
         assert_eq!(
             metadata.location(),
             "s3://bucket/warehouse/default.db/event_agg"
         );
-        assert_eq!(metadata.current_version_id(), 1);
+        assert_eq!(metadata.current_version_id(), INITIAL_VIEW_VERSION_ID);
         assert_eq!(metadata.versions().count(), 1);
         assert_eq!(metadata.schemas_iter().count(), 1);
         assert_eq!(metadata.properties().len(), 0);
@@ -652,9 +623,9 @@ mod tests {
     #[test]
     fn test_view_builder_assign_uuid() {
         let metadata = get_test_view_metadata("ViewMetadataV1Valid.json");
-        let metadata_builder = ViewMetadataBuilder::new(metadata);
+        let metadata_builder = metadata.into_builder();
         let uuid = Uuid::new_v4();
-        let metadata = 
metadata_builder.assign_uuid(uuid).unwrap().build().unwrap();
+        let metadata = 
metadata_builder.assign_uuid(uuid).build().unwrap().metadata;
         assert_eq!(metadata.uuid(), uuid);
     }
 
diff --git a/crates/iceberg/src/spec/view_metadata_builder.rs 
b/crates/iceberg/src/spec/view_metadata_builder.rs
new file mode 100644
index 00000000..43b78c92
--- /dev/null
+++ b/crates/iceberg/src/spec/view_metadata_builder.rs
@@ -0,0 +1,1581 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
+use chrono::Utc;
+use itertools::Itertools;
+use uuid::Uuid;
+
+use super::{
+    Schema, SchemaId, TableMetadataBuilder, ViewFormatVersion, ViewMetadata, 
ViewRepresentation,
+    ViewVersion, ViewVersionLog, ViewVersionRef, DEFAULT_SCHEMA_ID, 
INITIAL_VIEW_VERSION_ID,
+    ONE_MINUTE_MS, VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED,
+    VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED_DEFAULT, 
VIEW_PROPERTY_VERSION_HISTORY_SIZE,
+    VIEW_PROPERTY_VERSION_HISTORY_SIZE_DEFAULT,
+};
+use crate::catalog::ViewUpdate;
+use crate::error::{Error, ErrorKind, Result};
+use crate::io::is_truthy;
+use crate::ViewCreation;
+
+/// Manipulating view metadata.
+///
+/// For this builder the order of called functions matters.
+/// All operations applied to the `ViewMetadata` are tracked in `changes` as  
a chronologically
+/// ordered vec of `ViewUpdate`.
+/// If an operation does not lead to a change of the `ViewMetadata`, the 
corresponding update
+/// is omitted from `changes`.
+#[derive(Debug, Clone)]
+pub struct ViewMetadataBuilder {
+    metadata: ViewMetadata,
+    changes: Vec<ViewUpdate>,
+    last_added_schema_id: Option<SchemaId>,
+    last_added_version_id: Option<SchemaId>,
+    history_entry: Option<ViewVersionLog>,
+    // Previous view version is only used during build to check
+    // weather dialects are dropped or not.
+    previous_view_version: Option<ViewVersionRef>,
+}
+
+#[derive(Debug, Clone, PartialEq)]
+/// Result of modifying or creating a `ViewMetadata`.
+pub struct ViewMetadataBuildResult {
+    /// The new `ViewMetadata`.
+    pub metadata: ViewMetadata,
+    /// The changes that were applied to the metadata.
+    pub changes: Vec<ViewUpdate>,
+}
+
+impl ViewMetadataBuilder {
+    const LAST_ADDED: i32 = TableMetadataBuilder::LAST_ADDED;
+
+    /// Creates a new view metadata builder.
+    pub fn new(
+        location: String,
+        schema: Schema,
+        view_version: ViewVersion,
+        format_version: ViewFormatVersion,
+        properties: HashMap<String, String>,
+    ) -> Result<Self> {
+        let builder = Self {
+            metadata: ViewMetadata {
+                format_version,
+                view_uuid: Uuid::now_v7(),
+                location: "".to_string(), // Overwritten immediately by 
set_location
+                current_version_id: -1,   // Overwritten immediately by 
set_current_version,
+                versions: HashMap::new(), // Overwritten immediately by 
set_current_version
+                version_log: Vec::new(),
+                schemas: HashMap::new(), // Overwritten immediately by 
set_current_version
+                properties: HashMap::new(), // Overwritten immediately by 
set_properties
+            },
+            changes: vec![],
+            last_added_schema_id: None, // Overwritten immediately by 
set_current_version
+            last_added_version_id: None, // Overwritten immediately by 
set_current_version
+            history_entry: None,
+            previous_view_version: None, // This is a new view
+        };
+
+        builder
+            .set_location(location)
+            .set_current_version(view_version, schema)?
+            .set_properties(properties)
+    }
+
+    /// Creates a new view metadata builder from the given metadata to modify 
it.
+    #[must_use]
+    pub fn new_from_metadata(previous: ViewMetadata) -> Self {
+        let previous_view_version = previous.current_version().clone();
+        Self {
+            metadata: previous,
+            changes: Vec::default(),
+            last_added_schema_id: None,
+            last_added_version_id: None,
+            history_entry: None,
+            previous_view_version: Some(previous_view_version),
+        }
+    }
+
+    /// Creates a new view metadata builder from the given view creation.
+    pub fn from_view_creation(view_creation: ViewCreation) -> Result<Self> {
+        let ViewCreation {
+            location,
+            schema,
+            properties,
+            name: _,
+            representations,
+            default_catalog,
+            default_namespace,
+            summary,
+        } = view_creation;
+        let version = ViewVersion::builder()
+            .with_default_catalog(default_catalog)
+            .with_default_namespace(default_namespace)
+            .with_representations(representations)
+            .with_schema_id(schema.schema_id())
+            .with_summary(summary)
+            .with_timestamp_ms(Utc::now().timestamp_millis())
+            .with_version_id(INITIAL_VIEW_VERSION_ID)
+            .build();
+
+        Self::new(location, schema, version, ViewFormatVersion::V1, properties)
+    }
+
+    /// Upgrade `FormatVersion`. Downgrades are not allowed.
+    ///
+    /// # Errors
+    /// - Cannot downgrade to older format versions.
+    pub fn upgrade_format_version(self, format_version: ViewFormatVersion) -> 
Result<Self> {
+        if format_version < self.metadata.format_version {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Cannot downgrade ViewFormatVersion from {} to {}",
+                    self.metadata.format_version, format_version
+                ),
+            ));
+        }
+
+        if format_version != self.metadata.format_version {
+            match format_version {
+                ViewFormatVersion::V1 => {
+                    // No changes needed for V1
+                }
+            }
+        }
+
+        Ok(self)
+    }
+
+    /// Set the location of the view, stripping any trailing slashes.
+    pub fn set_location(mut self, location: String) -> Self {
+        let location = location.trim_end_matches('/').to_string();
+        if self.metadata.location != location {
+            self.changes.push(ViewUpdate::SetLocation {
+                location: location.clone(),
+            });
+            self.metadata.location = location;
+        }
+
+        self
+    }
+
+    /// Set an existing view version as the current version.
+    ///
+    /// # Errors
+    /// - The specified `version_id` does not exist.
+    /// - The specified `version_id` is `-1` but no version has been added.
+    pub fn set_current_version_id(mut self, mut version_id: i32) -> 
Result<Self> {
+        if version_id == Self::LAST_ADDED && 
self.last_added_version_id.is_none() {
+            version_id = self.last_added_version_id.ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    "Cannot set current version id to last added version: no 
version has been added.",
+                )
+            })?;
+        }
+        let version_id = version_id; // make immutable
+
+        if version_id == self.metadata.current_version_id {
+            return Ok(self);
+        }
+
+        let version = self.metadata.versions.get(&version_id).ok_or_else(|| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Cannot set current version to unknown version with id: 
{}",
+                    version_id
+                ),
+            )
+        })?;
+
+        self.metadata.current_version_id = version_id;
+
+        if self.last_added_version_id == Some(version_id) {
+            self.changes.push(ViewUpdate::SetCurrentViewVersion {
+                view_version_id: Self::LAST_ADDED,
+            });
+        } else {
+            self.changes.push(ViewUpdate::SetCurrentViewVersion {
+                view_version_id: version_id,
+            });
+        }
+
+        self.history_entry = Some(version.log());
+
+        Ok(self)
+    }
+
+    /// Add a new view version and set it as current.
+    pub fn set_current_version(
+        mut self,
+        view_version: ViewVersion,
+        schema: Schema,
+    ) -> Result<Self> {
+        let schema_id = self.add_schema_internal(schema);
+        let view_version = view_version.with_schema_id(schema_id);
+        let view_version_id = self.add_version_internal(view_version)?;
+        self.set_current_version_id(view_version_id)
+    }
+
+    /// Add a new version to the view.
+    ///
+    /// # Errors
+    /// - The schema ID of the version is set to `-1`, but no schema has been 
added.
+    /// - The schema ID of the specified version is unknown.
+    /// - Multiple queries for the same dialect are added.
+    pub fn add_version(mut self, view_version: ViewVersion) -> Result<Self> {
+        self.add_version_internal(view_version)?;
+
+        Ok(self)
+    }
+
+    fn add_version_internal(&mut self, view_version: ViewVersion) -> 
Result<i32> {
+        let version_id = 
self.reuse_or_create_new_view_version_id(&view_version);
+        let view_version = view_version.with_version_id(version_id);
+
+        if self.metadata.versions.contains_key(&version_id) {
+            // ToDo Discuss: Similar to TableMetadata sort-order, Java does 
not add changes
+            // in this case. I prefer to add changes as the state of the 
builder is
+            // potentially mutated (`last_added_version_id`), thus we should 
record the change.
+            if self.last_added_version_id != Some(version_id) {
+                self.changes.push(ViewUpdate::AddViewVersion {
+                    view_version: view_version.with_version_id(version_id),
+                });
+                self.last_added_version_id = Some(version_id);
+            }
+            return Ok(version_id);
+        }
+
+        let view_version = if view_version.schema_id() == Self::LAST_ADDED {
+            let last_added_schema_id = self.last_added_schema_id.ok_or_else(|| 
{
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    "Cannot set last added schema: no schema has been added",
+                )
+            })?;
+            view_version.with_schema_id(last_added_schema_id)
+        } else {
+            view_version
+        };
+
+        if !self
+            .metadata
+            .schemas
+            .contains_key(&view_version.schema_id())
+        {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Cannot add version with unknown schema: {}",
+                    view_version.schema_id()
+                ),
+            ));
+        }
+
+        require_unique_dialects(&view_version)?;
+
+        // ToDo Discuss: This check is not present in Java.
+        // The `TableMetadataBuilder` uses these checks in multiple places - 
also in Java.
+        // If we think delayed requests are a problem, I think we should also 
add it here.
+        if let Some(last) = self.metadata.version_log.last() {
+            // commits can happen concurrently from different machines.
+            // A tolerance helps us avoid failure for small clock skew
+            if view_version.timestamp_ms() - last.timestamp_ms() < 
-ONE_MINUTE_MS {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Invalid snapshot timestamp {}: before last snapshot 
timestamp {}",
+                        view_version.timestamp_ms(),
+                        last.timestamp_ms()
+                    ),
+                ));
+            }
+        }
+
+        self.metadata
+            .versions
+            .insert(version_id, Arc::new(view_version.clone()));
+
+        let view_version = if let Some(last_added_schema_id) = 
self.last_added_schema_id {
+            if view_version.schema_id() == last_added_schema_id {
+                view_version.with_schema_id(Self::LAST_ADDED)
+            } else {
+                view_version
+            }
+        } else {
+            view_version
+        };
+        self.changes
+            .push(ViewUpdate::AddViewVersion { view_version });
+
+        self.last_added_version_id = Some(version_id);
+
+        Ok(version_id)
+    }
+
+    fn reuse_or_create_new_view_version_id(&self, new_view_version: 
&ViewVersion) -> i32 {
+        self.metadata
+            .versions
+            .iter()
+            .find_map(|(id, other_version)| {
+                new_view_version
+                    .behaves_identical_to(other_version)
+                    .then_some(*id)
+            })
+            .unwrap_or_else(|| {
+                self.get_highest_view_version_id()
+                    .map(|id| id + 1)
+                    .unwrap_or(INITIAL_VIEW_VERSION_ID)
+            })
+    }
+
+    fn get_highest_view_version_id(&self) -> Option<i32> {
+        self.metadata.versions.keys().max().copied()
+    }
+
+    /// Add a new schema to the view.
+    pub fn add_schema(mut self, schema: Schema) -> Self {
+        self.add_schema_internal(schema);
+
+        self
+    }
+
+    fn add_schema_internal(&mut self, schema: Schema) -> SchemaId {
+        let schema_id = self.reuse_or_create_new_schema_id(&schema);
+
+        if self.metadata.schemas.contains_key(&schema_id) {
+            // ToDo Discuss: Java does not add changes in this case. I prefer 
to add changes
+            // as the state of the builder is potentially mutated 
(`last_added_schema_id`),
+            // thus we should record the change.
+            if self.last_added_schema_id != Some(schema_id) {
+                self.changes.push(ViewUpdate::AddSchema {
+                    schema: schema.clone().with_schema_id(schema_id),
+                    last_column_id: None,
+                });
+                self.last_added_schema_id = Some(schema_id);
+            }
+            return schema_id;
+        }
+
+        let schema = schema.with_schema_id(schema_id);
+
+        self.metadata
+            .schemas
+            .insert(schema_id, Arc::new(schema.clone()));
+        let last_column_id = schema.highest_field_id();
+        self.changes.push(ViewUpdate::AddSchema {
+            schema,
+            last_column_id: Some(last_column_id),
+        });
+
+        self.last_added_schema_id = Some(schema_id);
+
+        schema_id
+    }
+
+    fn reuse_or_create_new_schema_id(&self, new_schema: &Schema) -> SchemaId {
+        self.metadata
+            .schemas
+            .iter()
+            .find_map(|(id, schema)| 
new_schema.is_same_schema(schema).then_some(*id))
+            .unwrap_or_else(|| {
+                self.get_highest_schema_id()
+                    .map(|id| id + 1)
+                    .unwrap_or(DEFAULT_SCHEMA_ID)
+            })
+    }
+
+    fn get_highest_schema_id(&self) -> Option<SchemaId> {
+        self.metadata.schemas.keys().max().copied()
+    }
+
+    /// Update properties of the view.
+    pub fn set_properties(mut self, updates: HashMap<String, String>) -> 
Result<Self> {
+        if updates.is_empty() {
+            return Ok(self);
+        }
+
+        let num_versions_to_keep = updates
+            .get(VIEW_PROPERTY_VERSION_HISTORY_SIZE)
+            .and_then(|v| v.parse::<i64>().ok())
+            .unwrap_or(1);
+        if num_versions_to_keep < 0 {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "{} must be positive but was {}",
+                    VIEW_PROPERTY_VERSION_HISTORY_SIZE, num_versions_to_keep
+                ),
+            ));
+        }
+
+        self.metadata.properties.extend(updates.clone());
+        self.changes.push(ViewUpdate::SetProperties { updates });
+
+        Ok(self)
+    }
+
+    /// Remove properties from the view
+    pub fn remove_properties(mut self, removals: &[String]) -> Self {
+        if removals.is_empty() {
+            return self;
+        }
+
+        for property in removals {
+            self.metadata.properties.remove(property);
+        }
+
+        self.changes.push(ViewUpdate::RemoveProperties {
+            removals: removals.to_vec(),
+        });
+
+        self
+    }
+
+    /// Assign a new UUID to the view.
+    pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
+        if self.metadata.view_uuid != uuid {
+            self.metadata.view_uuid = uuid;
+            self.changes.push(ViewUpdate::AssignUuid { uuid });
+        }
+
+        self
+    }
+
+    /// Build the `ViewMetadata` from the changes.
+    pub fn build(mut self) -> Result<ViewMetadataBuildResult> {
+        if let Some(history_entry) = self.history_entry.take() {
+            self.metadata.version_log.push(history_entry);
+        }
+
+        // We should run validate before `self.metadata.current_version()` 
below,
+        // as it might panic if the metadata is invalid.
+        self.metadata.validate()?;
+
+        if let Some(previous) = self.previous_view_version.take() {
+            if !allow_replace_drop_dialects(&self.metadata.properties) {
+                require_no_dialect_dropped(&previous, 
self.metadata.current_version())?;
+            }
+        }
+
+        let _expired_versions = self.expire_versions();
+        self.metadata.version_log = update_version_log(
+            self.metadata.version_log,
+            self.metadata.versions.keys().copied().collect(),
+        );
+
+        Ok(ViewMetadataBuildResult {
+            metadata: self.metadata,
+            changes: self.changes,
+        })
+    }
+
+    /// Removes expired versions from the view and returns them.
+    fn expire_versions(&mut self) -> Vec<ViewVersionRef> {
+        let num_versions_to_keep = self
+            .metadata
+            .properties
+            .get(VIEW_PROPERTY_VERSION_HISTORY_SIZE)
+            .and_then(|v| v.parse::<usize>().ok())
+            .unwrap_or(VIEW_PROPERTY_VERSION_HISTORY_SIZE_DEFAULT)
+            .max(1);
+
+        // expire old versions, but keep at least the versions added in this 
builder
+        let num_added_versions = self
+            .changes
+            .iter()
+            .filter(|update| matches!(update, ViewUpdate::AddViewVersion { .. 
}))
+            .count();
+        let num_versions_to_keep = 
num_added_versions.max(num_versions_to_keep);
+
+        if self.metadata.versions.len() > num_versions_to_keep {
+            // version ids are assigned sequentially. keep the latest versions 
by ID.
+            let mut versions_to_keep = self
+                .metadata
+                .versions
+                .keys()
+                .copied()
+                .sorted()
+                .rev()
+                .take(num_versions_to_keep)
+                .collect::<HashSet<_>>();
+
+            // always retain current version
+            if !versions_to_keep.contains(&self.metadata.current_version_id) {
+                // Remove the lowest ID
+                if num_versions_to_keep > num_added_versions {
+                    let lowest_id = versions_to_keep.iter().min().copied();
+                    lowest_id.map(|id| versions_to_keep.remove(&id));
+                }
+                // Add the current version ID
+                versions_to_keep.insert(self.metadata.current_version_id);
+            }
+
+            let mut expired_versions = Vec::new();
+            // remove all versions which are not in versions_to_keep from the 
metadata
+            // and add them to the expired_versions list
+            self.metadata.versions.retain(|id, version| {
+                if versions_to_keep.contains(id) {
+                    true
+                } else {
+                    expired_versions.push(version.clone());
+                    false
+                }
+            });
+
+            expired_versions
+        } else {
+            Vec::new()
+        }
+    }
+}
+
+/// Expire version log entries that are no longer relevant.
+/// Returns the history entries to retain.
+fn update_version_log(
+    version_log: Vec<ViewVersionLog>,
+    ids_to_keep: HashSet<i32>,
+) -> Vec<ViewVersionLog> {
+    let mut retained_history = Vec::new();
+    for log_entry in version_log {
+        if ids_to_keep.contains(&log_entry.version_id()) {
+            retained_history.push(log_entry);
+        } else {
+            retained_history.clear();
+        }
+    }
+    retained_history
+}
+
+fn allow_replace_drop_dialects(properties: &HashMap<String, String>) -> bool {
+    properties
+        .get(VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED)
+        .map_or(
+            VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED_DEFAULT,
+            |value| is_truthy(value),
+        )
+}
+
+fn require_no_dialect_dropped(previous: &ViewVersion, current: &ViewVersion) 
-> Result<()> {
+    let base_dialects = lowercase_sql_dialects_for(previous);
+    let updated_dialects = lowercase_sql_dialects_for(current);
+
+    if !updated_dialects.is_superset(&base_dialects) {
+        return Err(Error::new(
+            ErrorKind::DataInvalid,
+            format!(
+                "Cannot replace view due to loss of view dialects: \nPrevious 
dialects: {:?}\nNew dialects: {:?}\nSet {} to true to allow dropping dialects.",
+                Vec::from_iter(base_dialects), 
Vec::from_iter(updated_dialects), VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED
+            ),
+        ));
+    }
+
+    Ok(())
+}
+
+fn lowercase_sql_dialects_for(view_version: &ViewVersion) -> HashSet<String> {
+    view_version
+        .representations()
+        .iter()
+        .map(|repr| match repr {
+            ViewRepresentation::Sql(sql_repr) => 
sql_repr.dialect.to_lowercase(),
+        })
+        .collect()
+}
+
+pub(super) fn require_unique_dialects(view_version: &ViewVersion) -> 
Result<()> {
+    let mut seen_dialects = 
HashSet::with_capacity(view_version.representations().len());
+    for repr in view_version.representations().iter() {
+        match repr {
+            ViewRepresentation::Sql(sql_repr) => {
+                if !seen_dialects.insert(sql_repr.dialect.to_lowercase()) {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "Invalid view version: Cannot add multiple queries 
for dialect {}",
+                            sql_repr.dialect
+                        ),
+                    ));
+                }
+            }
+        }
+    }
+    Ok(())
+}
+
+#[cfg(test)]
+mod test {
+    use super::super::view_metadata::tests::get_test_view_metadata;
+    use super::*;
+    use crate::spec::{
+        NestedField, PrimitiveType, SqlViewRepresentation, Type, 
ViewRepresentations,
+    };
+    use crate::NamespaceIdent;
+
+    fn new_view_version(id: usize, schema_id: SchemaId, sql: &str) -> 
ViewVersion {
+        new_view_version_with_dialect(id, schema_id, sql, vec!["spark"])
+    }
+
+    fn new_view_version_with_dialect(
+        id: usize,
+        schema_id: SchemaId,
+        sql: &str,
+        dialects: Vec<&str>,
+    ) -> ViewVersion {
+        ViewVersion::builder()
+            .with_version_id(id as i32)
+            .with_schema_id(schema_id)
+            .with_timestamp_ms(1573518431300)
+            .with_default_catalog(Some("prod".to_string()))
+            .with_summary(HashMap::from_iter(vec![(
+                "user".to_string(),
+                "some-user".to_string(),
+            )]))
+            .with_representations(ViewRepresentations(
+                dialects
+                    .iter()
+                    .map(|dialect| {
+                        ViewRepresentation::Sql(SqlViewRepresentation {
+                            dialect: dialect.to_string(),
+                            sql: sql.to_string(),
+                        })
+                    })
+                    .collect(),
+            ))
+            .with_default_namespace(NamespaceIdent::new("default".to_string()))
+            .build()
+    }
+
+    fn builder_without_changes() -> ViewMetadataBuilder {
+        
ViewMetadataBuilder::new_from_metadata(get_test_view_metadata("ViewMetadataV1Valid.json"))
+    }
+
+    #[test]
+    fn test_minimal_builder() {
+        let location = "s3://bucket/table".to_string();
+        let schema = Schema::builder()
+            .with_schema_id(1)
+            .with_fields(vec![])
+            .build()
+            .unwrap();
+        // Version ID and schema should be re-assigned
+        let version = new_view_version(20, 21, "select 1 as count");
+        let format_version = ViewFormatVersion::V1;
+        let properties = HashMap::from_iter(vec![("key".to_string(), 
"value".to_string())]);
+
+        let build_result = ViewMetadataBuilder::new(
+            location.clone(),
+            schema.clone(),
+            version.clone(),
+            format_version,
+            properties.clone(),
+        )
+        .unwrap()
+        .build()
+        .unwrap();
+
+        let metadata = build_result.metadata;
+        assert_eq!(metadata.location, location);
+        assert_eq!(metadata.current_version_id, INITIAL_VIEW_VERSION_ID);
+        assert_eq!(metadata.format_version, format_version);
+        assert_eq!(metadata.properties, properties);
+        assert_eq!(metadata.versions.len(), 1);
+        assert_eq!(metadata.schemas.len(), 1);
+        assert_eq!(metadata.version_log.len(), 1);
+        assert_eq!(
+            
Arc::unwrap_or_clone(metadata.versions[&INITIAL_VIEW_VERSION_ID].clone()),
+            version
+                .clone()
+                .with_version_id(INITIAL_VIEW_VERSION_ID)
+                .with_schema_id(0)
+        );
+
+        let changes = build_result.changes;
+        assert_eq!(changes.len(), 5);
+        assert!(changes.contains(&ViewUpdate::SetLocation { location }));
+        assert!(changes.contains(&ViewUpdate::AddViewVersion {
+            view_version: version
+                .with_version_id(INITIAL_VIEW_VERSION_ID)
+                .with_schema_id(-1)
+        }));
+        assert!(changes.contains(&ViewUpdate::SetCurrentViewVersion {
+            view_version_id: -1
+        }));
+        assert!(changes.contains(&ViewUpdate::AddSchema {
+            schema: schema.clone().with_schema_id(0),
+            last_column_id: Some(0)
+        }));
+        assert!(changes.contains(&ViewUpdate::SetProperties {
+            updates: properties
+        }));
+    }
+
+    #[test]
+    fn test_version_expiration() {
+        let v1 = new_view_version(0, 1, "select 1 as count");
+        let v2 = new_view_version(0, 1, "select count(1) as count from t2");
+        let v3 = new_view_version(0, 1, "select count from t1");
+
+        let builder = builder_without_changes()
+            .add_version(v1)
+            .unwrap()
+            .add_version(v2)
+            .unwrap()
+            .add_version(v3)
+            .unwrap();
+        let builder_without_changes = 
builder.clone().build().unwrap().metadata.into_builder();
+
+        // No limit on versions
+        let metadata = builder.clone().build().unwrap().metadata;
+        assert_eq!(
+            metadata.versions.keys().cloned().collect::<HashSet<_>>(),
+            HashSet::from_iter(vec![1, 2, 3, 4])
+        );
+
+        // Limit to 2 versions, we still want to keep 3 versions as 3 where 
added during this build
+        // Plus the current version
+        let metadata = builder
+            .clone()
+            .set_properties(HashMap::from_iter(vec![(
+                VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
+                "2".to_string(),
+            )]))
+            .unwrap()
+            .build()
+            .unwrap()
+            .metadata;
+        assert_eq!(
+            metadata.versions.keys().cloned().collect::<HashSet<_>>(),
+            HashSet::from_iter(vec![1, 2, 3, 4])
+        );
+        assert_eq!(metadata.version_log.len(), 1);
+
+        // Limit to 2 versions in new build, only keep 2.
+        // One of them should be the current
+        let metadata = builder_without_changes
+            .clone()
+            .set_properties(HashMap::from_iter(vec![(
+                VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
+                "2".to_string(),
+            )]))
+            .unwrap()
+            .build()
+            .unwrap()
+            .metadata;
+        assert_eq!(
+            metadata.versions.keys().cloned().collect::<HashSet<_>>(),
+            HashSet::from_iter(vec![1, 4])
+        );
+
+        // Keep at least 1 version irrespective of the limit.
+        // This is the current version
+        let metadata = builder_without_changes
+            .set_properties(HashMap::from_iter(vec![(
+                VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
+                "0".to_string(),
+            )]))
+            .unwrap()
+            .build()
+            .unwrap()
+            .metadata;
+        assert_eq!(
+            metadata.versions.keys().cloned().collect::<HashSet<_>>(),
+            HashSet::from_iter(vec![1])
+        );
+    }
+
+    #[test]
+    fn test_update_version_log() {
+        let v1 = new_view_version(1, 1, "select 1 as count");
+        let v2 = new_view_version(2, 1, "select count(1) as count from t2");
+        let v3 = new_view_version(3, 1, "select count from t1");
+
+        let one = ViewVersionLog::new(1, v1.timestamp_ms());
+        let two = ViewVersionLog::new(2, v2.timestamp_ms());
+        let three = ViewVersionLog::new(3, v3.timestamp_ms());
+
+        assert_eq!(
+            update_version_log(
+                vec![one.clone(), two.clone(), three.clone()],
+                HashSet::from_iter(vec![1, 2, 3])
+            ),
+            vec![one.clone(), two.clone(), three.clone()]
+        );
+
+        // one was an invalid entry in the history, so all previous elements 
are removed
+        assert_eq!(
+            update_version_log(
+                vec![
+                    three.clone(),
+                    two.clone(),
+                    one.clone(),
+                    two.clone(),
+                    three.clone()
+                ],
+                HashSet::from_iter(vec![2, 3])
+            ),
+            vec![two.clone(), three.clone()]
+        );
+
+        // two was an invalid entry in the history, so all previous elements 
are removed
+        assert_eq!(
+            update_version_log(
+                vec![
+                    one.clone(),
+                    two.clone(),
+                    three.clone(),
+                    one.clone(),
+                    three.clone()
+                ],
+                HashSet::from_iter(vec![1, 3])
+            ),
+            vec![three.clone(), one.clone(), three.clone()]
+        );
+    }
+
+    #[test]
+    fn test_assign_uuid() {
+        let builder = builder_without_changes();
+        let uuid = Uuid::now_v7();
+        let build_result = builder.clone().assign_uuid(uuid).build().unwrap();
+        assert_eq!(build_result.metadata.view_uuid, uuid);
+        assert_eq!(build_result.changes, vec![ViewUpdate::AssignUuid { uuid 
}]);
+    }
+
+    #[test]
+    fn test_set_location() {
+        let builder = builder_without_changes();
+        let location = "s3://bucket/table".to_string();
+        let build_result = builder
+            .clone()
+            .set_location(location.clone())
+            .build()
+            .unwrap();
+        assert_eq!(build_result.metadata.location, location);
+        assert_eq!(build_result.changes, vec![ViewUpdate::SetLocation {
+            location
+        }]);
+    }
+
+    #[test]
+    fn test_set_and_remove_properties() {
+        let builder = builder_without_changes();
+        let properties = HashMap::from_iter(vec![
+            ("key1".to_string(), "value1".to_string()),
+            ("key2".to_string(), "value2".to_string()),
+        ]);
+        let build_result = builder
+            .clone()
+            .set_properties(properties.clone())
+            .unwrap()
+            .remove_properties(&["key2".to_string(), "key3".to_string()])
+            .build()
+            .unwrap();
+        assert_eq!(
+            build_result.metadata.properties.get("key1"),
+            Some(&"value1".to_string())
+        );
+        assert_eq!(build_result.metadata.properties.get("key2"), None);
+        assert_eq!(build_result.changes, vec![
+            ViewUpdate::SetProperties {
+                updates: properties
+            },
+            ViewUpdate::RemoveProperties {
+                removals: vec!["key2".to_string(), "key3".to_string()]
+            }
+        ]);
+    }
+
+    #[test]
+    fn test_add_schema() {
+        let builder = builder_without_changes();
+        let schema = Schema::builder()
+            .with_schema_id(1)
+            .with_fields(vec![])
+            .build()
+            .unwrap();
+        let build_result = 
builder.clone().add_schema(schema.clone()).build().unwrap();
+        assert_eq!(build_result.metadata.schemas.len(), 2);
+        assert_eq!(build_result.changes, vec![ViewUpdate::AddSchema {
+            schema: schema.clone().with_schema_id(2),
+            last_column_id: Some(0)
+        }]);
+
+        // Add schema again - id is reused
+        let build_result = 
builder.clone().add_schema(schema.clone()).build().unwrap();
+        assert_eq!(build_result.metadata.schemas.len(), 2);
+        assert_eq!(build_result.changes, vec![ViewUpdate::AddSchema {
+            schema: schema.clone().with_schema_id(2),
+            last_column_id: Some(0)
+        }]);
+    }
+
+    #[test]
+    fn test_add_and_set_current_version() {
+        let builder = builder_without_changes();
+        let v1 = new_view_version(2, 1, "select 1 as count");
+        let v2 = new_view_version(3, 2, "select count(1) as count from t2");
+        let v2_schema = Schema::builder()
+            .with_schema_id(2)
+            .with_fields(vec![])
+            .build()
+            .unwrap();
+
+        let build_result = builder
+            .clone()
+            .add_version(v1.clone())
+            .unwrap()
+            .add_schema(v2_schema.clone())
+            .add_version(v2.clone())
+            .unwrap()
+            .set_current_version_id(3)
+            .unwrap()
+            .build()
+            .unwrap();
+
+        assert_eq!(build_result.metadata.current_version_id, 3);
+        assert_eq!(build_result.metadata.versions.len(), 3);
+        assert_eq!(build_result.metadata.schemas.len(), 2);
+        assert_eq!(build_result.metadata.version_log.len(), 2);
+        assert_eq!(
+            Arc::unwrap_or_clone(build_result.metadata.versions[&2].clone()),
+            v1.clone().with_version_id(2).with_schema_id(1)
+        );
+        assert_eq!(
+            Arc::unwrap_or_clone(build_result.metadata.versions[&3].clone()),
+            v2.clone().with_version_id(3).with_schema_id(2)
+        );
+        assert_eq!(build_result.changes.len(), 4);
+        assert_eq!(build_result.changes, vec![
+            ViewUpdate::AddViewVersion {
+                view_version: v1.clone().with_version_id(2).with_schema_id(1)
+            },
+            ViewUpdate::AddSchema {
+                schema: v2_schema.clone().with_schema_id(2),
+                last_column_id: Some(0)
+            },
+            ViewUpdate::AddViewVersion {
+                view_version: v2.clone().with_version_id(3).with_schema_id(-1)
+            },
+            ViewUpdate::SetCurrentViewVersion {
+                view_version_id: -1
+            }
+        ]);
+        assert_eq!(
+            build_result
+                .metadata
+                .version_log
+                .iter()
+                .map(|v| v.version_id())
+                .collect::<Vec<_>>(),
+            vec![1, 3]
+        );
+    }
+
+    #[test]
+    fn test_schema_and_version_id_reassignment() {
+        let builder = builder_without_changes();
+        let v1 = new_view_version(0, 1, "select 1 as count");
+        let v2 = new_view_version(0, 2, "select count(1) as count from t2");
+        let v2_schema = Schema::builder()
+            .with_schema_id(0)
+            .with_fields(vec![])
+            .build()
+            .unwrap();
+
+        let build_result = builder
+            .clone()
+            .add_version(v1.clone())
+            .unwrap()
+            .set_current_version(v2.clone(), v2_schema.clone())
+            .unwrap()
+            .build()
+            .unwrap();
+
+        assert_eq!(build_result.metadata.current_version_id, 3);
+        assert_eq!(build_result.metadata.versions.len(), 3);
+        assert_eq!(build_result.metadata.schemas.len(), 2);
+        assert_eq!(build_result.metadata.version_log.len(), 2);
+        assert_eq!(
+            Arc::unwrap_or_clone(build_result.metadata.versions[&2].clone()),
+            v1.clone().with_version_id(2).with_schema_id(1)
+        );
+        assert_eq!(
+            Arc::unwrap_or_clone(build_result.metadata.versions[&3].clone()),
+            v2.clone().with_version_id(3).with_schema_id(2)
+        );
+        assert_eq!(build_result.changes.len(), 4);
+        assert_eq!(build_result.changes, vec![
+            ViewUpdate::AddViewVersion {
+                view_version: v1.clone().with_version_id(2).with_schema_id(1)
+            },
+            ViewUpdate::AddSchema {
+                schema: v2_schema.clone().with_schema_id(2),
+                last_column_id: Some(0)
+            },
+            ViewUpdate::AddViewVersion {
+                view_version: v2.clone().with_version_id(3).with_schema_id(-1)
+            },
+            ViewUpdate::SetCurrentViewVersion {
+                view_version_id: -1
+            }
+        ]);
+        assert_eq!(
+            build_result
+                .metadata
+                .version_log
+                .iter()
+                .map(|v| v.version_id())
+                .collect::<Vec<_>>(),
+            vec![1, 3]
+        );
+    }
+
+    #[test]
+    fn test_view_version_deduplication() {
+        let builder = builder_without_changes();
+        let v1 = new_view_version(0, 1, "select * from ns.tbl");
+
+        assert_eq!(builder.metadata.versions.len(), 1);
+        let build_result = builder
+            .clone()
+            .add_version(v1.clone())
+            .unwrap()
+            .add_version(v1)
+            .unwrap()
+            .build()
+            .unwrap();
+
+        assert_eq!(build_result.metadata.versions.len(), 2);
+        assert_eq!(build_result.metadata.schemas.len(), 1);
+    }
+
+    #[test]
+    fn test_view_version_and_schema_deduplication() {
+        let schema_one = Schema::builder()
+            .with_schema_id(5)
+            .with_fields(vec![NestedField::required(
+                1,
+                "x",
+                Type::Primitive(PrimitiveType::Long),
+            )
+            .into()])
+            .build()
+            .unwrap();
+        let schema_two = Schema::builder()
+            .with_schema_id(7)
+            .with_fields(vec![NestedField::required(
+                1,
+                "y",
+                Type::Primitive(PrimitiveType::Long),
+            )
+            .into()])
+            .build()
+            .unwrap();
+        let schema_three = Schema::builder()
+            .with_schema_id(9)
+            .with_fields(vec![NestedField::required(
+                1,
+                "z",
+                Type::Primitive(PrimitiveType::Long),
+            )
+            .into()])
+            .build()
+            .unwrap();
+
+        let v1 = new_view_version(1, 5, "select * from ns.tbl");
+        let v2 = new_view_version(1, 7, "select count(*) from ns.tbl");
+        let v3 = new_view_version(1, 9, "select count(*) as count from 
ns.tbl");
+
+        let build_result = builder_without_changes()
+            .add_schema(schema_one.clone())
+            .add_schema(schema_two.clone())
+            .add_schema(schema_three.clone())
+            .set_current_version(v1.clone(), schema_one.clone())
+            .unwrap()
+            .set_current_version(v2.clone(), schema_two.clone())
+            .unwrap()
+            .set_current_version(v3.clone(), schema_three.clone())
+            .unwrap()
+            .set_current_version(v3.clone(), schema_three.clone())
+            .unwrap()
+            .set_current_version(v2.clone(), schema_two.clone())
+            .unwrap()
+            .set_current_version(v1.clone(), schema_one.clone())
+            .unwrap()
+            .build()
+            .unwrap();
+
+        assert_eq!(
+            
Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
+            v1.clone().with_version_id(2).with_schema_id(2)
+        );
+        assert_eq!(build_result.metadata.versions.len(), 4);
+        assert_eq!(
+            build_result.metadata.versions[&2],
+            Arc::new(v1.clone().with_version_id(2).with_schema_id(2))
+        );
+        assert_eq!(
+            build_result.metadata.versions[&3],
+            Arc::new(v2.clone().with_version_id(3).with_schema_id(3))
+        );
+        assert_eq!(
+            build_result.metadata.versions[&4],
+            Arc::new(v3.clone().with_version_id(4).with_schema_id(4))
+        );
+        assert_eq!(
+            // Remove schema_id 1 and get struct only
+            build_result
+                .metadata
+                .schemas_iter()
+                .filter(|s| s.schema_id() != 1)
+                .sorted_by_key(|s| s.schema_id())
+                .map(|s| s.as_struct())
+                .collect::<Vec<_>>(),
+            vec![
+                schema_one.as_struct(),
+                schema_two.as_struct(),
+                schema_three.as_struct()
+            ]
+        )
+    }
+
+    #[test]
+    fn test_error_on_missing_schema() {
+        let builder = builder_without_changes();
+        // Missing schema
+        assert!(builder
+            .clone()
+            .add_version(new_view_version(0, 10, "SELECT * FROM foo"))
+            .unwrap_err()
+            .to_string()
+            .contains("Cannot add version with unknown schema: 10"));
+
+        // Missing last added schema
+        assert!(builder
+            .clone()
+            .add_version(new_view_version(0, -1, "SELECT * FROM foo"))
+            .unwrap_err()
+            .to_string()
+            .contains("Cannot set last added schema: no schema has been 
added"));
+    }
+
+    #[test]
+    fn test_error_on_missing_current_version() {
+        let builder = builder_without_changes();
+        assert!(builder
+            .clone()
+            .set_current_version_id(-1)
+            .unwrap_err()
+            .to_string()
+            .contains(
+                "Cannot set current version id to last added version: no 
version has been added."
+            ));
+        assert!(builder
+            .clone()
+            .set_current_version_id(10)
+            .unwrap_err()
+            .to_string()
+            .contains("Cannot set current version to unknown version with id: 
10"));
+    }
+
+    #[test]
+    fn test_error_when_setting_negative_version_history_size() {
+        let builder = builder_without_changes();
+        assert!(builder
+            .clone()
+            .set_properties(HashMap::from_iter(vec![(
+                VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
+                "-1".to_string(),
+            )]))
+            .unwrap_err()
+            .to_string()
+            .contains("version.history.num-entries must be positive but was 
-1"));
+    }
+
+    #[test]
+    fn test_view_version_changes() {
+        let builder = builder_without_changes();
+
+        let v1 = new_view_version(2, 1, "select 1 as count");
+        let v2 = new_view_version(3, 1, "select count(1) as count from t2");
+
+        let changes = builder
+            .clone()
+            .add_version(v1.clone())
+            .unwrap()
+            .add_version(v2.clone())
+            .unwrap()
+            .build()
+            .unwrap()
+            .changes;
+
+        assert_eq!(changes.len(), 2);
+        assert_eq!(changes, vec![
+            ViewUpdate::AddViewVersion {
+                view_version: v1.clone()
+            },
+            ViewUpdate::AddViewVersion {
+                view_version: v2.clone()
+            }
+        ]);
+    }
+
+    #[test]
+    fn test_dropping_dialect_fails_by_default() {
+        let builder = builder_without_changes();
+
+        let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", 
vec!["spark"]);
+        let spark_trino =
+            new_view_version_with_dialect(0, 0, "SELECT * FROM foo", 
vec!["spark", "trino"]);
+        let schema = Schema::builder()
+            .with_schema_id(0)
+            .with_fields(vec![])
+            .build()
+            .unwrap();
+
+        let err = builder
+            .set_current_version(spark_trino, schema.clone())
+            .unwrap()
+            .build()
+            .unwrap()
+            .metadata
+            .into_builder()
+            .set_current_version(spark, schema)
+            .unwrap()
+            .build()
+            .unwrap_err();
+
+        assert!(err
+            .to_string()
+            .contains("Cannot replace view due to loss of view dialects"));
+    }
+
+    #[test]
+    fn test_dropping_dialects_does_not_fail_when_allowed() {
+        let builder = builder_without_changes();
+
+        let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", 
vec!["spark"]);
+        let spark_trino =
+            new_view_version_with_dialect(0, 0, "SELECT * FROM foo", 
vec!["spark", "trino"]);
+        let schema = Schema::builder()
+            .with_schema_id(0)
+            .with_fields(vec![])
+            .build()
+            .unwrap();
+
+        let build_result = builder
+            .set_properties(HashMap::from_iter(vec![(
+                VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED.to_string(),
+                "true".to_string(),
+            )]))
+            .unwrap()
+            .set_current_version(spark_trino, schema.clone())
+            .unwrap()
+            .build()
+            .unwrap()
+            .metadata
+            .into_builder()
+            .set_current_version(spark.clone(), schema)
+            .unwrap()
+            .build()
+            .unwrap();
+
+        assert_eq!(
+            
Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
+            spark.with_version_id(3).with_schema_id(2)
+        );
+    }
+
+    #[test]
+    fn test_can_add_dialects_by_default() {
+        let builder = builder_without_changes();
+
+        let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", 
vec!["spark"]);
+        let spark_trino =
+            new_view_version_with_dialect(0, 0, "SELECT * FROM foo", 
vec!["spark", "trino"]);
+
+        let schema = Schema::builder()
+            .with_schema_id(0)
+            .with_fields(vec![])
+            .build()
+            .unwrap();
+
+        let build_result = builder
+            .set_current_version(spark.clone(), schema.clone())
+            .unwrap()
+            .build()
+            .unwrap()
+            .metadata
+            .into_builder()
+            .set_current_version(spark_trino.clone(), schema.clone())
+            .unwrap()
+            .build()
+            .unwrap();
+
+        assert_eq!(
+            
Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
+            spark_trino.with_version_id(3).with_schema_id(2)
+        );
+    }
+
+    #[test]
+    fn test_can_update_dialect_by_default() {
+        let builder = builder_without_changes();
+
+        let spark_v1 = new_view_version_with_dialect(0, 0, "SELECT * FROM 
foo", vec!["spark"]);
+        let spark_v2 = new_view_version_with_dialect(0, 0, "SELECT * FROM 
bar", vec!["spark"]);
+
+        let schema = Schema::builder()
+            .with_schema_id(0)
+            .with_fields(vec![])
+            .build()
+            .unwrap();
+
+        let build_result = builder
+            .set_current_version(spark_v1.clone(), schema.clone())
+            .unwrap()
+            .build()
+            .unwrap()
+            .metadata
+            .into_builder()
+            .set_current_version(spark_v2.clone(), schema.clone())
+            .unwrap()
+            .build()
+            .unwrap();
+
+        assert_eq!(
+            
Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
+            spark_v2.with_version_id(3).with_schema_id(2)
+        );
+    }
+
+    #[test]
+    fn test_dropping_dialects_allowed_and_then_disallowed() {
+        let builder = builder_without_changes();
+
+        let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", 
vec!["spark"]);
+        let trino = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", 
vec!["trino"]);
+
+        let schema = Schema::builder()
+            .with_schema_id(0)
+            .with_fields(vec![])
+            .build()
+            .unwrap();
+
+        let updated = builder
+            .set_current_version(spark.clone(), schema.clone())
+            .unwrap()
+            .build()
+            .unwrap()
+            .metadata
+            .into_builder()
+            .set_current_version(trino.clone(), schema.clone())
+            .unwrap()
+            .set_properties(HashMap::from_iter(vec![(
+                VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED.to_string(),
+                "true".to_string(),
+            )]))
+            .unwrap()
+            .build()
+            .unwrap();
+
+        assert_eq!(
+            Arc::unwrap_or_clone(updated.metadata.current_version().clone()),
+            trino.with_version_id(3).with_schema_id(2)
+        );
+
+        let err = updated
+            .metadata
+            .into_builder()
+            .set_current_version(spark.clone(), schema.clone())
+            .unwrap()
+            .set_properties(HashMap::from_iter(vec![(
+                VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED.to_string(),
+                "false".to_string(),
+            )]))
+            .unwrap()
+            .build()
+            .unwrap_err();
+
+        assert!(err
+            .to_string()
+            .contains("Cannot replace view due to loss of view dialects"));
+    }
+
+    #[test]
+    fn test_require_no_dialect_dropped() {
+        let previous = ViewVersion::builder()
+            .with_version_id(0)
+            .with_schema_id(0)
+            .with_timestamp_ms(0)
+            .with_representations(ViewRepresentations(vec![
+                ViewRepresentation::Sql(SqlViewRepresentation {
+                    dialect: "trino".to_string(),
+                    sql: "SELECT * FROM foo".to_string(),
+                }),
+                ViewRepresentation::Sql(SqlViewRepresentation {
+                    dialect: "spark".to_string(),
+                    sql: "SELECT * FROM bar".to_string(),
+                }),
+            ]))
+            .with_default_namespace(NamespaceIdent::new("default".to_string()))
+            .build();
+
+        let current = ViewVersion::builder()
+            .with_version_id(0)
+            .with_schema_id(0)
+            .with_timestamp_ms(0)
+            
.with_representations(ViewRepresentations(vec![ViewRepresentation::Sql(
+                SqlViewRepresentation {
+                    dialect: "trino".to_string(),
+                    sql: "SELECT * FROM foo".to_string(),
+                },
+            )]))
+            .with_default_namespace(NamespaceIdent::new("default".to_string()))
+            .build();
+
+        assert!(require_no_dialect_dropped(&previous, &current).is_err());
+
+        let current = ViewVersion::builder()
+            .with_version_id(0)
+            .with_schema_id(0)
+            .with_timestamp_ms(0)
+            .with_representations(ViewRepresentations(vec![
+                ViewRepresentation::Sql(SqlViewRepresentation {
+                    dialect: "spark".to_string(),
+                    sql: "SELECT * FROM bar".to_string(),
+                }),
+                ViewRepresentation::Sql(SqlViewRepresentation {
+                    dialect: "trino".to_string(),
+                    sql: "SELECT * FROM foo".to_string(),
+                }),
+            ]))
+            .with_default_namespace(NamespaceIdent::new("default".to_string()))
+            .build();
+
+        assert!(require_no_dialect_dropped(&previous, &current).is_ok());
+    }
+
+    #[test]
+    fn test_allow_replace_drop_dialects() {
+        use std::collections::HashMap;
+
+        use super::allow_replace_drop_dialects;
+
+        let mut properties = HashMap::new();
+        assert!(!allow_replace_drop_dialects(&properties));
+
+        properties.insert(
+            "replace.drop-dialect.allowed".to_string(),
+            "true".to_string(),
+        );
+        assert!(allow_replace_drop_dialects(&properties));
+
+        properties.insert(
+            "replace.drop-dialect.allowed".to_string(),
+            "false".to_string(),
+        );
+        assert!(!allow_replace_drop_dialects(&properties));
+
+        properties.insert(
+            "replace.drop-dialect.allowed".to_string(),
+            "TRUE".to_string(),
+        );
+        assert!(allow_replace_drop_dialects(&properties));
+
+        properties.insert(
+            "replace.drop-dialect.allowed".to_string(),
+            "FALSE".to_string(),
+        );
+        assert!(!allow_replace_drop_dialects(&properties));
+    }
+
+    #[test]
+    fn test_lowercase_sql_dialects_for() {
+        let view_version = ViewVersion::builder()
+            .with_version_id(0)
+            .with_schema_id(0)
+            .with_timestamp_ms(0)
+            .with_representations(ViewRepresentations(vec![
+                ViewRepresentation::Sql(SqlViewRepresentation {
+                    dialect: "STARROCKS".to_string(),
+                    sql: "SELECT * FROM foo".to_string(),
+                }),
+                ViewRepresentation::Sql(SqlViewRepresentation {
+                    dialect: "trino".to_string(),
+                    sql: "SELECT * FROM bar".to_string(),
+                }),
+                ViewRepresentation::Sql(SqlViewRepresentation {
+                    dialect: "Spark".to_string(),
+                    sql: "SELECT * FROM bar".to_string(),
+                }),
+            ]))
+            .with_default_namespace(NamespaceIdent::new("default".to_string()))
+            .build();
+
+        let dialects = lowercase_sql_dialects_for(&view_version);
+        assert_eq!(dialects.len(), 3);
+        assert!(dialects.contains("trino"));
+        assert!(dialects.contains("spark"));
+        assert!(dialects.contains("starrocks"));
+    }
+
+    #[test]
+    fn test_require_unique_dialects() {
+        let view_version = ViewVersion::builder()
+            .with_version_id(0)
+            .with_schema_id(0)
+            .with_timestamp_ms(0)
+            .with_representations(ViewRepresentations(vec![
+                ViewRepresentation::Sql(SqlViewRepresentation {
+                    dialect: "trino".to_string(),
+                    sql: "SELECT * FROM foo".to_string(),
+                }),
+                ViewRepresentation::Sql(SqlViewRepresentation {
+                    dialect: "trino".to_string(),
+                    sql: "SELECT * FROM bar".to_string(),
+                }),
+            ]))
+            .with_default_namespace(NamespaceIdent::new("default".to_string()))
+            .build();
+
+        assert!(require_unique_dialects(&view_version).is_err());
+
+        let view_version = ViewVersion::builder()
+            .with_version_id(0)
+            .with_schema_id(0)
+            .with_timestamp_ms(0)
+            .with_representations(ViewRepresentations(vec![
+                ViewRepresentation::Sql(SqlViewRepresentation {
+                    dialect: "trino".to_string(),
+                    sql: "SELECT * FROM foo".to_string(),
+                }),
+                ViewRepresentation::Sql(SqlViewRepresentation {
+                    dialect: "spark".to_string(),
+                    sql: "SELECT * FROM bar".to_string(),
+                }),
+            ]))
+            .with_default_namespace(NamespaceIdent::new("default".to_string()))
+            .build();
+
+        assert!(require_unique_dialects(&view_version).is_ok());
+    }
+}
diff --git a/crates/iceberg/src/spec/view_version.rs 
b/crates/iceberg/src/spec/view_version.rs
index 30686b5a..b13d87a9 100644
--- a/crates/iceberg/src/spec/view_version.rs
+++ b/crates/iceberg/src/spec/view_version.rs
@@ -27,6 +27,7 @@ use serde::{Deserialize, Serialize};
 use typed_builder::TypedBuilder;
 
 use super::view_metadata::ViewVersionLog;
+use super::INITIAL_VIEW_VERSION_ID;
 use crate::catalog::NamespaceIdent;
 use crate::error::{timestamp_ms_to_utc, Result};
 use crate::spec::{SchemaId, SchemaRef, ViewMetadata};
@@ -44,12 +45,14 @@ pub type ViewVersionId = i32;
 /// A view versions represents the definition of a view at a specific point in 
time.
 pub struct ViewVersion {
     /// A unique long ID
+    #[builder(default = INITIAL_VIEW_VERSION_ID)]
     version_id: ViewVersionId,
     /// ID of the schema for the view version
     schema_id: SchemaId,
     /// Timestamp when the version was created (ms from epoch)
     timestamp_ms: i64,
     /// A string to string map of summary metadata about the version
+    #[builder(default = HashMap::new())]
     summary: HashMap<String, String>,
     /// A list of representations for the view definition.
     representations: ViewRepresentations,
@@ -124,10 +127,36 @@ impl ViewVersion {
     }
 
     /// Retrieve the history log entry for this view version.
-    #[allow(dead_code)]
     pub(crate) fn log(&self) -> ViewVersionLog {
         ViewVersionLog::new(self.version_id, self.timestamp_ms)
     }
+
+    /// Check if this view version behaves the same as another view spec.
+    ///
+    /// Returns true if the view version is equal to the other view version
+    /// with `timestamp_ms` and `version_id` ignored. The following must be 
identical:
+    /// * Summary (all of them)
+    /// * Representations
+    /// * Default Catalog
+    /// * Default Namespace
+    /// * The Schema ID
+    pub(crate) fn behaves_identical_to(&self, other: &Self) -> bool {
+        self.summary == other.summary
+            && self.representations == other.representations
+            && self.default_catalog == other.default_catalog
+            && self.default_namespace == other.default_namespace
+            && self.schema_id == other.schema_id
+    }
+
+    /// Change the version id of this view version.
+    pub fn with_version_id(self, version_id: i32) -> Self {
+        Self { version_id, ..self }
+    }
+
+    /// Change the schema id of this view version.
+    pub fn with_schema_id(self, schema_id: SchemaId) -> Self {
+        Self { schema_id, ..self }
+    }
 }
 
 /// A list of view representations.
@@ -148,7 +177,7 @@ impl ViewRepresentations {
     }
 
     /// Get an iterator over the representations
-    pub fn iter(&self) -> impl Iterator<Item = &'_ ViewRepresentation> {
+    pub fn iter(&self) -> impl ExactSizeIterator<Item = &'_ 
ViewRepresentation> {
         self.0.iter()
     }
 }
@@ -253,6 +282,7 @@ mod tests {
     use crate::spec::view_version::ViewVersion;
     use crate::spec::view_version::_serde::ViewVersionV1;
     use crate::spec::ViewRepresentations;
+    use crate::NamespaceIdent;
 
     #[test]
     fn view_version() {
@@ -310,4 +340,50 @@ mod tests {
             vec!["default".to_string()]
         );
     }
+
+    #[test]
+    fn test_behaves_identical_to() {
+        let view_version = ViewVersion::builder()
+            .with_version_id(1)
+            .with_schema_id(1)
+            .with_timestamp_ms(1573518431292)
+            .with_summary({
+                let mut map = std::collections::HashMap::new();
+                map.insert("engine-name".to_string(), "Spark".to_string());
+                map.insert("engineVersion".to_string(), "3.3.2".to_string());
+                map
+            })
+            
.with_representations(ViewRepresentations(vec![super::ViewRepresentation::Sql(
+                super::SqlViewRepresentation {
+                    sql: "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM 
events\nGROUP BY 2"
+                        .to_string(),
+                    dialect: "spark".to_string(),
+                },
+            )]))
+            .with_default_catalog(Some("prod".to_string()))
+            .with_default_namespace(NamespaceIdent::new("default".to_string()))
+            .build();
+
+        let mut identical_view_version = view_version.clone();
+        identical_view_version.version_id = 2;
+        identical_view_version.timestamp_ms = 1573518431293;
+
+        let different_view_version = ViewVersion::builder()
+            .with_version_id(view_version.version_id())
+            .with_schema_id(view_version.schema_id())
+            .with_timestamp_ms(view_version.timestamp_ms())
+            .with_summary(view_version.summary().clone())
+            
.with_representations(ViewRepresentations(vec![super::ViewRepresentation::Sql(
+                super::SqlViewRepresentation {
+                    sql: "SELECT * from events".to_string(),
+                    dialect: "spark".to_string(),
+                },
+            )]))
+            .with_default_catalog(view_version.default_catalog().cloned())
+            .with_default_namespace(view_version.default_namespace().clone())
+            .build();
+
+        assert!(view_version.behaves_identical_to(&identical_view_version));
+        assert!(!view_version.behaves_identical_to(&different_view_version));
+    }
 }

Reply via email to