This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 9cad36ab feat(connectors): Extend JSON field transformations in
connectors run… (#1863)
9cad36ab is described below
commit 9cad36abdd64e0a8e137164148e3f80736650927
Author: Jagadesh Adireddi <[email protected]>
AuthorDate: Mon Jun 16 21:31:45 2025 +0530
feat(connectors): Extend JSON field transformations in connectors run…
(#1863)
Fixes: https://github.com/apache/iggy/issues/1848
This PR introduces a message transformation system for connectors,
enabling modification of JSON messages. The implementation provides four
essential transforms with one having advanced pattern matching
capabilities.
### Transforms Added
**AddFields Transform**
- Dynamically adds fields to JSON messages
- Static values (strings, numbers, booleans, objects)
- Computed values (timestamps, UUIDs, datetime strings)
- Support for multiple timestamp formats (nanos, micros, millis,
seconds)
- UUID v4 and v7 generation
**DeleteFields Transform**
- Removes specified fields from JSON messages
- Simple field name-based deletion
- Ignores non-existent fields
**UpdateFields Transform**
- Conditionally updates existing fields in JSON messages
- Conditional logic: Always, KeyExists, KeyNotExists
- Static and computed value updates
- Field creation when conditions are met
- Same computed value support as AddFields
**FilterFields Transform **
- Filtering based on complex key-value patterns
- Key pattern matching (exact, prefix, suffix, contains, regex)
- Value pattern matching (equality, type checks, numeric comparisons,
regex)
- Combined key and value pattern matching
- Include or exclude matching behavior
---------
Co-authored-by: Piotr Gankiewicz <[email protected]>
---
Cargo.lock | 1 +
core/connectors/runtime/src/transform.rs | 2 +-
core/connectors/sdk/Cargo.toml | 1 +
core/connectors/sdk/src/transforms/add_fields.rs | 101 ++-----
.../connectors/sdk/src/transforms/delete_fields.rs | 37 +--
.../connectors/sdk/src/transforms/filter_fields.rs | 258 ++++++++++++++++++
.../sdk/src/transforms/json/add_fields.rs | 187 +++++++++++++
.../sdk/src/transforms/json/delete_fields.rs | 149 +++++++++++
.../sdk/src/transforms/json/filter_fields.rs | 290 +++++++++++++++++++++
core/connectors/sdk/src/transforms/json/mod.rs | 45 ++++
.../sdk/src/transforms/json/test_utils.rs | 93 +++++++
.../sdk/src/transforms/json/update_fields.rs | 224 ++++++++++++++++
core/connectors/sdk/src/transforms/mod.rs | 95 ++++---
.../connectors/sdk/src/transforms/update_fields.rs | 78 ++++++
14 files changed, 1435 insertions(+), 126 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 76292c4b..ac8cfa25 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3887,6 +3887,7 @@ dependencies = [
"iggy",
"once_cell",
"postcard",
+ "regex",
"serde",
"serde_json",
"simd-json",
diff --git a/core/connectors/runtime/src/transform.rs
b/core/connectors/runtime/src/transform.rs
index 0cee67cc..303153cb 100644
--- a/core/connectors/runtime/src/transform.rs
+++ b/core/connectors/runtime/src/transform.rs
@@ -42,7 +42,7 @@ pub fn load(config: TransformsConfig) -> Result<Vec<Arc<dyn
Transform>>, Runtime
continue;
}
- let transform = iggy_connector_sdk::transforms::load(r#type,
transform_config)?;
+ let transform = iggy_connector_sdk::transforms::from_config(r#type,
&transform_config)?;
transforms.push(transform);
}
diff --git a/core/connectors/sdk/Cargo.toml b/core/connectors/sdk/Cargo.toml
index 7c96ed3c..54e07c77 100644
--- a/core/connectors/sdk/Cargo.toml
+++ b/core/connectors/sdk/Cargo.toml
@@ -35,6 +35,7 @@ dashmap = { workspace = true }
iggy = { workspace = true }
once_cell = { workspace = true }
postcard = { workspace = true }
+regex = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
simd-json = { workspace = true }
diff --git a/core/connectors/sdk/src/transforms/add_fields.rs
b/core/connectors/sdk/src/transforms/add_fields.rs
index 569a16b8..591959aa 100644
--- a/core/connectors/sdk/src/transforms/add_fields.rs
+++ b/core/connectors/sdk/src/transforms/add_fields.rs
@@ -16,60 +16,32 @@
* under the License.
*/
-use serde::{Deserialize, Serialize};
-use simd_json::OwnedValue;
-use strum_macros::{Display, IntoStaticStr};
-
+use super::{FieldValue, Transform, TransformType};
use crate::{DecodedMessage, Error, Payload, TopicMetadata};
+use serde::{Deserialize, Serialize};
-use super::{Transform, TransformType};
-
+/// A field to be added to messages
#[derive(Debug, Serialize, Deserialize)]
-pub struct AddFieldsConfig {
- fields: Vec<Field>,
-}
-
-pub struct AddFields {
- fields: Vec<Field>,
+pub struct Field {
+ pub key: String,
+ pub value: FieldValue,
}
+/// Configuration for the AddFields transform
#[derive(Debug, Serialize, Deserialize)]
-struct Field {
- key: String,
- value: FieldValue,
-}
-
-#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
-#[serde(rename_all = "snake_case")]
-enum FieldValue {
- Static(simd_json::OwnedValue),
- Computed(ComputedValue),
+pub struct AddFieldsConfig {
+ #[serde(default)]
+ pub fields: Vec<Field>,
}
-#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, Display,
IntoStaticStr)]
-#[serde(rename_all = "snake_case")]
-enum ComputedValue {
- #[strum(to_string = "date_time")]
- DateTime,
- #[strum(to_string = "timestamp_nanos")]
- TimestampNanos,
- #[strum(to_string = "timestamp_micros")]
- TimestampMicros,
- #[strum(to_string = "timestamp_millis")]
- TimestampMillis,
- #[strum(to_string = "timestamp_seconds")]
- TimestampSeconds,
- #[strum(to_string = "uuid_v4")]
- UuidV4,
- #[strum(to_string = "uuid_v7")]
- UuidV7,
+/// Transform that adds fields to JSON messages
+pub struct AddFields {
+ pub fields: Vec<Field>,
}
impl AddFields {
- pub fn new(config: AddFieldsConfig) -> Self {
- Self {
- fields: config.fields,
- }
+ pub fn new(cfg: AddFieldsConfig) -> Self {
+ Self { fields: cfg.fields }
}
}
@@ -80,49 +52,16 @@ impl Transform for AddFields {
fn transform(
&self,
- _metadata: &TopicMetadata,
- mut message: DecodedMessage,
+ metadata: &TopicMetadata,
+ message: DecodedMessage,
) -> Result<Option<DecodedMessage>, Error> {
if self.fields.is_empty() {
return Ok(Some(message));
}
- let Payload::Json(OwnedValue::Object(ref mut map)) = message.payload
else {
- return Ok(Some(message));
- };
-
- for field in &self.fields {
- match &field.value {
- FieldValue::Static(value) => map.insert(field.key.clone(),
value.clone()),
- FieldValue::Computed(value) => match value {
- ComputedValue::DateTime => {
- map.insert(field.key.clone(),
chrono::Utc::now().to_rfc3339().into())
- }
- ComputedValue::TimestampMillis => map.insert(
- field.key.clone(),
- chrono::Utc::now().timestamp_millis().into(),
- ),
- ComputedValue::TimestampMicros => map.insert(
- field.key.clone(),
- chrono::Utc::now().timestamp_micros().into(),
- ),
- ComputedValue::TimestampNanos => map.insert(
- field.key.clone(),
- chrono::Utc::now().timestamp_nanos_opt().into(),
- ),
- ComputedValue::TimestampSeconds => {
- map.insert(field.key.clone(),
chrono::Utc::now().timestamp().into())
- }
- ComputedValue::UuidV4 => {
- map.insert(field.key.clone(),
uuid::Uuid::new_v4().to_string().into())
- }
- ComputedValue::UuidV7 => {
- map.insert(field.key.clone(),
uuid::Uuid::now_v7().to_string().into())
- }
- },
- };
+ match &message.payload {
+ Payload::Json(_) => self.transform_json(metadata, message),
+ _ => Ok(Some(message)),
}
-
- Ok(Some(message))
}
}
diff --git a/core/connectors/sdk/src/transforms/delete_fields.rs
b/core/connectors/sdk/src/transforms/delete_fields.rs
index d7dd755a..465c3719 100644
--- a/core/connectors/sdk/src/transforms/delete_fields.rs
+++ b/core/connectors/sdk/src/transforms/delete_fields.rs
@@ -16,30 +16,34 @@
* under the License.
*/
-use std::collections::HashSet;
-
+use super::{Transform, TransformType};
+use crate::{DecodedMessage, Error, Payload, TopicMetadata};
use serde::{Deserialize, Serialize};
use simd_json::OwnedValue;
+use std::collections::HashSet;
-use crate::{DecodedMessage, Error, Payload, TopicMetadata};
-
-use super::{Transform, TransformType};
-
+/// Configuration for the DeleteFields transform
#[derive(Debug, Serialize, Deserialize)]
pub struct DeleteFieldsConfig {
- fields: Vec<String>,
+ #[serde(default)]
+ pub fields: Vec<String>,
}
+/// Transform that removes specified fields from JSON messages
pub struct DeleteFields {
- fields: HashSet<String>,
+ pub fields: HashSet<String>,
}
impl DeleteFields {
- pub fn new(config: DeleteFieldsConfig) -> Self {
+ pub fn new(cfg: DeleteFieldsConfig) -> Self {
Self {
- fields: config.fields.into_iter().collect(),
+ fields: cfg.fields.into_iter().collect(),
}
}
+
+ pub fn should_remove(&self, k: &str, _v: &OwnedValue) -> bool {
+ self.fields.contains(k)
+ }
}
impl Transform for DeleteFields {
@@ -49,13 +53,16 @@ impl Transform for DeleteFields {
fn transform(
&self,
- _metadata: &TopicMetadata,
- mut message: DecodedMessage,
+ metadata: &TopicMetadata,
+ message: DecodedMessage,
) -> Result<Option<DecodedMessage>, Error> {
- if let Payload::Json(OwnedValue::Object(ref mut map)) =
message.payload {
- map.retain(|key, _| !self.fields.contains(key));
+ if self.fields.is_empty() {
+ return Ok(Some(message));
}
- Ok(Some(message))
+ match &message.payload {
+ Payload::Json(_) => self.transform_json(metadata, message),
+ _ => Ok(Some(message)),
+ }
}
}
diff --git a/core/connectors/sdk/src/transforms/filter_fields.rs
b/core/connectors/sdk/src/transforms/filter_fields.rs
new file mode 100644
index 00000000..2b70c209
--- /dev/null
+++ b/core/connectors/sdk/src/transforms/filter_fields.rs
@@ -0,0 +1,258 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::{Transform, TransformType};
+use crate::{DecodedMessage, Error, Payload, TopicMetadata};
+use regex::Regex;
+use serde::{Deserialize, Serialize};
+use simd_json::OwnedValue;
+use simd_json::prelude::{TypedArrayValue, TypedObjectValue, TypedScalarValue,
ValueAsScalar};
+use std::collections::HashSet;
+
+/// Pattern matching for field keys with various string matching strategies
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+#[serde(rename_all = "snake_case")]
+pub enum KeyPattern<T = String> {
+ /// Exact string match
+ Exact(String),
+ /// Key starts with the given string
+ StartsWith(String),
+ /// Key ends with the given string
+ EndsWith(String),
+ /// Key contains the given string
+ Contains(String),
+ /// Key matches the given regular expression
+ Regex(T),
+}
+
+impl KeyPattern<String> {
+ /// Compiles string patterns into regex patterns for efficient matching
+ pub fn compile(self) -> Result<KeyPattern<Regex>, Error> {
+ Ok(match self {
+ KeyPattern::Regex(pattern) => {
+ KeyPattern::Regex(Regex::new(&pattern).map_err(|_|
Error::InvalidConfig)?)
+ }
+ KeyPattern::Exact(s) => KeyPattern::Exact(s),
+ KeyPattern::StartsWith(s) => KeyPattern::StartsWith(s),
+ KeyPattern::EndsWith(s) => KeyPattern::EndsWith(s),
+ KeyPattern::Contains(s) => KeyPattern::Contains(s),
+ })
+ }
+}
+
+impl KeyPattern<Regex> {
+ /// Checks if a key matches this pattern
+ pub fn matches(&self, k: &str) -> bool {
+ match self {
+ KeyPattern::Exact(s) => k == s,
+ KeyPattern::StartsWith(s) => k.starts_with(s),
+ KeyPattern::EndsWith(s) => k.ends_with(s),
+ KeyPattern::Contains(s) => k.contains(s),
+ KeyPattern::Regex(re) => re.is_match(k),
+ }
+ }
+}
+
+/// Pattern matching for field values with type checking and content matching
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+#[serde(rename_all = "snake_case")]
+pub enum ValuePattern<T = String> {
+ /// Value equals the specified JSON value
+ Equals(OwnedValue),
+ /// String value contains the specified substring
+ Contains(String),
+ /// String value matches the given regular expression
+ Regex(T),
+ /// Numeric value is greater than the specified number
+ GreaterThan(f64),
+ /// Numeric value is less than the specified number
+ LessThan(f64),
+ /// Numeric value is between the specified range (inclusive)
+ Between(f64, f64),
+ /// Value is null
+ IsNull,
+ /// Value is not null
+ IsNotNull,
+ /// Value is a string
+ IsString,
+ /// Value is a number
+ IsNumber,
+ /// Value is a boolean
+ IsBoolean,
+ /// Value is an object
+ IsObject,
+ /// Value is an array
+ IsArray,
+}
+
+impl ValuePattern<String> {
+ /// Compiles string patterns into regex patterns for efficient matching
+ pub fn compile(self) -> Result<ValuePattern<Regex>, Error> {
+ use ValuePattern::*;
+ Ok(match self {
+ Regex(pattern) => Regex(regex::Regex::new(&pattern).map_err(|_|
Error::InvalidConfig)?),
+ Equals(v) => Equals(v),
+ Contains(s) => Contains(s),
+ GreaterThan(n) => GreaterThan(n),
+ LessThan(n) => LessThan(n),
+ Between(a, b) => Between(a, b),
+ IsNull => IsNull,
+ IsNotNull => IsNotNull,
+ IsString => IsString,
+ IsNumber => IsNumber,
+ IsBoolean => IsBoolean,
+ IsObject => IsObject,
+ IsArray => IsArray,
+ })
+ }
+}
+
+impl ValuePattern<Regex> {
+ /// Checks if a value matches this pattern
+ pub fn matches(&self, v: &OwnedValue) -> bool {
+ use ValuePattern::*;
+ match self {
+ Equals(x) => v == x,
+ Contains(s) => v.as_str().is_some_and(|x| x.contains(s)),
+ Regex(re) => v.as_str().is_some_and(|x| re.is_match(x)),
+ GreaterThan(t) => v.as_f64().is_some_and(|n| n > *t),
+ LessThan(t) => v.as_f64().is_some_and(|n| n < *t),
+ Between(a, b) => v.as_f64().is_some_and(|n| n >= *a && n <= *b),
+ IsNull => v.is_null(),
+ IsNotNull => !v.is_null(),
+ IsString => v.is_str(),
+ IsNumber => v.is_number(),
+ IsBoolean => v.is_bool(),
+ IsObject => v.is_object(),
+ IsArray => v.is_array(),
+ }
+ }
+}
+
+/// Configuration for the FilterFields transform
+#[derive(Debug, Serialize, Deserialize)]
+pub struct FilterFieldsConfig {
+ /// Fields to always keep regardless of pattern matching
+ #[serde(default)]
+ pub keep_fields: Vec<String>,
+ /// Patterns to match against field keys and values
+ #[serde(default)]
+ pub patterns: Vec<FilterPattern>,
+ /// Whether to include (true) or exclude (false) fields that match patterns
+ #[serde(default = "default_include")]
+ pub include_matching: bool,
+}
+
+/// Default value for include_matching field
+fn default_include() -> bool {
+ true
+}
+
+/// A pattern that can match both field keys and values
+#[derive(Debug, Serialize, Deserialize)]
+pub struct FilterPattern {
+ /// Optional pattern to match against field keys
+ #[serde(default)]
+ pub key_pattern: Option<KeyPattern<String>>,
+ /// Optional pattern to match against field values
+ #[serde(default)]
+ pub value_pattern: Option<ValuePattern<String>>,
+}
+
+/// A compiled pattern with regex patterns ready for efficient matching
+pub struct CompiledPattern {
+ /// Compiled key pattern
+ pub key_pattern: Option<KeyPattern<Regex>>,
+ /// Compiled value pattern
+ pub value_pattern: Option<ValuePattern<Regex>>,
+}
+
+/// Transform that filters JSON message fields based on complex patterns
+///
+/// This transform supports sophisticated filtering based on:
+/// - Field names (exact, prefix, suffix, contains, regex)
+/// - Field values (equality, type checking, numeric comparisons, regex)
+/// - Combination of key and value patterns
+/// - Include or exclude matching behavior
+pub struct FilterFields {
+ /// Whether to include or exclude matching fields
+ pub include_matching: bool,
+ /// Set of field names to always keep
+ pub keep_set: HashSet<String>,
+ /// Compiled patterns for efficient matching
+ pub patterns: Vec<CompiledPattern>,
+}
+
+impl FilterFields {
+ /// Creates a new FilterFields transform from configuration
+ ///
+ /// This compiles all regex patterns for efficient matching during
transformation.
+ /// Returns an error if any regex patterns are invalid.
+ pub fn new(cfg: FilterFieldsConfig) -> Result<Self, Error> {
+ let keep_set = cfg.keep_fields.into_iter().collect();
+
+ let mut patterns = Vec::with_capacity(cfg.patterns.len());
+ for p in cfg.patterns {
+ patterns.push(CompiledPattern {
+ key_pattern: p.key_pattern.map(|kp| kp.compile()).transpose()?,
+ value_pattern: p.value_pattern.map(|vp|
vp.compile()).transpose()?,
+ });
+ }
+
+ Ok(Self {
+ include_matching: cfg.include_matching,
+ keep_set,
+ patterns,
+ })
+ }
+
+ /// Checks if a field key-value pair matches any of the configured patterns
+ ///
+ /// A field matches if both the key pattern (if specified) and value
pattern
+ /// (if specified) match. If only one pattern type is specified, only that
+ /// pattern needs to match.
+ #[inline]
+ pub fn matches_patterns(&self, k: &str, v: &OwnedValue) -> bool {
+ self.patterns.iter().any(|pat| {
+ let key_ok = pat.key_pattern.as_ref().is_none_or(|kp|
kp.matches(k));
+ let value_ok = pat.value_pattern.as_ref().is_none_or(|vp|
vp.matches(v));
+ key_ok && value_ok
+ })
+ }
+}
+
+impl Transform for FilterFields {
+ fn r#type(&self) -> TransformType {
+ TransformType::FilterFields
+ }
+
+ fn transform(
+ &self,
+ metadata: &TopicMetadata,
+ message: DecodedMessage,
+ ) -> Result<Option<DecodedMessage>, Error> {
+ if self.keep_set.is_empty() && self.patterns.is_empty() {
+ return Ok(Some(message)); // nothing to do
+ }
+
+ match &message.payload {
+ Payload::Json(_) => self.transform_json(metadata, message),
+ _ => Ok(Some(message)),
+ }
+ }
+}
diff --git a/core/connectors/sdk/src/transforms/json/add_fields.rs
b/core/connectors/sdk/src/transforms/json/add_fields.rs
new file mode 100644
index 00000000..43fd4328
--- /dev/null
+++ b/core/connectors/sdk/src/transforms/json/add_fields.rs
@@ -0,0 +1,187 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::compute_value;
+use crate::{
+ DecodedMessage, Error, Payload, TopicMetadata,
+ transforms::{AddFields, FieldValue},
+};
+use simd_json::OwnedValue;
+
+impl AddFields {
+ pub(crate) fn transform_json(
+ &self,
+ _metadata: &TopicMetadata,
+ mut message: DecodedMessage,
+ ) -> Result<Option<DecodedMessage>, Error> {
+ let Payload::Json(OwnedValue::Object(ref mut map)) = message.payload
else {
+ return Ok(Some(message));
+ };
+
+ for field in &self.fields {
+ let new_val = match &field.value {
+ FieldValue::Static(v) => v.clone(),
+ FieldValue::Computed(c) => compute_value(c),
+ };
+ map.insert(field.key.clone(), new_val);
+ }
+
+ Ok(Some(message))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::transforms::json::test_utils::{
+ assert_is_number, assert_is_uuid, create_raw_test_message,
create_test_message,
+ create_test_topic_metadata, extract_json_object,
+ };
+ use crate::transforms::{
+ ComputedValue, FieldValue, Transform,
+ add_fields::{AddFields, Field},
+ };
+ use simd_json::OwnedValue;
+
+ #[test]
+ fn should_return_message_unchanged_when_no_fields() {
+ let transform = AddFields { fields: vec![] };
+ let msg = create_test_message(r#"{"existing": "field"}"#);
+ let result = transform
+ .transform_json(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ let json_obj = extract_json_object(&result).unwrap();
+ assert_eq!(json_obj.len(), 1);
+ assert_eq!(json_obj["existing"], "field");
+ }
+
+ #[test]
+ fn should_add_static_field_to_json_message() {
+ let transform = AddFields {
+ fields: vec![Field {
+ key: "new_field".to_string(),
+ value: FieldValue::Static(OwnedValue::from("new_value")),
+ }],
+ };
+ let msg = create_test_message(r#"{"existing": "field"}"#);
+ let result = transform
+ .transform_json(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ let json_obj = extract_json_object(&result).unwrap();
+ assert_eq!(json_obj.len(), 2);
+ assert_eq!(json_obj["existing"], "field");
+ assert_eq!(json_obj["new_field"], "new_value");
+ }
+
+ #[test]
+ fn should_add_multiple_static_fields_with_different_types() {
+ let transform = AddFields {
+ fields: vec![
+ Field {
+ key: "string_field".to_string(),
+ value:
FieldValue::Static(OwnedValue::from("string_value")),
+ },
+ Field {
+ key: "number_field".to_string(),
+ value: FieldValue::Static(OwnedValue::from(42)),
+ },
+ Field {
+ key: "boolean_field".to_string(),
+ value: FieldValue::Static(OwnedValue::from(true)),
+ },
+ ],
+ };
+ let msg = create_test_message(r#"{"existing": "field"}"#);
+ let result = transform
+ .transform_json(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ let json_obj = extract_json_object(&result).unwrap();
+ assert_eq!(json_obj.len(), 4);
+ assert_eq!(json_obj["existing"], "field");
+ assert_eq!(json_obj["string_field"], "string_value");
+ assert_eq!(json_obj["number_field"], 42);
+ assert_eq!(json_obj["boolean_field"], true);
+ }
+
+ #[test]
+ fn should_add_computed_fields_with_dynamic_values() {
+ let transform = AddFields {
+ fields: vec![
+ Field {
+ key: "timestamp_ms".to_string(),
+ value:
FieldValue::Computed(ComputedValue::TimestampMillis),
+ },
+ Field {
+ key: "uuid".to_string(),
+ value: FieldValue::Computed(ComputedValue::UuidV4),
+ },
+ ],
+ };
+ let msg = create_test_message(r#"{"existing": "field"}"#);
+ let result = transform
+ .transform_json(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ let json_obj = extract_json_object(&result).unwrap();
+ assert_eq!(json_obj.len(), 3);
+ assert_eq!(json_obj["existing"], "field");
+ assert_is_number(&json_obj["timestamp_ms"], "timestamp_ms");
+ assert_is_uuid(&json_obj["uuid"], "uuid");
+ }
+
+ #[test]
+ fn should_overwrite_existing_field_when_same_key_specified() {
+ let transform = AddFields {
+ fields: vec![Field {
+ key: "existing".to_string(),
+ value: FieldValue::Static(OwnedValue::from("new_value")),
+ }],
+ };
+ let msg = create_test_message(r#"{"existing": "field"}"#);
+ let result = transform
+ .transform_json(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ let json_obj = extract_json_object(&result).unwrap();
+ assert_eq!(json_obj.len(), 1);
+ assert_eq!(json_obj["existing"], "new_value");
+ }
+
+ #[test]
+ fn should_pass_through_non_json_payload_unchanged() {
+ let transform = AddFields {
+ fields: vec![Field {
+ key: "new_field".to_string(),
+ value: FieldValue::Static(OwnedValue::from("new_value")),
+ }],
+ };
+ let msg = create_raw_test_message(vec![1, 2, 3, 4]);
+ let result = transform
+ .transform(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ if let Payload::Raw(bytes) = &result.payload {
+ assert_eq!(*bytes, vec![1u8, 2, 3, 4]);
+ } else {
+ panic!("Expected Raw payload");
+ }
+ }
+}
diff --git a/core/connectors/sdk/src/transforms/json/delete_fields.rs
b/core/connectors/sdk/src/transforms/json/delete_fields.rs
new file mode 100644
index 00000000..747490d0
--- /dev/null
+++ b/core/connectors/sdk/src/transforms/json/delete_fields.rs
@@ -0,0 +1,149 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{
+ DecodedMessage, Error, Payload, TopicMetadata,
transforms::delete_fields::DeleteFields,
+};
+use simd_json::OwnedValue;
+
+impl DeleteFields {
+ pub(crate) fn transform_json(
+ &self,
+ _metadata: &TopicMetadata,
+ mut message: DecodedMessage,
+ ) -> Result<Option<DecodedMessage>, Error> {
+ let Payload::Json(OwnedValue::Object(ref mut map)) = message.payload
else {
+ return Ok(Some(message));
+ };
+
+ map.retain(|k, v| !self.should_remove(k, v));
+ Ok(Some(message))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::transforms::json::test_utils::{
+ create_raw_test_message, create_test_message,
create_test_topic_metadata,
+ extract_json_object,
+ };
+ use crate::transforms::{
+ Transform,
+ delete_fields::{DeleteFields, DeleteFieldsConfig},
+ };
+
+ #[test]
+ fn should_remove_specified_fields_from_json_message() {
+ let transform = DeleteFields::new(DeleteFieldsConfig {
+ fields: vec!["field1".to_string(), "field3".to_string()],
+ });
+ let msg = create_test_message(
+ r#"{"field1": "value1", "field2": "value2", "field3": 42,
"field4": true}"#,
+ );
+ let result = transform
+ .transform_json(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ let json_obj = extract_json_object(&result).unwrap();
+ assert_eq!(json_obj.len(), 2);
+ assert_eq!(json_obj["field2"], "value2");
+ assert_eq!(json_obj["field4"], true);
+ assert!(!json_obj.contains_key("field1"));
+ assert!(!json_obj.contains_key("field3"));
+ }
+
+ #[test]
+ fn should_remove_multiple_fields_while_preserving_others() {
+ let transform = DeleteFields::new(DeleteFieldsConfig {
+ fields: vec![
+ "remove1".to_string(),
+ "remove2".to_string(),
+ "remove3".to_string(),
+ ],
+ });
+ let msg = create_test_message(
+ r#"{
+ "keep1": "value1",
+ "remove1": 100,
+ "keep2": true,
+ "remove2": null,
+ "remove3": "delete me"
+ }"#,
+ );
+ let result = transform
+ .transform_json(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ let json_obj = extract_json_object(&result).unwrap();
+ assert_eq!(json_obj.len(), 2);
+ assert_eq!(json_obj["keep1"], "value1");
+ assert_eq!(json_obj["keep2"], true);
+ assert!(!json_obj.contains_key("remove1"));
+ assert!(!json_obj.contains_key("remove2"));
+ assert!(!json_obj.contains_key("remove3"));
+ }
+
+ #[test]
+ fn should_ignore_nonexistent_fields_when_deleting() {
+ let transform = DeleteFields::new(DeleteFieldsConfig {
+ fields: vec!["nonexistent1".to_string(), "field2".to_string()],
+ });
+ let msg = create_test_message(r#"{"field1": "value1", "field2":
"value2", "field3": 42}"#);
+ let result = transform
+ .transform_json(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ let json_obj = extract_json_object(&result).unwrap();
+ assert_eq!(json_obj.len(), 2);
+ assert_eq!(json_obj["field1"], "value1");
+ assert_eq!(json_obj["field3"], 42);
+ assert!(!json_obj.contains_key("field2"));
+ }
+
+ #[test]
+ fn should_return_message_unchanged_when_no_fields() {
+ let transform = DeleteFields::new(DeleteFieldsConfig { fields: vec![]
});
+ let msg = create_test_message(r#"{"field1": "value1", "field2":
"value2"}"#);
+ let result = transform
+ .transform_json(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ let json_obj = extract_json_object(&result).unwrap();
+ assert_eq!(json_obj.len(), 2);
+ assert_eq!(json_obj["field1"], "value1");
+ assert_eq!(json_obj["field2"], "value2");
+ }
+
+ #[test]
+ fn should_pass_through_non_json_payload_unchanged() {
+ let transform = DeleteFields::new(DeleteFieldsConfig {
+ fields: vec!["field1".to_string()],
+ });
+ let msg = create_raw_test_message(vec![1, 2, 3, 4]);
+ let result = transform
+ .transform(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ if let Payload::Raw(bytes) = &result.payload {
+ assert_eq!(*bytes, vec![1u8, 2, 3, 4]);
+ } else {
+ panic!("Expected Raw payload");
+ }
+ }
+}
diff --git a/core/connectors/sdk/src/transforms/json/filter_fields.rs
b/core/connectors/sdk/src/transforms/json/filter_fields.rs
new file mode 100644
index 00000000..39543238
--- /dev/null
+++ b/core/connectors/sdk/src/transforms/json/filter_fields.rs
@@ -0,0 +1,290 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{
+ DecodedMessage, Error, Payload, TopicMetadata,
transforms::filter_fields::FilterFields,
+};
+use simd_json::OwnedValue;
+
+impl FilterFields {
+ pub(crate) fn transform_json(
+ &self,
+ _metadata: &TopicMetadata,
+ mut message: DecodedMessage,
+ ) -> Result<Option<DecodedMessage>, Error> {
+ if self.keep_set.is_empty() && self.patterns.is_empty() {
+ return Ok(Some(message)); // nothing to do
+ }
+
+ let Payload::Json(OwnedValue::Object(ref mut map)) = message.payload
else {
+ return Ok(Some(message));
+ };
+
+ let include = self.include_matching;
+ map.retain(|k, v| {
+ let explicit_keep = self.keep_set.contains(k);
+ if explicit_keep {
+ return true; // never drop an explicitly kept key
+ }
+
+ let matched = self.matches_patterns(k, v);
+ include ^ !matched // xor gives us include / exclude in one line
+ });
+
+ Ok(Some(message))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::transforms::json::test_utils::{
+ create_raw_test_message, create_test_message,
create_test_topic_metadata,
+ extract_json_object,
+ };
+ use crate::transforms::{
+ Transform,
+ filter_fields::{
+ FilterFields, FilterFieldsConfig, FilterPattern, KeyPattern,
ValuePattern,
+ },
+ };
+
+ #[test]
+ fn should_keep_only_specified_fields_when_keep_list_provided() {
+ let transform = FilterFields::new(FilterFieldsConfig {
+ keep_fields: vec!["id".to_string(), "name".to_string()],
+ patterns: vec![],
+ include_matching: true,
+ })
+ .unwrap();
+ let msg = create_test_message(
+ r#"{
+ "id": 1,
+ "name": "test",
+ "description": "should be removed",
+ "created_at": "2023-01-01"
+ }"#,
+ );
+ let result = transform
+ .transform_json(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ let json_obj = extract_json_object(&result).unwrap();
+ assert_eq!(json_obj.len(), 2);
+ assert_eq!(json_obj["id"], 1);
+ assert_eq!(json_obj["name"], "test");
+ assert!(!json_obj.contains_key("description"));
+ assert!(!json_obj.contains_key("created_at"));
+ }
+
+ #[test]
+ fn should_include_fields_matching_key_pattern() {
+ let transform = FilterFields::new(FilterFieldsConfig {
+ keep_fields: vec![],
+ patterns: vec![FilterPattern {
+ key_pattern: Some(KeyPattern::StartsWith("meta_".to_string())),
+ value_pattern: None,
+ }],
+ include_matching: true,
+ })
+ .unwrap();
+ let msg = create_test_message(
+ r#"{
+ "id": 1,
+ "meta_created": "2023-01-01",
+ "meta_updated": "2023-01-02",
+ "content": "test content"
+ }"#,
+ );
+ let result = transform
+ .transform_json(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ let json_obj = extract_json_object(&result).unwrap();
+ assert_eq!(json_obj.len(), 2);
+ assert_eq!(json_obj["meta_created"], "2023-01-01");
+ assert_eq!(json_obj["meta_updated"], "2023-01-02");
+ assert!(!json_obj.contains_key("id"));
+ assert!(!json_obj.contains_key("content"));
+ }
+
+ #[test]
+ fn should_exclude_fields_matching_pattern_when_include_false() {
+ let transform = FilterFields::new(FilterFieldsConfig {
+ keep_fields: vec![],
+ patterns: vec![FilterPattern {
+ key_pattern: Some(KeyPattern::StartsWith("temp_".to_string())),
+ value_pattern: None,
+ }],
+ include_matching: false,
+ })
+ .unwrap();
+ let msg = create_test_message(
+ r#"{
+ "id": 1,
+ "name": "test",
+ "temp_value": 100,
+ "temp_flag": true
+ }"#,
+ );
+ let result = transform
+ .transform_json(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ let json_obj = extract_json_object(&result).unwrap();
+ assert_eq!(json_obj.len(), 2);
+ assert_eq!(json_obj["id"], 1);
+ assert_eq!(json_obj["name"], "test");
+ assert!(!json_obj.contains_key("temp_value"));
+ assert!(!json_obj.contains_key("temp_flag"));
+ }
+
+ #[test]
+ fn should_filter_fields_based_on_value_pattern_matching() {
+ let transform = FilterFields::new(FilterFieldsConfig {
+ keep_fields: vec![],
+ patterns: vec![FilterPattern {
+ key_pattern: None,
+ value_pattern: Some(ValuePattern::IsNumber),
+ }],
+ include_matching: true,
+ })
+ .unwrap();
+ let msg = create_test_message(
+ r#"{
+ "id": 1,
+ "count": 42,
+ "name": "test",
+ "active": true
+ }"#,
+ );
+ let result = transform
+ .transform_json(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ let json_obj = extract_json_object(&result).unwrap();
+ assert_eq!(json_obj.len(), 2);
+ assert_eq!(json_obj["id"], 1);
+ assert_eq!(json_obj["count"], 42);
+ assert!(!json_obj.contains_key("name"));
+ assert!(!json_obj.contains_key("active"));
+ }
+
+ #[test]
+ fn should_filter_using_combined_key_and_value_patterns() {
+ let transform = FilterFields::new(FilterFieldsConfig {
+ keep_fields: vec![],
+ patterns: vec![FilterPattern {
+ key_pattern: Some(KeyPattern::Contains("date".to_string())),
+ value_pattern: Some(ValuePattern::IsString),
+ }],
+ include_matching: true,
+ })
+ .unwrap();
+ let msg = create_test_message(
+ r#"{
+ "id": 1,
+ "created_date": "2023-01-01",
+ "updated_date": "2023-01-02",
+ "expired_date": null,
+ "version": "1.0"
+ }"#,
+ );
+ let result = transform
+ .transform_json(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ let json_obj = extract_json_object(&result).unwrap();
+ assert_eq!(json_obj.len(), 2);
+ assert_eq!(json_obj["created_date"], "2023-01-01");
+ assert_eq!(json_obj["updated_date"], "2023-01-02");
+ assert!(!json_obj.contains_key("id"));
+ assert!(!json_obj.contains_key("expired_date")); // null, not a string
+ assert!(!json_obj.contains_key("version"));
+ }
+
+ #[test]
+ fn should_combine_keep_fields_list_with_pattern_filtering() {
+ let transform = FilterFields::new(FilterFieldsConfig {
+ keep_fields: vec!["id".to_string()],
+ patterns: vec![FilterPattern {
+ key_pattern: Some(KeyPattern::StartsWith("meta_".to_string())),
+ value_pattern: None,
+ }],
+ include_matching: true,
+ })
+ .unwrap();
+ let msg = create_test_message(
+ r#"{
+ "id": 1,
+ "name": "test",
+ "meta_created": "2023-01-01",
+ "description": "should be removed"
+ }"#,
+ );
+ let result = transform
+ .transform_json(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ let json_obj = extract_json_object(&result).unwrap();
+ assert_eq!(json_obj.len(), 2);
+ assert_eq!(json_obj["id"], 1); // explicitly kept
+ assert_eq!(json_obj["meta_created"], "2023-01-01"); // matched pattern
+ assert!(!json_obj.contains_key("name"));
+ assert!(!json_obj.contains_key("description"));
+ }
+
+ #[test]
+ fn should_return_message_unchanged_when_no_filters() {
+ let transform = FilterFields::new(FilterFieldsConfig {
+ keep_fields: vec![],
+ patterns: vec![],
+ include_matching: true,
+ })
+ .unwrap();
+ let msg = create_test_message(r#"{"id": 1, "name": "test"}"#);
+ let result = transform
+ .transform_json(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ let json_obj = extract_json_object(&result).unwrap();
+ assert_eq!(json_obj.len(), 2); // should keep everything
+ assert_eq!(json_obj["id"], 1);
+ assert_eq!(json_obj["name"], "test");
+ }
+
+ #[test]
+ fn should_pass_through_non_json_payload_unchanged() {
+ let transform = FilterFields::new(FilterFieldsConfig {
+ keep_fields: vec!["field1".to_string()],
+ patterns: vec![],
+ include_matching: true,
+ })
+ .unwrap();
+ let msg = create_raw_test_message(vec![1, 2, 3, 4]);
+ let result = transform
+ .transform(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ if let Payload::Raw(bytes) = &result.payload {
+ assert_eq!(*bytes, vec![1u8, 2, 3, 4]);
+ } else {
+ panic!("Expected Raw payload");
+ }
+ }
+}
diff --git a/core/connectors/sdk/src/transforms/json/mod.rs
b/core/connectors/sdk/src/transforms/json/mod.rs
new file mode 100644
index 00000000..7b56c4d5
--- /dev/null
+++ b/core/connectors/sdk/src/transforms/json/mod.rs
@@ -0,0 +1,45 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use chrono::Utc;
+use simd_json::OwnedValue;
+
+use super::ComputedValue;
+
+// JSON-specific implementations for transforms
+pub mod add_fields;
+pub mod delete_fields;
+pub mod filter_fields;
+pub mod update_fields;
+
+/// Computes a JSON value based on the specified computed value type
+pub fn compute_value(kind: &ComputedValue) -> OwnedValue {
+ let now = Utc::now();
+ match kind {
+ ComputedValue::DateTime => now.to_rfc3339().into(),
+ ComputedValue::TimestampNanos =>
now.timestamp_nanos_opt().unwrap().into(),
+ ComputedValue::TimestampMicros => now.timestamp_micros().into(),
+ ComputedValue::TimestampMillis => now.timestamp_millis().into(),
+ ComputedValue::TimestampSeconds => now.timestamp().into(),
+ ComputedValue::UuidV4 => uuid::Uuid::new_v4().to_string().into(),
+ ComputedValue::UuidV7 => uuid::Uuid::now_v7().to_string().into(),
+ }
+}
+
+#[cfg(test)]
+pub mod test_utils;
diff --git a/core/connectors/sdk/src/transforms/json/test_utils.rs
b/core/connectors/sdk/src/transforms/json/test_utils.rs
new file mode 100644
index 00000000..f3f1575c
--- /dev/null
+++ b/core/connectors/sdk/src/transforms/json/test_utils.rs
@@ -0,0 +1,93 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{DecodedMessage, Payload, TopicMetadata};
+use simd_json::OwnedValue;
+use simd_json::prelude::{TypedScalarValue, ValueAsScalar};
+use uuid;
+
+/// Helper function to create a test message with the given JSON payload
+pub fn create_test_message(json: &str) -> DecodedMessage {
+ let mut payload = json.to_string().into_bytes();
+ let value = simd_json::to_owned_value(&mut payload).unwrap();
+ DecodedMessage {
+ id: None,
+ offset: None,
+ checksum: None,
+ timestamp: None,
+ origin_timestamp: None,
+ headers: None,
+ payload: Payload::Json(value),
+ }
+}
+
+/// Helper function to create a non-JSON message with raw bytes
+pub fn create_raw_test_message(bytes: Vec<u8>) -> DecodedMessage {
+ DecodedMessage {
+ id: None,
+ offset: None,
+ checksum: None,
+ timestamp: None,
+ origin_timestamp: None,
+ headers: None,
+ payload: Payload::Raw(bytes),
+ }
+}
+
+/// Helper function to create a topic metadata for testing
+pub fn create_test_topic_metadata() -> TopicMetadata {
+ TopicMetadata {
+ stream: "test-stream".to_string(),
+ topic: "test-topic".to_string(),
+ }
+}
+
+/// Helper function to extract the JSON object from a message
+pub fn extract_json_object(msg: &DecodedMessage) ->
Option<&simd_json::owned::Object> {
+ if let Payload::Json(OwnedValue::Object(map)) = &msg.payload {
+ Some(map)
+ } else {
+ None
+ }
+}
+
+/// Helper function to assert that a JSON value is a number
+pub fn assert_is_number(value: &OwnedValue, field_name: &str) {
+ if !value.is_number() {
+ panic!("{} should be a number", field_name);
+ }
+}
+
+/// Helper function to assert that a JSON value is a string
+pub fn assert_is_string(value: &OwnedValue, field_name: &str) {
+ if !value.is_str() {
+ panic!("{} should be a string", field_name);
+ }
+}
+
+/// Helper function to assert that a JSON value is a string and validates as a
UUID
+pub fn assert_is_uuid(value: &OwnedValue, field_name: &str) {
+ if !value.is_str() {
+ panic!("{} should be a string", field_name);
+ }
+
+ let string_value = value.as_str().unwrap();
+ if uuid::Uuid::parse_str(string_value).is_err() {
+ panic!("{} is not a valid UUID", field_name);
+ }
+}
diff --git a/core/connectors/sdk/src/transforms/json/update_fields.rs
b/core/connectors/sdk/src/transforms/json/update_fields.rs
new file mode 100644
index 00000000..22fff623
--- /dev/null
+++ b/core/connectors/sdk/src/transforms/json/update_fields.rs
@@ -0,0 +1,224 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::compute_value;
+use crate::{
+ DecodedMessage, Error, Payload, TopicMetadata,
+ transforms::{FieldValue, UpdateFields, update_fields::UpdateCondition},
+};
+use simd_json::OwnedValue;
+
+impl UpdateFields {
+ pub(crate) fn transform_json(
+ &self,
+ _metadata: &TopicMetadata,
+ mut message: DecodedMessage,
+ ) -> Result<Option<DecodedMessage>, Error> {
+ let Payload::Json(OwnedValue::Object(ref mut map)) = message.payload
else {
+ return Ok(Some(message));
+ };
+
+ for field in &self.fields {
+ let present = map.contains_key(&field.key);
+ let pass = match &field.condition {
+ None | Some(UpdateCondition::Always) => true,
+ Some(UpdateCondition::KeyExists) => present,
+ Some(UpdateCondition::KeyNotExists) => !present,
+ };
+ if pass {
+ let val = match &field.value {
+ FieldValue::Static(v) => v.clone(),
+ FieldValue::Computed(c) => compute_value(c),
+ };
+ map.insert(field.key.clone(), val);
+ }
+ }
+
+ Ok(Some(message))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::transforms::json::test_utils::{
+ assert_is_uuid, create_raw_test_message, create_test_message,
create_test_topic_metadata,
+ extract_json_object,
+ };
+ use crate::transforms::{
+ ComputedValue, FieldValue, Transform,
+ update_fields::{Field, UpdateCondition, UpdateFields,
UpdateFieldsConfig},
+ };
+ use simd_json::OwnedValue;
+ use simd_json::prelude::TypedScalarValue;
+
+ #[test]
+ fn should_always_update_field_when_no_condition() {
+ let transform = UpdateFields::new(UpdateFieldsConfig {
+ fields: vec![Field {
+ key: "status".to_string(),
+ value: FieldValue::Static(OwnedValue::from("updated")),
+ condition: None,
+ }],
+ });
+ let msg = create_test_message(r#"{"id": 1, "status": "pending"}"#);
+ let result = transform
+ .transform_json(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ let json_obj = extract_json_object(&result).unwrap();
+ assert_eq!(json_obj.len(), 2);
+ assert_eq!(json_obj["id"], 1);
+ assert_eq!(json_obj["status"], "updated");
+ }
+
+ #[test]
+ fn should_update_only_existing_keys_when_key_exists() {
+ let transform = UpdateFields::new(UpdateFieldsConfig {
+ fields: vec![
+ Field {
+ key: "status".to_string(),
+ value: FieldValue::Static(OwnedValue::from("updated")),
+ condition: Some(UpdateCondition::KeyExists),
+ },
+ Field {
+ key: "missing_field".to_string(),
+ value: FieldValue::Static(OwnedValue::from("should not be
added")),
+ condition: Some(UpdateCondition::KeyExists),
+ },
+ ],
+ });
+ let msg = create_test_message(r#"{"id": 1, "status": "pending"}"#);
+ let result = transform
+ .transform_json(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ let json_obj = extract_json_object(&result).unwrap();
+ assert_eq!(json_obj.len(), 2);
+ assert_eq!(json_obj["id"], 1);
+ assert_eq!(json_obj["status"], "updated");
+ assert!(!json_obj.contains_key("missing_field"));
+ }
+
+ #[test]
+ fn should_conditionally_add_fields_when_key_not_exists() {
+ let transform = UpdateFields::new(UpdateFieldsConfig {
+ fields: vec![
+ Field {
+ key: "status".to_string(),
+ value: FieldValue::Static(OwnedValue::from("should not
update")),
+ condition: Some(UpdateCondition::KeyNotExists),
+ },
+ Field {
+ key: "created_at".to_string(),
+ value: FieldValue::Static(OwnedValue::from("2023-01-01")),
+ condition: Some(UpdateCondition::KeyNotExists),
+ },
+ ],
+ });
+ let msg = create_test_message(r#"{"id": 1, "status": "pending"}"#);
+ let result = transform
+ .transform_json(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ let json_obj = extract_json_object(&result).unwrap();
+ assert_eq!(json_obj.len(), 3);
+ assert_eq!(json_obj["id"], 1);
+ assert_eq!(json_obj["status"], "pending"); // Should remain unchanged
+ assert_eq!(json_obj["created_at"], "2023-01-01"); // Should be added
+ }
+
+ #[test]
+ fn should_update_or_add_fields_when_always_condition() {
+ let transform = UpdateFields::new(UpdateFieldsConfig {
+ fields: vec![
+ Field {
+ key: "status".to_string(),
+ value: FieldValue::Static(OwnedValue::from("updated")),
+ condition: Some(UpdateCondition::Always),
+ },
+ Field {
+ key: "new_field".to_string(),
+ value: FieldValue::Static(OwnedValue::from("new_value")),
+ condition: Some(UpdateCondition::Always),
+ },
+ ],
+ });
+ let msg = create_test_message(r#"{"id": 1, "status": "pending"}"#);
+ let result = transform
+ .transform_json(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ let json_obj = extract_json_object(&result).unwrap();
+ assert_eq!(json_obj.len(), 3);
+ assert_eq!(json_obj["id"], 1);
+ assert_eq!(json_obj["status"], "updated"); // Should be updated
+ assert_eq!(json_obj["new_field"], "new_value"); // Should be added
+ }
+
+ #[test]
+ fn should_update_fields_with_dynamically_computed_values() {
+ let transform = UpdateFields::new(UpdateFieldsConfig {
+ fields: vec![
+ Field {
+ key: "updated_at".to_string(),
+ value: FieldValue::Computed(ComputedValue::DateTime),
+ condition: None,
+ },
+ Field {
+ key: "request_id".to_string(),
+ value: FieldValue::Computed(ComputedValue::UuidV4),
+ condition: Some(UpdateCondition::KeyNotExists),
+ },
+ ],
+ });
+ let msg = create_test_message(r#"{"id": 1, "status": "pending"}"#);
+ let result = transform
+ .transform_json(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ let json_obj = extract_json_object(&result).unwrap();
+ assert_eq!(json_obj.len(), 4);
+ assert_eq!(json_obj["id"], 1);
+ assert_eq!(json_obj["status"], "pending");
+ assert_is_uuid(&json_obj["request_id"], "request_id");
+ // Note: We can't easily test DateTime format here, but we know it's a
string
+ assert!(json_obj["updated_at"].is_str());
+ }
+
+ #[test]
+ fn should_pass_through_non_json_payload_unchanged() {
+ let transform = UpdateFields::new(UpdateFieldsConfig {
+ fields: vec![Field {
+ key: "new_field".to_string(),
+ value: FieldValue::Static(OwnedValue::from("new_value")),
+ condition: None,
+ }],
+ });
+ let msg = create_raw_test_message(vec![1, 2, 3, 4]);
+ let result = transform
+ .transform(&create_test_topic_metadata(), msg)
+ .unwrap()
+ .unwrap();
+ if let Payload::Raw(bytes) = &result.payload {
+ assert_eq!(*bytes, vec![1u8, 2, 3, 4]);
+ } else {
+ panic!("Expected Raw payload");
+ }
+ }
+}
diff --git a/core/connectors/sdk/src/transforms/mod.rs
b/core/connectors/sdk/src/transforms/mod.rs
index bbe67122..a8f2a46b 100644
--- a/core/connectors/sdk/src/transforms/mod.rs
+++ b/core/connectors/sdk/src/transforms/mod.rs
@@ -16,25 +16,54 @@
* under the License.
*/
+mod add_fields;
+mod delete_fields;
+mod filter_fields;
+pub mod json;
+mod update_fields;
+use crate::{DecodedMessage, Error, TopicMetadata};
+pub use add_fields::{AddFields, AddFieldsConfig, Field as AddField};
+pub use delete_fields::{DeleteFields, DeleteFieldsConfig};
+pub use filter_fields::{
+ FilterFields, FilterFieldsConfig, FilterPattern, KeyPattern as
FilterKeyPattern,
+ ValuePattern as FilterValuePattern,
+};
+use serde::{Deserialize, Serialize};
+use simd_json::OwnedValue;
use std::sync::Arc;
-
-use add_fields::AddFields;
-use delete_fields::DeleteFields;
-use serde::{Deserialize, Serialize, de::DeserializeOwned};
use strum_macros::{Display, IntoStaticStr};
-use tracing::error;
+pub use update_fields::{Field as UpdateField, UpdateCondition, UpdateFields,
UpdateFieldsConfig};
-use crate::{DecodedMessage, Error, TopicMetadata};
+/// The value of a field, either static or computed at runtime
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+#[serde(rename_all = "snake_case")]
+pub enum FieldValue {
+ Static(OwnedValue),
+ Computed(ComputedValue),
+}
-pub mod add_fields;
-pub mod delete_fields;
+/// Types of computed values that can be generated at runtime
+#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, Display,
IntoStaticStr)]
+#[serde(rename_all = "snake_case")]
+pub enum ComputedValue {
+ #[strum(to_string = "date_time")]
+ DateTime,
+ #[strum(to_string = "timestamp_nanos")]
+ TimestampNanos,
+ #[strum(to_string = "timestamp_micros")]
+ TimestampMicros,
+ #[strum(to_string = "timestamp_millis")]
+ TimestampMillis,
+ #[strum(to_string = "timestamp_seconds")]
+ TimestampSeconds,
+ #[strum(to_string = "uuid_v4")]
+ UuidV4,
+ #[strum(to_string = "uuid_v7")]
+ UuidV7,
+}
-/// Fields transformation trait.
pub trait Transform: Send + Sync {
- /// Returns the type of the transform.
fn r#type(&self) -> TransformType;
-
- /// Transforms the message, given the format is supported and the
transform is applicable.
fn transform(
&self,
metadata: &TopicMetadata,
@@ -47,25 +76,33 @@ pub trait Transform: Send + Sync {
)]
#[serde(rename_all = "snake_case")]
pub enum TransformType {
- #[strum(to_string = "add_fields")]
AddFields,
- #[strum(to_string = "delete_fields")]
DeleteFields,
+ FilterFields,
+ UpdateFields,
}
-pub fn load(r#type: TransformType, config: serde_json::Value) ->
Result<Arc<dyn Transform>, Error> {
- Ok(match r#type {
- TransformType::AddFields => Arc::new(AddFields::new(as_config(r#type,
config)?)),
- TransformType::DeleteFields =>
Arc::new(DeleteFields::new(as_config(r#type, config)?)),
- })
-}
-
-fn as_config<T: DeserializeOwned>(
- r#type: TransformType,
- config: serde_json::Value,
-) -> Result<T, Error> {
- serde_json::from_value::<T>(config).map_err(|error| {
- error!("Failed to deserialize config for transform: {type}. {error}");
- Error::InvalidConfig
- })
+pub fn from_config(t: TransformType, raw: &serde_json::Value) ->
Result<Arc<dyn Transform>, Error> {
+ match t {
+ TransformType::AddFields => {
+ let cfg: AddFieldsConfig =
+ serde_json::from_value(raw.clone()).map_err(|_|
Error::InvalidConfig)?;
+ Ok(Arc::new(AddFields::new(cfg)))
+ }
+ TransformType::DeleteFields => {
+ let cfg: DeleteFieldsConfig =
+ serde_json::from_value(raw.clone()).map_err(|_|
Error::InvalidConfig)?;
+ Ok(Arc::new(DeleteFields::new(cfg)))
+ }
+ TransformType::FilterFields => {
+ let cfg: FilterFieldsConfig =
+ serde_json::from_value(raw.clone()).map_err(|_|
Error::InvalidConfig)?;
+ Ok(Arc::new(FilterFields::new(cfg)?))
+ }
+ TransformType::UpdateFields => {
+ let cfg: UpdateFieldsConfig =
+ serde_json::from_value(raw.clone()).map_err(|_|
Error::InvalidConfig)?;
+ Ok(Arc::new(UpdateFields::new(cfg)))
+ }
+ }
}
diff --git a/core/connectors/sdk/src/transforms/update_fields.rs
b/core/connectors/sdk/src/transforms/update_fields.rs
new file mode 100644
index 00000000..4bec96c1
--- /dev/null
+++ b/core/connectors/sdk/src/transforms/update_fields.rs
@@ -0,0 +1,78 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::{FieldValue, Transform, TransformType};
+use crate::{DecodedMessage, Error, Payload, TopicMetadata};
+use serde::{Deserialize, Serialize};
+
+/// A field to be updated in messages
+#[derive(Debug, Serialize, Deserialize)]
+pub struct Field {
+ pub key: String,
+ pub value: FieldValue,
+ #[serde(default)]
+ pub condition: Option<UpdateCondition>,
+}
+
+/// Configuration for the UpdateFields transform
+#[derive(Debug, Serialize, Deserialize)]
+pub struct UpdateFieldsConfig {
+ #[serde(default)]
+ pub fields: Vec<Field>,
+}
+
+/// Conditions that determine when a field should be updated
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "snake_case")]
+pub enum UpdateCondition {
+ Always,
+ KeyExists,
+ KeyNotExists,
+}
+
+/// Transform that updates fields in JSON messages based on conditions
+pub struct UpdateFields {
+ pub fields: Vec<Field>,
+}
+
+impl UpdateFields {
+ pub fn new(cfg: UpdateFieldsConfig) -> Self {
+ Self { fields: cfg.fields }
+ }
+}
+
+impl Transform for UpdateFields {
+ fn r#type(&self) -> TransformType {
+ TransformType::UpdateFields
+ }
+
+ fn transform(
+ &self,
+ metadata: &TopicMetadata,
+ message: DecodedMessage,
+ ) -> Result<Option<DecodedMessage>, Error> {
+ if self.fields.is_empty() {
+ return Ok(Some(message));
+ }
+
+ match &message.payload {
+ Payload::Json(_) => self.transform_json(metadata, message),
+ _ => Ok(Some(message)),
+ }
+ }
+}