This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cad36ab feat(connectors): Extend JSON field transformations in 
connectors run… (#1863)
9cad36ab is described below

commit 9cad36abdd64e0a8e137164148e3f80736650927
Author: Jagadesh Adireddi <[email protected]>
AuthorDate: Mon Jun 16 21:31:45 2025 +0530

    feat(connectors): Extend JSON field transformations in connectors run… 
(#1863)
    
    Fixes: https://github.com/apache/iggy/issues/1848
    
    This PR introduces a message transformation system for connectors,
    enabling modification of JSON messages. The implementation provides four
    essential transforms with one having advanced pattern matching
    capabilities.
    
    ### Transforms Added
    
    **AddFields Transform**
    - Dynamically adds fields to JSON messages
    - Static values (strings, numbers, booleans, objects)
    - Computed values (timestamps, UUIDs, datetime strings)
    - Support for multiple timestamp formats (nanos, micros, millis,
    seconds)
    - UUID v4 and v7 generation
    
    **DeleteFields Transform**
    - Removes specified fields from JSON messages
    - Simple field name-based deletion
    - Ignores non-existent fields
    
    **UpdateFields Transform**
    - Conditionally updates existing fields in JSON messages
    - Conditional logic: Always, KeyExists, KeyNotExists
    - Static and computed value updates
    - Field creation when conditions are met
    - Same computed value support as AddFields
    
    **FilterFields Transform **
    - Filtering based on complex key-value patterns
    - Key pattern matching (exact, prefix, suffix, contains, regex)
    - Value pattern matching (equality, type checks, numeric comparisons,
    regex)
    - Combined key and value pattern matching
    - Include or exclude matching behavior
    
    ---------
    
    Co-authored-by: Piotr Gankiewicz <[email protected]>
---
 Cargo.lock                                         |   1 +
 core/connectors/runtime/src/transform.rs           |   2 +-
 core/connectors/sdk/Cargo.toml                     |   1 +
 core/connectors/sdk/src/transforms/add_fields.rs   | 101 ++-----
 .../connectors/sdk/src/transforms/delete_fields.rs |  37 +--
 .../connectors/sdk/src/transforms/filter_fields.rs | 258 ++++++++++++++++++
 .../sdk/src/transforms/json/add_fields.rs          | 187 +++++++++++++
 .../sdk/src/transforms/json/delete_fields.rs       | 149 +++++++++++
 .../sdk/src/transforms/json/filter_fields.rs       | 290 +++++++++++++++++++++
 core/connectors/sdk/src/transforms/json/mod.rs     |  45 ++++
 .../sdk/src/transforms/json/test_utils.rs          |  93 +++++++
 .../sdk/src/transforms/json/update_fields.rs       | 224 ++++++++++++++++
 core/connectors/sdk/src/transforms/mod.rs          |  95 ++++---
 .../connectors/sdk/src/transforms/update_fields.rs |  78 ++++++
 14 files changed, 1435 insertions(+), 126 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 76292c4b..ac8cfa25 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3887,6 +3887,7 @@ dependencies = [
  "iggy",
  "once_cell",
  "postcard",
+ "regex",
  "serde",
  "serde_json",
  "simd-json",
diff --git a/core/connectors/runtime/src/transform.rs 
b/core/connectors/runtime/src/transform.rs
index 0cee67cc..303153cb 100644
--- a/core/connectors/runtime/src/transform.rs
+++ b/core/connectors/runtime/src/transform.rs
@@ -42,7 +42,7 @@ pub fn load(config: TransformsConfig) -> Result<Vec<Arc<dyn 
Transform>>, Runtime
             continue;
         }
 
-        let transform = iggy_connector_sdk::transforms::load(r#type, 
transform_config)?;
+        let transform = iggy_connector_sdk::transforms::from_config(r#type, 
&transform_config)?;
         transforms.push(transform);
     }
 
diff --git a/core/connectors/sdk/Cargo.toml b/core/connectors/sdk/Cargo.toml
index 7c96ed3c..54e07c77 100644
--- a/core/connectors/sdk/Cargo.toml
+++ b/core/connectors/sdk/Cargo.toml
@@ -35,6 +35,7 @@ dashmap = { workspace = true }
 iggy = { workspace = true }
 once_cell = { workspace = true }
 postcard = { workspace = true }
+regex = { workspace = true }
 serde = { workspace = true }
 serde_json = { workspace = true }
 simd-json = { workspace = true }
diff --git a/core/connectors/sdk/src/transforms/add_fields.rs 
b/core/connectors/sdk/src/transforms/add_fields.rs
index 569a16b8..591959aa 100644
--- a/core/connectors/sdk/src/transforms/add_fields.rs
+++ b/core/connectors/sdk/src/transforms/add_fields.rs
@@ -16,60 +16,32 @@
  * under the License.
  */
 
-use serde::{Deserialize, Serialize};
-use simd_json::OwnedValue;
-use strum_macros::{Display, IntoStaticStr};
-
+use super::{FieldValue, Transform, TransformType};
 use crate::{DecodedMessage, Error, Payload, TopicMetadata};
+use serde::{Deserialize, Serialize};
 
-use super::{Transform, TransformType};
-
+/// A field to be added to messages
 #[derive(Debug, Serialize, Deserialize)]
-pub struct AddFieldsConfig {
-    fields: Vec<Field>,
-}
-
-pub struct AddFields {
-    fields: Vec<Field>,
+pub struct Field {
+    pub key: String,
+    pub value: FieldValue,
 }
 
+/// Configuration for the AddFields transform
 #[derive(Debug, Serialize, Deserialize)]
-struct Field {
-    key: String,
-    value: FieldValue,
-}
-
-#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
-#[serde(rename_all = "snake_case")]
-enum FieldValue {
-    Static(simd_json::OwnedValue),
-    Computed(ComputedValue),
+pub struct AddFieldsConfig {
+    #[serde(default)]
+    pub fields: Vec<Field>,
 }
 
-#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, Display, 
IntoStaticStr)]
-#[serde(rename_all = "snake_case")]
-enum ComputedValue {
-    #[strum(to_string = "date_time")]
-    DateTime,
-    #[strum(to_string = "timestamp_nanos")]
-    TimestampNanos,
-    #[strum(to_string = "timestamp_micros")]
-    TimestampMicros,
-    #[strum(to_string = "timestamp_millis")]
-    TimestampMillis,
-    #[strum(to_string = "timestamp_seconds")]
-    TimestampSeconds,
-    #[strum(to_string = "uuid_v4")]
-    UuidV4,
-    #[strum(to_string = "uuid_v7")]
-    UuidV7,
+/// Transform that adds fields to JSON messages
+pub struct AddFields {
+    pub fields: Vec<Field>,
 }
 
 impl AddFields {
-    pub fn new(config: AddFieldsConfig) -> Self {
-        Self {
-            fields: config.fields,
-        }
+    pub fn new(cfg: AddFieldsConfig) -> Self {
+        Self { fields: cfg.fields }
     }
 }
 
@@ -80,49 +52,16 @@ impl Transform for AddFields {
 
     fn transform(
         &self,
-        _metadata: &TopicMetadata,
-        mut message: DecodedMessage,
+        metadata: &TopicMetadata,
+        message: DecodedMessage,
     ) -> Result<Option<DecodedMessage>, Error> {
         if self.fields.is_empty() {
             return Ok(Some(message));
         }
 
-        let Payload::Json(OwnedValue::Object(ref mut map)) = message.payload 
else {
-            return Ok(Some(message));
-        };
-
-        for field in &self.fields {
-            match &field.value {
-                FieldValue::Static(value) => map.insert(field.key.clone(), 
value.clone()),
-                FieldValue::Computed(value) => match value {
-                    ComputedValue::DateTime => {
-                        map.insert(field.key.clone(), 
chrono::Utc::now().to_rfc3339().into())
-                    }
-                    ComputedValue::TimestampMillis => map.insert(
-                        field.key.clone(),
-                        chrono::Utc::now().timestamp_millis().into(),
-                    ),
-                    ComputedValue::TimestampMicros => map.insert(
-                        field.key.clone(),
-                        chrono::Utc::now().timestamp_micros().into(),
-                    ),
-                    ComputedValue::TimestampNanos => map.insert(
-                        field.key.clone(),
-                        chrono::Utc::now().timestamp_nanos_opt().into(),
-                    ),
-                    ComputedValue::TimestampSeconds => {
-                        map.insert(field.key.clone(), 
chrono::Utc::now().timestamp().into())
-                    }
-                    ComputedValue::UuidV4 => {
-                        map.insert(field.key.clone(), 
uuid::Uuid::new_v4().to_string().into())
-                    }
-                    ComputedValue::UuidV7 => {
-                        map.insert(field.key.clone(), 
uuid::Uuid::now_v7().to_string().into())
-                    }
-                },
-            };
+        match &message.payload {
+            Payload::Json(_) => self.transform_json(metadata, message),
+            _ => Ok(Some(message)),
         }
-
-        Ok(Some(message))
     }
 }
diff --git a/core/connectors/sdk/src/transforms/delete_fields.rs 
b/core/connectors/sdk/src/transforms/delete_fields.rs
index d7dd755a..465c3719 100644
--- a/core/connectors/sdk/src/transforms/delete_fields.rs
+++ b/core/connectors/sdk/src/transforms/delete_fields.rs
@@ -16,30 +16,34 @@
  * under the License.
  */
 
-use std::collections::HashSet;
-
+use super::{Transform, TransformType};
+use crate::{DecodedMessage, Error, Payload, TopicMetadata};
 use serde::{Deserialize, Serialize};
 use simd_json::OwnedValue;
+use std::collections::HashSet;
 
-use crate::{DecodedMessage, Error, Payload, TopicMetadata};
-
-use super::{Transform, TransformType};
-
+/// Configuration for the DeleteFields transform
 #[derive(Debug, Serialize, Deserialize)]
 pub struct DeleteFieldsConfig {
-    fields: Vec<String>,
+    #[serde(default)]
+    pub fields: Vec<String>,
 }
 
+/// Transform that removes specified fields from JSON messages
 pub struct DeleteFields {
-    fields: HashSet<String>,
+    pub fields: HashSet<String>,
 }
 
 impl DeleteFields {
-    pub fn new(config: DeleteFieldsConfig) -> Self {
+    pub fn new(cfg: DeleteFieldsConfig) -> Self {
         Self {
-            fields: config.fields.into_iter().collect(),
+            fields: cfg.fields.into_iter().collect(),
         }
     }
+
+    pub fn should_remove(&self, k: &str, _v: &OwnedValue) -> bool {
+        self.fields.contains(k)
+    }
 }
 
 impl Transform for DeleteFields {
@@ -49,13 +53,16 @@ impl Transform for DeleteFields {
 
     fn transform(
         &self,
-        _metadata: &TopicMetadata,
-        mut message: DecodedMessage,
+        metadata: &TopicMetadata,
+        message: DecodedMessage,
     ) -> Result<Option<DecodedMessage>, Error> {
-        if let Payload::Json(OwnedValue::Object(ref mut map)) = 
message.payload {
-            map.retain(|key, _| !self.fields.contains(key));
+        if self.fields.is_empty() {
+            return Ok(Some(message));
         }
 
-        Ok(Some(message))
+        match &message.payload {
+            Payload::Json(_) => self.transform_json(metadata, message),
+            _ => Ok(Some(message)),
+        }
     }
 }
diff --git a/core/connectors/sdk/src/transforms/filter_fields.rs 
b/core/connectors/sdk/src/transforms/filter_fields.rs
new file mode 100644
index 00000000..2b70c209
--- /dev/null
+++ b/core/connectors/sdk/src/transforms/filter_fields.rs
@@ -0,0 +1,258 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::{Transform, TransformType};
+use crate::{DecodedMessage, Error, Payload, TopicMetadata};
+use regex::Regex;
+use serde::{Deserialize, Serialize};
+use simd_json::OwnedValue;
+use simd_json::prelude::{TypedArrayValue, TypedObjectValue, TypedScalarValue, 
ValueAsScalar};
+use std::collections::HashSet;
+
+/// Pattern matching for field keys with various string matching strategies
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+#[serde(rename_all = "snake_case")]
+pub enum KeyPattern<T = String> {
+    /// Exact string match
+    Exact(String),
+    /// Key starts with the given string
+    StartsWith(String),
+    /// Key ends with the given string
+    EndsWith(String),
+    /// Key contains the given string
+    Contains(String),
+    /// Key matches the given regular expression
+    Regex(T),
+}
+
+impl KeyPattern<String> {
+    /// Compiles string patterns into regex patterns for efficient matching
+    pub fn compile(self) -> Result<KeyPattern<Regex>, Error> {
+        Ok(match self {
+            KeyPattern::Regex(pattern) => {
+                KeyPattern::Regex(Regex::new(&pattern).map_err(|_| 
Error::InvalidConfig)?)
+            }
+            KeyPattern::Exact(s) => KeyPattern::Exact(s),
+            KeyPattern::StartsWith(s) => KeyPattern::StartsWith(s),
+            KeyPattern::EndsWith(s) => KeyPattern::EndsWith(s),
+            KeyPattern::Contains(s) => KeyPattern::Contains(s),
+        })
+    }
+}
+
+impl KeyPattern<Regex> {
+    /// Checks if a key matches this pattern
+    pub fn matches(&self, k: &str) -> bool {
+        match self {
+            KeyPattern::Exact(s) => k == s,
+            KeyPattern::StartsWith(s) => k.starts_with(s),
+            KeyPattern::EndsWith(s) => k.ends_with(s),
+            KeyPattern::Contains(s) => k.contains(s),
+            KeyPattern::Regex(re) => re.is_match(k),
+        }
+    }
+}
+
+/// Pattern matching for field values with type checking and content matching
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+#[serde(rename_all = "snake_case")]
+pub enum ValuePattern<T = String> {
+    /// Value equals the specified JSON value
+    Equals(OwnedValue),
+    /// String value contains the specified substring
+    Contains(String),
+    /// String value matches the given regular expression
+    Regex(T),
+    /// Numeric value is greater than the specified number
+    GreaterThan(f64),
+    /// Numeric value is less than the specified number
+    LessThan(f64),
+    /// Numeric value is between the specified range (inclusive)
+    Between(f64, f64),
+    /// Value is null
+    IsNull,
+    /// Value is not null
+    IsNotNull,
+    /// Value is a string
+    IsString,
+    /// Value is a number
+    IsNumber,
+    /// Value is a boolean
+    IsBoolean,
+    /// Value is an object
+    IsObject,
+    /// Value is an array
+    IsArray,
+}
+
+impl ValuePattern<String> {
+    /// Compiles string patterns into regex patterns for efficient matching
+    pub fn compile(self) -> Result<ValuePattern<Regex>, Error> {
+        use ValuePattern::*;
+        Ok(match self {
+            Regex(pattern) => Regex(regex::Regex::new(&pattern).map_err(|_| 
Error::InvalidConfig)?),
+            Equals(v) => Equals(v),
+            Contains(s) => Contains(s),
+            GreaterThan(n) => GreaterThan(n),
+            LessThan(n) => LessThan(n),
+            Between(a, b) => Between(a, b),
+            IsNull => IsNull,
+            IsNotNull => IsNotNull,
+            IsString => IsString,
+            IsNumber => IsNumber,
+            IsBoolean => IsBoolean,
+            IsObject => IsObject,
+            IsArray => IsArray,
+        })
+    }
+}
+
+impl ValuePattern<Regex> {
+    /// Checks if a value matches this pattern
+    pub fn matches(&self, v: &OwnedValue) -> bool {
+        use ValuePattern::*;
+        match self {
+            Equals(x) => v == x,
+            Contains(s) => v.as_str().is_some_and(|x| x.contains(s)),
+            Regex(re) => v.as_str().is_some_and(|x| re.is_match(x)),
+            GreaterThan(t) => v.as_f64().is_some_and(|n| n > *t),
+            LessThan(t) => v.as_f64().is_some_and(|n| n < *t),
+            Between(a, b) => v.as_f64().is_some_and(|n| n >= *a && n <= *b),
+            IsNull => v.is_null(),
+            IsNotNull => !v.is_null(),
+            IsString => v.is_str(),
+            IsNumber => v.is_number(),
+            IsBoolean => v.is_bool(),
+            IsObject => v.is_object(),
+            IsArray => v.is_array(),
+        }
+    }
+}
+
+/// Configuration for the FilterFields transform
+#[derive(Debug, Serialize, Deserialize)]
+pub struct FilterFieldsConfig {
+    /// Fields to always keep regardless of pattern matching
+    #[serde(default)]
+    pub keep_fields: Vec<String>,
+    /// Patterns to match against field keys and values
+    #[serde(default)]
+    pub patterns: Vec<FilterPattern>,
+    /// Whether to include (true) or exclude (false) fields that match patterns
+    #[serde(default = "default_include")]
+    pub include_matching: bool,
+}
+
+/// Default value for include_matching field
+fn default_include() -> bool {
+    true
+}
+
+/// A pattern that can match both field keys and values
+#[derive(Debug, Serialize, Deserialize)]
+pub struct FilterPattern {
+    /// Optional pattern to match against field keys
+    #[serde(default)]
+    pub key_pattern: Option<KeyPattern<String>>,
+    /// Optional pattern to match against field values
+    #[serde(default)]
+    pub value_pattern: Option<ValuePattern<String>>,
+}
+
+/// A compiled pattern with regex patterns ready for efficient matching
+pub struct CompiledPattern {
+    /// Compiled key pattern
+    pub key_pattern: Option<KeyPattern<Regex>>,
+    /// Compiled value pattern
+    pub value_pattern: Option<ValuePattern<Regex>>,
+}
+
+/// Transform that filters JSON message fields based on complex patterns
+///
+/// This transform supports sophisticated filtering based on:
+/// - Field names (exact, prefix, suffix, contains, regex)
+/// - Field values (equality, type checking, numeric comparisons, regex)
+/// - Combination of key and value patterns
+/// - Include or exclude matching behavior
+pub struct FilterFields {
+    /// Whether to include or exclude matching fields
+    pub include_matching: bool,
+    /// Set of field names to always keep
+    pub keep_set: HashSet<String>,
+    /// Compiled patterns for efficient matching
+    pub patterns: Vec<CompiledPattern>,
+}
+
+impl FilterFields {
+    /// Creates a new FilterFields transform from configuration
+    ///
+    /// This compiles all regex patterns for efficient matching during 
transformation.
+    /// Returns an error if any regex patterns are invalid.
+    pub fn new(cfg: FilterFieldsConfig) -> Result<Self, Error> {
+        let keep_set = cfg.keep_fields.into_iter().collect();
+
+        let mut patterns = Vec::with_capacity(cfg.patterns.len());
+        for p in cfg.patterns {
+            patterns.push(CompiledPattern {
+                key_pattern: p.key_pattern.map(|kp| kp.compile()).transpose()?,
+                value_pattern: p.value_pattern.map(|vp| 
vp.compile()).transpose()?,
+            });
+        }
+
+        Ok(Self {
+            include_matching: cfg.include_matching,
+            keep_set,
+            patterns,
+        })
+    }
+
+    /// Checks if a field key-value pair matches any of the configured patterns
+    ///
+    /// A field matches if both the key pattern (if specified) and value 
pattern
+    /// (if specified) match. If only one pattern type is specified, only that
+    /// pattern needs to match.
+    #[inline]
+    pub fn matches_patterns(&self, k: &str, v: &OwnedValue) -> bool {
+        self.patterns.iter().any(|pat| {
+            let key_ok = pat.key_pattern.as_ref().is_none_or(|kp| 
kp.matches(k));
+            let value_ok = pat.value_pattern.as_ref().is_none_or(|vp| 
vp.matches(v));
+            key_ok && value_ok
+        })
+    }
+}
+
+impl Transform for FilterFields {
+    fn r#type(&self) -> TransformType {
+        TransformType::FilterFields
+    }
+
+    fn transform(
+        &self,
+        metadata: &TopicMetadata,
+        message: DecodedMessage,
+    ) -> Result<Option<DecodedMessage>, Error> {
+        if self.keep_set.is_empty() && self.patterns.is_empty() {
+            return Ok(Some(message)); // nothing to do
+        }
+
+        match &message.payload {
+            Payload::Json(_) => self.transform_json(metadata, message),
+            _ => Ok(Some(message)),
+        }
+    }
+}
diff --git a/core/connectors/sdk/src/transforms/json/add_fields.rs 
b/core/connectors/sdk/src/transforms/json/add_fields.rs
new file mode 100644
index 00000000..43fd4328
--- /dev/null
+++ b/core/connectors/sdk/src/transforms/json/add_fields.rs
@@ -0,0 +1,187 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::compute_value;
+use crate::{
+    DecodedMessage, Error, Payload, TopicMetadata,
+    transforms::{AddFields, FieldValue},
+};
+use simd_json::OwnedValue;
+
+impl AddFields {
+    pub(crate) fn transform_json(
+        &self,
+        _metadata: &TopicMetadata,
+        mut message: DecodedMessage,
+    ) -> Result<Option<DecodedMessage>, Error> {
+        let Payload::Json(OwnedValue::Object(ref mut map)) = message.payload 
else {
+            return Ok(Some(message));
+        };
+
+        for field in &self.fields {
+            let new_val = match &field.value {
+                FieldValue::Static(v) => v.clone(),
+                FieldValue::Computed(c) => compute_value(c),
+            };
+            map.insert(field.key.clone(), new_val);
+        }
+
+        Ok(Some(message))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::transforms::json::test_utils::{
+        assert_is_number, assert_is_uuid, create_raw_test_message, 
create_test_message,
+        create_test_topic_metadata, extract_json_object,
+    };
+    use crate::transforms::{
+        ComputedValue, FieldValue, Transform,
+        add_fields::{AddFields, Field},
+    };
+    use simd_json::OwnedValue;
+
+    #[test]
+    fn should_return_message_unchanged_when_no_fields() {
+        let transform = AddFields { fields: vec![] };
+        let msg = create_test_message(r#"{"existing": "field"}"#);
+        let result = transform
+            .transform_json(&create_test_topic_metadata(), msg)
+            .unwrap()
+            .unwrap();
+        let json_obj = extract_json_object(&result).unwrap();
+        assert_eq!(json_obj.len(), 1);
+        assert_eq!(json_obj["existing"], "field");
+    }
+
+    #[test]
+    fn should_add_static_field_to_json_message() {
+        let transform = AddFields {
+            fields: vec![Field {
+                key: "new_field".to_string(),
+                value: FieldValue::Static(OwnedValue::from("new_value")),
+            }],
+        };
+        let msg = create_test_message(r#"{"existing": "field"}"#);
+        let result = transform
+            .transform_json(&create_test_topic_metadata(), msg)
+            .unwrap()
+            .unwrap();
+        let json_obj = extract_json_object(&result).unwrap();
+        assert_eq!(json_obj.len(), 2);
+        assert_eq!(json_obj["existing"], "field");
+        assert_eq!(json_obj["new_field"], "new_value");
+    }
+
+    #[test]
+    fn should_add_multiple_static_fields_with_different_types() {
+        let transform = AddFields {
+            fields: vec![
+                Field {
+                    key: "string_field".to_string(),
+                    value: 
FieldValue::Static(OwnedValue::from("string_value")),
+                },
+                Field {
+                    key: "number_field".to_string(),
+                    value: FieldValue::Static(OwnedValue::from(42)),
+                },
+                Field {
+                    key: "boolean_field".to_string(),
+                    value: FieldValue::Static(OwnedValue::from(true)),
+                },
+            ],
+        };
+        let msg = create_test_message(r#"{"existing": "field"}"#);
+        let result = transform
+            .transform_json(&create_test_topic_metadata(), msg)
+            .unwrap()
+            .unwrap();
+        let json_obj = extract_json_object(&result).unwrap();
+        assert_eq!(json_obj.len(), 4);
+        assert_eq!(json_obj["existing"], "field");
+        assert_eq!(json_obj["string_field"], "string_value");
+        assert_eq!(json_obj["number_field"], 42);
+        assert_eq!(json_obj["boolean_field"], true);
+    }
+
+    #[test]
+    fn should_add_computed_fields_with_dynamic_values() {
+        let transform = AddFields {
+            fields: vec![
+                Field {
+                    key: "timestamp_ms".to_string(),
+                    value: 
FieldValue::Computed(ComputedValue::TimestampMillis),
+                },
+                Field {
+                    key: "uuid".to_string(),
+                    value: FieldValue::Computed(ComputedValue::UuidV4),
+                },
+            ],
+        };
+        let msg = create_test_message(r#"{"existing": "field"}"#);
+        let result = transform
+            .transform_json(&create_test_topic_metadata(), msg)
+            .unwrap()
+            .unwrap();
+        let json_obj = extract_json_object(&result).unwrap();
+        assert_eq!(json_obj.len(), 3);
+        assert_eq!(json_obj["existing"], "field");
+        assert_is_number(&json_obj["timestamp_ms"], "timestamp_ms");
+        assert_is_uuid(&json_obj["uuid"], "uuid");
+    }
+
+    #[test]
+    fn should_overwrite_existing_field_when_same_key_specified() {
+        let transform = AddFields {
+            fields: vec![Field {
+                key: "existing".to_string(),
+                value: FieldValue::Static(OwnedValue::from("new_value")),
+            }],
+        };
+        let msg = create_test_message(r#"{"existing": "field"}"#);
+        let result = transform
+            .transform_json(&create_test_topic_metadata(), msg)
+            .unwrap()
+            .unwrap();
+        let json_obj = extract_json_object(&result).unwrap();
+        assert_eq!(json_obj.len(), 1);
+        assert_eq!(json_obj["existing"], "new_value");
+    }
+
+    #[test]
+    fn should_pass_through_non_json_payload_unchanged() {
+        let transform = AddFields {
+            fields: vec![Field {
+                key: "new_field".to_string(),
+                value: FieldValue::Static(OwnedValue::from("new_value")),
+            }],
+        };
+        let msg = create_raw_test_message(vec![1, 2, 3, 4]);
+        let result = transform
+            .transform(&create_test_topic_metadata(), msg)
+            .unwrap()
+            .unwrap();
+        if let Payload::Raw(bytes) = &result.payload {
+            assert_eq!(*bytes, vec![1u8, 2, 3, 4]);
+        } else {
+            panic!("Expected Raw payload");
+        }
+    }
+}
diff --git a/core/connectors/sdk/src/transforms/json/delete_fields.rs 
b/core/connectors/sdk/src/transforms/json/delete_fields.rs
new file mode 100644
index 00000000..747490d0
--- /dev/null
+++ b/core/connectors/sdk/src/transforms/json/delete_fields.rs
@@ -0,0 +1,149 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{
+    DecodedMessage, Error, Payload, TopicMetadata, 
transforms::delete_fields::DeleteFields,
+};
+use simd_json::OwnedValue;
+
+impl DeleteFields {
+    pub(crate) fn transform_json(
+        &self,
+        _metadata: &TopicMetadata,
+        mut message: DecodedMessage,
+    ) -> Result<Option<DecodedMessage>, Error> {
+        let Payload::Json(OwnedValue::Object(ref mut map)) = message.payload 
else {
+            return Ok(Some(message));
+        };
+
+        map.retain(|k, v| !self.should_remove(k, v));
+        Ok(Some(message))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::transforms::json::test_utils::{
+        create_raw_test_message, create_test_message, 
create_test_topic_metadata,
+        extract_json_object,
+    };
+    use crate::transforms::{
+        Transform,
+        delete_fields::{DeleteFields, DeleteFieldsConfig},
+    };
+
+    #[test]
+    fn should_remove_specified_fields_from_json_message() {
+        let transform = DeleteFields::new(DeleteFieldsConfig {
+            fields: vec!["field1".to_string(), "field3".to_string()],
+        });
+        let msg = create_test_message(
+            r#"{"field1": "value1", "field2": "value2", "field3": 42, 
"field4": true}"#,
+        );
+        let result = transform
+            .transform_json(&create_test_topic_metadata(), msg)
+            .unwrap()
+            .unwrap();
+        let json_obj = extract_json_object(&result).unwrap();
+        assert_eq!(json_obj.len(), 2);
+        assert_eq!(json_obj["field2"], "value2");
+        assert_eq!(json_obj["field4"], true);
+        assert!(!json_obj.contains_key("field1"));
+        assert!(!json_obj.contains_key("field3"));
+    }
+
+    #[test]
+    fn should_remove_multiple_fields_while_preserving_others() {
+        let transform = DeleteFields::new(DeleteFieldsConfig {
+            fields: vec![
+                "remove1".to_string(),
+                "remove2".to_string(),
+                "remove3".to_string(),
+            ],
+        });
+        let msg = create_test_message(
+            r#"{
+            "keep1": "value1", 
+            "remove1": 100, 
+            "keep2": true, 
+            "remove2": null,
+            "remove3": "delete me"
+        }"#,
+        );
+        let result = transform
+            .transform_json(&create_test_topic_metadata(), msg)
+            .unwrap()
+            .unwrap();
+        let json_obj = extract_json_object(&result).unwrap();
+        assert_eq!(json_obj.len(), 2);
+        assert_eq!(json_obj["keep1"], "value1");
+        assert_eq!(json_obj["keep2"], true);
+        assert!(!json_obj.contains_key("remove1"));
+        assert!(!json_obj.contains_key("remove2"));
+        assert!(!json_obj.contains_key("remove3"));
+    }
+
+    #[test]
+    fn should_ignore_nonexistent_fields_when_deleting() {
+        let transform = DeleteFields::new(DeleteFieldsConfig {
+            fields: vec!["nonexistent1".to_string(), "field2".to_string()],
+        });
+        let msg = create_test_message(r#"{"field1": "value1", "field2": 
"value2", "field3": 42}"#);
+        let result = transform
+            .transform_json(&create_test_topic_metadata(), msg)
+            .unwrap()
+            .unwrap();
+        let json_obj = extract_json_object(&result).unwrap();
+        assert_eq!(json_obj.len(), 2);
+        assert_eq!(json_obj["field1"], "value1");
+        assert_eq!(json_obj["field3"], 42);
+        assert!(!json_obj.contains_key("field2"));
+    }
+
+    #[test]
+    fn should_return_message_unchanged_when_no_fields() {
+        let transform = DeleteFields::new(DeleteFieldsConfig { fields: vec![] 
});
+        let msg = create_test_message(r#"{"field1": "value1", "field2": 
"value2"}"#);
+        let result = transform
+            .transform_json(&create_test_topic_metadata(), msg)
+            .unwrap()
+            .unwrap();
+        let json_obj = extract_json_object(&result).unwrap();
+        assert_eq!(json_obj.len(), 2);
+        assert_eq!(json_obj["field1"], "value1");
+        assert_eq!(json_obj["field2"], "value2");
+    }
+
+    #[test]
+    fn should_pass_through_non_json_payload_unchanged() {
+        let transform = DeleteFields::new(DeleteFieldsConfig {
+            fields: vec!["field1".to_string()],
+        });
+        let msg = create_raw_test_message(vec![1, 2, 3, 4]);
+        let result = transform
+            .transform(&create_test_topic_metadata(), msg)
+            .unwrap()
+            .unwrap();
+        if let Payload::Raw(bytes) = &result.payload {
+            assert_eq!(*bytes, vec![1u8, 2, 3, 4]);
+        } else {
+            panic!("Expected Raw payload");
+        }
+    }
+}
diff --git a/core/connectors/sdk/src/transforms/json/filter_fields.rs 
b/core/connectors/sdk/src/transforms/json/filter_fields.rs
new file mode 100644
index 00000000..39543238
--- /dev/null
+++ b/core/connectors/sdk/src/transforms/json/filter_fields.rs
@@ -0,0 +1,290 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{
+    DecodedMessage, Error, Payload, TopicMetadata, 
transforms::filter_fields::FilterFields,
+};
+use simd_json::OwnedValue;
+
+impl FilterFields {
+    pub(crate) fn transform_json(
+        &self,
+        _metadata: &TopicMetadata,
+        mut message: DecodedMessage,
+    ) -> Result<Option<DecodedMessage>, Error> {
+        if self.keep_set.is_empty() && self.patterns.is_empty() {
+            return Ok(Some(message)); // nothing to do
+        }
+
+        let Payload::Json(OwnedValue::Object(ref mut map)) = message.payload 
else {
+            return Ok(Some(message));
+        };
+
+        let include = self.include_matching;
+        map.retain(|k, v| {
+            let explicit_keep = self.keep_set.contains(k);
+            if explicit_keep {
+                return true; // never drop an explicitly kept key
+            }
+
+            let matched = self.matches_patterns(k, v);
+            include ^ !matched // xor gives us include / exclude in one line
+        });
+
+        Ok(Some(message))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::transforms::json::test_utils::{
+        create_raw_test_message, create_test_message, 
create_test_topic_metadata,
+        extract_json_object,
+    };
+    use crate::transforms::{
+        Transform,
+        filter_fields::{
+            FilterFields, FilterFieldsConfig, FilterPattern, KeyPattern, 
ValuePattern,
+        },
+    };
+
+    #[test]
+    fn should_keep_only_specified_fields_when_keep_list_provided() {
+        let transform = FilterFields::new(FilterFieldsConfig {
+            keep_fields: vec!["id".to_string(), "name".to_string()],
+            patterns: vec![],
+            include_matching: true,
+        })
+        .unwrap();
+        let msg = create_test_message(
+            r#"{
+            "id": 1,
+            "name": "test",
+            "description": "should be removed",
+            "created_at": "2023-01-01"
+        }"#,
+        );
+        let result = transform
+            .transform_json(&create_test_topic_metadata(), msg)
+            .unwrap()
+            .unwrap();
+        let json_obj = extract_json_object(&result).unwrap();
+        assert_eq!(json_obj.len(), 2);
+        assert_eq!(json_obj["id"], 1);
+        assert_eq!(json_obj["name"], "test");
+        assert!(!json_obj.contains_key("description"));
+        assert!(!json_obj.contains_key("created_at"));
+    }
+
+    #[test]
+    fn should_include_fields_matching_key_pattern() {
+        let transform = FilterFields::new(FilterFieldsConfig {
+            keep_fields: vec![],
+            patterns: vec![FilterPattern {
+                key_pattern: Some(KeyPattern::StartsWith("meta_".to_string())),
+                value_pattern: None,
+            }],
+            include_matching: true,
+        })
+        .unwrap();
+        let msg = create_test_message(
+            r#"{
+            "id": 1,
+            "meta_created": "2023-01-01",
+            "meta_updated": "2023-01-02",
+            "content": "test content"
+        }"#,
+        );
+        let result = transform
+            .transform_json(&create_test_topic_metadata(), msg)
+            .unwrap()
+            .unwrap();
+        let json_obj = extract_json_object(&result).unwrap();
+        assert_eq!(json_obj.len(), 2);
+        assert_eq!(json_obj["meta_created"], "2023-01-01");
+        assert_eq!(json_obj["meta_updated"], "2023-01-02");
+        assert!(!json_obj.contains_key("id"));
+        assert!(!json_obj.contains_key("content"));
+    }
+
+    #[test]
+    fn should_exclude_fields_matching_pattern_when_include_false() {
+        let transform = FilterFields::new(FilterFieldsConfig {
+            keep_fields: vec![],
+            patterns: vec![FilterPattern {
+                key_pattern: Some(KeyPattern::StartsWith("temp_".to_string())),
+                value_pattern: None,
+            }],
+            include_matching: false,
+        })
+        .unwrap();
+        let msg = create_test_message(
+            r#"{
+            "id": 1,
+            "name": "test",
+            "temp_value": 100,
+            "temp_flag": true
+        }"#,
+        );
+        let result = transform
+            .transform_json(&create_test_topic_metadata(), msg)
+            .unwrap()
+            .unwrap();
+        let json_obj = extract_json_object(&result).unwrap();
+        assert_eq!(json_obj.len(), 2);
+        assert_eq!(json_obj["id"], 1);
+        assert_eq!(json_obj["name"], "test");
+        assert!(!json_obj.contains_key("temp_value"));
+        assert!(!json_obj.contains_key("temp_flag"));
+    }
+
+    #[test]
+    fn should_filter_fields_based_on_value_pattern_matching() {
+        let transform = FilterFields::new(FilterFieldsConfig {
+            keep_fields: vec![],
+            patterns: vec![FilterPattern {
+                key_pattern: None,
+                value_pattern: Some(ValuePattern::IsNumber),
+            }],
+            include_matching: true,
+        })
+        .unwrap();
+        let msg = create_test_message(
+            r#"{
+            "id": 1,
+            "count": 42,
+            "name": "test",
+            "active": true
+        }"#,
+        );
+        let result = transform
+            .transform_json(&create_test_topic_metadata(), msg)
+            .unwrap()
+            .unwrap();
+        let json_obj = extract_json_object(&result).unwrap();
+        assert_eq!(json_obj.len(), 2);
+        assert_eq!(json_obj["id"], 1);
+        assert_eq!(json_obj["count"], 42);
+        assert!(!json_obj.contains_key("name"));
+        assert!(!json_obj.contains_key("active"));
+    }
+
+    #[test]
+    fn should_filter_using_combined_key_and_value_patterns() {
+        let transform = FilterFields::new(FilterFieldsConfig {
+            keep_fields: vec![],
+            patterns: vec![FilterPattern {
+                key_pattern: Some(KeyPattern::Contains("date".to_string())),
+                value_pattern: Some(ValuePattern::IsString),
+            }],
+            include_matching: true,
+        })
+        .unwrap();
+        let msg = create_test_message(
+            r#"{
+            "id": 1,
+            "created_date": "2023-01-01",
+            "updated_date": "2023-01-02",
+            "expired_date": null,
+            "version": "1.0"
+        }"#,
+        );
+        let result = transform
+            .transform_json(&create_test_topic_metadata(), msg)
+            .unwrap()
+            .unwrap();
+        let json_obj = extract_json_object(&result).unwrap();
+        assert_eq!(json_obj.len(), 2);
+        assert_eq!(json_obj["created_date"], "2023-01-01");
+        assert_eq!(json_obj["updated_date"], "2023-01-02");
+        assert!(!json_obj.contains_key("id"));
+        assert!(!json_obj.contains_key("expired_date")); // null, not a string
+        assert!(!json_obj.contains_key("version"));
+    }
+
+    #[test]
+    fn should_combine_keep_fields_list_with_pattern_filtering() {
+        let transform = FilterFields::new(FilterFieldsConfig {
+            keep_fields: vec!["id".to_string()],
+            patterns: vec![FilterPattern {
+                key_pattern: Some(KeyPattern::StartsWith("meta_".to_string())),
+                value_pattern: None,
+            }],
+            include_matching: true,
+        })
+        .unwrap();
+        let msg = create_test_message(
+            r#"{
+            "id": 1,
+            "name": "test",
+            "meta_created": "2023-01-01",
+            "description": "should be removed"
+        }"#,
+        );
+        let result = transform
+            .transform_json(&create_test_topic_metadata(), msg)
+            .unwrap()
+            .unwrap();
+        let json_obj = extract_json_object(&result).unwrap();
+        assert_eq!(json_obj.len(), 2);
+        assert_eq!(json_obj["id"], 1); // explicitly kept
+        assert_eq!(json_obj["meta_created"], "2023-01-01"); // matched pattern
+        assert!(!json_obj.contains_key("name"));
+        assert!(!json_obj.contains_key("description"));
+    }
+
+    #[test]
+    fn should_return_message_unchanged_when_no_filters() {
+        let transform = FilterFields::new(FilterFieldsConfig {
+            keep_fields: vec![],
+            patterns: vec![],
+            include_matching: true,
+        })
+        .unwrap();
+        let msg = create_test_message(r#"{"id": 1, "name": "test"}"#);
+        let result = transform
+            .transform_json(&create_test_topic_metadata(), msg)
+            .unwrap()
+            .unwrap();
+        let json_obj = extract_json_object(&result).unwrap();
+        assert_eq!(json_obj.len(), 2); // should keep everything
+        assert_eq!(json_obj["id"], 1);
+        assert_eq!(json_obj["name"], "test");
+    }
+
+    #[test]
+    fn should_pass_through_non_json_payload_unchanged() {
+        let transform = FilterFields::new(FilterFieldsConfig {
+            keep_fields: vec!["field1".to_string()],
+            patterns: vec![],
+            include_matching: true,
+        })
+        .unwrap();
+        let msg = create_raw_test_message(vec![1, 2, 3, 4]);
+        let result = transform
+            .transform(&create_test_topic_metadata(), msg)
+            .unwrap()
+            .unwrap();
+        if let Payload::Raw(bytes) = &result.payload {
+            assert_eq!(*bytes, vec![1u8, 2, 3, 4]);
+        } else {
+            panic!("Expected Raw payload");
+        }
+    }
+}
diff --git a/core/connectors/sdk/src/transforms/json/mod.rs 
b/core/connectors/sdk/src/transforms/json/mod.rs
new file mode 100644
index 00000000..7b56c4d5
--- /dev/null
+++ b/core/connectors/sdk/src/transforms/json/mod.rs
@@ -0,0 +1,45 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use chrono::Utc;
+use simd_json::OwnedValue;
+
+use super::ComputedValue;
+
+// JSON-specific implementations for transforms
+pub mod add_fields;
+pub mod delete_fields;
+pub mod filter_fields;
+pub mod update_fields;
+
+/// Computes a JSON value based on the specified computed value type
+pub fn compute_value(kind: &ComputedValue) -> OwnedValue {
+    let now = Utc::now();
+    match kind {
+        ComputedValue::DateTime => now.to_rfc3339().into(),
+        ComputedValue::TimestampNanos => 
now.timestamp_nanos_opt().unwrap().into(),
+        ComputedValue::TimestampMicros => now.timestamp_micros().into(),
+        ComputedValue::TimestampMillis => now.timestamp_millis().into(),
+        ComputedValue::TimestampSeconds => now.timestamp().into(),
+        ComputedValue::UuidV4 => uuid::Uuid::new_v4().to_string().into(),
+        ComputedValue::UuidV7 => uuid::Uuid::now_v7().to_string().into(),
+    }
+}
+
+#[cfg(test)]
+pub mod test_utils;
diff --git a/core/connectors/sdk/src/transforms/json/test_utils.rs 
b/core/connectors/sdk/src/transforms/json/test_utils.rs
new file mode 100644
index 00000000..f3f1575c
--- /dev/null
+++ b/core/connectors/sdk/src/transforms/json/test_utils.rs
@@ -0,0 +1,93 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{DecodedMessage, Payload, TopicMetadata};
+use simd_json::OwnedValue;
+use simd_json::prelude::{TypedScalarValue, ValueAsScalar};
+use uuid;
+
+/// Helper function to create a test message with the given JSON payload
+pub fn create_test_message(json: &str) -> DecodedMessage {
+    let mut payload = json.to_string().into_bytes();
+    let value = simd_json::to_owned_value(&mut payload).unwrap();
+    DecodedMessage {
+        id: None,
+        offset: None,
+        checksum: None,
+        timestamp: None,
+        origin_timestamp: None,
+        headers: None,
+        payload: Payload::Json(value),
+    }
+}
+
+/// Helper function to create a non-JSON message with raw bytes
+pub fn create_raw_test_message(bytes: Vec<u8>) -> DecodedMessage {
+    DecodedMessage {
+        id: None,
+        offset: None,
+        checksum: None,
+        timestamp: None,
+        origin_timestamp: None,
+        headers: None,
+        payload: Payload::Raw(bytes),
+    }
+}
+
+/// Helper function to create a topic metadata for testing
+pub fn create_test_topic_metadata() -> TopicMetadata {
+    TopicMetadata {
+        stream: "test-stream".to_string(),
+        topic: "test-topic".to_string(),
+    }
+}
+
+/// Helper function to extract the JSON object from a message
+pub fn extract_json_object(msg: &DecodedMessage) -> 
Option<&simd_json::owned::Object> {
+    if let Payload::Json(OwnedValue::Object(map)) = &msg.payload {
+        Some(map)
+    } else {
+        None
+    }
+}
+
+/// Helper function to assert that a JSON value is a number
+pub fn assert_is_number(value: &OwnedValue, field_name: &str) {
+    if !value.is_number() {
+        panic!("{} should be a number", field_name);
+    }
+}
+
+/// Helper function to assert that a JSON value is a string
+pub fn assert_is_string(value: &OwnedValue, field_name: &str) {
+    if !value.is_str() {
+        panic!("{} should be a string", field_name);
+    }
+}
+
+/// Helper function to assert that a JSON value is a string and validates as a 
UUID
+pub fn assert_is_uuid(value: &OwnedValue, field_name: &str) {
+    if !value.is_str() {
+        panic!("{} should be a string", field_name);
+    }
+
+    let string_value = value.as_str().unwrap();
+    if uuid::Uuid::parse_str(string_value).is_err() {
+        panic!("{} is not a valid UUID", field_name);
+    }
+}
diff --git a/core/connectors/sdk/src/transforms/json/update_fields.rs 
b/core/connectors/sdk/src/transforms/json/update_fields.rs
new file mode 100644
index 00000000..22fff623
--- /dev/null
+++ b/core/connectors/sdk/src/transforms/json/update_fields.rs
@@ -0,0 +1,224 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::compute_value;
+use crate::{
+    DecodedMessage, Error, Payload, TopicMetadata,
+    transforms::{FieldValue, UpdateFields, update_fields::UpdateCondition},
+};
+use simd_json::OwnedValue;
+
+impl UpdateFields {
+    pub(crate) fn transform_json(
+        &self,
+        _metadata: &TopicMetadata,
+        mut message: DecodedMessage,
+    ) -> Result<Option<DecodedMessage>, Error> {
+        let Payload::Json(OwnedValue::Object(ref mut map)) = message.payload 
else {
+            return Ok(Some(message));
+        };
+
+        for field in &self.fields {
+            let present = map.contains_key(&field.key);
+            let pass = match &field.condition {
+                None | Some(UpdateCondition::Always) => true,
+                Some(UpdateCondition::KeyExists) => present,
+                Some(UpdateCondition::KeyNotExists) => !present,
+            };
+            if pass {
+                let val = match &field.value {
+                    FieldValue::Static(v) => v.clone(),
+                    FieldValue::Computed(c) => compute_value(c),
+                };
+                map.insert(field.key.clone(), val);
+            }
+        }
+
+        Ok(Some(message))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::transforms::json::test_utils::{
+        assert_is_uuid, create_raw_test_message, create_test_message, 
create_test_topic_metadata,
+        extract_json_object,
+    };
+    use crate::transforms::{
+        ComputedValue, FieldValue, Transform,
+        update_fields::{Field, UpdateCondition, UpdateFields, 
UpdateFieldsConfig},
+    };
+    use simd_json::OwnedValue;
+    use simd_json::prelude::TypedScalarValue;
+
+    #[test]
+    fn should_always_update_field_when_no_condition() {
+        let transform = UpdateFields::new(UpdateFieldsConfig {
+            fields: vec![Field {
+                key: "status".to_string(),
+                value: FieldValue::Static(OwnedValue::from("updated")),
+                condition: None,
+            }],
+        });
+        let msg = create_test_message(r#"{"id": 1, "status": "pending"}"#);
+        let result = transform
+            .transform_json(&create_test_topic_metadata(), msg)
+            .unwrap()
+            .unwrap();
+        let json_obj = extract_json_object(&result).unwrap();
+        assert_eq!(json_obj.len(), 2);
+        assert_eq!(json_obj["id"], 1);
+        assert_eq!(json_obj["status"], "updated");
+    }
+
+    #[test]
+    fn should_update_only_existing_keys_when_key_exists() {
+        let transform = UpdateFields::new(UpdateFieldsConfig {
+            fields: vec![
+                Field {
+                    key: "status".to_string(),
+                    value: FieldValue::Static(OwnedValue::from("updated")),
+                    condition: Some(UpdateCondition::KeyExists),
+                },
+                Field {
+                    key: "missing_field".to_string(),
+                    value: FieldValue::Static(OwnedValue::from("should not be 
added")),
+                    condition: Some(UpdateCondition::KeyExists),
+                },
+            ],
+        });
+        let msg = create_test_message(r#"{"id": 1, "status": "pending"}"#);
+        let result = transform
+            .transform_json(&create_test_topic_metadata(), msg)
+            .unwrap()
+            .unwrap();
+        let json_obj = extract_json_object(&result).unwrap();
+        assert_eq!(json_obj.len(), 2);
+        assert_eq!(json_obj["id"], 1);
+        assert_eq!(json_obj["status"], "updated");
+        assert!(!json_obj.contains_key("missing_field"));
+    }
+
+    #[test]
+    fn should_conditionally_add_fields_when_key_not_exists() {
+        let transform = UpdateFields::new(UpdateFieldsConfig {
+            fields: vec![
+                Field {
+                    key: "status".to_string(),
+                    value: FieldValue::Static(OwnedValue::from("should not 
update")),
+                    condition: Some(UpdateCondition::KeyNotExists),
+                },
+                Field {
+                    key: "created_at".to_string(),
+                    value: FieldValue::Static(OwnedValue::from("2023-01-01")),
+                    condition: Some(UpdateCondition::KeyNotExists),
+                },
+            ],
+        });
+        let msg = create_test_message(r#"{"id": 1, "status": "pending"}"#);
+        let result = transform
+            .transform_json(&create_test_topic_metadata(), msg)
+            .unwrap()
+            .unwrap();
+        let json_obj = extract_json_object(&result).unwrap();
+        assert_eq!(json_obj.len(), 3);
+        assert_eq!(json_obj["id"], 1);
+        assert_eq!(json_obj["status"], "pending"); // Should remain unchanged
+        assert_eq!(json_obj["created_at"], "2023-01-01"); // Should be added
+    }
+
+    #[test]
+    fn should_update_or_add_fields_when_always_condition() {
+        let transform = UpdateFields::new(UpdateFieldsConfig {
+            fields: vec![
+                Field {
+                    key: "status".to_string(),
+                    value: FieldValue::Static(OwnedValue::from("updated")),
+                    condition: Some(UpdateCondition::Always),
+                },
+                Field {
+                    key: "new_field".to_string(),
+                    value: FieldValue::Static(OwnedValue::from("new_value")),
+                    condition: Some(UpdateCondition::Always),
+                },
+            ],
+        });
+        let msg = create_test_message(r#"{"id": 1, "status": "pending"}"#);
+        let result = transform
+            .transform_json(&create_test_topic_metadata(), msg)
+            .unwrap()
+            .unwrap();
+        let json_obj = extract_json_object(&result).unwrap();
+        assert_eq!(json_obj.len(), 3);
+        assert_eq!(json_obj["id"], 1);
+        assert_eq!(json_obj["status"], "updated"); // Should be updated
+        assert_eq!(json_obj["new_field"], "new_value"); // Should be added
+    }
+
+    #[test]
+    fn should_update_fields_with_dynamically_computed_values() {
+        let transform = UpdateFields::new(UpdateFieldsConfig {
+            fields: vec![
+                Field {
+                    key: "updated_at".to_string(),
+                    value: FieldValue::Computed(ComputedValue::DateTime),
+                    condition: None,
+                },
+                Field {
+                    key: "request_id".to_string(),
+                    value: FieldValue::Computed(ComputedValue::UuidV4),
+                    condition: Some(UpdateCondition::KeyNotExists),
+                },
+            ],
+        });
+        let msg = create_test_message(r#"{"id": 1, "status": "pending"}"#);
+        let result = transform
+            .transform_json(&create_test_topic_metadata(), msg)
+            .unwrap()
+            .unwrap();
+        let json_obj = extract_json_object(&result).unwrap();
+        assert_eq!(json_obj.len(), 4);
+        assert_eq!(json_obj["id"], 1);
+        assert_eq!(json_obj["status"], "pending");
+        assert_is_uuid(&json_obj["request_id"], "request_id");
+        // Note: We can't easily test DateTime format here, but we know it's a 
string
+        assert!(json_obj["updated_at"].is_str());
+    }
+
+    #[test]
+    fn should_pass_through_non_json_payload_unchanged() {
+        let transform = UpdateFields::new(UpdateFieldsConfig {
+            fields: vec![Field {
+                key: "new_field".to_string(),
+                value: FieldValue::Static(OwnedValue::from("new_value")),
+                condition: None,
+            }],
+        });
+        let msg = create_raw_test_message(vec![1, 2, 3, 4]);
+        let result = transform
+            .transform(&create_test_topic_metadata(), msg)
+            .unwrap()
+            .unwrap();
+        if let Payload::Raw(bytes) = &result.payload {
+            assert_eq!(*bytes, vec![1u8, 2, 3, 4]);
+        } else {
+            panic!("Expected Raw payload");
+        }
+    }
+}
diff --git a/core/connectors/sdk/src/transforms/mod.rs 
b/core/connectors/sdk/src/transforms/mod.rs
index bbe67122..a8f2a46b 100644
--- a/core/connectors/sdk/src/transforms/mod.rs
+++ b/core/connectors/sdk/src/transforms/mod.rs
@@ -16,25 +16,54 @@
  * under the License.
  */
 
+mod add_fields;
+mod delete_fields;
+mod filter_fields;
+pub mod json;
+mod update_fields;
+use crate::{DecodedMessage, Error, TopicMetadata};
+pub use add_fields::{AddFields, AddFieldsConfig, Field as AddField};
+pub use delete_fields::{DeleteFields, DeleteFieldsConfig};
+pub use filter_fields::{
+    FilterFields, FilterFieldsConfig, FilterPattern, KeyPattern as 
FilterKeyPattern,
+    ValuePattern as FilterValuePattern,
+};
+use serde::{Deserialize, Serialize};
+use simd_json::OwnedValue;
 use std::sync::Arc;
-
-use add_fields::AddFields;
-use delete_fields::DeleteFields;
-use serde::{Deserialize, Serialize, de::DeserializeOwned};
 use strum_macros::{Display, IntoStaticStr};
-use tracing::error;
+pub use update_fields::{Field as UpdateField, UpdateCondition, UpdateFields, 
UpdateFieldsConfig};
 
-use crate::{DecodedMessage, Error, TopicMetadata};
+/// The value of a field, either static or computed at runtime
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+#[serde(rename_all = "snake_case")]
+pub enum FieldValue {
+    Static(OwnedValue),
+    Computed(ComputedValue),
+}
 
-pub mod add_fields;
-pub mod delete_fields;
+/// Types of computed values that can be generated at runtime
+#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, Display, 
IntoStaticStr)]
+#[serde(rename_all = "snake_case")]
+pub enum ComputedValue {
+    #[strum(to_string = "date_time")]
+    DateTime,
+    #[strum(to_string = "timestamp_nanos")]
+    TimestampNanos,
+    #[strum(to_string = "timestamp_micros")]
+    TimestampMicros,
+    #[strum(to_string = "timestamp_millis")]
+    TimestampMillis,
+    #[strum(to_string = "timestamp_seconds")]
+    TimestampSeconds,
+    #[strum(to_string = "uuid_v4")]
+    UuidV4,
+    #[strum(to_string = "uuid_v7")]
+    UuidV7,
+}
 
-/// Fields transformation trait.
 pub trait Transform: Send + Sync {
-    /// Returns the type of the transform.
     fn r#type(&self) -> TransformType;
-
-    /// Transforms the message, given the format is supported and the 
transform is applicable.
     fn transform(
         &self,
         metadata: &TopicMetadata,
@@ -47,25 +76,33 @@ pub trait Transform: Send + Sync {
 )]
 #[serde(rename_all = "snake_case")]
 pub enum TransformType {
-    #[strum(to_string = "add_fields")]
     AddFields,
-    #[strum(to_string = "delete_fields")]
     DeleteFields,
+    FilterFields,
+    UpdateFields,
 }
 
-pub fn load(r#type: TransformType, config: serde_json::Value) -> 
Result<Arc<dyn Transform>, Error> {
-    Ok(match r#type {
-        TransformType::AddFields => Arc::new(AddFields::new(as_config(r#type, 
config)?)),
-        TransformType::DeleteFields => 
Arc::new(DeleteFields::new(as_config(r#type, config)?)),
-    })
-}
-
-fn as_config<T: DeserializeOwned>(
-    r#type: TransformType,
-    config: serde_json::Value,
-) -> Result<T, Error> {
-    serde_json::from_value::<T>(config).map_err(|error| {
-        error!("Failed to deserialize config for transform: {type}. {error}");
-        Error::InvalidConfig
-    })
+pub fn from_config(t: TransformType, raw: &serde_json::Value) -> 
Result<Arc<dyn Transform>, Error> {
+    match t {
+        TransformType::AddFields => {
+            let cfg: AddFieldsConfig =
+                serde_json::from_value(raw.clone()).map_err(|_| 
Error::InvalidConfig)?;
+            Ok(Arc::new(AddFields::new(cfg)))
+        }
+        TransformType::DeleteFields => {
+            let cfg: DeleteFieldsConfig =
+                serde_json::from_value(raw.clone()).map_err(|_| 
Error::InvalidConfig)?;
+            Ok(Arc::new(DeleteFields::new(cfg)))
+        }
+        TransformType::FilterFields => {
+            let cfg: FilterFieldsConfig =
+                serde_json::from_value(raw.clone()).map_err(|_| 
Error::InvalidConfig)?;
+            Ok(Arc::new(FilterFields::new(cfg)?))
+        }
+        TransformType::UpdateFields => {
+            let cfg: UpdateFieldsConfig =
+                serde_json::from_value(raw.clone()).map_err(|_| 
Error::InvalidConfig)?;
+            Ok(Arc::new(UpdateFields::new(cfg)))
+        }
+    }
 }
diff --git a/core/connectors/sdk/src/transforms/update_fields.rs 
b/core/connectors/sdk/src/transforms/update_fields.rs
new file mode 100644
index 00000000..4bec96c1
--- /dev/null
+++ b/core/connectors/sdk/src/transforms/update_fields.rs
@@ -0,0 +1,78 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::{FieldValue, Transform, TransformType};
+use crate::{DecodedMessage, Error, Payload, TopicMetadata};
+use serde::{Deserialize, Serialize};
+
+/// A field to be updated in messages
+#[derive(Debug, Serialize, Deserialize)]
+pub struct Field {
+    pub key: String,
+    pub value: FieldValue,
+    #[serde(default)]
+    pub condition: Option<UpdateCondition>,
+}
+
+/// Configuration for the UpdateFields transform
+#[derive(Debug, Serialize, Deserialize)]
+pub struct UpdateFieldsConfig {
+    #[serde(default)]
+    pub fields: Vec<Field>,
+}
+
+/// Conditions that determine when a field should be updated
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "snake_case")]
+pub enum UpdateCondition {
+    Always,
+    KeyExists,
+    KeyNotExists,
+}
+
+/// Transform that updates fields in JSON messages based on conditions
+pub struct UpdateFields {
+    pub fields: Vec<Field>,
+}
+
+impl UpdateFields {
+    pub fn new(cfg: UpdateFieldsConfig) -> Self {
+        Self { fields: cfg.fields }
+    }
+}
+
+impl Transform for UpdateFields {
+    fn r#type(&self) -> TransformType {
+        TransformType::UpdateFields
+    }
+
+    fn transform(
+        &self,
+        metadata: &TopicMetadata,
+        message: DecodedMessage,
+    ) -> Result<Option<DecodedMessage>, Error> {
+        if self.fields.is_empty() {
+            return Ok(Some(message));
+        }
+
+        match &message.payload {
+            Payload::Json(_) => self.transform_json(metadata, message),
+            _ => Ok(Some(message)),
+        }
+    }
+}

Reply via email to