jeqo commented on code in PR #15893: URL: https://github.com/apache/kafka/pull/15893#discussion_r1612458345
########## connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java: ########## @@ -202,6 +204,225 @@ public Object valueFrom(Map<String, Object> map) { return current.get(lastStep()); } + /** + * Access {@code Map} fields and apply functions to update field values. + * + * @param originalValue schema-based data value + * @param whenFound function to apply when path is found + * @param whenNotFound function to apply when path is not found + * @param whenOther function to apply on fields not matched by path + * @return updated data value + */ + public Map<String, Object> updateValueFrom( + Map<String, Object> originalValue, + MapValueUpdater whenFound, + MapValueUpdater whenNotFound, + MapValueUpdater whenOther + ) { + return updateValue(originalValue, 0, whenFound, whenNotFound, whenOther); + } + + @SuppressWarnings("unchecked") + private Map<String, Object> updateValue( + Map<String, Object> originalValue, + int step, + MapValueUpdater whenFound, + MapValueUpdater whenNotFound, + MapValueUpdater whenOther + ) { + if (originalValue == null) return null; + Map<String, Object> updatedParent = new HashMap<>(originalValue.size()); + boolean found = false; + for (Map.Entry<String, Object> entry : originalValue.entrySet()) { + String fieldName = entry.getKey(); + Object fieldValue = entry.getValue(); + if (steps.get(step).equals(fieldName)) { + found = true; + if (step < lastStepIndex()) { + if (fieldValue instanceof Map) { + Map<String, Object> updatedField = updateValue( + (Map<String, Object>) fieldValue, + step + 1, + whenFound, + whenNotFound, + whenOther); + updatedParent.put(fieldName, updatedField); + } else { + // add back to not found and apply others, as only leaf values are updated + found = false; + whenOther.apply(originalValue, updatedParent, null, fieldName); + } + } else { + whenFound.apply(originalValue, updatedParent, this, fieldName); + } + } else { + whenOther.apply(originalValue, updatedParent, null, fieldName); + } + } + + if (!found) { + whenNotFound.apply(originalValue, updatedParent, this, steps.get(step)); + } + + return updatedParent; + } Review Comment: Thanks! I like the proposed approach. The only change I'm considering to add is to return an intact map if value is not found (the proposal is to return null). This should make the behavior consistent with update Struct and Schema that do not make this distinction. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org