jeqo commented on code in PR #15893:
URL: https://github.com/apache/kafka/pull/15893#discussion_r1612459168


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -202,6 +204,225 @@ public Object valueFrom(Map<String, Object> map) {
         return current.get(lastStep());
     }
 
+    /**
+     * Access {@code Map} fields and apply functions to update field values.
+     *
+     * @param originalValue schema-based data value
+     * @param whenFound     function to apply when path is found
+     * @param whenNotFound  function to apply when path is not found
+     * @param whenOther     function to apply on fields not matched by path
+     * @return updated data value
+     */
+    public Map<String, Object> updateValueFrom(
+        Map<String, Object> originalValue,
+        MapValueUpdater whenFound,
+        MapValueUpdater whenNotFound,
+        MapValueUpdater whenOther
+    ) {
+        return updateValue(originalValue, 0, whenFound, whenNotFound, 
whenOther);
+    }
+
+    @SuppressWarnings("unchecked")
+    private Map<String, Object> updateValue(
+        Map<String, Object> originalValue,
+        int step,
+        MapValueUpdater whenFound,
+        MapValueUpdater whenNotFound,
+        MapValueUpdater whenOther
+    ) {
+        if (originalValue == null) return null;
+        Map<String, Object> updatedParent = new 
HashMap<>(originalValue.size());
+        boolean found = false;
+        for (Map.Entry<String, Object> entry : originalValue.entrySet()) {
+            String fieldName = entry.getKey();
+            Object fieldValue = entry.getValue();
+            if (steps.get(step).equals(fieldName)) {
+                found = true;
+                if (step < lastStepIndex()) {
+                    if (fieldValue instanceof Map) {
+                        Map<String, Object> updatedField = updateValue(
+                            (Map<String, Object>) fieldValue,
+                            step + 1,
+                            whenFound,
+                            whenNotFound,
+                            whenOther);
+                        updatedParent.put(fieldName, updatedField);
+                    } else {
+                        // add back to not found and apply others, as only 
leaf values are updated
+                        found = false;
+                        whenOther.apply(originalValue, updatedParent, null, 
fieldName);
+                    }
+                } else {
+                    whenFound.apply(originalValue, updatedParent, this, 
fieldName);
+                }
+            } else {
+                whenOther.apply(originalValue, updatedParent, null, fieldName);
+            }
+        }
+
+        if (!found) {
+            whenNotFound.apply(originalValue, updatedParent, this, 
steps.get(step));
+        }
+
+        return updatedParent;
+    }
+
+    /**
+     * Access {@code Struct} fields and apply functions to update field values.
+     *
+     * @param originalSchema original struct schema
+     * @param originalValue  schema-based data value
+     * @param updatedSchema  updated struct schema
+     * @param whenFound      function to apply when path is found
+     * @param whenNotFound   function to apply when path is not found
+     * @param whenOther      function to apply on fields not matched by path
+     * @return updated data value
+     */
+    public Struct updateValueFrom(
+        Schema originalSchema,
+        Struct originalValue,
+        Schema updatedSchema,
+        StructValueUpdater whenFound,
+        StructValueUpdater whenNotFound,
+        StructValueUpdater whenOther
+    ) {
+        return updateValue(originalSchema, originalValue, updatedSchema, 0, 
whenFound, whenNotFound, whenOther);
+    }
+
+    private Struct updateValue(
+        Schema originalSchema,
+        Struct originalValue,
+        Schema updateSchema,
+        int step,
+        StructValueUpdater whenFound,
+        StructValueUpdater whenNotFound,
+        StructValueUpdater whenOther
+    ) {
+        Struct updated = new Struct(updateSchema);
+        boolean found = false;
+        for (Field field : originalSchema.fields()) {
+            if (step < steps.size()) {
+                if (steps.get(step).equals(field.name())) {
+                    found = true;
+                    if (step == lastStepIndex()) {
+                        whenFound.apply(
+                            originalValue,
+                            field,
+                            updated,
+                            updateSchema.field(field.name()),
+                            this
+                        );
+                    } else {
+                        if (field.schema().type() == Schema.Type.STRUCT) {
+                            Struct fieldValue = updateValue(
+                                field.schema(),
+                                originalValue.getStruct(field.name()),
+                                updateSchema.field(field.name()).schema(),
+                                step + 1,
+                                whenFound,
+                                whenNotFound,
+                                whenOther
+                            );
+                            updated.put(field.name(), fieldValue);
+                        } else {
+                            // add back to not found and apply others, as only 
leaf values are updated
+                            found = false;
+                            whenOther.apply(originalValue, field, updated, 
null, this);
+                        }
+                    }
+                } else {
+                    whenOther.apply(originalValue, field, updated, null, this);
+                }
+            }
+        }
+        if (!found) {
+            whenNotFound.apply(
+                originalValue,
+                null,
+                updated,
+                updateSchema.field(steps.get(step)),
+                this);
+        }
+        return updated;
+    }
+
+    /**
+     * Prepares a new schema based on an original one, and applies an update 
function
+     * when the current path(s) is found.
+     *
+     * <p>If path is not found, no function is applied, and the path is 
ignored.
+     *
+     * <p>Other fields are copied from original schema.
+     *
+     * @param originalSchema        baseline schema
+     * @param baselineSchemaBuilder baseline schema build, if changes to the 
baseline
+     *                              are required before copying original
+     * @param whenFound             function to apply when current path(s) 
is/are found.
+     * @return an updated schema. Resulting schemas are usually cached for 
further access.
+     */
+    public Schema updateSchemaFrom(
+        Schema originalSchema,
+        SchemaBuilder baselineSchemaBuilder,
+        StructSchemaUpdater whenFound,
+        StructSchemaUpdater whenNotFound,
+        StructSchemaUpdater whenOther
+    ) {
+        return updateSchema(originalSchema, baselineSchemaBuilder, 0, 
whenFound, whenNotFound, whenOther);
+    }
+
+    // Recursive implementation to update schema at different steps.
+    // Consider that resulting schemas are usually cached.
+    private Schema updateSchema(
+        Schema operatingSchema,
+        SchemaBuilder builder,
+        int step,
+        StructSchemaUpdater matching,
+        StructSchemaUpdater notFound,
+        StructSchemaUpdater others
+    ) {
+        if (operatingSchema.isOptional()) {
+            builder.optional();
+        }
+        if (operatingSchema.defaultValue() != null) {
+            builder.defaultValue(operatingSchema.defaultValue());
+        }

Review Comment:
   Good catch. I'm removing this for the current PR -- we can evaluate if this 
is needed in following PRs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to