jeqo commented on code in PR #15893:
URL: https://github.com/apache/kafka/pull/15893#discussion_r1612458345


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -202,6 +204,225 @@ public Object valueFrom(Map<String, Object> map) {
         return current.get(lastStep());
     }
 
+    /**
+     * Access {@code Map} fields and apply functions to update field values.
+     *
+     * @param originalValue schema-based data value
+     * @param whenFound     function to apply when path is found
+     * @param whenNotFound  function to apply when path is not found
+     * @param whenOther     function to apply on fields not matched by path
+     * @return updated data value
+     */
+    public Map<String, Object> updateValueFrom(
+        Map<String, Object> originalValue,
+        MapValueUpdater whenFound,
+        MapValueUpdater whenNotFound,
+        MapValueUpdater whenOther
+    ) {
+        return updateValue(originalValue, 0, whenFound, whenNotFound, 
whenOther);
+    }
+
+    @SuppressWarnings("unchecked")
+    private Map<String, Object> updateValue(
+        Map<String, Object> originalValue,
+        int step,
+        MapValueUpdater whenFound,
+        MapValueUpdater whenNotFound,
+        MapValueUpdater whenOther
+    ) {
+        if (originalValue == null) return null;
+        Map<String, Object> updatedParent = new 
HashMap<>(originalValue.size());
+        boolean found = false;
+        for (Map.Entry<String, Object> entry : originalValue.entrySet()) {
+            String fieldName = entry.getKey();
+            Object fieldValue = entry.getValue();
+            if (steps.get(step).equals(fieldName)) {
+                found = true;
+                if (step < lastStepIndex()) {
+                    if (fieldValue instanceof Map) {
+                        Map<String, Object> updatedField = updateValue(
+                            (Map<String, Object>) fieldValue,
+                            step + 1,
+                            whenFound,
+                            whenNotFound,
+                            whenOther);
+                        updatedParent.put(fieldName, updatedField);
+                    } else {
+                        // add back to not found and apply others, as only 
leaf values are updated
+                        found = false;
+                        whenOther.apply(originalValue, updatedParent, null, 
fieldName);
+                    }
+                } else {
+                    whenFound.apply(originalValue, updatedParent, this, 
fieldName);
+                }
+            } else {
+                whenOther.apply(originalValue, updatedParent, null, fieldName);
+            }
+        }
+
+        if (!found) {
+            whenNotFound.apply(originalValue, updatedParent, this, 
steps.get(step));
+        }
+
+        return updatedParent;
+    }

Review Comment:
   Thanks! I like the proposed approach. The only change I'm considering to add 
is to return an intact map if value is not found (the proposal is to return 
null).
   This should make the behavior consistent with update Struct and Schema that 
do not make this distinction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to