This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 12d464e  feat: Implement JsonSerde for all datatypes (#53)
12d464e is described below

commit 12d464ea6b5a754f054b6b394c185d9c29c95fb9
Author: Pavlos-Petros Tournaris <[email protected]>
AuthorDate: Fri Dec 5 06:26:24 2025 +0200

    feat: Implement JsonSerde for all datatypes (#53)
---
 crates/fluss/src/metadata/json_serde.rs | 285 ++++++++++++++++++++++++++++----
 1 file changed, 251 insertions(+), 34 deletions(-)

diff --git a/crates/fluss/src/metadata/json_serde.rs 
b/crates/fluss/src/metadata/json_serde.rs
index 1c7604c..447b0f9 100644
--- a/crates/fluss/src/metadata/json_serde.rs
+++ b/crates/fluss/src/metadata/json_serde.rs
@@ -17,7 +17,7 @@
 
 use crate::error::Error::{InvalidTableError, JsonSerdeError};
 use crate::error::Result;
-use crate::metadata::datatype::{DataType, DataTypes};
+use crate::metadata::datatype::{DataField, DataType, DataTypes};
 use crate::metadata::table::{Column, Schema, TableDescriptor};
 use serde_json::{Value, json};
 use std::collections::HashMap;
@@ -58,10 +58,8 @@ impl DataType {
     const FIELD_NAME_TYPE_NAME: &'static str = "type";
     const FIELD_NAME_NULLABLE: &'static str = "nullable";
     const FIELD_NAME_LENGTH: &'static str = "length";
-    #[allow(dead_code)]
     const FIELD_NAME_PRECISION: &'static str = "precision";
-    #[allow(dead_code)]
-    const FILED_NAME_SCALE: &'static str = "scale";
+    const FIELD_NAME_SCALE: &'static str = "scale";
     #[allow(dead_code)]
     const FIELD_NAME_ELEMENT_TYPE: &'static str = "element_type";
     #[allow(dead_code)]
@@ -111,21 +109,54 @@ impl JsonSerde for DataType {
                 obj.insert(Self::FIELD_NAME_LENGTH.to_string(), 
json!(_type.length()));
             }
             DataType::Decimal(_type) => {
-                todo!()
+                obj.insert(
+                    Self::FIELD_NAME_PRECISION.to_string(),
+                    json!(_type.precision()),
+                );
+                obj.insert(Self::FIELD_NAME_SCALE.to_string(), 
json!(_type.scale()));
             }
-
             DataType::Time(_type) => {
-                todo!()
+                obj.insert(
+                    Self::FIELD_NAME_PRECISION.to_string(),
+                    json!(_type.precision()),
+                );
             }
             DataType::Timestamp(_type) => {
-                todo!()
+                obj.insert(
+                    Self::FIELD_NAME_PRECISION.to_string(),
+                    json!(_type.precision()),
+                );
             }
             DataType::TimestampLTz(_type) => {
-                todo!()
+                obj.insert(
+                    Self::FIELD_NAME_PRECISION.to_string(),
+                    json!(_type.precision()),
+                );
+            }
+            DataType::Array(_type) => {
+                obj.insert(
+                    Self::FIELD_NAME_ELEMENT_TYPE.to_string(),
+                    _type.get_element_type().serialize_json()?,
+                );
+            }
+            DataType::Map(_type) => {
+                obj.insert(
+                    Self::FIELD_NAME_KEY_TYPE.to_string(),
+                    _type.key_type().serialize_json()?,
+                );
+                obj.insert(
+                    Self::FIELD_NAME_VALUE_TYPE.to_string(),
+                    _type.value_type().serialize_json()?,
+                );
+            }
+            DataType::Row(_type) => {
+                let fields: Vec<Value> = _type
+                    .fields()
+                    .iter()
+                    .map(|field| field.serialize_json())
+                    .collect::<Result<_>>()?;
+                obj.insert(Self::FIELD_NAME_FIELDS.to_string(), json!(fields));
             }
-            DataType::Array(_type) => todo!(),
-            DataType::Map(_type) => todo!(),
-            DataType::Row(_type) => todo!(),
         }
         Ok(Value::Object(obj))
     }
@@ -150,18 +181,112 @@ impl JsonSerde for DataType {
             "BIGINT" => DataTypes::bigint(),
             "FLOAT" => DataTypes::float(),
             "DOUBLE" => DataTypes::double(),
-            "CHAR" => todo!(),
+            "CHAR" => {
+                let length = node
+                    .get(Self::FIELD_NAME_LENGTH)
+                    .and_then(|v| v.as_u64())
+                    .ok_or_else(|| {
+                        JsonSerdeError(format!(
+                            "Missing required field: {}",
+                            Self::FIELD_NAME_LENGTH
+                        ))
+                    })? as u32;
+                DataTypes::char(length)
+            }
             "STRING" => DataTypes::string(),
-            "DECIMAL" => todo!(),
+            "DECIMAL" => {
+                let precision = node
+                    .get(Self::FIELD_NAME_PRECISION)
+                    .and_then(|v| v.as_u64())
+                    .ok_or_else(|| {
+                        JsonSerdeError(format!(
+                            "Missing required field: {}",
+                            Self::FIELD_NAME_PRECISION
+                        ))
+                    })? as u32;
+                let scale = node
+                    .get(Self::FIELD_NAME_SCALE)
+                    .and_then(|v| v.as_u64())
+                    .unwrap_or(0) as u32;
+                DataTypes::decimal(precision, scale)
+            }
             "DATE" => DataTypes::date(),
-            "TIME_WITHOUT_TIME_ZONE" => todo!(), // Precision set separately
-            "TIMESTAMP_WITHOUT_TIME_ZONE" => todo!(), // Precision set 
separately
-            "TIMESTAMP_WITH_LOCAL_TIME_ZONE" => todo!(), // Precision set 
separately
+            "TIME_WITHOUT_TIME_ZONE" => {
+                let precision = node
+                    .get(Self::FIELD_NAME_PRECISION)
+                    .and_then(|v| v.as_u64())
+                    .unwrap_or(0) as u32;
+                DataTypes::time_with_precision(precision)
+            }
+            "TIMESTAMP_WITHOUT_TIME_ZONE" => {
+                let precision = node
+                    .get(Self::FIELD_NAME_PRECISION)
+                    .and_then(|v| v.as_u64())
+                    .unwrap_or(6) as u32;
+                DataTypes::timestamp_with_precision(precision)
+            }
+            "TIMESTAMP_WITH_LOCAL_TIME_ZONE" => {
+                let precision = node
+                    .get(Self::FIELD_NAME_PRECISION)
+                    .and_then(|v| v.as_u64())
+                    .unwrap_or(6) as u32;
+                DataTypes::timestamp_ltz_with_precision(precision)
+            }
             "BYTES" => DataTypes::bytes(),
-            "BINARY" => todo!(),
-            "ARRAY" => todo!(),
-            "MAP" => todo!(),
-            "ROW" => todo!(),
+            "BINARY" => {
+                let length = node
+                    .get(Self::FIELD_NAME_LENGTH)
+                    .and_then(|v| v.as_u64())
+                    .unwrap_or(1) as usize;
+                DataTypes::binary(length)
+            }
+            "ARRAY" => {
+                let element_type_node =
+                    node.get(Self::FIELD_NAME_ELEMENT_TYPE).ok_or_else(|| {
+                        JsonSerdeError(format!(
+                            "Missing required field: {}",
+                            Self::FIELD_NAME_ELEMENT_TYPE
+                        ))
+                    })?;
+                let element_type = 
DataType::deserialize_json(element_type_node)?;
+                DataTypes::array(element_type)
+            }
+            "MAP" => {
+                let key_type_node = 
node.get(Self::FIELD_NAME_KEY_TYPE).ok_or_else(|| {
+                    JsonSerdeError(format!(
+                        "Missing required field: {}",
+                        Self::FIELD_NAME_KEY_TYPE
+                    ))
+                })?;
+                let key_type = DataType::deserialize_json(key_type_node)?;
+                let value_type_node = 
node.get(Self::FIELD_NAME_VALUE_TYPE).ok_or_else(|| {
+                    JsonSerdeError(format!(
+                        "Missing required field: {}",
+                        Self::FIELD_NAME_VALUE_TYPE
+                    ))
+                })?;
+                let value_type = DataType::deserialize_json(value_type_node)?;
+                DataTypes::map(key_type, value_type)
+            }
+            "ROW" => {
+                let fields_node = node
+                    .get(Self::FIELD_NAME_FIELDS)
+                    .ok_or_else(|| {
+                        JsonSerdeError(format!(
+                            "Missing required field: {}",
+                            Self::FIELD_NAME_FIELDS
+                        ))
+                    })?
+                    .as_array()
+                    .ok_or_else(|| {
+                        JsonSerdeError(format!("{} must be an array", 
Self::FIELD_NAME_FIELDS))
+                    })?;
+                let mut fields = Vec::with_capacity(fields_node.len());
+                for field_node in fields_node {
+                    fields.push(DataField::deserialize_json(field_node)?);
+                }
+                DataTypes::row(fields)
+            }
             _ => return Err(JsonSerdeError(format!("Unknown type root: 
{type_root}"))),
         };
 
@@ -175,6 +300,51 @@ impl JsonSerde for DataType {
     }
 }
 
+impl DataField {
+    const NAME: &'static str = "name";
+    const FIELD_TYPE: &'static str = "field_type";
+    const DESCRIPTION: &'static str = "description";
+}
+
+impl JsonSerde for DataField {
+    fn serialize_json(&self) -> Result<Value> {
+        let mut obj = serde_json::Map::new();
+
+        obj.insert(Self::NAME.to_string(), json!(self.name()));
+        obj.insert(
+            Self::FIELD_TYPE.to_string(),
+            self.data_type.serialize_json()?,
+        );
+
+        if let Some(description) = &self.description {
+            obj.insert(Self::DESCRIPTION.to_string(), json!(description));
+        }
+
+        Ok(Value::Object(obj))
+    }
+
+    fn deserialize_json(node: &Value) -> Result<DataField> {
+        let name = node
+            .get(Self::NAME)
+            .and_then(|v| v.as_str())
+            .ok_or_else(|| JsonSerdeError(format!("Missing required field: 
{}", Self::NAME)))?
+            .to_string();
+
+        let field_type_node = node.get(Self::FIELD_TYPE).ok_or_else(|| {
+            JsonSerdeError(format!("Missing required field: {}", 
Self::FIELD_TYPE))
+        })?;
+
+        let data_type = DataType::deserialize_json(field_type_node)?;
+
+        let description = node
+            .get(Self::DESCRIPTION)
+            .and_then(|v| v.as_str())
+            .map(|s| s.to_string());
+
+        Ok(DataField::new(name, data_type, description))
+    }
+}
+
 impl Column {
     const NAME: &'static str = "name";
     const DATA_TYPE: &'static str = "data_type";
@@ -203,7 +373,7 @@ impl JsonSerde for Column {
         let name = node
             .get(Self::NAME)
             .and_then(|v| v.as_str())
-            .unwrap_or_else(|| panic!("{}", format!("Missing required field: 
{}", Self::NAME)))
+            .ok_or_else(|| JsonSerdeError(format!("Missing required field: 
{}", Self::NAME)))?
             .to_string();
 
         let data_type_node = node.get(Self::DATA_TYPE).ok_or_else(|| {
@@ -263,7 +433,7 @@ impl JsonSerde for Schema {
                 JsonSerdeError(format!("Missing required field: {}", 
Self::COLUMNS_NAME))
             })?
             .as_array()
-            .ok_or_else(|| JsonSerdeError(format!("{} should be an array", 
Self::COLUMNS_NAME)))?;
+            .ok_or_else(|| JsonSerdeError(format!("{} must be an array", 
Self::COLUMNS_NAME)))?;
 
         let mut columns = Vec::with_capacity(columns_node.len());
         for col_node in columns_node {
@@ -275,14 +445,16 @@ impl JsonSerde for Schema {
         if let Some(pk_node) = node.get(Self::PRIMARY_KEY_NAME) {
             let pk_array = pk_node
                 .as_array()
-                .ok_or_else(|| InvalidTableError("Primary key is not an 
array".to_string()))?;
+                .ok_or_else(|| InvalidTableError("Primary key must be an 
array".to_string()))?;
 
             let mut primary_keys = Vec::with_capacity(pk_array.len());
             for name_node in pk_array {
                 primary_keys.push(
                     name_node
                         .as_str()
-                        .ok_or_else(|| InvalidTableError("Primary key is not 
string".to_string()))?
+                        .ok_or_else(|| {
+                            InvalidTableError("Primary key element must be a 
string".to_string())
+                        })?
                         .to_string(),
                 );
             }
@@ -308,7 +480,7 @@ impl TableDescriptor {
     fn deserialize_properties(node: &Value) -> Result<HashMap<String, String>> 
{
         let obj = node
             .as_object()
-            .ok_or_else(|| JsonSerdeError("Properties should be an 
object".to_string()))?;
+            .ok_or_else(|| JsonSerdeError("Properties must be an 
object".to_string()))?;
 
         let mut properties = HashMap::with_capacity(obj.len());
         for (key, value) in obj {
@@ -316,7 +488,7 @@ impl TableDescriptor {
                 key.clone(),
                 value
                     .as_str()
-                    .ok_or_else(|| JsonSerdeError("Properties should be an 
object".to_string()))?
+                    .ok_or_else(|| JsonSerdeError("Property value must be a 
string".to_string()))?
                     .to_owned(),
             );
         }
@@ -383,9 +555,7 @@ impl JsonSerde for TableDescriptor {
         if let Some(comment_node) = node.get(Self::COMMENT_NAME) {
             let comment = comment_node
                 .as_str()
-                .ok_or_else(|| {
-                    JsonSerdeError(format!("{} should be a string", 
Self::COMMENT_NAME))
-                })?
+                .ok_or_else(|| JsonSerdeError(format!("{} must be a string", 
Self::COMMENT_NAME)))?
                 .to_owned();
             builder = builder.comment(comment.as_str());
         }
@@ -400,7 +570,7 @@ impl JsonSerde for TableDescriptor {
             })?
             .as_array()
             .ok_or_else(|| {
-                JsonSerdeError(format!("{} should be an array", 
Self::PARTITION_KEY_NAME))
+                JsonSerdeError(format!("{} must be an array", 
Self::PARTITION_KEY_NAME))
             })?;
 
         let mut partition_keys = Vec::with_capacity(partition_node.len());
@@ -409,7 +579,10 @@ impl JsonSerde for TableDescriptor {
                 key_node
                     .as_str()
                     .ok_or_else(|| {
-                        JsonSerdeError(format!("{} should be a string", 
Self::PARTITION_KEY_NAME))
+                        JsonSerdeError(format!(
+                            "{} element must be a string",
+                            Self::PARTITION_KEY_NAME
+                        ))
                     })?
                     .to_owned(),
             );
@@ -420,14 +593,14 @@ impl JsonSerde for TableDescriptor {
         let mut bucket_keys = vec![];
         if let Some(bucket_key_node) = node.get(Self::BUCKET_KEY_NAME) {
             let bucket_key_node = bucket_key_node.as_array().ok_or_else(|| {
-                JsonSerdeError(format!("{} should be an array", 
Self::BUCKET_COUNT_NAME))
+                JsonSerdeError(format!("{} must be an array", 
Self::BUCKET_KEY_NAME))
             })?;
 
             for key_node in bucket_key_node {
                 bucket_keys.push(
                     key_node
                         .as_str()
-                        .ok_or_else(|| JsonSerdeError("Bucket key should be a 
string".to_string()))?
+                        .ok_or_else(|| JsonSerdeError("Bucket key must be a 
string".to_string()))?
                         .to_owned(),
                 );
             }
@@ -462,3 +635,47 @@ impl JsonSerde for TableDescriptor {
         builder.build()
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::metadata::DataTypes;
+
+    #[test]
+    fn test_datatype_json_serde() {
+        let data_types = vec![
+            DataTypes::boolean(),
+            DataTypes::tinyint(),
+            DataTypes::smallint(),
+            DataTypes::int().as_non_nullable(),
+            DataTypes::bigint(),
+            DataTypes::float(),
+            DataTypes::double(),
+            DataTypes::char(10),
+            DataTypes::string(),
+            DataTypes::decimal(10, 2),
+            DataTypes::date(),
+            DataTypes::time(),
+            DataTypes::timestamp(),
+            DataTypes::timestamp_ltz(),
+            DataTypes::bytes(),
+            DataTypes::binary(100),
+            DataTypes::array(DataTypes::int()),
+            DataTypes::map(DataTypes::string(), DataTypes::int()),
+            DataTypes::row(vec![
+                DataField::new("f1".to_string(), DataTypes::int(), None),
+                DataField::new(
+                    "f2".to_string(),
+                    DataTypes::string(),
+                    Some("desc".to_string()),
+                ),
+            ]),
+        ];
+
+        for dt in data_types {
+            let json = dt.serialize_json().unwrap();
+            let deserialized = DataType::deserialize_json(&json).unwrap();
+            assert_eq!(dt, deserialized);
+        }
+    }
+}

Reply via email to